• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mostafa / xk6-kafka / 3902377300

12 Jan 2023 12:49PM UTC coverage: 75.771% (-1.0%) from 76.738%
3902377300

Pull #189

github

Mostafa Moradian
Remove opinionated default value for MaxWait
Pull Request #189: Fix `ReaderConfig`'s default values

50 of 50 new or added lines in 2 files covered. (100.0%)

1376 of 1816 relevant lines covered (75.77%)

12.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.35
/reader.go
1
package kafka
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "errors"
7
        "io"
8
        "time"
9

10
        "github.com/dop251/goja"
11
        kafkago "github.com/segmentio/kafka-go"
12
        "go.k6.io/k6/js/common"
13
        "go.k6.io/k6/metrics"
14
)
15

16
var (
17
        // Group balancers.
18
        groupBalancerRange        = "group_balancer_range"
19
        groupBalancerRoundRobin   = "group_balancer_round_robin"
20
        groupBalancerRackAffinity = "group_balancer_rack_affinity"
21

22
        GroupBalancers map[string]kafkago.GroupBalancer
23

24
        // Isolation levels.
25
        isolationLevelReadUncommitted = "isolation_level_read_uncommitted"
26
        isolationLevelReadCommitted   = "isolation_level_read_committed"
27

28
        IsolationLevels map[string]kafkago.IsolationLevel
29

30
        // Start offsets.
31
        lastOffset  = "start_offset_last_offset"
32
        firstOffset = "start_offset_first_offset"
33

34
        StartOffsets map[string]int64
35

36
        RebalanceTimeout       = time.Second * 5
37
        HeartbeatInterval      = time.Second * 3
38
        SessionTimeout         = time.Second * 30
39
        PartitionWatchInterval = time.Second * 5
40
        JoinGroupBackoff       = time.Second * 5
41
        RetentionTime          = time.Hour * 24
42
)
43

44
type ReaderConfig struct {
45
        WatchPartitionChanges  bool          `json:"watchPartitionChanges"`
46
        ConnectLogger          bool          `json:"connectLogger"`
47
        Partition              int           `json:"partition"`
48
        QueueCapacity          int           `json:"queueCapacity"`
49
        MinBytes               int           `json:"minBytes"`
50
        MaxBytes               int           `json:"maxBytes"`
51
        MaxAttempts            int           `json:"maxAttempts"`
52
        GroupID                string        `json:"groupId"`
53
        Topic                  string        `json:"topic"`
54
        IsolationLevel         string        `json:"isolationLevel"`
55
        StartOffset            string        `json:"startOffset"`
56
        Offset                 int64         `json:"offset"`
57
        Brokers                []string      `json:"brokers"`
58
        GroupTopics            []string      `json:"groupTopics"`
59
        GroupBalancers         []string      `json:"groupBalancers"`
60
        MaxWait                time.Duration `json:"maxWait"`
61
        ReadBatchTimeout       time.Duration `json:"readBatchTimeout"`
62
        ReadLagInterval        time.Duration `json:"readLagInterval"`
63
        HeartbeatInterval      time.Duration `json:"heartbeatInterval"`
64
        CommitInterval         time.Duration `json:"commitInterval"`
65
        PartitionWatchInterval time.Duration `json:"partitionWatchInterval"`
66
        SessionTimeout         time.Duration `json:"sessionTimeout"`
67
        RebalanceTimeout       time.Duration `json:"rebalanceTimeout"`
68
        JoinGroupBackoff       time.Duration `json:"joinGroupBackoff"`
69
        RetentionTime          time.Duration `json:"retentionTime"`
70
        ReadBackoffMin         time.Duration `json:"readBackoffMin"`
71
        ReadBackoffMax         time.Duration `json:"readBackoffMax"`
72
        SASL                   SASLConfig    `json:"sasl"`
73
        TLS                    TLSConfig     `json:"tls"`
74
}
75

76
type ConsumeConfig struct {
77
        Limit int64 `json:"limit"`
78
}
79

80
// readerClass is a wrapper around kafkago.reader and acts as a JS constructor
81
// for this extension, thus it must be called with new operator, e.g. new Reader(...).
82
// nolint: funlen
83
func (k *Kafka) readerClass(call goja.ConstructorCall) *goja.Object {
1✔
84
        runtime := k.vu.Runtime()
1✔
85
        var readerConfig *ReaderConfig
1✔
86
        if len(call.Arguments) == 0 {
1✔
87
                common.Throw(runtime, ErrNotEnoughArguments)
×
88
        }
×
89

90
        if params, ok := call.Argument(0).Export().(map[string]interface{}); ok {
2✔
91
                if b, err := json.Marshal(params); err != nil {
1✔
92
                        common.Throw(runtime, err)
×
93
                } else {
1✔
94
                        if err = json.Unmarshal(b, &readerConfig); err != nil {
1✔
95
                                common.Throw(runtime, err)
×
96
                        }
×
97
                }
98
        }
99

100
        reader := k.reader(readerConfig)
1✔
101

1✔
102
        readerObject := runtime.NewObject()
1✔
103
        // This is the reader object itself
1✔
104
        if err := readerObject.Set("This", reader); err != nil {
1✔
105
                common.Throw(runtime, err)
×
106
        }
×
107

108
        err := readerObject.Set("consume", func(call goja.FunctionCall) goja.Value {
2✔
109
                var consumeConfig *ConsumeConfig
1✔
110
                if len(call.Arguments) == 0 {
1✔
111
                        common.Throw(runtime, ErrNotEnoughArguments)
×
112
                }
×
113

114
                if params, ok := call.Argument(0).Export().(map[string]interface{}); ok {
2✔
115
                        if b, err := json.Marshal(params); err != nil {
1✔
116
                                common.Throw(runtime, err)
×
117
                        } else {
1✔
118
                                if err = json.Unmarshal(b, &consumeConfig); err != nil {
1✔
119
                                        common.Throw(runtime, err)
×
120
                                }
×
121
                        }
122
                }
123

124
                return runtime.ToValue(k.consume(reader, consumeConfig))
1✔
125
        })
126
        if err != nil {
1✔
127
                common.Throw(runtime, err)
×
128
        }
×
129

130
        // This is unnecessary, but it's here for reference purposes
131
        err = readerObject.Set("close", func(call goja.FunctionCall) goja.Value {
2✔
132
                if err := reader.Close(); err != nil {
1✔
133
                        common.Throw(runtime, err)
×
134
                }
×
135

136
                return goja.Undefined()
1✔
137
        })
138
        if err != nil {
1✔
139
                common.Throw(runtime, err)
×
140
        }
×
141

142
        freeze(readerObject)
1✔
143

1✔
144
        return runtime.ToValue(readerObject).ToObject(runtime)
1✔
145
}
146

147
// reader creates a Kafka reader with the given configuration
148
// nolint: funlen
149
func (k *Kafka) reader(readerConfig *ReaderConfig) *kafkago.Reader {
6✔
150
        dialer, err := GetDialer(readerConfig.SASL, readerConfig.TLS)
6✔
151
        if err != nil {
6✔
152
                if err.Unwrap() != nil {
×
153
                        logger.WithField("error", err).Error(err)
×
154
                }
×
155
                common.Throw(k.vu.Runtime(), err)
×
156
        }
157

158
        if readerConfig.Partition != 0 && readerConfig.GroupID != "" {
6✔
159
                common.Throw(k.vu.Runtime(), ErrPartitionAndGroupID)
×
160
        }
×
161

162
        if readerConfig.Topic != "" && readerConfig.GroupID != "" {
6✔
163
                common.Throw(k.vu.Runtime(), ErrTopicAndGroupID)
×
164
        }
×
165

166
        if readerConfig.GroupID != "" &&
6✔
167
                len(readerConfig.GroupTopics) >= 0 &&
6✔
168
                readerConfig.HeartbeatInterval == 0 {
6✔
169
                readerConfig.HeartbeatInterval = HeartbeatInterval
×
170
        }
×
171

172
        if readerConfig.GroupID != "" && readerConfig.SessionTimeout == 0 {
6✔
173
                readerConfig.SessionTimeout = SessionTimeout
×
174
        }
×
175

176
        if readerConfig.GroupID != "" && readerConfig.RebalanceTimeout == 0 {
6✔
177
                readerConfig.RebalanceTimeout = RebalanceTimeout
×
178
        }
×
179

180
        if readerConfig.GroupID != "" && readerConfig.JoinGroupBackoff == 0 {
6✔
181
                readerConfig.JoinGroupBackoff = JoinGroupBackoff
×
182
        }
×
183

184
        if readerConfig.GroupID != "" && readerConfig.PartitionWatchInterval == 0 {
6✔
185
                readerConfig.PartitionWatchInterval = PartitionWatchInterval
×
186
        }
×
187

188
        if readerConfig.GroupID != "" && readerConfig.RetentionTime == 0 {
6✔
189
                readerConfig.RetentionTime = RetentionTime
×
190
        }
×
191

192
        var groupBalancers []kafkago.GroupBalancer
6✔
193
        if readerConfig.GroupID != "" {
6✔
194
                groupBalancers = make([]kafkago.GroupBalancer, 0, len(readerConfig.GroupBalancers))
×
195
                for _, balancer := range readerConfig.GroupBalancers {
×
196
                        if b, ok := GroupBalancers[balancer]; ok {
×
197
                                groupBalancers = append(groupBalancers, b)
×
198
                        }
×
199
                }
200
                if len(groupBalancers) == 0 {
×
201
                        // Default to [Range, RoundRobin] if no balancer is specified
×
202
                        groupBalancers = append(groupBalancers, GroupBalancers[groupBalancerRange])
×
203
                        groupBalancers = append(groupBalancers, GroupBalancers[groupBalancerRoundRobin])
×
204
                }
×
205
        }
206

207
        isolationLevel := IsolationLevels[isolationLevelReadUncommitted]
6✔
208
        if readerConfig.IsolationLevel == "" {
12✔
209
                isolationLevel = IsolationLevels[readerConfig.IsolationLevel]
6✔
210
        }
6✔
211

212
        var startOffset int64
6✔
213
        if readerConfig.GroupID != "" && readerConfig.StartOffset != "" {
6✔
214
                startOffset = StartOffsets[firstOffset] // Default to FirstOffset
×
215
                if s, ok := StartOffsets[readerConfig.StartOffset]; ok {
×
216
                        startOffset = s
×
217
                }
×
218
        }
219

220
        consolidatedConfig := kafkago.ReaderConfig{
6✔
221
                Brokers:                readerConfig.Brokers,
6✔
222
                GroupID:                readerConfig.GroupID,
6✔
223
                GroupTopics:            readerConfig.GroupTopics,
6✔
224
                Topic:                  readerConfig.Topic,
6✔
225
                Partition:              readerConfig.Partition,
6✔
226
                QueueCapacity:          readerConfig.QueueCapacity,
6✔
227
                MinBytes:               readerConfig.MinBytes,
6✔
228
                MaxBytes:               readerConfig.MaxBytes,
6✔
229
                MaxWait:                readerConfig.MaxWait,
6✔
230
                ReadBatchTimeout:       readerConfig.ReadBatchTimeout,
6✔
231
                ReadLagInterval:        readerConfig.ReadLagInterval,
6✔
232
                GroupBalancers:         groupBalancers,
6✔
233
                HeartbeatInterval:      readerConfig.HeartbeatInterval,
6✔
234
                CommitInterval:         readerConfig.CommitInterval,
6✔
235
                PartitionWatchInterval: readerConfig.PartitionWatchInterval,
6✔
236
                WatchPartitionChanges:  readerConfig.WatchPartitionChanges,
6✔
237
                SessionTimeout:         readerConfig.SessionTimeout,
6✔
238
                RebalanceTimeout:       readerConfig.RebalanceTimeout,
6✔
239
                JoinGroupBackoff:       readerConfig.JoinGroupBackoff,
6✔
240
                RetentionTime:          readerConfig.RetentionTime,
6✔
241
                StartOffset:            startOffset,
6✔
242
                ReadBackoffMin:         readerConfig.ReadBackoffMin,
6✔
243
                ReadBackoffMax:         readerConfig.ReadBackoffMax,
6✔
244
                IsolationLevel:         isolationLevel,
6✔
245
                MaxAttempts:            readerConfig.MaxAttempts,
6✔
246
                Dialer:                 dialer,
6✔
247
        }
6✔
248

6✔
249
        if readerConfig.ConnectLogger {
6✔
250
                consolidatedConfig.Logger = logger
×
251
        }
×
252

253
        reader := kafkago.NewReader(consolidatedConfig)
6✔
254

6✔
255
        if readerConfig.Offset > 0 {
8✔
256
                if readerConfig.GroupID == "" {
4✔
257
                        if err := reader.SetOffset(readerConfig.Offset); err != nil {
2✔
258
                                wrappedError := NewXk6KafkaError(
×
259
                                        failedSetOffset, "Unable to set offset, yet returning the reader.", err)
×
260
                                logger.WithField("error", wrappedError).Warn(wrappedError)
×
261
                                return reader
×
262
                        }
×
263
                } else {
×
264
                        err := NewXk6KafkaError(
×
265
                                failedSetOffset, "Offset and groupID are mutually exclusive options, "+
×
266
                                        "so offset is not set, yet returning the reader.", nil)
×
267
                        logger.WithField("error", err).Warn(err)
×
268
                        return reader
×
269
                }
×
270
        }
271

272
        return reader
6✔
273
}
274

275
// consume consumes messages from the given reader.
276
// nolint: funlen
277
func (k *Kafka) consume(
278
        reader *kafkago.Reader, consumeConfig *ConsumeConfig,
279
) []map[string]interface{} {
11✔
280
        if state := k.vu.State(); state == nil {
11✔
281
                logger.WithField("error", ErrForbiddenInInitContext).Error(ErrForbiddenInInitContext)
×
282
                common.Throw(k.vu.Runtime(), ErrForbiddenInInitContext)
×
283
        }
×
284

285
        var ctx context.Context
11✔
286
        if ctx = k.vu.Context(); ctx == nil {
11✔
287
                err := NewXk6KafkaError(noContextError, "No context.", nil)
×
288
                logger.WithField("error", err).Info(err)
×
289
                common.Throw(k.vu.Runtime(), err)
×
290
        }
×
291

292
        if consumeConfig.Limit <= 0 {
11✔
293
                consumeConfig.Limit = 1
×
294
        }
×
295

296
        messages := make([]map[string]interface{}, 0)
11✔
297

11✔
298
        for i := int64(0); i < consumeConfig.Limit; i++ {
22✔
299
                msg, err := reader.ReadMessage(ctx)
11✔
300

11✔
301
                if errors.Is(err, io.EOF) {
11✔
302
                        k.reportReaderStats(reader.Stats())
×
303

×
304
                        err = NewXk6KafkaError(noMoreMessages, "No more messages.", nil)
×
305
                        logger.WithField("error", err).Info(err)
×
306
                        return messages
×
307
                }
×
308

309
                if err != nil {
12✔
310
                        k.reportReaderStats(reader.Stats())
1✔
311

1✔
312
                        err = NewXk6KafkaError(failedReadMessage, "Unable to read messages.", nil)
1✔
313
                        logger.WithField("error", err).Error(err)
1✔
314
                        return messages
1✔
315
                }
1✔
316

317
                // Rest of the fields of a given message
318
                message := map[string]interface{}{
10✔
319
                        "topic":         msg.Topic,
10✔
320
                        "partition":     msg.Partition,
10✔
321
                        "offset":        msg.Offset,
10✔
322
                        "time":          time.Unix(msg.Time.Unix(), 0).Format(time.RFC3339),
10✔
323
                        "highWaterMark": msg.HighWaterMark,
10✔
324
                        "headers":       make(map[string]interface{}),
10✔
325
                }
10✔
326

10✔
327
                if headers, ok := message["headers"].(map[string]interface{}); ok {
20✔
328
                        for _, header := range msg.Headers {
10✔
329
                                headers[header.Key] = header.Value
×
330
                        }
×
331
                } else {
×
332
                        err = NewXk6KafkaError(failedTypeCast, "Failed to cast to map.", nil)
×
333
                        logger.WithField("error", err).Error(err)
×
334
                }
×
335

336
                if len(msg.Key) > 0 {
12✔
337
                        message["key"] = msg.Key
2✔
338
                }
2✔
339

340
                if len(msg.Value) > 0 {
20✔
341
                        message["value"] = msg.Value
10✔
342
                }
10✔
343

344
                messages = append(messages, message)
10✔
345
        }
346

347
        k.reportReaderStats(reader.Stats())
10✔
348
        return messages
10✔
349
}
350

351
// reportReaderStats reports the reader stats
352
// nolint:funlen
353
func (k *Kafka) reportReaderStats(currentStats kafkago.ReaderStats) {
11✔
354
        state := k.vu.State()
11✔
355
        if state == nil {
11✔
356
                logger.WithField("error", ErrForbiddenInInitContext).Error(ErrForbiddenInInitContext)
×
357
                common.Throw(k.vu.Runtime(), ErrForbiddenInInitContext)
×
358
        }
×
359

360
        ctx := k.vu.Context()
11✔
361
        if ctx == nil {
11✔
362
                err := NewXk6KafkaError(noContextError, "No context.", nil)
×
363
                logger.WithField("error", err).Info(err)
×
364
                common.Throw(k.vu.Runtime(), err)
×
365
        }
×
366

367
        ctm := k.vu.State().Tags.GetCurrentValues()
11✔
368
        sampleTags := ctm.Tags.With("topic", currentStats.Topic)
11✔
369
        sampleTags = sampleTags.With("clientid", currentStats.ClientID)
11✔
370
        sampleTags = sampleTags.With("partition", currentStats.Partition)
11✔
371

11✔
372
        now := time.Now()
11✔
373
        metrics.PushIfNotDone(ctx, state.Samples, metrics.ConnectedSamples{
11✔
374
                Samples: []metrics.Sample{
11✔
375
                        {
11✔
376
                                Time: now,
11✔
377
                                TimeSeries: metrics.TimeSeries{
11✔
378
                                        Metric: k.metrics.ReaderDials,
11✔
379
                                        Tags:   sampleTags,
11✔
380
                                },
11✔
381
                                Value:    float64(currentStats.Dials),
11✔
382
                                Metadata: ctm.Metadata,
11✔
383
                        },
11✔
384
                        {
11✔
385
                                Time: now,
11✔
386
                                TimeSeries: metrics.TimeSeries{
11✔
387
                                        Metric: k.metrics.ReaderFetches,
11✔
388
                                        Tags:   sampleTags,
11✔
389
                                },
11✔
390
                                Value:    float64(currentStats.Fetches),
11✔
391
                                Metadata: ctm.Metadata,
11✔
392
                        },
11✔
393
                        {
11✔
394
                                Time: now,
11✔
395
                                TimeSeries: metrics.TimeSeries{
11✔
396
                                        Metric: k.metrics.ReaderMessages,
11✔
397
                                        Tags:   sampleTags,
11✔
398
                                },
11✔
399
                                Value:    float64(currentStats.Messages),
11✔
400
                                Metadata: ctm.Metadata,
11✔
401
                        },
11✔
402
                        {
11✔
403
                                Time: now,
11✔
404
                                TimeSeries: metrics.TimeSeries{
11✔
405
                                        Metric: k.metrics.ReaderBytes,
11✔
406
                                        Tags:   sampleTags,
11✔
407
                                },
11✔
408
                                Value:    float64(currentStats.Bytes),
11✔
409
                                Metadata: ctm.Metadata,
11✔
410
                        },
11✔
411
                        {
11✔
412
                                Time: now,
11✔
413
                                TimeSeries: metrics.TimeSeries{
11✔
414
                                        Metric: k.metrics.ReaderRebalances,
11✔
415
                                        Tags:   sampleTags,
11✔
416
                                },
11✔
417
                                Value:    float64(currentStats.Rebalances),
11✔
418
                                Metadata: ctm.Metadata,
11✔
419
                        },
11✔
420
                        {
11✔
421
                                Time: now,
11✔
422
                                TimeSeries: metrics.TimeSeries{
11✔
423
                                        Metric: k.metrics.ReaderTimeouts,
11✔
424
                                        Tags:   sampleTags,
11✔
425
                                },
11✔
426
                                Value:    float64(currentStats.Timeouts),
11✔
427
                                Metadata: ctm.Metadata,
11✔
428
                        },
11✔
429
                        {
11✔
430
                                Time: now,
11✔
431
                                TimeSeries: metrics.TimeSeries{
11✔
432
                                        Metric: k.metrics.ReaderErrors,
11✔
433
                                        Tags:   sampleTags,
11✔
434
                                },
11✔
435
                                Value:    float64(currentStats.Errors),
11✔
436
                                Metadata: ctm.Metadata,
11✔
437
                        },
11✔
438
                        {
11✔
439
                                Time: now,
11✔
440
                                TimeSeries: metrics.TimeSeries{
11✔
441
                                        Metric: k.metrics.ReaderDialTime,
11✔
442
                                        Tags:   sampleTags,
11✔
443
                                },
11✔
444
                                Value:    metrics.D(currentStats.DialTime.Avg),
11✔
445
                                Metadata: ctm.Metadata,
11✔
446
                        },
11✔
447
                        {
11✔
448
                                Time: now,
11✔
449
                                TimeSeries: metrics.TimeSeries{
11✔
450
                                        Metric: k.metrics.ReaderReadTime,
11✔
451
                                        Tags:   sampleTags,
11✔
452
                                },
11✔
453
                                Value:    metrics.D(currentStats.ReadTime.Avg),
11✔
454
                                Metadata: ctm.Metadata,
11✔
455
                        },
11✔
456
                        {
11✔
457
                                Time: now,
11✔
458
                                TimeSeries: metrics.TimeSeries{
11✔
459
                                        Metric: k.metrics.ReaderWaitTime,
11✔
460
                                        Tags:   sampleTags,
11✔
461
                                },
11✔
462
                                Value:    metrics.D(currentStats.WaitTime.Avg),
11✔
463
                                Metadata: ctm.Metadata,
11✔
464
                        },
11✔
465
                        {
11✔
466
                                Time: now,
11✔
467
                                TimeSeries: metrics.TimeSeries{
11✔
468
                                        Metric: k.metrics.ReaderFetchSize,
11✔
469
                                        Tags:   sampleTags,
11✔
470
                                },
11✔
471
                                Value:    float64(currentStats.FetchSize.Avg),
11✔
472
                                Metadata: ctm.Metadata,
11✔
473
                        },
11✔
474
                        {
11✔
475
                                Time: now,
11✔
476
                                TimeSeries: metrics.TimeSeries{
11✔
477
                                        Metric: k.metrics.ReaderFetchBytes,
11✔
478
                                        Tags:   sampleTags,
11✔
479
                                },
11✔
480
                                Value:    float64(currentStats.FetchBytes.Min),
11✔
481
                                Metadata: ctm.Metadata,
11✔
482
                        },
11✔
483
                        {
11✔
484
                                Time: now,
11✔
485
                                TimeSeries: metrics.TimeSeries{
11✔
486
                                        Metric: k.metrics.ReaderFetchBytes,
11✔
487
                                        Tags:   sampleTags,
11✔
488
                                },
11✔
489
                                Value:    float64(currentStats.FetchBytes.Max),
11✔
490
                                Metadata: ctm.Metadata,
11✔
491
                        },
11✔
492
                        {
11✔
493
                                Time: now,
11✔
494
                                TimeSeries: metrics.TimeSeries{
11✔
495
                                        Metric: k.metrics.ReaderOffset,
11✔
496
                                        Tags:   sampleTags,
11✔
497
                                },
11✔
498
                                Value:    float64(currentStats.Offset),
11✔
499
                                Metadata: ctm.Metadata,
11✔
500
                        },
11✔
501
                        {
11✔
502
                                Time: now,
11✔
503
                                TimeSeries: metrics.TimeSeries{
11✔
504
                                        Metric: k.metrics.ReaderLag,
11✔
505
                                        Tags:   sampleTags,
11✔
506
                                },
11✔
507
                                Value:    float64(currentStats.Lag),
11✔
508
                                Metadata: ctm.Metadata,
11✔
509
                        },
11✔
510
                        {
11✔
511
                                Time: now,
11✔
512
                                TimeSeries: metrics.TimeSeries{
11✔
513
                                        Metric: k.metrics.ReaderMinBytes,
11✔
514
                                        Tags:   sampleTags,
11✔
515
                                },
11✔
516
                                Value:    float64(currentStats.MinBytes),
11✔
517
                                Metadata: ctm.Metadata,
11✔
518
                        },
11✔
519
                        {
11✔
520
                                Time: now,
11✔
521
                                TimeSeries: metrics.TimeSeries{
11✔
522
                                        Metric: k.metrics.ReaderMaxBytes,
11✔
523
                                        Tags:   sampleTags,
11✔
524
                                },
11✔
525
                                Value:    float64(currentStats.MaxBytes),
11✔
526
                                Metadata: ctm.Metadata,
11✔
527
                        },
11✔
528
                        {
11✔
529
                                Time: now,
11✔
530
                                TimeSeries: metrics.TimeSeries{
11✔
531
                                        Metric: k.metrics.ReaderMaxWait,
11✔
532
                                        Tags:   sampleTags,
11✔
533
                                },
11✔
534
                                Value:    metrics.D(currentStats.MaxWait),
11✔
535
                                Metadata: ctm.Metadata,
11✔
536
                        },
11✔
537
                        {
11✔
538
                                Time: now,
11✔
539
                                TimeSeries: metrics.TimeSeries{
11✔
540
                                        Metric: k.metrics.ReaderQueueLength,
11✔
541
                                        Tags:   sampleTags,
11✔
542
                                },
11✔
543
                                Value:    float64(currentStats.QueueLength),
11✔
544
                                Metadata: ctm.Metadata,
11✔
545
                        },
11✔
546
                        {
11✔
547
                                Time: now,
11✔
548
                                TimeSeries: metrics.TimeSeries{
11✔
549
                                        Metric: k.metrics.ReaderQueueCapacity,
11✔
550
                                        Tags:   sampleTags,
11✔
551
                                },
11✔
552
                                Value:    float64(currentStats.QueueCapacity),
11✔
553
                                Metadata: ctm.Metadata,
11✔
554
                        },
11✔
555
                },
11✔
556
                Tags: sampleTags,
11✔
557
                Time: now,
11✔
558
        })
11✔
559
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc