• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nihiyama / ffq / 13108879284

03 Feb 2025 08:01AM UTC coverage: 91.254% (+4.9%) from 86.359%
13108879284

push

github

web-flow
Merge pull request #38 from nihiyama/feature/issue-37

#37 fix ffq

518 of 550 new or added lines in 6 files covered. (94.18%)

5 existing lines in 3 files now uncovered.

866 of 949 relevant lines covered (91.25%)

289.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.7
/group_queue.go
1
// Package ffq provides a File-based FIFO Queue implementation that supports generic types.
2
package ffq
3

4
import (
5
        "encoding/json"
6
        "errors"
7
        "fmt"
8
        "io/fs"
9
        "os"
10
        "path/filepath"
11
        "sync"
12
        "sync/atomic"
13
        "time"
14
)
15

16
// GroupQueue represents a group of queues with common configurations like size, encoder, and decoder.
17
// It manages multiple named queues and supports operations like enqueue, dequeue, and bulk enqueue/dequeue.
18
type GroupQueue[T any] struct {
19
        size            uint64 // The maximum number of items in the queue.
20
        maxPage         uint64
21
        name            string      // The name of the group queue.
22
        fileDir         string      // The directory where queue files are stored.
23
        queues          []*Queue[T] // A map of queue names to their corresponding Queue instances.
24
        nameIndices     sync.Map
25
        groupSize       int
26
        queueType       QueueType
27
        queueCount      uint32
28
        closeCount      uint32
29
        activeQueue     uint32
30
        isClose         atomic.Bool
31
        encoder         func(v any) ([]byte, error)    // Function to encode data before saving to the queue.
32
        decoder         func(data []byte, v any) error // Function to decode data when reading from the queue.
33
        initializeBlock chan struct{}                  // A channel to block until the queue is fully initialized.
34
        enqueueSig      chan struct{}                  // A signal channel to notify enqueue operations.
35
        closeSig        chan struct{}                  // A signal channel to notify that the queue is closed.
36
        mu              sync.RWMutex
37
}
38

39
// NewGroupQueue initializes a new GroupQueue with the given name and options.
40
// It sets up the queues, directory, encoder, and decoder for managing the queue data.
41
//
42
// Parameters:
43
//   - name: The name of the group queue.
44
//   - opts: Optional settings for the group queue.
45
//
46
// Returns:
47
//   - *GroupQueue: A pointer to the newly created GroupQueue.
48
//   - error: An error if any occurs during the queue creation.
49
//
50
// Example:
51
//
52
//        gq, err := NewGroupQueue[Data]("myGroupQueue", WithQueueSize(100), WithMaxPages(5))
53
//        if err != nil {
54
//            log.Fatal(err)
55
//        }
56
func NewGroupQueue[T any](name string, opts ...Option) (*GroupQueue[T], error) {
35✔
57
        var err error
35✔
58

35✔
59
        // check options and set default settings
35✔
60
        var options options
35✔
61
        for _, opt := range opts {
168✔
62
                err := opt(&options)
133✔
63
                if err != nil {
133✔
64
                        return nil, err
×
65
                }
×
66
        }
67

68
        var fileDir = "/tmp/ffq"
35✔
69
        if options.fileDir != nil {
70✔
70
                fileDir = *options.fileDir
35✔
71
        }
35✔
72
        err = createQueueDir(fileDir)
35✔
73
        if err != nil {
35✔
74
                return nil, err
×
75
        }
×
76

77
        var size uint64 = 1024
35✔
78
        if options.size != nil {
69✔
79
                size = *options.size
34✔
80
        }
34✔
81

82
        var maxPage uint64 = 2
35✔
83
        if options.maxPage != nil {
69✔
84
                maxPage = *options.maxPage
34✔
85
        }
34✔
86

87
        var queueType QueueType = SPSC
35✔
88
        if options.queueType != nil {
36✔
89
                queueType = *options.queueType
1✔
90
        }
1✔
91

92
        var groupSize int = 10
35✔
93
        if options.groupSize != nil {
62✔
94
                groupSize = *options.groupSize
27✔
95
        }
27✔
96

97
        var encoder func(v any) ([]byte, error) = json.Marshal
35✔
98
        if options.encoder != nil {
36✔
99
                encoder = *options.encoder
1✔
100
        }
1✔
101

102
        var decoder func(data []byte, v any) error = json.Unmarshal
35✔
103
        if options.decoder != nil {
36✔
104
                decoder = *options.decoder
1✔
105
        }
1✔
106

107
        queues := make([]*Queue[T], groupSize)
35✔
108
        enqueueSig := make(chan struct{}, 1)
35✔
109
        closeSig := make(chan struct{}, 1)
35✔
110

35✔
111
        gq := GroupQueue[T]{
35✔
112
                size:            size,
35✔
113
                maxPage:         maxPage,
35✔
114
                name:            name,
35✔
115
                fileDir:         fileDir,
35✔
116
                queues:          queues,
35✔
117
                nameIndices:     sync.Map{},
35✔
118
                groupSize:       groupSize,
35✔
119
                queueType:       queueType,
35✔
120
                queueCount:      0,
35✔
121
                closeCount:      0,
35✔
122
                activeQueue:     0,
35✔
123
                encoder:         encoder,
35✔
124
                decoder:         decoder,
35✔
125
                initializeBlock: make(chan struct{}),
35✔
126
                enqueueSig:      enqueueSig,
35✔
127
                closeSig:        closeSig,
35✔
128
        }
35✔
129

35✔
130
        gq.isClose.Store(false)
35✔
131

35✔
132
        go func() {
70✔
133
                gq.initialize()
35✔
134
        }()
35✔
135

136
        return &gq, nil
35✔
137
}
138

139
func (gq *GroupQueue[T]) addQueue(name string) error {
93✔
140
        if gq.isClose.Load() {
94✔
141
                return fmt.Errorf("already closed")
1✔
142
        }
1✔
143
        q, err := NewQueue[T](
92✔
144
                name,
92✔
145
                WithFileDir(filepath.Join(gq.fileDir, name)),
92✔
146
                WithMaxPage(gq.maxPage),
92✔
147
                WithQueueSize(gq.size),
92✔
148
                WithEncoder(gq.encoder),
92✔
149
                WithDecoder(gq.decoder),
92✔
150
                WithQueueType(gq.queueType),
92✔
151
        )
92✔
152
        if err != nil {
92✔
153
                return err
×
154
        }
×
155
        queueCount := atomic.AddUint32(&gq.queueCount, 1)
92✔
156
        if int(queueCount) > gq.groupSize {
93✔
157
                q.mu.Lock()
1✔
158
                q.WaitInitialize()
1✔
159
                os.RemoveAll(q.fileDir)
1✔
160
                q.mu.Unlock()
1✔
161
                return fmt.Errorf("reached group queue max size, %d", gq.groupSize)
1✔
162
        }
1✔
163
        gq.nameIndices.Store(name, queueCount-1)
91✔
164
        gq.mu.Lock()
91✔
165
        gq.queues[queueCount-1] = q
91✔
166
        gq.mu.Unlock()
91✔
167
        q.WaitInitialize()
91✔
168
        if q.Length() > 0 {
109✔
169
                gq.signalEnqueue()
18✔
170
        }
18✔
171
        return nil
91✔
172
}
173

174
// Enqueue adds a single item to the queue identified by the given name.
175
// If the queue does not exist, it will be created automatically.
176
//
177
// Parameters:
178
//   - name: The name of the queue to which the data should be enqueued.
179
//   - data: The data item to be added to the queue.
180
//
181
// Returns:
182
//   - error: An error if the enqueue operation fails.
183
//
184
// Example:
185
//
186
//         data := Data{...}
187
//                err := gq.Enqueue("queue1", &data)
188
//                if err != nil {
189
//                    log.Fatal(err)
190
//                }
191
func (gq *GroupQueue[T]) Enqueue(name string, item *T) error {
675✔
192
        var err error
675✔
193
        q, err := gq.getQueue(name)
675✔
194
        if err != nil {
741✔
195
                err = gq.addQueue(name)
66✔
196
                if err != nil {
68✔
197
                        return err
2✔
198
                }
2✔
199
                q, _ = gq.getQueue(name)
64✔
200
        }
201
        err = q.Enqueue(item)
673✔
202
        if err != nil {
673✔
203
                return err
×
204
        }
×
205
        gq.signalEnqueue()
673✔
206
        return nil
673✔
207
}
208

209
// BulkEnqueue adds multiple items to the queue identified by the given name.
210
// The items are added in batches, ensuring the queue size limit is respected.
211
//
212
// Parameters:
213
//   - name: The name of the queue to which the data should be enqueued.
214
//   - data: A slice of data items to be added to the queue.
215
//
216
// Returns:
217
//   - error: An error if the bulk enqueue operation fails.
218
//
219
// Example:
220
//
221
//        data := []*Data{{...},{...},...}
222
//        err := gq.BulkEnqueue("queue1", data)
223
//
224
//        if err != nil {
225
//            log.Fatal(err)
226
//        }
227
func (gq *GroupQueue[T]) BulkEnqueue(name string, items []*T) error {
33✔
228
        var err error
33✔
229
        q, err := gq.getQueue(name)
33✔
230
        if err != nil {
42✔
231
                err = gq.addQueue(name)
9✔
232
                if err != nil {
9✔
233
                        return err
×
234
                }
×
235
                q, _ = gq.getQueue(name)
9✔
236
        }
237
        var i uint64
33✔
238
        itemLength := uint64(len(items))
33✔
239
        for i < itemLength {
82✔
240
                var next uint64 = i + q.size - q.Length()
49✔
241
                if next == i {
55✔
242
                        time.Sleep(30 * time.Microsecond)
6✔
243
                        continue
6✔
244
                }
245
                if next >= itemLength {
76✔
246
                        err = q.BulkEnqueue(items[i:])
33✔
247
                } else {
43✔
248
                        err = q.BulkEnqueue(items[i:next])
10✔
249
                }
10✔
250
                if err != nil {
43✔
251
                        return err
×
252
                }
×
253
                gq.signalEnqueue()
43✔
254
                i = next
43✔
255
        }
256
        return nil
33✔
257
}
258

259
// Dequeue retrieves items from all non-empty queues in the GroupQueue.
260
// The items are sent to a channel for further processing.
261
//
262
// Returns:
263
//   - chan *Message[T]: A channel from which dequeued items can be received.
264
//   - error: An error if the dequeue operation fails.
265
//
266
// Example:
267
//
268
//        mCh, err := gq.Dequeue()
269
//        if err != nil {
270
//            log.Fatal(err)
271
//        }
272
//        for m := range mCh {
273
//            fmt.Println(m)
274
//        }
275
func (gq *GroupQueue[T]) Dequeue() (*Message[T], error) {
370✔
276
        // if queue has been closed, return ErrQueueClose
370✔
277
        for {
743✔
278
                err := gq.checkQueueSignal()
373✔
279
                if err != nil {
374✔
280
                        return nil, ErrQueueClose
1✔
281
                }
1✔
282
                q, _ := gq.getActiveQueue()
372✔
283
                m, err := q.Dequeue()
372✔
284
                gq.signalEnqueue()
372✔
285
                if m != nil {
741✔
286
                        return m, err
369✔
287
                }
369✔
288
                gq.manageQueueClose()
3✔
289
        }
290
}
291

292
// BulkDequeue retrieves multiple items from all non-empty queues in the GroupQueue
293
// and sends them in batches of the specified size.
294
//
295
// Parameters:
296
//   - size: The number of items to dequeue in each batch.
297
//   - lazy: A duration to wait between dequeue operations.
298
//
299
// Returns:
300
//   - chan []*Message[T]: A channel from which batches of dequeued items can be received.
301
//   - error: An error if the bulk dequeue operation fails.
302
//
303
// Example:
304
//
305
//        msCh, err := gq.BulkDequeue(10, 100*time.Millisecond)
306
//        if err != nil {
307
//            log.Fatal(err)
308
//        }
309
//        for ms := range msCh {
310
//            fmt.Println(ms)
311
//        }
312
func (gq *GroupQueue[T]) BulkDequeue(size uint64, lazy time.Duration) ([]*Message[T], error) {
25✔
313
        err := gq.checkQueueSignal()
25✔
314
        if err != nil {
28✔
315
                return nil, ErrQueueClose
3✔
316
        }
3✔
317

318
        // add enqueueSignal because get enqueueSignal first
319
        ms := make([]*Message[T], 0, size)
22✔
320
        batch := size / uint64(atomic.LoadUint32(&gq.queueCount))
22✔
321
        gq.signalEnqueue() // use next time select case
22✔
322
        timer := time.After(lazy)
22✔
323
        for {
105✔
324
                select {
83✔
325
                case <-timer:
11✔
326
                        return ms, nil
11✔
327
                case <-gq.closeSig:
1✔
328
                        gq.signalClose() // close next time
1✔
329
                        return ms, nil
1✔
330
                case <-gq.enqueueSig:
71✔
331
                        select {
71✔
332
                        case <-gq.closeSig:
2✔
333
                                gq.signalClose() // close next time
2✔
334
                                return ms, nil
2✔
335
                        default:
69✔
336
                        }
337

338
                        q, _ := gq.getActiveQueue()
69✔
339
                        n := batch
69✔
340
                        qlen := int(q.Length() + 1) // if q.Length() == 0, queue may be closed.
69✔
341
                        if newN := uint64(qlen); newN < n {
80✔
342
                                n = newN
11✔
343
                        }
11✔
344
                        if newN := size - uint64(len(ms)); newN < n {
75✔
345
                                n = newN
6✔
346
                        }
6✔
347
                        for i := uint64(0); i < n; i++ {
177✔
348
                                m, _ := q.Dequeue()
108✔
349
                                if m != nil {
207✔
350
                                        ms = append(ms, m)
99✔
351
                                } else {
108✔
352
                                        gq.manageQueueClose()
9✔
353
                                }
9✔
354
                        }
355
                        gq.signalEnqueue()
69✔
356
                        if uint64(len(ms)) == size {
77✔
357
                                return ms, nil
8✔
358
                        }
8✔
359
                }
360
        }
361
}
362

363
// FuncAfterDequeue applies a given function to each item after it is dequeued.
364
//
365
// Parameters:
366
//   - f: A function that will be applied to each dequeued item.
367
//
368
// Returns:
369
//   - error: An error if the operation fails.
370
//
371
// Example:
372
//
373
//        err := gq.FuncAfterDequeue(func(data *T) error {
374
//            fmt.Println("Processed:", data)
375
//            return nil
376
//        })
377
//        if err != nil {
378
//            log.Fatal(err)
379
//        }
380
func (gq *GroupQueue[T]) FuncAfterDequeue(f func(*T) error) error {
26✔
381
        // if queue has been closed, return ErrQueueClose
26✔
382
        for {
55✔
383
                err := gq.checkQueueSignal()
29✔
384
                if err != nil {
30✔
385
                        return ErrQueueClose
1✔
386
                }
1✔
387
                q, _ := gq.getActiveQueue()
28✔
388
                m, _ := q.Dequeue()
28✔
389
                gq.signalEnqueue()
28✔
390
                if m != nil {
53✔
391
                        var err error
25✔
392
                        fErr := f(m.item)
25✔
393
                        if fErr != nil {
26✔
394
                                err = errors.Join(err, fErr)
1✔
395
                        }
1✔
396
                        iErr := q.writeIndex(m.index)
25✔
397
                        if iErr != nil {
25✔
NEW
398
                                err = errors.Join(err, iErr)
×
UNCOV
399
                        }
×
400
                        return err
25✔
401
                }
402
                gq.manageQueueClose()
3✔
403
        }
404
}
405

406
// FuncAfterBulkDequeue applies a given function to multiple items after they are dequeued in batches.
407
//
408
// Parameters:
409
//   - size: The number of items to dequeue in each batch.
410
//   - lazy: A duration to wait between dequeue operations.
411
//   - f: A function that will be applied to each batch of dequeued items.
412
//
413
// Returns:
414
//   - error: An error if the operation fails.
415
//
416
// Example:
417
//
418
//        err := gq.FuncAfterBulkDequeue(10, 100*time.Millisecond, func(data []*T) error {
419
//            fmt.Println("Processed batch:", data)
420
//            return nil
421
//        })
422
//        if err != nil {
423
//            log.Fatal(err)
424
//        }
425
func (gq *GroupQueue[T]) FuncAfterBulkDequeue(size uint64, lazy time.Duration, f func([]*T) error) (int, error) {
22✔
426
        err := gq.checkQueueSignal()
22✔
427
        if err != nil {
25✔
428
                return 0, ErrQueueClose
3✔
429
        }
3✔
430

431
        // add enqueueSignal because get enqueueSignal first
432
        items := make([]*T, 0, size)
19✔
433
        batch := size / uint64(atomic.LoadUint32(&gq.queueCount))
19✔
434
        gq.signalEnqueue() // next time select case
19✔
435
        lastIndexMap := make(map[uint32]uint64, gq.maxPage)
19✔
436
        timer := time.After(lazy)
19✔
437
LOOP:
19✔
438
        for {
100✔
439
                select {
81✔
440
                case <-timer:
10✔
441
                        break LOOP
10✔
442
                case <-gq.closeSig:
3✔
443
                        gq.signalClose() // close next time
3✔
444
                        break LOOP
3✔
445
                case <-gq.enqueueSig:
68✔
446
                        select {
68✔
NEW
447
                        case <-gq.closeSig:
×
NEW
448
                                gq.signalClose() // close next time
×
NEW
449
                                break LOOP
×
450
                        default:
68✔
451
                        }
452

453
                        q, activeQueue := gq.getActiveQueue()
68✔
454
                        n := batch
68✔
455
                        qlen := int(q.Length() + 1) // if q.Length() == 0, queue may be closed.
68✔
456
                        if newN := uint64(qlen); newN < n {
76✔
457
                                n = newN
8✔
458
                        }
8✔
459
                        if newN := size - uint64(len(items)); newN < n {
68✔
NEW
460
                                n = newN
×
NEW
461
                        }
×
462
                        for i := uint64(0); i < n; i++ {
153✔
463
                                m, _ := q.Dequeue()
85✔
464
                                if m != nil {
161✔
465
                                        items = append(items, m.item)
76✔
466
                                        lastIndexMap[activeQueue] = m.index
76✔
467
                                } else {
85✔
468
                                        gq.manageQueueClose()
9✔
469
                                }
9✔
470
                        }
471
                        gq.signalEnqueue()
68✔
472
                        if uint64(len(items)) == size {
74✔
473
                                break LOOP
6✔
474
                        }
475
                }
476
        }
477
        fErr := f(items)
19✔
478
        if fErr != nil {
20✔
479
                err = errors.Join(err, fErr)
1✔
480
        }
1✔
481
        for i, index := range lastIndexMap {
65✔
482
                q := gq.queues[i]
46✔
483
                iErr := q.writeIndex(index)
46✔
484
                if iErr != nil {
46✔
NEW
485
                        err = errors.Join(err, iErr)
×
UNCOV
486
                }
×
487
        }
488
        return len(items), nil
19✔
489
}
490

491
func (gq *GroupQueue[T]) Length() ([]uint64, uint64) {
3✔
492
        qls := make([]uint64, 0, gq.groupSize)
3✔
493
        var total uint64 = 0
3✔
494
        for _, q := range gq.queues {
8✔
495
                if q == nil {
8✔
496
                        break
3✔
497
                }
498
                ql := q.Length()
2✔
499
                qls = append(qls, ql)
2✔
500
                total += ql
2✔
501
        }
502
        return qls, total
3✔
503
}
504

505
func (gq *GroupQueue[T]) initialize() {
35✔
506
        entries, err := os.ReadDir(gq.fileDir)
35✔
507
        if err != nil {
35✔
508
                panic(fmt.Sprintf("could not find directory, %s, %v", gq.fileDir, err))
×
509
        }
510

511
        var wg sync.WaitGroup
35✔
512

35✔
513
        for _, entry := range entries {
53✔
514
                if entry.IsDir() {
36✔
515
                        wg.Add(1)
18✔
516
                        go func(wg *sync.WaitGroup, entry fs.DirEntry) {
36✔
517
                                defer wg.Done()
18✔
518
                                gq.addQueue(entry.Name())
18✔
519
                        }(&wg, entry)
18✔
520
                }
521
        }
522
        // wait goroutine
523
        wg.Wait()
35✔
524

35✔
525
        // release blocking
35✔
526
        gq.initializeBlock <- struct{}{}
35✔
527
}
528

529
// WaitInitialize blocks until the group queue is fully initialized.
530
//
531
// Example:
532
//
533
//        gq, _ := NewGroupQueue(...)
534
//        // start dequeue
535
//        go func(){
536
//                for {
537
//                        mCh, _ := gq.Dequeue()
538
//                }
539
//        }
540
//        gq.WaitInitialize()
541
//        go func() {
542
//                gq.Enqueu(data)
543
//        }
544
func (gq *GroupQueue[T]) WaitInitialize() {
35✔
545
        <-gq.initializeBlock
35✔
546
}
35✔
547

548
func (gq *GroupQueue[T]) signalEnqueue() {
1,312✔
549
        select {
1,312✔
550
        case gq.enqueueSig <- struct{}{}:
609✔
551
        default:
703✔
552
        }
553
}
554

555
func (gq *GroupQueue[T]) signalClose() {
14✔
556
        select {
14✔
557
        case gq.closeSig <- struct{}{}:
14✔
NEW
558
        default:
×
559
        }
560
}
561

562
func (gq *GroupQueue[T]) getQueue(name string) (*Queue[T], error) {
1,600✔
563
        i, ok := gq.nameIndices.Load(name)
1,600✔
564
        if !ok {
1,678✔
565
                return nil, fmt.Errorf("queue name: %s, %v", name, ErrQueueNotFound)
78✔
566
        }
78✔
567
        q := gq.queues[i.(uint32)]
1,522✔
568
        return q, nil
1,522✔
569
}
570

571
// CloseQueue closes all queues in the group and signals the closure.
572
//
573
// Returns:
574
//   - error: An error if any of the queues fail to close.
575
//
576
// Example:
577
//
578
//        err := gq.CloseQueue()
579
//        if err != nil {
580
//            log.Fatal(err)
581
//        }
582
func (gq *GroupQueue[T]) CloseQueue() error {
33✔
583
        var err error
33✔
584
        gq.isClose.Store(true)
33✔
585
        for _, q := range gq.queues {
130✔
586
                if q == nil {
107✔
587
                        break
10✔
588
                }
589
                closeErr := q.CloseQueue()
87✔
590
                if err != nil {
87✔
591
                        err = errors.Join(err, closeErr)
×
592
                }
×
593
        }
594
        return err
33✔
595
}
596

597
func (gq *GroupQueue[T]) manageQueueClose() {
24✔
598
        closeCount := atomic.AddUint32(&gq.closeCount, 1)
24✔
599
        queueCount := atomic.LoadUint32(&gq.queueCount)
24✔
600
        if closeCount == queueCount {
32✔
601
                gq.signalClose()
8✔
602
        }
8✔
603
}
604

605
// CloseIndex closes the index files associated with all queues in the group.
606
//
607
// Returns:
608
//   - error: An error if any of the index files fail to close.
609
//
610
// Example:
611
//
612
//        err := gq.CloseIndex()
613
//        if err != nil {
614
//            log.Fatal(err)
615
//        }
616
func (gq *GroupQueue[T]) CloseIndex() error {
32✔
617
        var err error
32✔
618
        for _, q := range gq.queues {
126✔
619
                if q == nil {
104✔
620
                        break
10✔
621
                }
622
                if q.isQueueClosed.Load() && !q.isIndexClosed.Load() {
168✔
623
                        err = q.CloseIndex()
84✔
624
                        atomic.AddUint32(&gq.closeCount, 1)
84✔
625
                }
84✔
626
        }
627
        return err
32✔
628
}
629

630
// UpdateIndex updates the index of a given m in its corresponding queue.
631
//
632
// Parameters:
633
//   - m: The m whose index needs to be updated.
634
//
635
// Returns:
636
//   - error: An error if the update fails.
637
//
638
// Example:
639
//
640
//        err := gq.UpdateIndex(m)
641
//        if err != nil {
642
//            log.Fatal(err)
643
//        }
644
func (gq *GroupQueue[T]) UpdateIndex(m *Message[T]) error {
468✔
645
        var err error
468✔
646
        q, gqErr := gq.getQueue(m.name)
468✔
647
        if gqErr != nil {
468✔
648
                err = errors.Join(err, gqErr)
×
649
        }
×
650
        iErr := q.writeIndex(m.index)
468✔
651
        if iErr != nil {
468✔
652
                err = errors.Join(err, iErr)
×
653
        }
×
654
        return err
468✔
655
}
656

657
func (gq *GroupQueue[T]) getActiveQueue() (*Queue[T], uint32) {
540✔
658
        activeQueue := atomic.LoadUint32(&gq.activeQueue)
540✔
659
        startActiveQueue := activeQueue
540✔
660
        for {
8,915✔
661
                gq.mu.RLock()
8,375✔
662
                q := gq.queues[activeQueue]
8,375✔
663
                gq.mu.RUnlock()
8,375✔
664
                if q == nil {
8,376✔
665
                        activeQueue = 0
1✔
666
                        continue
1✔
667
                }
668
                if !q.isQueueClosedRecieved.Load() && (q.Length() > 0 || q.isQueueClosed.Load()) {
8,914✔
669
                        if activeQueue == uint32(gq.groupSize-1) {
718✔
670
                                atomic.StoreUint32(&gq.activeQueue, 0)
178✔
671
                        } else {
540✔
672
                                atomic.StoreUint32(&gq.activeQueue, activeQueue+1)
362✔
673
                        }
362✔
674
                        return q, activeQueue
540✔
675
                }
676
                if activeQueue == uint32(gq.groupSize-1) {
10,441✔
677
                        activeQueue = 0
2,607✔
678
                } else {
7,834✔
679
                        activeQueue++
5,227✔
680
                }
5,227✔
681
                if activeQueue == startActiveQueue {
10,421✔
682
                        time.Sleep(30 * time.Microsecond)
2,587✔
683
                }
2,587✔
684
        }
685
}
686

687
func (gq *GroupQueue[T]) checkQueueSignal() error {
449✔
688
        select {
449✔
689
        case <-gq.closeSig:
6✔
690
                return ErrQueueClose
6✔
691
        case <-gq.enqueueSig:
443✔
692
                select {
443✔
693
                case <-gq.closeSig:
2✔
694
                        return ErrQueueClose
2✔
695
                default:
441✔
696
                        return nil
441✔
697
                }
698
        }
699
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc