• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freeeve / tinykvs / 21180415886

20 Jan 2026 05:06PM UTC coverage: 70.81% (-0.1%) from 70.948%
21180415886

push

github

freeeve
fix(compaction): add SSTable reference counting to prevent use-after-close

Add reference counting to SSTables to prevent concurrent readers from
encountering "file already closed" errors during compaction.

Changes:
- Add refs and markedForRemoval fields to SSTable struct
- Add IncRef/DecRef/MarkForRemoval methods for safe lifecycle management
- Update reader.Get to hold refs while accessing SSTables
- Update ScanPrefix/ScanRange scanners to track and release refs
- Replace direct Close+Remove with MarkForRemoval in compaction

Fixes TestConcurrentReadsDuringCompaction race condition.

50 of 53 new or added lines in 3 files covered. (94.34%)

760 existing lines in 12 files now uncovered.

5594 of 7900 relevant lines covered (70.81%)

405174.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.77
/batch_parallel.go
1
package tinykvs
2

3
import (
4
        "runtime"
5
        "sync"
6

7
        "github.com/freeeve/msgpck"
8
)
9

10
// KeyValue pairs a key with a struct value for bulk operations.
11
type KeyValue[T any] struct {
12
        Key   []byte
13
        Value *T
14
}
15

16
// PutStructs writes multiple structs in parallel using cached encoder.
17
// Parallelizes encoding across CPU cores, then writes atomically.
18
func PutStructs[T any](s *Store, items []KeyValue[T]) error {
1✔
19
        if len(items) == 0 {
1✔
UNCOV
20
                return nil
×
UNCOV
21
        }
×
22

23
        batch := NewBatch()
1✔
24
        if err := batchPutStructsParallel(batch, items, runtime.NumCPU()); err != nil {
1✔
25
                return err
×
26
        }
×
27

28
        return s.WriteBatch(batch)
1✔
29
}
30

31
// batchPutStructsParallel encodes structs in parallel using cached encoder.
32
func batchPutStructsParallel[T any](b *Batch, items []KeyValue[T], numWorkers int) error {
1✔
33
        if len(items) == 0 {
1✔
UNCOV
34
                return nil
×
35
        }
×
36

37
        enc := msgpck.GetStructEncoder[T]()
1✔
38

1✔
39
        if numWorkers <= 1 || len(items) < numWorkers {
1✔
UNCOV
40
                return encodeItemsSequential(b, enc, items)
×
UNCOV
41
        }
×
42

43
        ops, err := encodeItemsParallel(enc, items, numWorkers)
1✔
44
        if err != nil {
1✔
UNCOV
45
                return err
×
UNCOV
46
        }
×
47

48
        b.ops = append(b.ops, ops...)
1✔
49
        return nil
1✔
50
}
51

52
// encodeItemsSequential encodes items one at a time for small batches.
UNCOV
53
func encodeItemsSequential[T any](b *Batch, enc *msgpck.StructEncoder[T], items []KeyValue[T]) error {
×
54
        for _, item := range items {
×
UNCOV
55
                data, err := enc.EncodeCopy(item.Value)
×
UNCOV
56
                if err != nil {
×
UNCOV
57
                        return err
×
UNCOV
58
                }
×
UNCOV
59
                b.Put(item.Key, MsgpackValue(data))
×
60
        }
UNCOV
61
        return nil
×
62
}
63

64
// encodeItemsParallel encodes items across multiple workers.
65
func encodeItemsParallel[T any](enc *msgpck.StructEncoder[T], items []KeyValue[T], numWorkers int) ([]batchOp, error) {
1✔
66
        results := make([]batchOp, len(items))
1✔
67
        var firstErr error
1✔
68
        var errOnce sync.Once
1✔
69

1✔
70
        chunkSize := (len(items) + numWorkers - 1) / numWorkers
1✔
71
        var wg sync.WaitGroup
1✔
72

1✔
73
        for w := 0; w < numWorkers; w++ {
5✔
74
                start := w * chunkSize
4✔
75
                end := start + chunkSize
4✔
76
                if end > len(items) {
4✔
77
                        end = len(items)
×
78
                }
×
79
                if start >= end {
4✔
UNCOV
80
                        break
×
81
                }
82

83
                wg.Add(1)
4✔
84
                go func(start, end int) {
8✔
85
                        defer wg.Done()
4✔
86
                        encodeChunk(enc, items, results, start, end, &firstErr, &errOnce)
4✔
87
                }(start, end)
4✔
88
        }
89

90
        wg.Wait()
1✔
91

1✔
92
        if firstErr != nil {
1✔
UNCOV
93
                return nil, firstErr
×
UNCOV
94
        }
×
95

96
        return results, nil
1✔
97
}
98

99
// encodeChunk encodes a range of items into the results slice.
100
func encodeChunk[T any](enc *msgpck.StructEncoder[T], items []KeyValue[T], results []batchOp, start, end int, firstErr *error, errOnce *sync.Once) {
4✔
101
        for i := start; i < end; i++ {
1,004✔
102
                item := items[i]
1,000✔
103
                data, err := enc.EncodeCopy(item.Value)
1,000✔
104
                if err != nil {
1,000✔
UNCOV
105
                        errOnce.Do(func() { *firstErr = err })
×
UNCOV
106
                        return
×
107
                }
108
                keyCopy := make([]byte, len(item.Key))
1,000✔
109
                copy(keyCopy, item.Key)
1,000✔
110
                results[i] = batchOp{key: keyCopy, value: MsgpackValue(data), delete: false}
1,000✔
111
        }
112
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc