• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nihiyama / ffq / 20678994611

03 Jan 2026 03:07PM UTC coverage: 90.603% (-1.0%) from 91.57%
20678994611

push

github

web-flow
Merge pull request #45 from nihiyama/feature/issue-40

Feature/issue 40

22 of 31 new or added lines in 1 file covered. (70.97%)

5 existing lines in 1 file now uncovered.

887 of 979 relevant lines covered (90.6%)

289.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.91
/simple_queue.go
1
// Package ffq provides a file-based FIFO queue implementation that supports generic types.
2
package ffq
3

4
import (
5
        "bufio"
6
        "bytes"
7
        "encoding/binary"
8
        "encoding/json"
9
        "errors"
10
        "fmt"
11
        "io"
12
        "os"
13
        "path/filepath"
14
        "sync"
15
        "sync/atomic"
16
        "time"
17
)
18

19
const (
20
        indexFilename = "index"
21
        queueFilename = "queue"
22
        queueFileDir  = "/tmp/ffq"
23
)
24

25
type QueueType string
26

27
const (
28
        // SPSC indicates Single Producer Single Consumer mode.
29
        SPSC QueueType = "SPSC"
30
        // MPSC indicates Multiple Producer Single Consumer mode.
31
        MPSC QueueType = "MPSC"
32
)
33

34
// Queue represents a file-based FIFO queue with a generic type T.
35
// It supports operations such as enqueue, dequeue, bulk enqueue/dequeue, and manages the queue across multiple pages.
36
type Queue[T any] struct {
37
        size                  uint64                         // The maximum number of items in the queue.
38
        tail                  uint64                         // Current write index
39
        maxPage               uint64                         // Total number of pages available
40
        currentPage           uint64                         // Current page number used for writing
41
        name                  string                         // The name of the queue.
42
        fileDir               string                         // The directory where the queue files are stored.
43
        queueFile             atomic.Value                   // The file where the queue data is written.
44
        indexFile             *os.File                       // The file where the queue's index is stored.
45
        encoder               func(v any) ([]byte, error)    // Function to encode data before writing to the queue.
46
        decoder               func(data []byte, v any) error // Function to decode data when reading from the queue.
47
        enqueuer              func(item *T) error            // Internal function for enqueuing a single item
48
        bulkEnqueuer          func(items []*T) error         // Internal function for bulk enqueuing multiple items
49
        queue                 chan *Message[T]               // Internal message channel
50
        isQueueClosed         atomic.Bool                    // Flag indicating if the queue is closed
51
        isQueueClosedRecieved atomic.Bool                    // Flag indicating if a closed queue has been read from
52
        isIndexClosed         atomic.Bool                    // Flag indicating if the index file is closed
53
        initializeBlock       chan struct{}                  // A channel to block until the queue is fully initialized.
54
        mu                    sync.Mutex                     // Mutex for synchronizing multi-producer operations
55
}
56

57
// NewQueue creates a new Queue with the specified name and options.
58
//
59
// Parameters:
60
//   - name: The identifier for the queue. This is used to distinguish multiple queues.
61
//   - opts: A list of options to customize the queue (e.g., WithQueueSize, WithMaxPages, WithFileDir, WithEncoder, WithDecoder).
62
//
63
// Returns:
64
//   - *Queue[T]: A pointer to the newly created and partially initialized Queue.
65
//   - error: An error if the queue initialization fails (for example, if creating the file directory fails).
66
//
67
// Example:
68
//
69
//        // Create a queue with a buffer size of 100 items and up to 5 file pages.
70
//        q, err := ffq.NewQueue[string]("myQueue", ffq.WithQueueSize(100), ffq.WithMaxPages(5))
71
//        if err != nil {
72
//            log.Fatalf("Failed to create queue: %v", err)
73
//        }
74
//        // Wait for the background initialization to comple
75
func NewQueue[T any](name string, opts ...Option) (*Queue[T], error) {
137✔
76
        var err error
137✔
77

137✔
78
        // check options and set default settings
137✔
79
        var options options
137✔
80
        for _, opt := range opts {
840✔
81
                err := opt(&options)
703✔
82
                if err != nil {
703✔
83
                        return nil, err
×
84
                }
×
85
        }
86

87
        var fileDir = queueFileDir
137✔
88
        if options.fileDir != nil {
274✔
89
                fileDir = *options.fileDir
137✔
90
        }
137✔
91
        err = createQueueDir(fileDir)
137✔
92
        if err != nil {
137✔
93
                return nil, err
×
94
        }
×
95

96
        var size uint64 = 1024
137✔
97
        if options.size != nil {
267✔
98
                size = *options.size
130✔
99
        }
130✔
100

101
        var maxPage uint64 = 2
137✔
102
        if options.maxPage != nil {
267✔
103
                maxPage = *options.maxPage
130✔
104
        }
130✔
105

106
        var queueType QueueType = SPSC
137✔
107
        if options.queueType != nil {
252✔
108
                queueType = *options.queueType
115✔
109
        }
115✔
110

111
        var encoder func(v any) ([]byte, error) = json.Marshal
137✔
112
        if options.encoder != nil {
230✔
113
                encoder = *options.encoder
93✔
114
        }
93✔
115

116
        var decoder func(data []byte, v any) error = json.Unmarshal
137✔
117
        if options.decoder != nil {
235✔
118
                decoder = *options.decoder
98✔
119
        }
98✔
120

121
        queue := make(chan *Message[T], size)
137✔
122

137✔
123
        // open index file
137✔
124
        indexFilePath := filepath.Join(fileDir, indexFilename)
137✔
125
        index := readIndex(indexFilePath)
137✔
126

137✔
127
        var tail uint64 = 0
137✔
128
        if index != nil {
160✔
129
                tail = *index + 1
23✔
130
        }
23✔
131

132
        indexFile, err := openIndexFile(indexFilePath)
137✔
133
        if err != nil {
137✔
134
                return nil, err
×
135
        }
×
136

137
        q := Queue[T]{
137✔
138
                size:            size,
137✔
139
                tail:            tail,
137✔
140
                maxPage:         maxPage,
137✔
141
                currentPage:     0,
137✔
142
                name:            name,
137✔
143
                fileDir:         fileDir,
137✔
144
                indexFile:       indexFile,
137✔
145
                encoder:         encoder,
137✔
146
                decoder:         decoder,
137✔
147
                queue:           queue,
137✔
148
                initializeBlock: make(chan struct{}),
137✔
149
        }
137✔
150

137✔
151
        switch queueType {
137✔
152
        case SPSC:
130✔
153
                q.enqueuer = q.spEnqueue
130✔
154
                q.bulkEnqueuer = q.spBulkEnqueue
130✔
155
        case MPSC:
7✔
156
                q.enqueuer = q.mpEnqueue
7✔
157
                q.bulkEnqueuer = q.mpBulkEnqueue
7✔
158
        }
159

160
        q.isQueueClosed.Store(false)
137✔
161
        q.isQueueClosedRecieved.Store(false)
137✔
162
        q.isIndexClosed.Store(false)
137✔
163

137✔
164
        go func() {
274✔
165
                q.initialize()
137✔
166
        }()
137✔
167

168
        return &q, nil
137✔
169
}
170

171
func (q *Queue[T]) spStoreTail(tail uint64, nums uint64) uint64 {
1,112✔
172
        if tail+nums < q.size*q.maxPage {
2,204✔
173
                return atomic.AddUint64(&q.tail, nums)
1,092✔
174
        } else {
1,112✔
175
                return atomic.AddUint64(&q.tail, nums-(q.size*q.maxPage))
20✔
176
        }
20✔
177
}
178

179
func (q *Queue[T]) mpStoreTail(nums uint64) {
63✔
180
        if q.tail+nums < q.size*q.maxPage {
124✔
181
                q.tail += nums
61✔
182
        } else {
63✔
183
                q.tail += (nums - (q.size * q.maxPage))
2✔
184
        }
2✔
185
}
186

187
func (q *Queue[T]) enqueue(tail uint64, item *T) error {
936✔
188
        buf, err := q.encoder([]*T{item})
936✔
189
        if err != nil {
936✔
190
                return err
×
191
        }
×
192
        err = q.writeQueue(buf, tail)
936✔
193
        if err != nil {
936✔
194
                return err
×
195
        }
×
196

197
        m := &Message[T]{
936✔
198
                index: tail,
936✔
199
                name:  q.name,
936✔
200
                item:  item,
936✔
201
        }
936✔
202
        q.queue <- m
936✔
203

936✔
204
        return nil
936✔
205
}
206

207
func (q *Queue[T]) bulkEnqueue(tail uint64, items []*T) error {
80✔
208
        buf, err := q.encoder(items)
80✔
209
        if err != nil {
80✔
210
                return err
×
211
        }
×
212
        err = q.writeQueue(buf, tail)
80✔
213
        if err != nil {
80✔
214
                return err
×
215
        }
×
216

217
        for _, item := range items {
384✔
218
                m := &Message[T]{
304✔
219
                        index: tail,
304✔
220
                        name:  q.name,
304✔
221
                        item:  item,
304✔
222
                }
304✔
223
                q.queue <- m
304✔
224
                tail++
304✔
225
        }
304✔
226
        return nil
80✔
227
}
228

229
func (q *Queue[T]) spEnqueue(item *T) error {
888✔
230
        var err error
888✔
231

888✔
232
        tail := atomic.LoadUint64(&q.tail)
888✔
233
        err = q.enqueue(tail, item)
888✔
234
        if err != nil {
888✔
235
                return err
×
236
        }
×
237
        q.spStoreTail(tail, 1)
888✔
238

888✔
239
        return err
888✔
240
}
241

242
func (q *Queue[T]) mpEnqueue(item *T) error {
48✔
243
        var err error
48✔
244
        q.mu.Lock()
48✔
245
        defer q.mu.Unlock()
48✔
246

48✔
247
        err = q.enqueue(q.tail, item)
48✔
248
        if err != nil {
48✔
249
                return err
×
250
        }
×
251
        q.mpStoreTail(1)
48✔
252

48✔
253
        return err
48✔
254
}
255

256
// Enqueue adds a single item to the queue.
257
//
258
// Parameters:
259
//   - item: A pointer to the data item to be enqueued.
260
//     For example: &dataItem
261
//
262
// Returns:
263
//   - error: An error if the enqueue operation fails.
264
//
265
// Example:
266
//
267
//        data := Data{...}
268
//        if err := q.Enqueue(&data); err != nil {
269
//            log.Fatalf("Enqueue failed: %v", err)
270
//        }
271
func (q *Queue[T]) Enqueue(item *T) error {
936✔
272
        return q.enqueuer(item)
936✔
273
}
936✔
274

275
func (q *Queue[T]) spBulkEnqueue(items []*T) error {
52✔
276
        var err error
52✔
277

52✔
278
        itemLength := uint64(len(items))
52✔
279
        var itemIndex uint64
52✔
280
        tail := atomic.LoadUint64(&q.tail)
52✔
281

52✔
282
        for itemIndex < itemLength {
117✔
283
                batch := q.size - (tail % q.size)
65✔
284
                if itemIndex+batch > itemLength {
114✔
285
                        batch = itemLength - itemIndex
49✔
286
                }
49✔
287
                err = q.bulkEnqueue(tail, items[itemIndex:itemIndex+batch])
65✔
288
                if err != nil {
65✔
289
                        return err
×
290
                }
×
291
                itemIndex += batch
65✔
292
                tail = q.spStoreTail(tail, batch)
65✔
293
        }
294
        return nil
52✔
295
}
296

297
func (q *Queue[T]) mpBulkEnqueue(items []*T) error {
13✔
298
        var err error
13✔
299
        q.mu.Lock()
13✔
300
        defer q.mu.Unlock()
13✔
301

13✔
302
        itemLength := uint64(len(items))
13✔
303
        var itemIndex uint64
13✔
304

13✔
305
        for itemIndex < itemLength {
28✔
306
                batch := q.size - (q.tail % q.size)
15✔
307
                if itemIndex+batch > itemLength {
27✔
308
                        batch = itemLength - itemIndex
12✔
309
                }
12✔
310
                err = q.bulkEnqueue(q.tail, items[itemIndex:itemIndex+batch])
15✔
311
                if err != nil {
15✔
312
                        return err
×
313
                }
×
314
                itemIndex += batch
15✔
315
                q.mpStoreTail(batch)
15✔
316
        }
317
        return nil
13✔
318
}
319

320
// BulkEnqueue adds multiple items to the queue in a single operation.
321
//
322
// Parameters:
323
//   - items: A slice of pointers to data items to be enqueued.
324
//     For example: []*Data{&data1, &data2, ...}
325
//
326
// Returns:
327
//   - error: An error if the bulk enqueue operation fails.
328
//
329
// Example:
330
//
331
//        dataItems := []*Data{
332
//            { ... },
333
//            { ... },
334
//        }
335
//        if err := q.BulkEnqueue(dataItems); err != nil {
336
//            log.Fatalf("BulkEnqueue failed: %v", err)
337
//        }
338
func (q *Queue[T]) BulkEnqueue(items []*T) error {
65✔
339
        return q.bulkEnqueuer(items)
65✔
340
}
65✔
341

342
// Dequeue retrieves and returns a single message from the queue.
343
//
344
// Returns:
345
//   - *Message[T]: The dequeued message containing the data item and its associated index.
346
//   - error: An error if the dequeue operation fails or if the queue is closed.
347
//
348
// Example:
349
//
350
//        msg, err := q.Dequeue()
351
//        if err != nil {
352
//            log.Fatalf("Dequeue failed: %v", err)
353
//        }
354
//        fmt.Printf("Dequeued message: %+v\n", msg)
355
func (q *Queue[T]) Dequeue() (*Message[T], error) {
820✔
356
        m, ok := <-q.queue
820✔
357
        if !ok {
852✔
358
                q.isQueueClosedRecieved.Store(true)
32✔
359
                return nil, ErrQueueClose
32✔
360
        }
32✔
361
        return m, nil
788✔
362
}
363

364
// BulkDequeue retrieves multiple messages from the queue in a single operation.
365
// It waits for either the specified number of messages or until the lazy duration expires.
366
//
367
// Parameters:
368
//   - size: The maximum number of messages to dequeue at once.
369
//   - lazy: The duration to wait between dequeue operations before returning the collected messages.
370
//
371
// Returns:
372
//   - []*Message[T]: A slice of dequeued messages.
373
//   - error: An error if the dequeue operation fails or if the queue is closed.
374
//
375
// Example:
376
//
377
//        messages, err := q.BulkDequeue(10, 100*time.Millisecond)
378
//        if err != nil {
379
//            log.Fatalf("BulkDequeue failed: %v", err)
380
//        }
381
//        fmt.Printf("Bulk dequeued messages: %+v\n", messages)
382
func (q *Queue[T]) BulkDequeue(size uint64, lazy time.Duration) ([]*Message[T], error) {
18✔
383
        var err error
18✔
384
        ms := make([]*Message[T], 0, size)
18✔
385

18✔
386
        m, err := q.Dequeue()
18✔
387
        if err != nil {
21✔
388
                return nil, err
3✔
389
        }
3✔
390
        ms = append(ms, m)
15✔
391

15✔
392
        timer := time.After(lazy)
15✔
393
        for {
40✔
394
                select {
25✔
395
                case <-timer:
12✔
396
                        return ms, nil
12✔
397
                case m, ok := <-q.queue:
13✔
398
                        if !ok {
15✔
399
                                // return ErrQueueClose at next time
2✔
400
                                return ms, nil
2✔
401
                        }
2✔
402
                        ms = append(ms, m)
11✔
403
                        if uint64(len(ms)) == size {
12✔
404
                                return ms, nil
1✔
405
                        }
1✔
406
                }
407
        }
408
}
409

410
// FuncAfterDequeue retrieves a single item from the queue and applies the specified function to it.
411
// After processing, the function updates the index file accordingly.
412
//
413
// Parameters:
414
//   - f: A function that processes the dequeued item. The function should return an error if processing fails.
415
//
416
// Returns:
417
//   - error: An error if either the dequeue operation or the function application (or index update) fails.
418
//
419
// Example:
420
//
421
//        err := q.FuncAfterDequeue(func(data *Data) error {
422
//            fmt.Printf("Processing item: %+v\n", data)
423
//            return nil
424
//        })
425
//        if err != nil {
426
//            log.Fatalf("FuncAfterDequeue failed: %v", err)
427
//        }
428
func (q *Queue[T]) FuncAfterDequeue(f func(*T) error) error {
10✔
429
        m, err := q.Dequeue()
10✔
430
        if err != nil {
11✔
431
                return err
1✔
432
        }
1✔
433

434
        fErr := f(m.item)
9✔
435
        if fErr != nil {
10✔
436
                err = errors.Join(err, fErr)
1✔
437
        }
1✔
438
        iErr := q.writeIndex(m.index)
9✔
439
        if iErr != nil {
9✔
440
                err = errors.Join(err, iErr)
×
441
        }
×
442
        return err
9✔
443
}
444

445
// FuncAfterBulkDequeue retrieves multiple items from the queue as a batch,
446
// applies the specified function to the batch, and then updates the index file based on the last item processed.
447
//
448
// Parameters:
449
//   - size: The maximum number of items to dequeue in the batch.
450
//   - lazy: The duration to wait between dequeue operations before processing the batch.
451
//   - f: A function that processes the batch of dequeued items. The function should return an error if processing fails.
452
//
453
// Returns:
454
//   - int: The actual number of items processed.
455
//   - error: An error if either the batch dequeue operation, the function application, or the index update fails.
456
//
457
// Example:
458
//
459
//        count, err := q.FuncAfterBulkDequeue(10, 100*time.Millisecond, func(data []*Data) error {
460
//            fmt.Printf("Processing batch of %d items\n", len(data))
461
//            return nil
462
//        })
463
//        if err != nil {
464
//            log.Fatalf("FuncAfterBulkDequeue failed: %v", err)
465
//        }
466
//        fmt.Printf("Processed %d items\n", count)
467
func (q *Queue[T]) FuncAfterBulkDequeue(size uint64, lazy time.Duration, f func([]*T) error) (int, error) {
16✔
468
        var err error
16✔
469
        items := make([]*T, 0, size)
16✔
470

16✔
471
        m, err := q.Dequeue()
16✔
472
        if err != nil {
19✔
473
                return 0, err
3✔
474
        }
3✔
475
        lastM := m
13✔
476
        items = append(items, m.item)
13✔
477

13✔
478
        timer := time.After(lazy)
13✔
479
LOOP:
13✔
480
        for {
38✔
481
                select {
25✔
482
                case <-timer:
10✔
483
                        break LOOP
10✔
484
                case m, ok := <-q.queue:
15✔
485
                        if !ok {
16✔
486
                                // return ErrQueueClose at next time
1✔
487
                                break LOOP
1✔
488
                        }
489
                        items = append(items, m.item)
14✔
490
                        lastM = m
14✔
491
                        if uint64(len(items)) == size {
16✔
492
                                break LOOP
2✔
493
                        }
494
                }
495
        }
496
        fErr := f(items)
13✔
497
        if fErr != nil {
14✔
498
                err = errors.Join(err, fErr)
1✔
499
        }
1✔
500
        iErr := q.writeIndex(lastM.index)
13✔
501
        if iErr != nil {
13✔
502
                err = errors.Join(err, iErr)
×
503
        }
×
504
        return len(items), err
13✔
505
}
506

507
func (q *Queue[T]) writeQueue(b []byte, tail uint64) error {
1,016✔
508
        var err error
1,016✔
509

1,016✔
510
        // file rotation
1,016✔
511
        currentPage := atomic.LoadUint64(&q.currentPage)
1,016✔
512
        if (tail / q.size) != currentPage {
1,073✔
513
                err = q.rotateFile()
57✔
514
                if err != nil {
57✔
515
                        return err
×
516
                }
×
517
        }
518

519
        buf := queueBufPool.Get().(*bytes.Buffer)
1,016✔
520
        buf.Reset()
1,016✔
521
        buf.Grow(len(b) + 1)
1,016✔
522
        defer queueBufPool.Put(buf)
1,016✔
523

1,016✔
524
        _, err = buf.Write(b)
1,016✔
525
        if err != nil {
1,016✔
526
                return err
×
527
        }
×
528
        // add LF
529
        _, err = buf.Write([]byte{'\n'})
1,016✔
530
        if err != nil {
1,016✔
531
                return err
×
532
        }
×
533

534
        queueFile := q.queueFile.Load().(*os.File)
1,016✔
535
        _, err = buf.WriteTo(queueFile)
1,016✔
536
        if err != nil {
1,016✔
537
                return err
×
538
        }
×
539

540
        return nil
1,016✔
541
}
542

543
func (q *Queue[T]) rotateFile() error {
57✔
544
        queueFile := q.queueFile.Load().(*os.File)
57✔
545
        queueFile.Close()
57✔
546
        currentPage := atomic.LoadUint64(&q.currentPage)
57✔
547
        currentPage++
57✔
548
        if currentPage == q.maxPage {
75✔
549
                currentPage = 0
18✔
550
        }
18✔
551
        atomic.StoreUint64(&q.currentPage, currentPage)
57✔
552
        newQueueFilepath := filepath.Join(q.fileDir, fmt.Sprintf("%s.%d", queueFilename, currentPage))
57✔
553
        newQueueFile, err := os.OpenFile(newQueueFilepath, fCreateFlag, 0644)
57✔
554
        if err != nil {
57✔
555
                return err
×
556
        }
×
557
        q.queueFile.Store(newQueueFile)
57✔
558
        return nil
57✔
559
}
560

561
// UpdateIndex updates the index information of the specified message in the index file.
562
// This is used to record the position up to which the queue has been processed.
563
//
564
// Parameters:
565
//   - m: The message whose index is to be updated.
566
//     The index value contained in the message will be written to the index file.
567
//
568
// Returns:
569
//   - error: An error if updating the index fails.
570
//
571
// Example:
572
//
573
//        if err := q.UpdateIndex(message); err != nil {
574
//            log.Fatalf("UpdateIndex failed: %v", err)
575
//        }
576
func (q *Queue[T]) UpdateIndex(m *Message[T]) error {
201✔
577
        return q.writeIndex(m.index)
201✔
578
}
201✔
579

580
func (q *Queue[T]) writeIndex(index uint64) error {
761✔
581
        var err error
761✔
582
        _, err = q.indexFile.Seek(0, io.SeekStart)
761✔
583
        if err != nil {
761✔
584
                return err
×
585
        }
×
586

587
        buf := indexBufPool.Get().(*[8]byte)
761✔
588

761✔
589
        // uint64 size is 8
761✔
590
        // | -- index(8) -- |
761✔
591
        binary.LittleEndian.PutUint64((*buf)[0:8], index)
761✔
592

761✔
593
        defer indexBufPool.Put(buf)
761✔
594

761✔
595
        _, err = q.indexFile.Write((*buf)[:])
761✔
596
        if err != nil {
761✔
597
                return err
×
598
        }
×
599
        return nil
761✔
600
}
601

602
// Length returns the current number of messages stored in the internal buffer of the queue.
603
// This reflects the number of unprocessed messages.
604
//
605
// Returns:
606
//   - uint64: The number of messages currently in the queue.
607
//
608
// Example:
609
//
610
//        currentLength := q.Length()
611
//        fmt.Printf("Current queue length: %d\n", currentLength)
612
func (q *Queue[T]) Length() uint64 {
9,026✔
613
        return uint64(len(q.queue))
9,026✔
614
}
9,026✔
615

616
func countJSONArrayItems(b []byte) (uint64, error) {
105✔
617
        dec := json.NewDecoder(bytes.NewReader(b))
105✔
618
        tok, err := dec.Token()
105✔
619
        if err != nil {
105✔
NEW
620
                return 0, err
×
NEW
621
        }
×
622
        d, ok := tok.(json.Delim)
105✔
623
        if !ok || d != '[' {
105✔
NEW
624
                return 0, fmt.Errorf("expected JSON array")
×
NEW
625
        }
×
626

627
        var n uint64
105✔
628
        for dec.More() {
232✔
629
                var raw json.RawMessage
127✔
630
                if err := dec.Decode(&raw); err != nil {
127✔
NEW
631
                        return 0, err
×
NEW
632
                }
×
633
                n++
127✔
634
        }
635

636
        if _, err := dec.Token(); err != nil {
105✔
NEW
637
                return 0, err
×
NEW
638
        }
×
639
        return n, nil
105✔
640
}
641

642
func (q *Queue[T]) initialize() {
137✔
643
        var queueFile *os.File
137✔
644

137✔
645
        tail := atomic.LoadUint64(&q.tail)
137✔
646
        startTail := tail
137✔
647
        currentPage := tail / q.size
137✔
648

137✔
649
        for {
290✔
650
                queueFilepath := filepath.Join(q.fileDir, fmt.Sprintf("%s.%d", queueFilename, currentPage))
153✔
651
                stat, err := os.Stat(queueFilepath)
153✔
652
                if err != nil {
266✔
653
                        if os.IsNotExist(err) {
226✔
654
                                queueFile, err = os.OpenFile(queueFilepath, fCreateFlag, 0644)
113✔
655
                                if err != nil {
113✔
656
                                        panic(fmt.Sprintf("could not create file, %s, %v", queueFilepath, err))
×
657
                                }
658
                                atomic.StoreUint64(&q.currentPage, currentPage)
113✔
659
                                q.queueFile.Store(queueFile)
113✔
660
                                break
113✔
661
                        } else {
×
662
                                panic(err)
×
663
                        }
664
                }
665

666
                // read queue file and set queue
667
                queueFile, err = os.OpenFile(queueFilepath, fOpenFlag, 0644)
40✔
668
                if err != nil {
40✔
669
                        panic(fmt.Sprintf("could not open file, %s, %v", queueFilepath, err))
×
670
                }
671
                reader := bufio.NewReader(queueFile)
40✔
672
                var itemNums uint64 = 0
40✔
673
                for {
298✔
674
                        b, err := reader.ReadBytes('\n')
258✔
675
                        if err != nil {
301✔
676
                                if err == io.EOF {
86✔
677
                                        if len(b) == 0 {
83✔
678
                                                break
40✔
679
                                        }
680
                                } else {
×
681
                                        panic(fmt.Sprintf("could not read file, %v", err))
×
682
                                }
683
                        }
684
                        batchStart := itemNums + (currentPage * q.size)
218✔
685
                        if batchStart < startTail {
323✔
686
                                count, err := countJSONArrayItems(b)
105✔
687
                                if err != nil {
105✔
NEW
688
                                        panic(fmt.Sprintf("could not count items, %s, %v", string(b), err))
×
689
                                }
690
                                if batchStart+count <= startTail {
203✔
691
                                        itemNums += count
98✔
692
                                        continue
98✔
693
                                }
694
                        }
695
                        var items []*T
120✔
696
                        err = q.decoder(b, &items)
120✔
697
                        if err != nil {
120✔
698
                                panic(fmt.Sprintf("could not UnMarshal data, %s, %v", string(b), err))
×
699
                        }
700
                        for _, item := range items {
294✔
701
                                if uint64(itemNums)+(currentPage*q.size) < startTail {
189✔
702
                                        itemNums++
15✔
703
                                        continue
15✔
704
                                }
705
                                m := &Message[T]{
159✔
706
                                        index: tail,
159✔
707
                                        name:  q.name,
159✔
708
                                        item:  item,
159✔
709
                                }
159✔
710
                                q.queue <- m
159✔
711
                                tail = q.spStoreTail(tail, 1)
159✔
712
                                if tail == 0 {
163✔
713
                                        // reset 0 page
4✔
714
                                        startTail = 0
4✔
715
                                }
4✔
716
                                itemNums++
159✔
717
                        }
718
                }
719

720
                // check next page
721
                if (tail / q.size) == currentPage {
64✔
722
                        atomic.StoreUint64(&q.currentPage, currentPage)
24✔
723
                        q.queueFile.Store(queueFile)
24✔
724
                        break
24✔
725
                }
726

727
                currentPage++
16✔
728
                if currentPage == q.maxPage {
20✔
729
                        currentPage = 0
4✔
730
                }
4✔
731
                nextQueueFilepath := filepath.Join(q.fileDir, fmt.Sprintf("%s.%d", queueFilename, currentPage))
16✔
732
                nextStat, err := os.Stat(nextQueueFilepath)
16✔
733
                if err != nil {
20✔
734
                        if !os.IsNotExist(err) {
4✔
735
                                panic(err)
×
736
                        }
737
                } else {
12✔
738
                        if stat.ModTime().After(nextStat.ModTime()) {
16✔
739
                                os.Remove(nextQueueFilepath)
4✔
740
                        }
4✔
741
                }
742
        }
743

744
        // release blocking
745
        q.initializeBlock <- struct{}{}
137✔
746
        close(q.initializeBlock)
137✔
747
}
748

749
// WaitInitialize blocks until the queue has completed its background initialization process.
750
// Since the queue may load existing data from files and perform recovery in the background,
751
// this method should be called to ensure the queue is fully initialized before starting operations.
752
//
753
// Example:
754
//
755
//        // Immediately after creating the queue, wait for initialization to complete.
756
//        q.WaitInitialize()
757
//        fmt.Println("Queue initialization complete")
758
func (q *Queue[T]) WaitInitialize() {
137✔
759
        <-q.initializeBlock
137✔
760
}
137✔
761

762
// CloseQueue closes the queue, disallowing any further enqueues,
763
// and closes the file associated with the queue data.
764
// After calling CloseQueue, further attempts to enqueue will result in errors.
765
//
766
// Returns:
767
//   - error: An error if closing the queue or its associated file fails.
768
//
769
// Example:
770
//
771
//        if err := q.CloseQueue(); err != nil {
772
//            log.Fatalf("CloseQueue failed: %v", err)
773
//        }
774
func (q *Queue[T]) CloseQueue() error {
132✔
775
        close(q.queue)
132✔
776
        q.isQueueClosed.Store(true)
132✔
777
        queueFile := q.queueFile.Load().(*os.File)
132✔
778
        err := queueFile.Close()
132✔
779
        if err != nil {
132✔
780
                return err
×
781
        }
×
782
        return nil
132✔
783
}
784

785
// CloseIndex closes the index file and releases its associated resources.
786
// This should be called when the queue is no longer needed to ensure that all file handles are properly released.
787
//
788
// Returns:
789
//   - error: An error if closing the index file fails.
790
//
791
// Example:
792
//
793
//        if err := q.CloseIndex(); err != nil {
794
//            log.Fatalf("CloseIndex failed: %v", err)
795
//        }
796
func (q *Queue[T]) CloseIndex() error {
129✔
797
        q.isIndexClosed.Store(true)
129✔
798
        err := q.indexFile.Close()
129✔
799
        if err != nil {
129✔
800
                return err
×
801
        }
×
802
        return nil
129✔
803
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc