• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nihiyama / ffq / 20679210386

03 Jan 2026 03:25PM UTC coverage: 90.296% (-0.3%) from 90.603%
20679210386

Pull #47

github

nihiyama
add bench
Pull Request #47: add bench

884 of 979 relevant lines covered (90.3%)

293.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.94
/group_queue.go
1
// Package ffq provides a File-based FIFO Queue implementation that supports generic types.
2
package ffq
3

4
import (
5
        "encoding/json"
6
        "errors"
7
        "fmt"
8
        "io/fs"
9
        "os"
10
        "path/filepath"
11
        "sync"
12
        "sync/atomic"
13
        "time"
14
)
15

16
// GroupQueue represents a group of queues with common configurations like size, encoder, and decoder.
17
// It manages multiple named queues and supports operations like enqueue, dequeue, and bulk enqueue/dequeue.
18
type GroupQueue[T any] struct {
19
        size            uint64                         // The maximum number of items in one file page.
20
        maxPage         uint64                         // The number of file pages for file rotation.
21
        queueCount      uint32                         // The current count of queues in the group.
22
        closeCount      uint32                         // The number of queues that have been closed.
23
        activeQueue     uint32                         // The index of the currently active queue.
24
        groupSize       int                            // The maximum number of individual queues allowed in the group.
25
        name            string                         // The name of the group queue.
26
        fileDir         string                         // The directory where queue files are stored.
27
        queues          []*Queue[T]                    // The collection of individual Queue instances.
28
        nameIndices     sync.Map                       // Maps queue names to their index within the group.
29
        queueType       QueueType                      // The operating mode (SPSC or MPSC) for the queues.
30
        isClose         atomic.Bool                    // Flag indicating if the group queue is closed.
31
        encoder         func(v any) ([]byte, error)    // Encoder function for serializing data.
32
        decoder         func(data []byte, v any) error // Decoder function for deserializing data.
33
        initializeBlock chan struct{}                  // Channel used to block until initialization is complete.
34
        enqueueSig      chan struct{}                  // Signal channel to notify enqueue operations.
35
        closeSig        chan struct{}                  // Signal channel to notify that the group queue is closed.
36
        mu              sync.RWMutex                   // Mutex to synchronize access to the group's queues.
37
}
38

39
// NewGroupQueue initializes a new GroupQueue with the given name and optional settings.
40
// It sets up the underlying queues, file directory, encoder, and decoder to manage the queue data.
41
//
42
// Parameters:
43
//   - name: The name of the group queue.
44
//   - opts: Optional settings for the group queue (e.g., WithQueueSize, WithMaxPage, WithQueueType, WithFileDir, WithGroupSize, WithEncoder, WithDecoder).
45
//
46
// Returns:
47
//   - *GroupQueue[T]: A pointer to the newly created GroupQueue.
48
//   - error: An error if any issues occur during initialization (e.g., failure to create the file directory).
49
//
50
// Example:
51
//
52
//        // Create a group queue for Data items with a queue size of 1024 and up to 5 pages.
53
//        gq, err := ffq.NewGroupQueue[Data]("myGroupQueue",
54
//             ffq.WithQueueSize(1024),
55
//             ffq.WithMaxPage(2),
56
//             ffq.WithGroupSize(10),
57
//             ffq.WithQueueType(ffq.SPSC))
58
//        if err != nil {
59
//            log.Fatalf("Failed to create group queue: %v", err)
60
//        }
61
//        // Wait until the group queue is fully initialized before use.
62
//        gq.WaitInitialize()
63
func NewGroupQueue[T any](name string, opts ...Option) (*GroupQueue[T], error) {
35✔
64
        var err error
35✔
65

35✔
66
        // check options and set default settings
35✔
67
        var options options
35✔
68
        for _, opt := range opts {
168✔
69
                err := opt(&options)
133✔
70
                if err != nil {
133✔
71
                        return nil, err
×
72
                }
×
73
        }
74

75
        var fileDir = "/tmp/ffq"
35✔
76
        if options.fileDir != nil {
70✔
77
                fileDir = *options.fileDir
35✔
78
        }
35✔
79
        err = createQueueDir(fileDir)
35✔
80
        if err != nil {
35✔
81
                return nil, err
×
82
        }
×
83

84
        var size uint64 = 1024
35✔
85
        if options.size != nil {
69✔
86
                size = *options.size
34✔
87
        }
34✔
88

89
        var maxPage uint64 = 2
35✔
90
        if options.maxPage != nil {
69✔
91
                maxPage = *options.maxPage
34✔
92
        }
34✔
93

94
        var queueType QueueType = SPSC
35✔
95
        if options.queueType != nil {
36✔
96
                queueType = *options.queueType
1✔
97
        }
1✔
98

99
        var groupSize int = 10
35✔
100
        if options.groupSize != nil {
62✔
101
                groupSize = *options.groupSize
27✔
102
        }
27✔
103

104
        var encoder func(v any) ([]byte, error) = json.Marshal
35✔
105
        if options.encoder != nil {
36✔
106
                encoder = *options.encoder
1✔
107
        }
1✔
108

109
        var decoder func(data []byte, v any) error = json.Unmarshal
35✔
110
        if options.decoder != nil {
36✔
111
                decoder = *options.decoder
1✔
112
        }
1✔
113

114
        queues := make([]*Queue[T], groupSize)
35✔
115
        enqueueSig := make(chan struct{}, 1)
35✔
116
        closeSig := make(chan struct{}, 1)
35✔
117

35✔
118
        gq := GroupQueue[T]{
35✔
119
                size:            size,
35✔
120
                maxPage:         maxPage,
35✔
121
                name:            name,
35✔
122
                fileDir:         fileDir,
35✔
123
                queues:          queues,
35✔
124
                nameIndices:     sync.Map{},
35✔
125
                groupSize:       groupSize,
35✔
126
                queueType:       queueType,
35✔
127
                queueCount:      0,
35✔
128
                closeCount:      0,
35✔
129
                activeQueue:     0,
35✔
130
                encoder:         encoder,
35✔
131
                decoder:         decoder,
35✔
132
                initializeBlock: make(chan struct{}),
35✔
133
                enqueueSig:      enqueueSig,
35✔
134
                closeSig:        closeSig,
35✔
135
        }
35✔
136

35✔
137
        gq.isClose.Store(false)
35✔
138

35✔
139
        go func() {
70✔
140
                gq.initialize()
35✔
141
        }()
35✔
142

143
        return &gq, nil
35✔
144
}
145

146
func (gq *GroupQueue[T]) addQueue(name string) error {
93✔
147
        if gq.isClose.Load() {
94✔
148
                return fmt.Errorf("already closed")
1✔
149
        }
1✔
150
        q, err := NewQueue[T](
92✔
151
                name,
92✔
152
                WithFileDir(filepath.Join(gq.fileDir, name)),
92✔
153
                WithMaxPage(gq.maxPage),
92✔
154
                WithQueueSize(gq.size),
92✔
155
                WithEncoder(gq.encoder),
92✔
156
                WithDecoder(gq.decoder),
92✔
157
                WithQueueType(gq.queueType),
92✔
158
        )
92✔
159
        if err != nil {
92✔
160
                return err
×
161
        }
×
162
        queueCount := atomic.AddUint32(&gq.queueCount, 1)
92✔
163
        if int(queueCount) > gq.groupSize {
93✔
164
                q.mu.Lock()
1✔
165
                q.WaitInitialize()
1✔
166
                os.RemoveAll(q.fileDir)
1✔
167
                q.mu.Unlock()
1✔
168
                return fmt.Errorf("reached group queue max size, %d", gq.groupSize)
1✔
169
        }
1✔
170
        gq.nameIndices.Store(name, queueCount-1)
91✔
171
        gq.mu.Lock()
91✔
172
        gq.queues[queueCount-1] = q
91✔
173
        gq.mu.Unlock()
91✔
174
        q.WaitInitialize()
91✔
175
        if q.Length() > 0 {
109✔
176
                gq.signalEnqueue()
18✔
177
        }
18✔
178
        return nil
91✔
179
}
180

181
// Enqueue adds a single item to the queue identified by the given name.
182
// If the queue does not exist, it will be created automatically.
183
//
184
// Parameters:
185
//   - name: The name of the individual queue to which the item should be enqueued.
186
//   - item: A pointer to the data item to be enqueued.
187
//
188
// Returns:
189
//   - error: An error if the enqueue operation fails.
190
//
191
// Example:
192
//
193
//        // Enqueue a Data item into the queue named "queue1".
194
//        data := Data{...}
195
//        if err := gq.Enqueue("queue1", &data); err != nil {
196
//            log.Fatalf("Enqueue failed: %v", err)
197
//        }
198
func (gq *GroupQueue[T]) Enqueue(name string, item *T) error {
675✔
199
        var err error
675✔
200
        q, err := gq.getQueue(name)
675✔
201
        if err != nil {
741✔
202
                err = gq.addQueue(name)
66✔
203
                if err != nil {
68✔
204
                        return err
2✔
205
                }
2✔
206
                q, _ = gq.getQueue(name)
64✔
207
        }
208
        err = q.Enqueue(item)
673✔
209
        if err != nil {
673✔
210
                return err
×
211
        }
×
212
        gq.signalEnqueue()
673✔
213
        return nil
673✔
214
}
215

216
// BulkEnqueue adds multiple items to the queue identified by the given name.
217
// The items are enqueued in batches, respecting the individual queue's size limit.
218
//
219
// Parameters:
220
//   - name: The name of the individual queue to which the items should be enqueued.
221
//   - items: A slice of pointers to data items to be enqueued.
222
//
223
// Returns:
224
//   - error: An error if the bulk enqueue operation fails.
225
//
226
// Example:
227
//
228
//        // Enqueue multiple Data items into the queue named "queue1".
229
//        dataItems := []*Data{
230
//            { ... },
231
//            { ... },
232
//        }
233
//        if err := gq.BulkEnqueue("queue1", dataItems); err != nil {
234
//            log.Fatalf("BulkEnqueue failed: %v", err)
235
//        }
236
func (gq *GroupQueue[T]) BulkEnqueue(name string, items []*T) error {
33✔
237
        var err error
33✔
238
        q, err := gq.getQueue(name)
33✔
239
        if err != nil {
42✔
240
                err = gq.addQueue(name)
9✔
241
                if err != nil {
9✔
242
                        return err
×
243
                }
×
244
                q, _ = gq.getQueue(name)
9✔
245
        }
246
        var i uint64
33✔
247
        itemLength := uint64(len(items))
33✔
248
        for i < itemLength {
82✔
249
                var next uint64 = i + q.size - q.Length()
49✔
250
                if next == i {
55✔
251
                        time.Sleep(30 * time.Microsecond)
6✔
252
                        continue
6✔
253
                }
254
                if next >= itemLength {
76✔
255
                        err = q.BulkEnqueue(items[i:])
33✔
256
                } else {
43✔
257
                        err = q.BulkEnqueue(items[i:next])
10✔
258
                }
10✔
259
                if err != nil {
43✔
260
                        return err
×
261
                }
×
262
                gq.signalEnqueue()
43✔
263
                i = next
43✔
264
        }
265
        return nil
33✔
266
}
267

268
// Dequeue retrieves a single item from the active queue in the group.
269
// It continuously monitors the underlying queues until an item is available,
270
// or the group queue has been closed.
271
//
272
// Returns:
273
//   - *Message[T]: The dequeued message containing the data item and metadata.
274
//   - error: An error if the dequeue operation fails or if the group queue is closed.
275
//
276
// Example:
277
//
278
//        // Dequeue a message from the group queue.
279
//        msg, err := gq.Dequeue()
280
//        if err != nil {
281
//            log.Fatalf("Dequeue failed: %v", err)
282
//        }
283
//        fmt.Printf("Dequeued message: %+v\n", msg)
284
func (gq *GroupQueue[T]) Dequeue() (*Message[T], error) {
370✔
285
        // if queue has been closed, return ErrQueueClose
370✔
286
        for {
743✔
287
                err := gq.checkQueueSignal()
373✔
288
                if err != nil {
374✔
289
                        return nil, ErrQueueClose
1✔
290
                }
1✔
291
                q, _ := gq.getActiveQueue()
372✔
292
                m, err := q.Dequeue()
372✔
293
                gq.signalEnqueue()
372✔
294
                if m != nil {
741✔
295
                        return m, err
369✔
296
                }
369✔
297
                gq.manageQueueClose()
3✔
298
        }
299
}
300

301
// BulkDequeue retrieves multiple items from the active queue in the group,
302
// returning a batch of messages once the specified size is reached or after the lazy duration expires.
303
//
304
// Parameters:
305
//   - size: The number of items to dequeue in one batch.
306
//   - lazy: The duration to wait between dequeue attempts before returning the batch.
307
//
308
// Returns:
309
//   - []*Message[T]: A slice of dequeued messages.
310
//   - error: An error if the bulk dequeue operation fails or if the group queue is closed.
311
//
312
// Example:
313
//
314
//        // Dequeue a batch of 10 messages, waiting up to 100 milliseconds between attempts.
315
//        messages, err := gq.BulkDequeue(10, 100*time.Millisecond)
316
//        if err != nil {
317
//            log.Fatalf("BulkDequeue failed: %v", err)
318
//        }
319
//        for _, msg := range messages {
320
//            fmt.Printf("Dequeued message: %+v\n", msg)
321
//        }
322
func (gq *GroupQueue[T]) BulkDequeue(size uint64, lazy time.Duration) ([]*Message[T], error) {
26✔
323
        err := gq.checkQueueSignal()
26✔
324
        if err != nil {
29✔
325
                return nil, ErrQueueClose
3✔
326
        }
3✔
327

328
        // add enqueueSignal because get enqueueSignal first
329
        ms := make([]*Message[T], 0, size)
23✔
330
        batch := size / uint64(atomic.LoadUint32(&gq.queueCount))
23✔
331
        gq.signalEnqueue() // use next time select case
23✔
332
        timer := time.After(lazy)
23✔
333
        for {
107✔
334
                select {
84✔
335
                case <-timer:
14✔
336
                        return ms, nil
14✔
337
                case <-gq.closeSig:
2✔
338
                        gq.signalClose() // close next time
2✔
339
                        return ms, nil
2✔
340
                case <-gq.enqueueSig:
68✔
341
                        select {
68✔
342
                        case <-gq.closeSig:
×
343
                                gq.signalClose() // close next time
×
344
                                return ms, nil
×
345
                        default:
68✔
346
                        }
347

348
                        q, _ := gq.getActiveQueue()
68✔
349
                        n := batch
68✔
350
                        qlen := int(q.Length() + 1) // if q.Length() == 0, queue may be closed.
68✔
351
                        if newN := uint64(qlen); newN < n {
76✔
352
                                n = newN
8✔
353
                        }
8✔
354
                        if newN := size - uint64(len(ms)); newN < n {
74✔
355
                                n = newN
6✔
356
                        }
6✔
357
                        for i := uint64(0); i < n; i++ {
176✔
358
                                m, _ := q.Dequeue()
108✔
359
                                if m != nil {
207✔
360
                                        ms = append(ms, m)
99✔
361
                                } else {
108✔
362
                                        gq.manageQueueClose()
9✔
363
                                }
9✔
364
                        }
365
                        gq.signalEnqueue()
68✔
366
                        if uint64(len(ms)) == size {
75✔
367
                                return ms, nil
7✔
368
                        }
7✔
369
                }
370
        }
371
}
372

373
// FuncAfterDequeue applies the given function to a single dequeued item.
374
// After processing the item, the queue index is updated accordingly.
375
//
376
// Parameters:
377
//   - f: A function that processes the dequeued data item.
378
//     It should return an error if processing fails.
379
//
380
// Returns:
381
//   - error: An error if either the dequeue operation, the function application, or the index update fails.
382
//
383
// Example:
384
//
385
//        err := gq.FuncAfterDequeue(func(data *Data) error {
386
//            fmt.Println("Processed:", data)
387
//            return nil
388
//        })
389
//        if err != nil {
390
//            log.Fatalf("FuncAfterDequeue failed: %v", err)
391
//        }
392
func (gq *GroupQueue[T]) FuncAfterDequeue(f func(*T) error) error {
26✔
393
        // if queue has been closed, return ErrQueueClose
26✔
394
        for {
55✔
395
                err := gq.checkQueueSignal()
29✔
396
                if err != nil {
30✔
397
                        return ErrQueueClose
1✔
398
                }
1✔
399
                q, _ := gq.getActiveQueue()
28✔
400
                m, _ := q.Dequeue()
28✔
401
                gq.signalEnqueue()
28✔
402
                if m != nil {
53✔
403
                        var err error
25✔
404
                        fErr := f(m.item)
25✔
405
                        if fErr != nil {
26✔
406
                                err = errors.Join(err, fErr)
1✔
407
                        }
1✔
408
                        iErr := q.writeIndex(m.index)
25✔
409
                        if iErr != nil {
25✔
410
                                err = errors.Join(err, iErr)
×
411
                        }
×
412
                        return err
25✔
413
                }
414
                gq.manageQueueClose()
3✔
415
        }
416
}
417

418
// FuncAfterBulkDequeue applies the given function to a batch of dequeued items.
419
// Once the batch is processed, it updates the indices of the corresponding queues.
420
//
421
// Parameters:
422
//   - size: The maximum number of items to dequeue in one batch.
423
//   - lazy: The duration to wait between dequeue attempts before processing the batch.
424
//   - f: A function that processes the batch of data items.
425
//     It should return an error if processing fails.
426
//
427
// Returns:
428
//   - int: The actual number of items processed.
429
//   - error: An error if the bulk dequeue operation, batch processing, or index updates fail.
430
//
431
// Example:
432
//
433
//        count, err := gq.FuncAfterBulkDequeue(10, 100*time.Millisecond, func(data []*Data) error {
434
//            fmt.Println("Processed batch:", data)
435
//            return nil
436
//        })
437
//        if err != nil {
438
//            log.Fatalf("FuncAfterBulkDequeue failed: %v", err)
439
//        }
440
//        fmt.Printf("Processed %d items\n", count)
441
func (gq *GroupQueue[T]) FuncAfterBulkDequeue(size uint64, lazy time.Duration, f func([]*T) error) (int, error) {
23✔
442
        err := gq.checkQueueSignal()
23✔
443
        if err != nil {
26✔
444
                return 0, ErrQueueClose
3✔
445
        }
3✔
446

447
        // add enqueueSignal because get enqueueSignal first
448
        items := make([]*T, 0, size)
20✔
449
        batch := size / uint64(atomic.LoadUint32(&gq.queueCount))
20✔
450
        gq.signalEnqueue() // next time select case
20✔
451
        lastIndexMap := make(map[uint32]uint64, gq.maxPage)
20✔
452
        timer := time.After(lazy)
20✔
453
LOOP:
20✔
454
        for {
103✔
455
                select {
83✔
456
                case <-timer:
11✔
457
                        break LOOP
11✔
458
                case <-gq.closeSig:
×
459
                        gq.signalClose() // close next time
×
460
                        break LOOP
×
461
                case <-gq.enqueueSig:
72✔
462
                        select {
72✔
463
                        case <-gq.closeSig:
3✔
464
                                gq.signalClose() // close next time
3✔
465
                                break LOOP
3✔
466
                        default:
69✔
467
                        }
468

469
                        q, activeQueue := gq.getActiveQueue()
69✔
470
                        n := batch
69✔
471
                        qlen := int(q.Length() + 1) // if q.Length() == 0, queue may be closed.
69✔
472
                        if newN := uint64(qlen); newN < n {
74✔
473
                                n = newN
5✔
474
                        }
5✔
475
                        if newN := size - uint64(len(items)); newN < n {
69✔
476
                                n = newN
×
477
                        }
×
478
                        for i := uint64(0); i < n; i++ {
154✔
479
                                m, _ := q.Dequeue()
85✔
480
                                if m != nil {
161✔
481
                                        items = append(items, m.item)
76✔
482
                                        lastIndexMap[activeQueue] = m.index
76✔
483
                                } else {
85✔
484
                                        gq.manageQueueClose()
9✔
485
                                }
9✔
486
                        }
487
                        gq.signalEnqueue()
69✔
488
                        if uint64(len(items)) == size {
75✔
489
                                break LOOP
6✔
490
                        }
491
                }
492
        }
493
        fErr := f(items)
20✔
494
        if fErr != nil {
21✔
495
                err = errors.Join(err, fErr)
1✔
496
        }
1✔
497
        for i, index := range lastIndexMap {
67✔
498
                q := gq.queues[i]
47✔
499
                iErr := q.writeIndex(index)
47✔
500
                if iErr != nil {
47✔
501
                        err = errors.Join(err, iErr)
×
502
                }
×
503
        }
504
        return len(items), nil
20✔
505
}
506

507
// Length returns the lengths of each individual queue in the group as well as the total number
508
// of unprocessed items across all queues.
509
//
510
// Returns:
511
//   - []uint64: A slice containing the length of each queue.
512
//   - uint64: The total number of unprocessed items in the group.
513
//
514
// Example:
515
//
516
//        lengths, total := gq.Length()
517
//        fmt.Printf("Queue lengths: %v, Total items: %d\n", lengths, total)
518
func (gq *GroupQueue[T]) Length() ([]uint64, uint64) {
3✔
519
        qls := make([]uint64, 0, gq.groupSize)
3✔
520
        var total uint64 = 0
3✔
521
        for _, q := range gq.queues {
8✔
522
                if q == nil {
8✔
523
                        break
3✔
524
                }
525
                ql := q.Length()
2✔
526
                qls = append(qls, ql)
2✔
527
                total += ql
2✔
528
        }
529
        return qls, total
3✔
530
}
531

532
func (gq *GroupQueue[T]) initialize() {
35✔
533
        entries, err := os.ReadDir(gq.fileDir)
35✔
534
        if err != nil {
35✔
535
                panic(fmt.Sprintf("could not find directory, %s, %v", gq.fileDir, err))
×
536
        }
537

538
        var wg sync.WaitGroup
35✔
539

35✔
540
        for _, entry := range entries {
53✔
541
                if entry.IsDir() {
36✔
542
                        wg.Add(1)
18✔
543
                        go func(wg *sync.WaitGroup, entry fs.DirEntry) {
36✔
544
                                defer wg.Done()
18✔
545
                                gq.addQueue(entry.Name())
18✔
546
                        }(&wg, entry)
18✔
547
                }
548
        }
549
        // wait goroutine
550
        wg.Wait()
35✔
551

35✔
552
        // release blocking
35✔
553
        gq.initializeBlock <- struct{}{}
35✔
554
}
555

556
// WaitInitialize blocks until the group queue has completed its initialization process.
557
// This ensures that all individual queues have been set up (for example, by reading existing data files)
558
// before the group queue is used.
559
//
560
// Example:
561
//
562
//        // Create a new group queue and wait for initialization to complete.
563
//        gq, err := ffq.NewGroupQueue[Data]("myGroupQueue", ffq.WithQueueSize(1024))
564
//        if err != nil {
565
//            log.Fatalf("Failed to create group queue: %v", err)
566
//        }
567
//        gq.WaitInitialize()
568
//        // Now it is safe to start enqueuing and dequeuing items.
569
//        go func() {
570
//            for {
571
//                msg, err := gq.Dequeue()
572
//                if err != nil {
573
//                    log.Println("Dequeue error:", err)
574
//                    break
575
//                }
576
//                fmt.Println("Dequeued:", msg)
577
//            }
578
//        }()
579
//        // Enqueue data into the group queue.
580
//        if err := gq.Enqueue("queue1", &dataItem); err != nil {
581
//            log.Fatal(err)
582
//        }
583
func (gq *GroupQueue[T]) WaitInitialize() {
35✔
584
        <-gq.initializeBlock
35✔
585
}
35✔
586

587
func (gq *GroupQueue[T]) signalEnqueue() {
1,314✔
588
        select {
1,314✔
589
        case gq.enqueueSig <- struct{}{}:
611✔
590
        default:
703✔
591
        }
592
}
593

594
func (gq *GroupQueue[T]) signalClose() {
13✔
595
        select {
13✔
596
        case gq.closeSig <- struct{}{}:
13✔
597
        default:
×
598
        }
599
}
600

601
func (gq *GroupQueue[T]) getQueue(name string) (*Queue[T], error) {
1,600✔
602
        i, ok := gq.nameIndices.Load(name)
1,600✔
603
        if !ok {
1,678✔
604
                return nil, fmt.Errorf("queue name: %s, %v", name, ErrQueueNotFound)
78✔
605
        }
78✔
606
        q := gq.queues[i.(uint32)]
1,522✔
607
        return q, nil
1,522✔
608
}
609

610
// CloseQueue closes all individual queues within the group and signals that no further enqueues are allowed.
611
//
612
// Returns:
613
//   - error: An error if any of the queues fail to close.
614
//
615
// Example:
616
//
617
//        if err := gq.CloseQueue(); err != nil {
618
//            log.Fatalf("CloseQueue failed: %v", err)
619
//        }
620
func (gq *GroupQueue[T]) CloseQueue() error {
33✔
621
        var err error
33✔
622
        gq.isClose.Store(true)
33✔
623
        for _, q := range gq.queues {
130✔
624
                if q == nil {
107✔
625
                        break
10✔
626
                }
627
                closeErr := q.CloseQueue()
87✔
628
                if err != nil {
87✔
629
                        err = errors.Join(err, closeErr)
×
630
                }
×
631
        }
632
        return err
33✔
633
}
634

635
func (gq *GroupQueue[T]) manageQueueClose() {
24✔
636
        closeCount := atomic.AddUint32(&gq.closeCount, 1)
24✔
637
        queueCount := atomic.LoadUint32(&gq.queueCount)
24✔
638
        if closeCount == queueCount {
32✔
639
                gq.signalClose()
8✔
640
        }
8✔
641
}
642

643
// CloseIndex closes the index files associated with all individual queues in the group.
644
//
645
// Returns:
646
//   - error: An error if any of the index files fail to close.
647
//
648
// Example:
649
//
650
//        if err := gq.CloseIndex(); err != nil {
651
//            log.Fatalf("CloseIndex failed: %v", err)
652
//        }
653
func (gq *GroupQueue[T]) CloseIndex() error {
32✔
654
        var err error
32✔
655
        for _, q := range gq.queues {
126✔
656
                if q == nil {
104✔
657
                        break
10✔
658
                }
659
                if q.isQueueClosed.Load() && !q.isIndexClosed.Load() {
168✔
660
                        err = q.CloseIndex()
84✔
661
                        atomic.AddUint32(&gq.closeCount, 1)
84✔
662
                }
84✔
663
        }
664
        return err
32✔
665
}
666

667
// UpdateIndex updates the index of the given message in its corresponding individual queue.
668
// This is used to record the position up to which the queue has been processed.
669
//
670
// Parameters:
671
//   - m: The message whose index should be updated. The message's name is used to identify the corresponding queue.
672
//
673
// Returns:
674
//   - error: An error if the index update fails.
675
//
676
// Example:
677
//
678
//        if err := gq.UpdateIndex(message); err != nil {
679
//            log.Fatalf("UpdateIndex failed: %v", err)
680
//        }
681
func (gq *GroupQueue[T]) UpdateIndex(m *Message[T]) error {
468✔
682
        var err error
468✔
683
        q, gqErr := gq.getQueue(m.name)
468✔
684
        if gqErr != nil {
468✔
685
                err = errors.Join(err, gqErr)
×
686
        }
×
687
        iErr := q.writeIndex(m.index)
468✔
688
        if iErr != nil {
468✔
689
                err = errors.Join(err, iErr)
×
690
        }
×
691
        return err
468✔
692
}
693

694
func (gq *GroupQueue[T]) getActiveQueue() (*Queue[T], uint32) {
540✔
695
        activeQueue := atomic.LoadUint32(&gq.activeQueue)
540✔
696
        startActiveQueue := activeQueue
540✔
697
        for {
9,592✔
698
                gq.mu.RLock()
9,052✔
699
                q := gq.queues[activeQueue]
9,052✔
700
                gq.mu.RUnlock()
9,052✔
701
                if q == nil {
9,053✔
702
                        activeQueue = 0
1✔
703
                        continue
1✔
704
                }
705
                if !q.isQueueClosedRecieved.Load() && (q.Length() > 0 || q.isQueueClosed.Load()) {
9,591✔
706
                        if activeQueue == uint32(gq.groupSize-1) {
719✔
707
                                atomic.StoreUint32(&gq.activeQueue, 0)
179✔
708
                        } else {
540✔
709
                                atomic.StoreUint32(&gq.activeQueue, activeQueue+1)
361✔
710
                        }
361✔
711
                        return q, activeQueue
540✔
712
                }
713
                if activeQueue == uint32(gq.groupSize-1) {
11,343✔
714
                        activeQueue = 0
2,832✔
715
                } else {
8,511✔
716
                        activeQueue++
5,679✔
717
                }
5,679✔
718
                if activeQueue == startActiveQueue {
11,324✔
719
                        time.Sleep(30 * time.Microsecond)
2,813✔
720
                }
2,813✔
721
        }
722
}
723

724
func (gq *GroupQueue[T]) checkQueueSignal() error {
451✔
725
        select {
451✔
726
        case <-gq.closeSig:
5✔
727
                return ErrQueueClose
5✔
728
        case <-gq.enqueueSig:
446✔
729
                select {
446✔
730
                case <-gq.closeSig:
3✔
731
                        return ErrQueueClose
3✔
732
                default:
443✔
733
                        return nil
443✔
734
                }
735
        }
736
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc