• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

nihiyama / ffq / 13108879284

03 Feb 2025 08:01AM UTC coverage: 91.254% (+4.9%) from 86.359%
13108879284

push

github

web-flow
Merge pull request #38 from nihiyama/feature/issue-37

#37 fix ffq

518 of 550 new or added lines in 6 files covered. (94.18%)

5 existing lines in 3 files now uncovered.

866 of 949 relevant lines covered (91.25%)

289.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.07
/simple_queue.go
1
// Package ffq provides a file-based FIFO queue implementation that supports generic types.
2
package ffq
3

4
import (
5
        "bufio"
6
        "bytes"
7
        "encoding/binary"
8
        "encoding/json"
9
        "errors"
10
        "fmt"
11
        "io"
12
        "os"
13
        "path/filepath"
14
        "sync"
15
        "sync/atomic"
16
        "time"
17
)
18

19
const (
20
        indexFilename = "index"
21
        queueFilename = "queue"
22
        queueFileDir  = "/tmp/ffq"
23
)
24

25
type QueueType string
26

27
const (
28
        SPSC QueueType = "SPSC"
29
        MPSC QueueType = "MPSC"
30
)
31

32
// Queue represents a file-based FIFO queue with a generic type T.
33
// It supports operations such as enqueue, dequeue, bulk enqueue/dequeue, and manages the queue across multiple pages.
34
type Queue[T any] struct {
35
        size                  uint64 // The maximum number of items in the queue.
36
        tail                  uint64
37
        maxPage               uint64
38
        currentPage           uint64
39
        name                  string                         // The name of the queue.
40
        fileDir               string                         // The directory where the queue files are stored.
41
        queueFile             atomic.Value                   // The file where the queue data is written.
42
        indexFile             *os.File                       // The file where the queue's index is stored.
43
        encoder               func(v any) ([]byte, error)    // Function to encode data before writing to the queue.
44
        decoder               func(data []byte, v any) error // Function to decode data when reading from the queue.
45
        enqueuer              func(item *T) error
46
        bulkEnqueuer          func(items []*T) error
47
        queue                 chan *Message[T]
48
        isQueueClosed         atomic.Bool
49
        isQueueClosedRecieved atomic.Bool
50
        isIndexClosed         atomic.Bool
51
        initializeBlock       chan struct{} // A channel to block until the queue is fully initialized.
52
        mu                    sync.Mutex
53
}
54

55
// NewQueue creates a new Queue with the given name and options.
56
//
57
// Parameters:
58
//   - name: The name of the queue.
59
//   - opts: A list of options to customize the queue (e.g., queue size, max pages, file directory).
60
//
61
// Returns:
62
//   - *Queue: A pointer to the newly created Queue.
63
//   - error: An error if the queue initialization fails.
64
//
65
// Example:
66
//
67
//        q, err := NewQueue[string]("myQueue", WithQueueSize(100), WithMaxPages(5))
68
//        if err != nil {
69
//            log.Fatal(err)
70
//        }
71
func NewQueue[T any](name string, opts ...Option) (*Queue[T], error) {
130✔
72
        var err error
130✔
73

130✔
74
        // check options and set default settings
130✔
75
        var options options
130✔
76
        for _, opt := range opts {
819✔
77
                err := opt(&options)
689✔
78
                if err != nil {
689✔
79
                        return nil, err
×
80
                }
×
81
        }
82

83
        var fileDir = queueFileDir
130✔
84
        if options.fileDir != nil {
260✔
85
                fileDir = *options.fileDir
130✔
86
        }
130✔
87
        err = createQueueDir(fileDir)
130✔
88
        if err != nil {
130✔
89
                return nil, err
×
90
        }
×
91

92
        var size uint64 = 1024
130✔
93
        if options.size != nil {
259✔
94
                size = *options.size
129✔
95
        }
129✔
96

97
        var maxPage uint64 = 2
130✔
98
        if options.maxPage != nil {
259✔
99
                maxPage = *options.maxPage
129✔
100
        }
129✔
101

102
        var queueType QueueType = SPSC
130✔
103
        if options.queueType != nil {
245✔
104
                queueType = *options.queueType
115✔
105
        }
115✔
106

107
        var encoder func(v any) ([]byte, error) = json.Marshal
130✔
108
        if options.encoder != nil {
223✔
109
                encoder = *options.encoder
93✔
110
        }
93✔
111

112
        var decoder func(data []byte, v any) error = json.Unmarshal
130✔
113
        if options.decoder != nil {
223✔
114
                decoder = *options.decoder
93✔
115
        }
93✔
116

117
        queue := make(chan *Message[T], size)
130✔
118

130✔
119
        // open index file
130✔
120
        indexFilePath := filepath.Join(fileDir, indexFilename)
130✔
121
        index := readIndex(indexFilePath)
130✔
122

130✔
123
        var tail uint64 = 0
130✔
124
        if index != nil {
146✔
125
                tail = *index + 1
16✔
126
        }
16✔
127

128
        indexFile, err := openIndexFile(indexFilePath)
130✔
129
        if err != nil {
130✔
130
                return nil, err
×
131
        }
×
132

133
        q := Queue[T]{
130✔
134
                size:            size,
130✔
135
                tail:            tail,
130✔
136
                maxPage:         maxPage,
130✔
137
                currentPage:     0,
130✔
138
                name:            name,
130✔
139
                fileDir:         fileDir,
130✔
140
                indexFile:       indexFile,
130✔
141
                encoder:         encoder,
130✔
142
                decoder:         decoder,
130✔
143
                queue:           queue,
130✔
144
                initializeBlock: make(chan struct{}),
130✔
145
        }
130✔
146

130✔
147
        switch queueType {
130✔
148
        case SPSC:
123✔
149
                q.enqueuer = q.spEnqueue
123✔
150
                q.bulkEnqueuer = q.spBulkEnqueue
123✔
151
        case MPSC:
7✔
152
                q.enqueuer = q.mpEnqueue
7✔
153
                q.bulkEnqueuer = q.mpBulkEnqueue
7✔
154
        }
155

156
        q.isQueueClosed.Store(false)
130✔
157
        q.isQueueClosedRecieved.Store(false)
130✔
158
        q.isIndexClosed.Store(false)
130✔
159

130✔
160
        go func() {
260✔
161
                q.initialize()
130✔
162
        }()
130✔
163

164
        return &q, nil
130✔
165
}
166

167
func (q *Queue[T]) spStoreTail(tail uint64, nums uint64) uint64 {
1,107✔
168
        if tail+nums < q.size*q.maxPage {
2,194✔
169
                return atomic.AddUint64(&q.tail, nums)
1,087✔
170
        } else {
1,107✔
171
                return atomic.AddUint64(&q.tail, nums-(q.size*q.maxPage))
20✔
172
        }
20✔
173
}
174

175
func (q *Queue[T]) mpStoreTail(nums uint64) {
63✔
176
        if q.tail+nums < q.size*q.maxPage {
124✔
177
                q.tail += nums
61✔
178
        } else {
63✔
179
                q.tail += (nums - (q.size * q.maxPage))
2✔
180
        }
2✔
181
}
182

183
func (q *Queue[T]) enqueue(tail uint64, item *T) error {
936✔
184
        buf, err := q.encoder([]*T{item})
936✔
185
        if err != nil {
936✔
NEW
186
                return err
×
NEW
187
        }
×
188
        err = q.writeQueue(buf, tail)
936✔
189
        if err != nil {
936✔
NEW
190
                return err
×
NEW
191
        }
×
192

193
        m := &Message[T]{
936✔
194
                index: tail,
936✔
195
                name:  q.name,
936✔
196
                item:  item,
936✔
197
        }
936✔
198
        q.queue <- m
936✔
199

936✔
200
        return nil
936✔
201
}
202

203
func (q *Queue[T]) bulkEnqueue(tail uint64, items []*T) error {
82✔
204
        buf, err := q.encoder(items)
82✔
205
        if err != nil {
82✔
NEW
206
                return err
×
NEW
207
        }
×
208
        err = q.writeQueue(buf, tail)
82✔
209
        if err != nil {
82✔
NEW
210
                return err
×
NEW
211
        }
×
212

213
        for _, item := range items {
386✔
214
                m := &Message[T]{
304✔
215
                        index: tail,
304✔
216
                        name:  q.name,
304✔
217
                        item:  item,
304✔
218
                }
304✔
219
                q.queue <- m
304✔
220
                tail++
304✔
221
        }
304✔
222
        return nil
82✔
223
}
224

225
func (q *Queue[T]) spEnqueue(item *T) error {
888✔
226
        var err error
888✔
227

888✔
228
        tail := atomic.LoadUint64(&q.tail)
888✔
229
        err = q.enqueue(tail, item)
888✔
230
        if err != nil {
888✔
NEW
231
                return err
×
NEW
232
        }
×
233
        q.spStoreTail(tail, 1)
888✔
234

888✔
235
        return err
888✔
236
}
237

238
func (q *Queue[T]) mpEnqueue(item *T) error {
48✔
239
        var err error
48✔
240
        q.mu.Lock()
48✔
241
        defer q.mu.Unlock()
48✔
242

48✔
243
        err = q.enqueue(q.tail, item)
48✔
244
        if err != nil {
48✔
NEW
245
                return err
×
NEW
246
        }
×
247
        q.mpStoreTail(1)
48✔
248

48✔
249
        return err
48✔
250
}
251

252
// Enqueue adds a single item to the queue.
253
//
254
// Parameters:
255
//   - data: The data to be added to the queue.
256
//
257
// Returns:
258
//   - error: An error if the enqueue operation fails.
259
//
260
// Example:
261
//
262
//        data := Data{...}
263
//        err := q.Enqueue(&dataItem)
264
//        if err != nil {
265
//                log.Fatal(err)
266
//        }
267
func (q *Queue[T]) Enqueue(item *T) error {
936✔
268
        return q.enqueuer(item)
936✔
269
}
936✔
270

271
func (q *Queue[T]) spBulkEnqueue(items []*T) error {
54✔
272
        var err error
54✔
273

54✔
274
        itemLength := uint64(len(items))
54✔
275
        var itemIndex uint64
54✔
276
        tail := atomic.LoadUint64(&q.tail)
54✔
277

54✔
278
        for itemIndex < itemLength {
121✔
279
                batch := q.size - (tail % q.size)
67✔
280
                if itemIndex+batch > itemLength {
118✔
281
                        batch = itemLength - itemIndex
51✔
282
                }
51✔
283
                err = q.bulkEnqueue(tail, items[itemIndex:itemIndex+batch])
67✔
284
                if err != nil {
67✔
NEW
285
                        return err
×
NEW
286
                }
×
287
                itemIndex += batch
67✔
288
                tail = q.spStoreTail(tail, batch)
67✔
289
        }
290
        return nil
54✔
291
}
292

293
func (q *Queue[T]) mpBulkEnqueue(items []*T) error {
13✔
294
        var err error
13✔
295
        q.mu.Lock()
13✔
296
        defer q.mu.Unlock()
13✔
297

13✔
298
        itemLength := uint64(len(items))
13✔
299
        var itemIndex uint64
13✔
300

13✔
301
        for itemIndex < itemLength {
28✔
302
                batch := q.size - (q.tail % q.size)
15✔
303
                if itemIndex+batch > itemLength {
27✔
304
                        batch = itemLength - itemIndex
12✔
305
                }
12✔
306
                err = q.bulkEnqueue(q.tail, items[itemIndex:itemIndex+batch])
15✔
307
                if err != nil {
15✔
NEW
308
                        return err
×
NEW
309
                }
×
310
                itemIndex += batch
15✔
311
                q.mpStoreTail(batch)
15✔
312
        }
313
        return nil
13✔
314
}
315

316
// BulkEnqueue adds multiple items to the queue in a single operation.
317
//
318
// Parameters:
319
//   - data: A slice of data items to be added to the queue.
320
//
321
// Returns:
322
//   - error: An error if the bulk enqueue operation fails.
323
//
324
// Example:
325
//
326
//        data := []*Data{{...},{...},...}
327
//        err := q.BulkEnqueue(data)
328
//        if err != nil {
329
//            log.Fatal(err)
330
//        }
331
func (q *Queue[T]) BulkEnqueue(items []*T) error {
67✔
332
        return q.bulkEnqueuer(items)
67✔
333
}
67✔
334

335
// Dequeue retrieves and returns a single message from the queue.
336
//
337
// Returns:
338
//   - *Message[T]: The dequeued message.
339
//   - error: An error if the dequeue operation fails or the queue is closed.
340
//
341
// Example:
342
//
343
//        m, err := q.Dequeue()
344
//        if err != nil {
345
//            log.Fatal(err)
346
//        }
347
//        fmt.Println("Dequeued message:", message)
348
func (q *Queue[T]) Dequeue() (*Message[T], error) {
812✔
349
        m, ok := <-q.queue
812✔
350
        if !ok {
844✔
351
                q.isQueueClosedRecieved.Store(true)
32✔
352
                return nil, ErrQueueClose
32✔
353
        }
32✔
354
        return m, nil
780✔
355
}
356

357
// BulkDequeue retrieves multiple messages from the queue and returns them in a slice.
358
//
359
// Parameters:
360
//   - size: The maximum number of messages to dequeue in one operation.
361
//   - lazy: A duration to wait between dequeue operations.
362
//
363
// Returns:
364
//   - []*Message[T]: A slice of dequeued messages.
365
//   - error: An error if the bulk dequeue operation fails or the queue is closed.
366
//
367
// Example:
368
//
369
//        ms, err := q.BulkDequeue(10, 100*time.Millisecond)
370
//        if err != nil {
371
//            log.Fatal(err)
372
//        }
373
//        fmt.Println("Bulk dequeued messages:", ms)
374
func (q *Queue[T]) BulkDequeue(size uint64, lazy time.Duration) ([]*Message[T], error) {
17✔
375
        var err error
17✔
376
        ms := make([]*Message[T], 0, size)
17✔
377

17✔
378
        m, err := q.Dequeue()
17✔
379
        if err != nil {
20✔
380
                return nil, err
3✔
381
        }
3✔
382
        ms = append(ms, m)
14✔
383

14✔
384
        timer := time.After(lazy)
14✔
385
        for {
38✔
386
                select {
24✔
387
                case <-timer:
10✔
388
                        return ms, nil
10✔
389
                case m, ok := <-q.queue:
14✔
390
                        if !ok {
16✔
391
                                // return ErrQueueClose at next time
2✔
392
                                return ms, nil
2✔
393
                        }
2✔
394
                        ms = append(ms, m)
12✔
395
                        if uint64(len(ms)) == size {
14✔
396
                                return ms, nil
2✔
397
                        }
2✔
398
                }
399
        }
400
}
401

402
// FuncAfterDequeue applies a given function to the data of a dequeued item.
403
//
404
// Parameters:
405
//   - f: A function that processes the dequeued item.
406
//
407
// Returns:
408
//   - error: An error if the dequeue or function application fails.
409
//
410
// Example:
411
//
412
//        err := q.FuncAfterDequeue(func(data *T) error {
413
//            fmt.Println("Processing item:", data)
414
//            return nil
415
//        })
416
//        if err != nil {
417
//            log.Fatal(err)
418
//        }
419
func (q *Queue[T]) FuncAfterDequeue(f func(*T) error) error {
10✔
420
        m, err := q.Dequeue()
10✔
421
        if err != nil {
11✔
422
                return err
1✔
423
        }
1✔
424

425
        fErr := f(m.item)
9✔
426
        if fErr != nil {
10✔
427
                err = errors.Join(err, fErr)
1✔
428
        }
1✔
429
        iErr := q.writeIndex(m.index)
9✔
430
        if iErr != nil {
9✔
NEW
431
                err = errors.Join(err, iErr)
×
NEW
432
        }
×
433
        return err
9✔
434
}
435

436
// FuncAfterBulkDequeue applies a given function to multiple dequeued items in a batch.
437
//
438
// Parameters:
439
//   - size: The maximum number of items to dequeue in one batch.
440
//   - lazy: A duration to wait between dequeue operations.
441
//   - f: A function that processes the batch of dequeued items.
442
//
443
// Returns:
444
//   - error: An error if the bulk dequeue or function application fails.
445
//
446
// Example:
447
//
448
//        err := q.FuncAfterBulkDequeue(10, 100*time.Millisecond, func(data []*T) error {
449
//            fmt.Println("Processing batch:", data)
450
//            return nil
451
//        })
452
//        if err != nil {
453
//            log.Fatal(err)
454
//        }
455
func (q *Queue[T]) FuncAfterBulkDequeue(size uint64, lazy time.Duration, f func([]*T) error) (int, error) {
16✔
456
        var err error
16✔
457
        items := make([]*T, 0, size)
16✔
458

16✔
459
        m, err := q.Dequeue()
16✔
460
        if err != nil {
19✔
461
                return 0, err
3✔
462
        }
3✔
463
        lastM := m
13✔
464
        items = append(items, m.item)
13✔
465

13✔
466
        timer := time.After(lazy)
13✔
467
LOOP:
13✔
468
        for {
38✔
469
                select {
25✔
470
                case <-timer:
10✔
471
                        break LOOP
10✔
472
                case m, ok := <-q.queue:
15✔
473
                        if !ok {
16✔
474
                                // return ErrQueueClose at next time
1✔
475
                                break LOOP
1✔
476
                        }
477
                        items = append(items, m.item)
14✔
478
                        lastM = m
14✔
479
                        if uint64(len(items)) == size {
16✔
480
                                break LOOP
2✔
481
                        }
482
                }
483
        }
484
        fErr := f(items)
13✔
485
        if fErr != nil {
14✔
486
                err = errors.Join(err, fErr)
1✔
487
        }
1✔
488
        iErr := q.writeIndex(lastM.index)
13✔
489
        if iErr != nil {
13✔
NEW
490
                err = errors.Join(err, iErr)
×
UNCOV
491
        }
×
492
        return len(items), err
13✔
493
}
494

495
func (q *Queue[T]) writeQueue(b []byte, tail uint64) error {
1,018✔
496
        var err error
1,018✔
497

1,018✔
498
        // file rotation
1,018✔
499
        currentPage := atomic.LoadUint64(&q.currentPage)
1,018✔
500
        if (tail / q.size) != currentPage {
1,075✔
501
                err = q.rotateFile()
57✔
502
                if err != nil {
57✔
NEW
503
                        return err
×
NEW
504
                }
×
505
        }
506

507
        buf := queueBufPool.Get().(*bytes.Buffer)
1,018✔
508
        buf.Reset()
1,018✔
509
        buf.Grow(len(b) + 1)
1,018✔
510
        defer queueBufPool.Put(buf)
1,018✔
511

1,018✔
512
        _, err = buf.Write(b)
1,018✔
513
        if err != nil {
1,018✔
514
                return err
×
515
        }
×
516
        // add LF
517
        _, err = buf.Write([]byte{'\n'})
1,018✔
518
        if err != nil {
1,018✔
519
                return err
×
520
        }
×
521

522
        queueFile := q.queueFile.Load().(*os.File)
1,018✔
523
        _, err = buf.WriteTo(queueFile)
1,018✔
524
        if err != nil {
1,018✔
525
                return err
×
526
        }
×
527

528
        return nil
1,018✔
529
}
530

531
func (q *Queue[T]) rotateFile() error {
57✔
532
        queueFile := q.queueFile.Load().(*os.File)
57✔
533
        queueFile.Close()
57✔
534
        currentPage := atomic.LoadUint64(&q.currentPage)
57✔
535
        currentPage++
57✔
536
        if currentPage == q.maxPage {
75✔
537
                currentPage = 0
18✔
538
        }
18✔
539
        atomic.StoreUint64(&q.currentPage, currentPage)
57✔
540
        newQueueFilepath := filepath.Join(q.fileDir, fmt.Sprintf("%s.%d", queueFilename, currentPage))
57✔
541
        newQueueFile, err := os.OpenFile(newQueueFilepath, fCreateFlag, 0644)
57✔
542
        if err != nil {
57✔
543
                return err
×
544
        }
×
545
        q.queueFile.Store(newQueueFile)
57✔
546
        return nil
57✔
547
}
548

549
// UpdateIndex updates the index of a given message in the queue.
550
//
551
// Parameters:
552
//   - message: The message whose index needs to be updated.
553
//
554
// Returns:
555
//   - error: An error if the index update fails.
556
//
557
// Example:
558
//
559
//        err := q.UpdateIndex(message)
560
//        if err != nil {
561
//            log.Fatal(err)
562
//        }
563
func (q *Queue[T]) UpdateIndex(m *Message[T]) error {
201✔
564
        return q.writeIndex(m.index)
201✔
565
}
201✔
566

567
func (q *Queue[T]) writeIndex(index uint64) error {
762✔
568
        var err error
762✔
569
        _, err = q.indexFile.Seek(0, io.SeekStart)
762✔
570
        if err != nil {
762✔
571
                return err
×
572
        }
×
573

574
        buf := indexBufPool.Get().(*[8]byte)
762✔
575

762✔
576
        // uint64 size is 8
762✔
577
        // | -- index(8) -- |
762✔
578
        binary.LittleEndian.PutUint64((*buf)[0:8], index)
762✔
579

762✔
580
        defer indexBufPool.Put(buf)
762✔
581

762✔
582
        _, err = q.indexFile.Write((*buf)[:])
762✔
583
        if err != nil {
762✔
584
                return err
×
585
        }
×
586
        return nil
762✔
587
}
588

589
// Length returns the current number of items in the queue.
590
//
591
// Returns:
592
//   - int: The number of items in the queue.
593
//
594
// Example:
595
//
596
//        length := q.Length()
597
//        fmt.Println("Queue length:", length)
598
func (q *Queue[T]) Length() uint64 {
8,655✔
599
        return uint64(len(q.queue))
8,655✔
600
}
8,655✔
601

602
func (q *Queue[T]) initialize() {
130✔
603
        var queueFile *os.File
130✔
604

130✔
605
        tail := atomic.LoadUint64(&q.tail)
130✔
606
        startTail := tail
130✔
607
        currentPage := tail / q.size
130✔
608

130✔
609
        for {
276✔
610
                queueFilepath := filepath.Join(q.fileDir, fmt.Sprintf("%s.%d", queueFilename, currentPage))
146✔
611
                stat, err := os.Stat(queueFilepath)
146✔
612
                if err != nil {
259✔
613
                        if os.IsNotExist(err) {
226✔
614
                                queueFile, err = os.OpenFile(queueFilepath, fCreateFlag, 0644)
113✔
615
                                if err != nil {
113✔
616
                                        panic(fmt.Sprintf("could not create file, %s, %v", queueFilepath, err))
×
617
                                }
618
                                atomic.StoreUint64(&q.currentPage, currentPage)
113✔
619
                                q.queueFile.Store(queueFile)
113✔
620
                                break
113✔
621
                        } else {
×
622
                                panic(err)
×
623
                        }
624
                }
625

626
                // read queue file and set queue
627
                queueFile, err = os.OpenFile(queueFilepath, fOpenFlag, 0644)
33✔
628
                if err != nil {
33✔
629
                        panic(fmt.Sprintf("could not open file, %s, %v", queueFilepath, err))
×
630
                }
631
                reader := bufio.NewReader(queueFile)
33✔
632
                var itemNums uint64 = 0
33✔
633
                for {
274✔
634
                        b, err := reader.ReadBytes('\n')
241✔
635
                        if err != nil {
274✔
636
                                if err == io.EOF {
66✔
637
                                        break
33✔
NEW
638
                                } else {
×
NEW
639
                                        panic(fmt.Sprintf("could not read file, %v", err))
×
640
                                }
641
                        }
642
                        var items []*T
208✔
643
                        err = q.decoder(b, &items)
208✔
644
                        if err != nil {
208✔
645
                                panic(fmt.Sprintf("could not UnMarshal data, %s, %v", string(b), err))
×
646
                        }
647
                        for _, item := range items {
464✔
648
                                if uint64(itemNums)+(currentPage*q.size) < startTail {
360✔
649
                                        itemNums++
104✔
650
                                        continue
104✔
651
                                }
652
                                m := &Message[T]{
152✔
653
                                        index: tail,
152✔
654
                                        name:  q.name,
152✔
655
                                        item:  item,
152✔
656
                                }
152✔
657
                                q.queue <- m
152✔
658
                                tail = q.spStoreTail(tail, 1)
152✔
659
                                if tail == 0 {
156✔
660
                                        // reset 0 page
4✔
661
                                        startTail = 0
4✔
662
                                }
4✔
663
                                itemNums++
152✔
664
                        }
665
                }
666

667
                // check next page
668
                if (tail / q.size) == currentPage {
50✔
669
                        atomic.StoreUint64(&q.currentPage, currentPage)
17✔
670
                        q.queueFile.Store(queueFile)
17✔
671
                        break
17✔
672
                }
673

674
                currentPage++
16✔
675
                if currentPage == q.maxPage {
20✔
676
                        currentPage = 0
4✔
677
                }
4✔
678
                nextQueueFilepath := filepath.Join(q.fileDir, fmt.Sprintf("%s.%d", queueFilename, currentPage))
16✔
679
                nextStat, err := os.Stat(nextQueueFilepath)
16✔
680
                if err != nil {
20✔
681
                        if !os.IsNotExist(err) {
4✔
NEW
682
                                panic(err)
×
683
                        }
684
                } else {
12✔
685
                        if stat.ModTime().After(nextStat.ModTime()) {
16✔
686
                                os.Remove(nextQueueFilepath)
4✔
687
                        }
4✔
688
                }
689
        }
690

691
        // release blocking
692
        q.initializeBlock <- struct{}{}
130✔
693
        close(q.initializeBlock)
130✔
694
}
695

696
// WaitInitialize blocks until the queue is fully initialized.
697
//
698
// Example:
699
//
700
//        q.WaitInitialize()
701
func (q *Queue[T]) WaitInitialize() {
130✔
702
        <-q.initializeBlock
130✔
703
}
130✔
704

705
// CloseQueue closes the queue and its associated file, signaling that no more data can be added.
706
//
707
// Returns:
708
//   - error: An error if the file closure fails.
709
//
710
// Example:
711
//
712
//        err := q.CloseQueue()
713
//        if err != nil {
714
//            log.Fatal(err)
715
//        }
716
func (q *Queue[T]) CloseQueue() error {
125✔
717
        close(q.queue)
125✔
718
        q.isQueueClosed.Store(true)
125✔
719
        queueFile := q.queueFile.Load().(*os.File)
125✔
720
        err := queueFile.Close()
125✔
721
        if err != nil {
125✔
722
                return err
×
723
        }
×
724
        return nil
125✔
725
}
726

727
// CloseIndex closes the index file and releases associated resources.
728
//
729
// Returns:
730
//   - error: An error if the index file could not be closed.
731
//
732
// Example:
733
//
734
//        err := queue.CloseIndex()
735
//        if err != nil {
736
//          log.Fatal(err)
737
//        }
738
func (q *Queue[T]) CloseIndex() error {
122✔
739
        q.isIndexClosed.Store(true)
122✔
740
        err := q.indexFile.Close()
122✔
741
        if err != nil {
122✔
742
                return err
×
743
        }
×
744
        return nil
122✔
745
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc