• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freeeve / tinykvs / 21180415886

20 Jan 2026 05:06PM UTC coverage: 70.81% (-0.1%) from 70.948%
21180415886

push

github

freeeve
fix(compaction): add SSTable reference counting to prevent use-after-close

Add reference counting to SSTables to prevent concurrent readers from
encountering "file already closed" errors during compaction.

Changes:
- Add refs and markedForRemoval fields to SSTable struct
- Add IncRef/DecRef/MarkForRemoval methods for safe lifecycle management
- Update reader.Get to hold refs while accessing SSTables
- Update ScanPrefix/ScanRange scanners to track and release refs
- Replace direct Close+Remove with MarkForRemoval in compaction

Fixes TestConcurrentReadsDuringCompaction race condition.

50 of 53 new or added lines in 3 files covered. (94.34%)

760 existing lines in 12 files now uncovered.

5594 of 7900 relevant lines covered (70.81%)

405174.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.46
/batch.go
1
package tinykvs
2

3
import (
4
        "reflect"
5

6
        "github.com/freeeve/msgpck"
7
)
8

9
// Batch accumulates multiple operations to be applied atomically.
10
type Batch struct {
11
        ops []batchOp
12
}
13

14
type batchOp struct {
15
        key    []byte
16
        value  Value
17
        delete bool
18
}
19

20
// NewBatch creates a new batch for atomic writes.
21
func NewBatch() *Batch {
8✔
22
        return &Batch{}
8✔
23
}
8✔
24

25
// Put adds a put operation to the batch.
26
func (b *Batch) Put(key []byte, value Value) {
18✔
27
        // Copy key to avoid issues if caller reuses buffer
18✔
28
        keyCopy := make([]byte, len(key))
18✔
29
        copy(keyCopy, key)
18✔
30
        b.ops = append(b.ops, batchOp{key: keyCopy, value: value, delete: false})
18✔
31
}
18✔
32

33
// Delete adds a delete operation to the batch.
34
func (b *Batch) Delete(key []byte) {
160✔
35
        keyCopy := make([]byte, len(key))
160✔
36
        copy(keyCopy, key)
160✔
37
        b.ops = append(b.ops, batchOp{key: keyCopy, delete: true})
160✔
38
}
160✔
39

40
// PutString adds a string put operation.
41
func (b *Batch) PutString(key []byte, value string) {
2✔
42
        b.Put(key, StringValue(value))
2✔
43
}
2✔
44

45
// PutInt64 adds an int64 put operation.
46
func (b *Batch) PutInt64(key []byte, value int64) {
1✔
47
        b.Put(key, Int64Value(value))
1✔
48
}
1✔
49

50
// PutBytes adds a bytes put operation.
51
func (b *Batch) PutBytes(key []byte, value []byte) {
3✔
52
        b.Put(key, BytesValue(value))
3✔
53
}
3✔
54

55
// PutMap adds a record put operation.
56
func (b *Batch) PutMap(key []byte, fields map[string]any) error {
3✔
57
        data, err := msgpck.MarshalCopy(fields)
3✔
58
        if err != nil {
3✔
UNCOV
59
                return err
×
UNCOV
60
        }
×
61
        b.Put(key, MsgpackValue(data))
3✔
62
        return nil
3✔
63
}
64

65
// PutJson adds a JSON string put operation.
66
func (b *Batch) PutJson(key []byte, data any) error {
2✔
67
        jsonBytes, err := EncodeJson(data)
2✔
68
        if err != nil {
2✔
UNCOV
69
                return err
×
UNCOV
70
        }
×
71
        b.Put(key, StringValue(string(jsonBytes)))
2✔
72
        return nil
2✔
73
}
74

75
// Len returns the number of operations in the batch.
76
func (b *Batch) Len() int {
2✔
77
        return len(b.ops)
2✔
78
}
2✔
79

80
// Reset clears the batch for reuse.
81
func (b *Batch) Reset() {
1✔
82
        b.ops = b.ops[:0]
1✔
83
}
1✔
84

85
// WriteBatch atomically applies all operations in the batch.
86
// All operations are written to wal together before applying to memtable.
87
func (s *Store) WriteBatch(batch *Batch) error {
9✔
88
        if batch == nil || len(batch.ops) == 0 {
11✔
89
                return nil
2✔
90
        }
2✔
91

92
        s.writeMu.Lock()
7✔
93
        defer s.writeMu.Unlock()
7✔
94

7✔
95
        if s.closed.Load() {
7✔
UNCOV
96
                return ErrStoreClosed
×
UNCOV
97
        }
×
98

99
        return s.writer.WriteBatch(batch.ops)
7✔
100
}
101

102
// PutIfNotExists stores a value only if the key doesn't exist.
103
// Returns ErrKeyExists if the key already exists.
104
func (s *Store) PutIfNotExists(key []byte, value Value) error {
2✔
105
        s.writeMu.Lock()
2✔
106
        defer s.writeMu.Unlock()
2✔
107

2✔
108
        if s.closed.Load() {
2✔
UNCOV
109
                return ErrStoreClosed
×
UNCOV
110
        }
×
111

112
        // Check if key exists
113
        _, err := s.reader.Get(key)
2✔
114
        if err == nil {
3✔
115
                return ErrKeyExists
1✔
116
        }
1✔
117
        if err != ErrKeyNotFound {
1✔
UNCOV
118
                return err
×
UNCOV
119
        }
×
120

121
        return s.writer.Put(key, value)
1✔
122
}
123

124
// PutIfEquals stores a value only if the current value equals expected.
125
// Returns ErrConditionFailed if values don't match, ErrKeyNotFound if key doesn't exist.
126
func (s *Store) PutIfEquals(key []byte, value Value, expected Value) error {
3✔
127
        s.writeMu.Lock()
3✔
128
        defer s.writeMu.Unlock()
3✔
129

3✔
130
        if s.closed.Load() {
3✔
UNCOV
131
                return ErrStoreClosed
×
UNCOV
132
        }
×
133

134
        // Get current value
135
        current, err := s.reader.Get(key)
3✔
136
        if err != nil {
4✔
137
                return err
1✔
138
        }
1✔
139

140
        // Compare values
141
        if !valuesEqual(current, expected) {
3✔
142
                return ErrConditionFailed
1✔
143
        }
1✔
144

145
        return s.writer.Put(key, value)
1✔
146
}
147

148
// Increment atomically adds delta to an int64 value and returns the new value.
149
// If key doesn't exist, it's treated as 0.
150
// Returns error if existing value is not an int64.
151
func (s *Store) Increment(key []byte, delta int64) (int64, error) {
4✔
152
        s.writeMu.Lock()
4✔
153
        defer s.writeMu.Unlock()
4✔
154

4✔
155
        if s.closed.Load() {
4✔
156
                return 0, ErrStoreClosed
×
UNCOV
157
        }
×
158

159
        // Get current value
160
        var current int64
4✔
161
        val, err := s.reader.Get(key)
4✔
162
        if err == nil {
7✔
163
                if val.Type != ValueTypeInt64 {
4✔
164
                        return 0, ErrTypeMismatch
1✔
165
                }
1✔
166
                current = val.Int64
2✔
167
        } else if err != ErrKeyNotFound {
1✔
UNCOV
168
                return 0, err
×
UNCOV
169
        }
×
170

171
        // Compute new value
172
        newVal := current + delta
3✔
173

3✔
174
        // Write new value
3✔
175
        if err := s.writer.Put(key, Int64Value(newVal)); err != nil {
3✔
UNCOV
176
                return 0, err
×
UNCOV
177
        }
×
178

179
        return newVal, nil
3✔
180
}
181

182
// DeleteRange deletes all keys in the range [start, end).
183
// This is more efficient than deleting keys one by one.
184
// Returns the number of keys deleted.
185
func (s *Store) DeleteRange(start, end []byte) (int64, error) {
2✔
186
        s.writeMu.Lock()
2✔
187
        defer s.writeMu.Unlock()
2✔
188

2✔
189
        if s.closed.Load() {
2✔
UNCOV
190
                return 0, ErrStoreClosed
×
UNCOV
191
        }
×
192

193
        // Collect keys in range
194
        var keys [][]byte
2✔
195
        err := s.reader.ScanPrefix(nil, func(key []byte, _ Value) bool {
211✔
196
                if CompareKeys(key, start) >= 0 && CompareKeys(key, end) < 0 {
313✔
197
                        keyCopy := make([]byte, len(key))
104✔
198
                        copy(keyCopy, key)
104✔
199
                        keys = append(keys, keyCopy)
104✔
200
                }
104✔
201
                // Stop if we've passed the end
202
                return CompareKeys(key, end) < 0
209✔
203
        })
204
        if err != nil {
2✔
UNCOV
205
                return 0, err
×
UNCOV
206
        }
×
207

208
        // Delete all keys in batch
209
        if len(keys) == 0 {
2✔
UNCOV
210
                return 0, nil
×
UNCOV
211
        }
×
212

213
        batch := &Batch{}
2✔
214
        for _, key := range keys {
106✔
215
                batch.Delete(key)
104✔
216
        }
104✔
217

218
        if err := s.writer.WriteBatch(batch.ops); err != nil {
2✔
UNCOV
219
                return 0, err
×
UNCOV
220
        }
×
221

222
        return int64(len(keys)), nil
2✔
223
}
224

225
// DeletePrefix deletes all keys with the given prefix.
226
// Returns the number of keys deleted.
227
func (s *Store) DeletePrefix(prefix []byte) (int64, error) {
2✔
228
        s.writeMu.Lock()
2✔
229
        defer s.writeMu.Unlock()
2✔
230

2✔
231
        if s.closed.Load() {
2✔
UNCOV
232
                return 0, ErrStoreClosed
×
UNCOV
233
        }
×
234

235
        // Collect keys with prefix
236
        var keys [][]byte
2✔
237
        // Use reader directly since we already hold the lock
2✔
238
        err := s.reader.ScanPrefix(prefix, func(key []byte, _ Value) bool {
57✔
239
                keyCopy := make([]byte, len(key))
55✔
240
                copy(keyCopy, key)
55✔
241
                keys = append(keys, keyCopy)
55✔
242
                return true
55✔
243
        })
55✔
244
        if err != nil {
2✔
UNCOV
245
                return 0, err
×
UNCOV
246
        }
×
247

248
        if len(keys) == 0 {
2✔
UNCOV
249
                return 0, nil
×
UNCOV
250
        }
×
251

252
        // Delete all keys in batch
253
        batch := &Batch{}
2✔
254
        for _, key := range keys {
57✔
255
                batch.Delete(key)
55✔
256
        }
55✔
257

258
        if err := s.writer.WriteBatch(batch.ops); err != nil {
2✔
UNCOV
259
                return 0, err
×
UNCOV
260
        }
×
261

262
        return int64(len(keys)), nil
2✔
263
}
264

265
// valuesEqual compares two values for equality.
266
func valuesEqual(a, b Value) bool {
14✔
267
        if a.Type != b.Type {
15✔
268
                return false
1✔
269
        }
1✔
270
        switch a.Type {
13✔
271
        case ValueTypeInt64:
4✔
272
                return a.Int64 == b.Int64
4✔
273
        case ValueTypeFloat64:
2✔
274
                return a.Float64 == b.Float64
2✔
275
        case ValueTypeBool:
2✔
276
                return a.Bool == b.Bool
2✔
277
        case ValueTypeString, ValueTypeBytes:
4✔
278
                return string(a.Bytes) == string(b.Bytes)
4✔
UNCOV
279
        case ValueTypeRecord:
×
UNCOV
280
                return reflect.DeepEqual(a.Record, b.Record)
×
281
        case ValueTypeTombstone:
1✔
282
                return true
1✔
283
        default:
×
284
                return false
×
285
        }
286
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc