• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freeeve / tinykvs / 21180415886

20 Jan 2026 05:06PM UTC coverage: 70.81% (-0.1%) from 70.948%
21180415886

push

github

freeeve
fix(compaction): add SSTable reference counting to prevent use-after-close

Add reference counting to SSTables to prevent concurrent readers from
encountering "file already closed" errors during compaction.

Changes:
- Add refs and markedForRemoval fields to SSTable struct
- Add IncRef/DecRef/MarkForRemoval methods for safe lifecycle management
- Update reader.Get to hold refs while accessing SSTables
- Update ScanPrefix/ScanRange scanners to track and release refs
- Replace direct Close+Remove with MarkForRemoval in compaction

Fixes TestConcurrentReadsDuringCompaction race condition.

50 of 53 new or added lines in 3 files covered. (94.34%)

760 existing lines in 12 files now uncovered.

5594 of 7900 relevant lines covered (70.81%)

405174.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.34
/reader.go
1
package tinykvs
2

3
import (
4
        "fmt"
5
        "os"
6
        "sync"
7
        "sync/atomic"
8
)
9

10
// ScanStats tracks statistics during scan operations.
11
type ScanStats struct {
12
        BlocksLoaded   int64 // Number of SSTable blocks accessed (disk + cache)
13
        BlocksCacheHit int64 // Number of blocks served from cache
14
        BlocksDiskRead int64 // Number of blocks read from disk
15
        KeysExamined   int64 // Total keys examined (including duplicates and tombstones)
16
        TablesChecked  int64 // Number of SSTables checked for prefix
17
        TablesAdded    int64 // Number of SSTables added to scanner (had matching entries)
18
}
19

20
// ScanProgress is called periodically during scan operations to report progress.
21
// Return false to stop the scan early.
22
type ScanProgress func(stats ScanStats) bool
23

24
// reader coordinates lookups across memtable and SSTables.
25
type reader struct {
26
        mu sync.RWMutex
27

28
        memtable   *memtable
29
        immutables []*memtable  // Recently flushed, waiting for SSTable write
30
        levels     [][]*SSTable // levels[0] = L0, levels[1] = L1, etc.
31
        cache      *lruCache
32
        opts       Options
33
}
34

35
// newReader creates a new reader.
36
func newReader(memtable *memtable, levels [][]*SSTable, cache *lruCache, opts Options) *reader {
209✔
37
        // Make a deep copy of levels to avoid sharing with Store
209✔
38
        // Store and reader use different mutexes, so sharing would cause races
209✔
39
        levelsCopy := make([][]*SSTable, len(levels))
209✔
40
        for i, level := range levels {
1,616✔
41
                levelsCopy[i] = make([]*SSTable, len(level))
1,407✔
42
                copy(levelsCopy[i], level)
1,407✔
43
        }
1,407✔
44
        return &reader{
209✔
45
                memtable: memtable,
209✔
46
                levels:   levelsCopy,
209✔
47
                cache:    cache,
209✔
48
                opts:     opts,
209✔
49
        }
209✔
50
}
51

52
// Get looks up a key, checking memtable first, then SSTables.
53
func (r *reader) Get(key []byte) (Value, error) {
16,530✔
54
        r.mu.RLock()
16,530✔
55
        memtable := r.memtable
16,530✔
56
        immutables := copyImmutables(r.immutables)
16,530✔
57
        levels := copyLevels(r.levels)
16,530✔
58
        cache := r.cache
16,530✔
59
        verify := r.opts.VerifyChecksums
16,530✔
60
        r.mu.RUnlock()
16,530✔
61

16,530✔
62
        if val, found, err := getFromMemtables(key, memtable, immutables); found || err != nil {
17,585✔
63
                return val, err
1,055✔
64
        }
1,055✔
65

66
        // Increment refs on all SSTables we're about to use
67
        incRefLevels(levels)
15,475✔
68
        defer decRefLevels(levels)
15,475✔
69

15,475✔
70
        return getFromSSTables(key, levels, cache, verify)
15,475✔
71
}
72

73
// getFromMemtables checks the active memtable and immutables for a key.
74
func getFromMemtables(key []byte, memtable *memtable, immutables []*memtable) (Value, bool, error) {
16,530✔
75
        if entry, found := memtable.Get(key); found {
17,583✔
76
                if entry.Value.IsTombstone() {
1,055✔
77
                        return Value{}, true, ErrKeyNotFound
2✔
78
                }
2✔
79
                return entry.Value, true, nil
1,051✔
80
        }
81

82
        for i := len(immutables) - 1; i >= 0; i-- {
15,525✔
83
                if entry, found := immutables[i].Get(key); found {
50✔
84
                        if entry.Value.IsTombstone() {
3✔
85
                                return Value{}, true, ErrKeyNotFound
1✔
86
                        }
1✔
87
                        return entry.Value, true, nil
1✔
88
                }
89
        }
90

91
        return Value{}, false, nil
15,475✔
92
}
93

94
// getFromSSTables searches SSTables level by level for a key.
95
func getFromSSTables(key []byte, levels [][]*SSTable, cache *lruCache, verify bool) (Value, error) {
15,475✔
96
        for level := 0; level < len(levels); level++ {
45,307✔
97
                tables := levels[level]
29,832✔
98
                if level == 0 {
45,307✔
99
                        if val, found, err := getFromL0(key, tables, cache, verify); found || err != nil {
17,854✔
100
                                return val, err
2,379✔
101
                        }
2,379✔
102
                } else {
14,357✔
103
                        if val, found, err := getFromSortedLevel(key, tables, cache, verify); found || err != nil {
27,165✔
104
                                return val, err
12,808✔
105
                        }
12,808✔
106
                }
107
        }
108
        return Value{}, ErrKeyNotFound
288✔
109
}
110

111
// getFromL0 checks all L0 tables (newest first, may have overlapping keys).
112
func getFromL0(key []byte, tables []*SSTable, cache *lruCache, verify bool) (Value, bool, error) {
15,475✔
113
        for i := len(tables) - 1; i >= 0; i-- {
22,691✔
114
                entry, found, err := tables[i].Get(key, cache, verify)
7,216✔
115
                if err != nil {
7,216✔
UNCOV
116
                        return Value{}, false, err
×
UNCOV
117
                }
×
118
                if found {
9,595✔
119
                        if entry.Value.IsTombstone() {
2,400✔
120
                                return Value{}, true, ErrKeyNotFound
21✔
121
                        }
21✔
122
                        return entry.Value, true, nil
2,358✔
123
                }
124
        }
125
        return Value{}, false, nil
13,096✔
126
}
127

128
// getFromSortedLevel checks a sorted level (L1+) using binary search.
129
func getFromSortedLevel(key []byte, tables []*SSTable, cache *lruCache, verify bool) (Value, bool, error) {
14,357✔
130
        idx := findTableForKey(tables, key)
14,357✔
131
        if idx < 0 {
15,905✔
132
                return Value{}, false, nil
1,548✔
133
        }
1,548✔
134
        entry, found, err := tables[idx].Get(key, cache, verify)
12,809✔
135
        if err != nil {
12,809✔
UNCOV
136
                return Value{}, false, err
×
UNCOV
137
        }
×
138
        if found {
25,617✔
139
                if entry.Value.IsTombstone() {
13,160✔
140
                        return Value{}, true, ErrKeyNotFound
352✔
141
                }
352✔
142
                return entry.Value, true, nil
12,456✔
143
        }
144
        return Value{}, false, nil
1✔
145
}
146

147
// findTableForKey finds the SSTable that may contain the key (for L1+).
148
// Returns the index of the table, or -1 if not found.
149
func findTableForKey(tables []*SSTable, key []byte) int {
14,357✔
150
        lo, hi := 0, len(tables)-1
14,357✔
151

14,357✔
152
        for lo <= hi {
49,835✔
153
                mid := (lo + hi) / 2
35,478✔
154
                minKey := tables[mid].MinKey()
35,478✔
155
                maxKey := tables[mid].MaxKey()
35,478✔
156

35,478✔
157
                // Check if key is in range [MinKey, MaxKey]
35,478✔
158
                if CompareKeys(key, minKey) >= 0 && CompareKeys(key, maxKey) <= 0 {
48,287✔
159
                        return mid
12,809✔
160
                }
12,809✔
161

162
                if CompareKeys(key, minKey) < 0 {
33,165✔
163
                        hi = mid - 1
10,496✔
164
                } else {
22,669✔
165
                        lo = mid + 1
12,173✔
166
                }
12,173✔
167
        }
168

169
        return -1
1,548✔
170
}
171

172
// findTableForPrefix finds the first SSTable that may contain keys with the given prefix (for L1+).
173
// Returns the index of the first matching table, or -1 if no table could contain the prefix.
174
func findTableForPrefix(tables []*SSTable, prefix []byte) int {
588✔
175
        if len(tables) == 0 {
1,169✔
176
                return -1
581✔
177
        }
581✔
178

179
        // Binary search to find the first table where prefix might exist
180
        lo, hi := 0, len(tables)-1
7✔
181
        result := -1
7✔
182

7✔
183
        for lo <= hi {
19✔
184
                mid := (lo + hi) / 2
12✔
185
                maxKey := tables[mid].MaxKey()
12✔
186

12✔
187
                // If prefix <= maxKey (prefix-wise), this table or an earlier one might contain matches
12✔
188
                prefixBeforeOrAtMax := true
12✔
189
                if len(maxKey) >= len(prefix) {
24✔
190
                        for i := 0; i < len(prefix); i++ {
59✔
191
                                if prefix[i] < maxKey[i] {
49✔
192
                                        break
2✔
193
                                } else if prefix[i] > maxKey[i] {
46✔
194
                                        prefixBeforeOrAtMax = false
1✔
195
                                        break
1✔
196
                                }
197
                        }
198
                }
199

200
                if prefixBeforeOrAtMax {
23✔
201
                        // This table might contain matches, but check if there's an earlier one
11✔
202
                        if hasKeyInRange(prefix, tables[mid].MinKey(), maxKey) {
21✔
203
                                result = mid
10✔
204
                        }
10✔
205
                        hi = mid - 1
11✔
206
                } else {
1✔
207
                        // Prefix is after this table's maxKey, look right
1✔
208
                        lo = mid + 1
1✔
209
                }
1✔
210
        }
211

212
        return result
7✔
213
}
214

215
// Setmemtable updates the active memtable.
216
func (r *reader) Setmemtable(mt *memtable) {
2,054✔
217
        r.mu.Lock()
2,054✔
218
        defer r.mu.Unlock()
2,054✔
219
        r.memtable = mt
2,054✔
220
}
2,054✔
221

222
// AddImmutable adds an immutable memtable.
223
func (r *reader) AddImmutable(mt *memtable) {
2,056✔
224
        r.mu.Lock()
2,056✔
225
        defer r.mu.Unlock()
2,056✔
226
        r.immutables = append(r.immutables, mt)
2,056✔
227
}
2,056✔
228

229
// RemoveImmutable removes an immutable memtable after it's been flushed.
230
func (r *reader) RemoveImmutable(mt *memtable) {
2,401✔
231
        r.mu.Lock()
2,401✔
232
        defer r.mu.Unlock()
2,401✔
233
        for i, imm := range r.immutables {
4,793✔
234
                if imm == mt {
4,446✔
235
                        r.immutables = append(r.immutables[:i], r.immutables[i+1:]...)
2,054✔
236
                        return
2,054✔
237
                }
2,054✔
238
        }
239
}
240

241
// SetLevels updates the SSTable levels.
242
func (r *reader) SetLevels(levels [][]*SSTable) {
967✔
243
        r.mu.Lock()
967✔
244
        defer r.mu.Unlock()
967✔
245
        r.levels = levels
967✔
246
}
967✔
247

248
// AddSSTable adds an SSTable to a level.
249
func (r *reader) AddSSTable(level int, sst *SSTable) {
2,402✔
250
        r.mu.Lock()
2,402✔
251
        defer r.mu.Unlock()
2,402✔
252

2,402✔
253
        // Ensure the level exists
2,402✔
254
        for len(r.levels) <= level {
2,408✔
255
                r.levels = append(r.levels, nil)
6✔
256
        }
6✔
257

258
        r.levels[level] = append(r.levels[level], sst)
2,402✔
259
}
260

261
// GetLevels returns a copy of the current levels.
262
func (r *reader) GetLevels() [][]*SSTable {
5,546✔
263
        r.mu.RLock()
5,546✔
264
        defer r.mu.RUnlock()
5,546✔
265

5,546✔
266
        levels := make([][]*SSTable, len(r.levels))
5,546✔
267
        for i, level := range r.levels {
39,712✔
268
                levels[i] = make([]*SSTable, len(level))
34,166✔
269
                copy(levels[i], level)
34,166✔
270
        }
34,166✔
271
        return levels
5,546✔
272
}
273

274
// ScanPrefix iterates over all keys with the given prefix in sorted order.
275
// Keys are deduplicated (newest version wins) and tombstones are skipped.
276
// Return false from the callback to stop iteration early.
277
// Keys and values passed to the callback are copies owned by the caller.
278
// Note: Streams entries directly without buffering for constant memory usage.
279
func (r *reader) ScanPrefix(prefix []byte, fn func(key []byte, value Value) bool) error {
97✔
280
        _, err := r.ScanPrefixWithStats(prefix, fn, nil)
97✔
281
        return err
97✔
282
}
97✔
283

284
// ScanPrefixWithStats is like ScanPrefix but also returns scan statistics.
285
// The progress callback (if non-nil) is called periodically during the scan
286
// with current stats. Return false from progress to stop the scan early.
287
// Note: This streams entries directly to the callback without buffering,
288
// so it uses constant memory regardless of result size.
289
func (r *reader) ScanPrefixWithStats(prefix []byte, fn func(key []byte, value Value) bool, progress ScanProgress) (ScanStats, error) {
98✔
290
        scanner := r.setupPrefixScanner(prefix)
98✔
291
        defer scanner.close()
98✔
292

98✔
293
        return r.scanPrefixLoop(scanner, prefix, fn, progress)
98✔
294
}
98✔
295

296
// setupPrefixScanner creates and initializes a prefix scanner with all sources.
297
func (r *reader) setupPrefixScanner(prefix []byte) *prefixScanner {
98✔
298
        r.mu.RLock()
98✔
299
        memtable := r.memtable
98✔
300
        immutables := copyImmutables(r.immutables)
98✔
301
        levels := copyLevels(r.levels)
98✔
302
        cache := r.cache
98✔
303
        verify := r.opts.VerifyChecksums
98✔
304
        r.mu.RUnlock()
98✔
305

98✔
306
        // Increment refs on all SSTables to prevent them from being closed during scan
98✔
307
        incRefLevels(levels)
98✔
308

98✔
309
        scanner := newPrefixScanner(prefix, cache, verify)
98✔
310
        scanner.refTables = levels // Store for cleanup
98✔
311

98✔
312
        scanner.addmemtable(memtable, 0)
98✔
313

98✔
314
        for i := len(immutables) - 1; i >= 0; i-- {
99✔
315
                scanner.addmemtable(immutables[i], len(immutables)-i)
1✔
316
        }
1✔
317

318
        baseIdx := len(immutables) + 1
98✔
319
        sortedTables := r.collectPrefixTables(levels, prefix, scanner, &baseIdx)
98✔
320

98✔
321
        if len(sortedTables) > 0 {
105✔
322
                scanner.queueSortedTables(sortedTables, baseIdx)
7✔
323
        }
7✔
324

325
        scanner.init()
98✔
326
        return scanner
98✔
327
}
328

329
// collectPrefixTables adds L0 tables to scanner and returns L1+ tables for lazy loading.
330
func (r *reader) collectPrefixTables(levels [][]*SSTable, prefix []byte, scanner *prefixScanner, baseIdx *int) []*SSTable {
98✔
331
        var sortedTables []*SSTable
98✔
332

98✔
333
        for level := 0; level < len(levels); level++ {
784✔
334
                tables := levels[level]
686✔
335
                if level == 0 {
784✔
336
                        for i := len(tables) - 1; i >= 0; i-- {
181✔
337
                                scanner.addSSTable(tables[i], *baseIdx)
83✔
338
                                *baseIdx++
83✔
339
                        }
83✔
340
                } else {
588✔
341
                        startIdx := findTableForPrefix(tables, prefix)
588✔
342
                        if startIdx >= 0 {
595✔
343
                                for i := startIdx; i < len(tables); i++ {
26✔
344
                                        if hasKeyInRange(prefix, tables[i].MinKey(), tables[i].MaxKey()) {
36✔
345
                                                sortedTables = append(sortedTables, tables[i])
17✔
346
                                        } else {
19✔
347
                                                break
2✔
348
                                        }
349
                                }
350
                        }
351
                }
352
        }
353
        return sortedTables
98✔
354
}
355

356
// scanPrefixLoop iterates through entries, calling fn for each matching key.
357
func (r *reader) scanPrefixLoop(scanner *prefixScanner, prefix []byte, fn func(key []byte, value Value) bool, progress ScanProgress) (ScanStats, error) {
98✔
358
        var lastKey []byte
98✔
359
        var progressCount int64
98✔
360

98✔
361
        for scanner.next() {
2,936✔
362
                entry := scanner.entry()
2,838✔
363

2,838✔
364
                progressCount++
2,838✔
365
                if progress != nil && progressCount%10000 == 0 {
2,838✔
UNCOV
366
                        if !progress(scanner.stats) {
×
UNCOV
367
                                break
×
368
                        }
369
                }
370

371
                if lastKey != nil && CompareKeys(entry.Key, lastKey) == 0 {
3,147✔
372
                        continue
309✔
373
                }
374

375
                lastKey = make([]byte, len(entry.Key))
2,529✔
376
                copy(lastKey, entry.Key)
2,529✔
377

2,529✔
378
                if entry.Value.IsTombstone() {
2,560✔
379
                        continue
31✔
380
                }
381

382
                if !hasPrefix(entry.Key, prefix) {
2,498✔
UNCOV
383
                        break
×
384
                }
385

386
                keyCopy := make([]byte, len(entry.Key))
2,498✔
387
                copy(keyCopy, entry.Key)
2,498✔
388

2,498✔
389
                if !fn(keyCopy, copyValue(entry.Value)) {
2,505✔
390
                        break
7✔
391
                }
392
        }
393

394
        return scanner.stats, nil
98✔
395
}
396

397
// copyValue creates a deep copy of a Value.
398
func copyValue(v Value) Value {
2,498✔
399
        result := Value{
2,498✔
400
                Type:    v.Type,
2,498✔
401
                Int64:   v.Int64,
2,498✔
402
                Float64: v.Float64,
2,498✔
403
                Bool:    v.Bool,
2,498✔
404
        }
2,498✔
405
        if v.Bytes != nil {
4,045✔
406
                result.Bytes = make([]byte, len(v.Bytes))
1,547✔
407
                copy(result.Bytes, v.Bytes)
1,547✔
408
        }
1,547✔
409
        if v.Pointer != nil {
2,498✔
UNCOV
410
                result.Pointer = &dataPointer{
×
UNCOV
411
                        FileID:      v.Pointer.FileID,
×
UNCOV
412
                        BlockOffset: v.Pointer.BlockOffset,
×
UNCOV
413
                        DataOffset:  v.Pointer.DataOffset,
×
UNCOV
414
                        Length:      v.Pointer.Length,
×
UNCOV
415
                }
×
UNCOV
416
        }
×
417
        if v.Record != nil {
2,501✔
418
                result.Record = make(map[string]any, len(v.Record))
3✔
419
                for k, val := range v.Record {
9✔
420
                        result.Record[k] = val
6✔
421
                }
6✔
422
        }
423
        return result
2,498✔
424
}
425

426
// copyLevels creates a deep copy of the levels slice.
427
// This is needed to prevent race conditions when readers snapshot state
428
// while writers are modifying the levels.
429
func copyLevels(levels [][]*SSTable) [][]*SSTable {
16,638✔
430
        result := make([][]*SSTable, len(levels))
16,638✔
431
        for i, level := range levels {
122,045✔
432
                result[i] = make([]*SSTable, len(level))
105,407✔
433
                copy(result[i], level)
105,407✔
434
        }
105,407✔
435
        return result
16,638✔
436
}
437

438
// copyImmutables creates a copy of the immutables slice.
439
func copyImmutables(immutables []*memtable) []*memtable {
16,638✔
440
        result := make([]*memtable, len(immutables))
16,638✔
441
        copy(result, immutables)
16,638✔
442
        return result
16,638✔
443
}
16,638✔
444

445
// incRefLevels increments reference counts on all SSTables in the levels.
446
func incRefLevels(levels [][]*SSTable) {
15,583✔
447
        for _, level := range levels {
113,624✔
448
                for _, sst := range level {
488,635✔
449
                        sst.IncRef()
390,594✔
450
                }
390,594✔
451
        }
452
}
453

454
// decRefLevels decrements reference counts on all SSTables in the levels.
455
func decRefLevels(levels [][]*SSTable) {
15,583✔
456
        for _, level := range levels {
113,624✔
457
                for _, sst := range level {
488,635✔
458
                        sst.DecRef()
390,594✔
459
                }
390,594✔
460
        }
461
}
462

463
// hasPrefix returns true if key starts with prefix.
464
func hasPrefix(key, prefix []byte) bool {
10,664✔
465
        if len(key) < len(prefix) {
10,671✔
466
                return false
7✔
467
        }
7✔
468
        for i := 0; i < len(prefix); i++ {
57,218✔
469
                if key[i] != prefix[i] {
46,587✔
470
                        return false
26✔
471
                }
26✔
472
        }
473
        return true
10,631✔
474
}
475

476
// hasKeyInRange returns true if a key with the given prefix might exist in [minKey, maxKey].
477
func hasKeyInRange(prefix, minKey, maxKey []byte) bool {
157✔
478
        // Prefix is before or equal to maxKey AND prefix+\xff... is >= minKey
157✔
479
        // Simplified: prefix <= maxKey (prefix-wise) AND minKey prefix-matches or is < prefix
157✔
480
        if len(maxKey) >= len(prefix) {
311✔
481
                for i := 0; i < len(prefix); i++ {
591✔
482
                        if prefix[i] < maxKey[i] {
470✔
483
                                break // prefix < maxKey prefix, OK
33✔
484
                        } else if prefix[i] > maxKey[i] {
414✔
485
                                return false // prefix > maxKey, no match possible
10✔
486
                        }
10✔
487
                }
488
        } else {
3✔
489
                // maxKey is shorter than prefix - check if maxKey is a prefix of prefix
3✔
490
                for i := 0; i < len(maxKey); i++ {
12✔
491
                        if prefix[i] < maxKey[i] {
10✔
492
                                break
1✔
493
                        } else if prefix[i] > maxKey[i] {
10✔
494
                                return false
2✔
495
                        }
2✔
496
                }
497
        }
498

499
        // Check if minKey could have a prefix match
500
        if len(minKey) >= len(prefix) {
289✔
501
                for i := 0; i < len(prefix); i++ {
564✔
502
                        if minKey[i] < prefix[i] {
442✔
503
                                return true // minKey < prefix, might have prefix matches after minKey
22✔
504
                        } else if minKey[i] > prefix[i] {
429✔
505
                                return false // minKey > prefix+\xff..., no prefix matches
9✔
506
                        }
9✔
507
                }
508
                return true // minKey starts with prefix
113✔
509
        }
510
        // minKey is shorter - if minKey < prefix, OK
511
        for i := 0; i < len(minKey); i++ {
4✔
512
                if minKey[i] < prefix[i] {
3✔
UNCOV
513
                        return true
×
514
                } else if minKey[i] > prefix[i] {
3✔
UNCOV
515
                        return false
×
UNCOV
516
                }
×
517
        }
518
        return true // minKey is prefix of prefix
1✔
519
}
520

521
// prefixScanner merges entries from memtables and SSTables for prefix scanning.
522
type prefixScanner struct {
523
        prefix  []byte
524
        cache   *lruCache
525
        verify  bool
526
        heap    prefixHeap
527
        current Entry
528
        stats   ScanStats // Tracks blocks loaded and keys examined
529

530
        // Lazy loading for L1+ tables (sorted, non-overlapping)
531
        // Only one L1+ table per level needs to be in the heap at a time
532
        pendingTables []*SSTable // Tables waiting to be added (sorted by minKey)
533
        pendingIdx    int        // Next table to add from pendingTables
534
        basePriority  int        // Priority for pending tables
535

536
        // Reference counting: all SSTables we hold refs on
537
        refTables [][]*SSTable
538
}
539

540
type prefixHeapEntry struct {
541
        entry    Entry
542
        priority int // lower = newer/higher priority
543
        source   prefixSource
544
}
545

546
type prefixSource interface {
547
        next() bool
548
        entry() Entry
549
        close()
550
}
551

552
type prefixHeap []prefixHeapEntry
553

554
func (h prefixHeap) less(i, j int) bool {
7,477✔
555
        cmp := CompareKeys(h[i].entry.Key, h[j].entry.Key)
7,477✔
556
        if cmp != 0 {
14,030✔
557
                return cmp < 0
6,553✔
558
        }
6,553✔
559
        return h[i].priority < h[j].priority
924✔
560
}
561

562
func (h *prefixHeap) push(x prefixHeapEntry) {
2,740✔
563
        *h = append(*h, x)
2,740✔
564
        h.up(len(*h) - 1)
2,740✔
565
}
2,740✔
566

567
func (h *prefixHeap) pop() prefixHeapEntry {
2,838✔
568
        old := *h
2,838✔
569
        n := len(old) - 1
2,838✔
570
        old[0], old[n] = old[n], old[0]
2,838✔
571
        h.down(0, n)
2,838✔
572
        x := old[n]
2,838✔
573
        *h = old[:n]
2,838✔
574
        return x
2,838✔
575
}
2,838✔
576

577
func (h prefixHeap) up(j int) {
2,740✔
578
        for {
8,191✔
579
                i := (j - 1) / 2
5,451✔
580
                if i == j || !h.less(j, i) {
8,191✔
581
                        break
2,740✔
582
                }
583
                h[i], h[j] = h[j], h[i]
2,711✔
584
                j = i
2,711✔
585
        }
586
}
587

588
func (h prefixHeap) down(i, n int) {
2,859✔
589
        for {
7,561✔
590
                j1 := 2*i + 1
4,702✔
591
                if j1 >= n || j1 < 0 {
7,036✔
592
                        break
2,334✔
593
                }
594
                j := j1
2,368✔
595
                if j2 := j1 + 1; j2 < n && h.less(j2, j1) {
2,826✔
596
                        j = j2
458✔
597
                }
458✔
598
                if !h.less(j, i) {
2,893✔
599
                        break
525✔
600
                }
601
                h[i], h[j] = h[j], h[i]
1,843✔
602
                i = j
1,843✔
603
        }
604
}
605

606
func (h *prefixHeap) init() {
98✔
607
        n := len(*h)
98✔
608
        for i := n/2 - 1; i >= 0; i-- {
119✔
609
                h.down(i, n)
21✔
610
        }
21✔
611
}
612

613
func newPrefixScanner(prefix []byte, cache *lruCache, verify bool) *prefixScanner {
98✔
614
        return &prefixScanner{
98✔
615
                prefix: prefix,
98✔
616
                cache:  cache,
98✔
617
                verify: verify,
98✔
618
                heap:   make(prefixHeap, 0, 8),
98✔
619
        }
98✔
620
}
98✔
621

622
// queueSortedTables queues L1+ tables for lazy loading.
623
// Tables must be sorted by minKey and non-overlapping.
624
// Only the first table is added to the heap immediately; others are loaded on demand.
625
func (s *prefixScanner) queueSortedTables(tables []*SSTable, basePriority int) {
7✔
626
        if len(tables) == 0 {
7✔
UNCOV
627
                return
×
628
        }
×
629

630
        s.pendingTables = tables
7✔
631
        s.pendingIdx = 0
7✔
632
        s.basePriority = basePriority
7✔
633

7✔
634
        // Add only the first table immediately
7✔
635
        s.addNextPendingTable()
7✔
636
}
637

638
// addNextPendingTable adds the next pending L1+ table to the heap.
639
func (s *prefixScanner) addNextPendingTable() {
17✔
640
        for s.pendingIdx < len(s.pendingTables) {
34✔
641
                sst := s.pendingTables[s.pendingIdx]
17✔
642
                s.pendingIdx++
17✔
643

17✔
644
                // Check if this table could have matching keys
17✔
645
                if !hasKeyInRange(s.prefix, sst.MinKey(), sst.MaxKey()) {
17✔
UNCOV
646
                        continue
×
647
                }
648

649
                atomic.AddInt64(&s.stats.TablesChecked, 1)
17✔
650

17✔
651
                src := &sstablePrefixSource{
17✔
652
                        sst:    sst,
17✔
653
                        prefix: s.prefix,
17✔
654
                        cache:  s.cache,
17✔
655
                        verify: s.verify,
17✔
656
                        stats:  &s.stats,
17✔
657
                }
17✔
658
                src.seekToPrefix()
17✔
659
                if src.valid {
34✔
660
                        atomic.AddInt64(&s.stats.TablesAdded, 1)
17✔
661
                        s.heap.push(prefixHeapEntry{
17✔
662
                                entry:    src.entry(),
17✔
663
                                priority: s.basePriority,
17✔
664
                                source:   src,
17✔
665
                        })
17✔
666
                        return // Only add one table at a time
17✔
667
                }
17✔
UNCOV
668
                src.close()
×
669
        }
670
}
671

672
func (s *prefixScanner) addmemtable(mt *memtable, priority int) {
99✔
673
        src := &memtablePrefixSource{mt: mt, prefix: s.prefix}
99✔
674
        src.seekToPrefix()
99✔
675
        if src.valid {
145✔
676
                s.heap = append(s.heap, prefixHeapEntry{
46✔
677
                        entry:    src.entry(),
46✔
678
                        priority: priority,
46✔
679
                        source:   src,
46✔
680
                })
46✔
681
        } else {
99✔
682
                // No matching entries, close the iterator to release lock
53✔
683
                src.close()
53✔
684
        }
53✔
685
}
686

687
func (s *prefixScanner) addSSTable(sst *SSTable, priority int) {
83✔
688
        atomic.AddInt64(&s.stats.TablesChecked, 1)
83✔
689

83✔
690
        src := &sstablePrefixSource{
83✔
691
                sst:    sst,
83✔
692
                prefix: s.prefix,
83✔
693
                cache:  s.cache,
83✔
694
                verify: s.verify,
83✔
695
                stats:  &s.stats,
83✔
696
        }
83✔
697
        src.seekToPrefix()
83✔
698
        if src.valid {
150✔
699
                atomic.AddInt64(&s.stats.TablesAdded, 1)
67✔
700
                entry := src.entry()
67✔
701
                // Debug: ALWAYS log initial entry for debugging
67✔
702
                if len(s.prefix) > 0 && !hasPrefix(entry.Key, s.prefix) {
67✔
UNCOV
703
                        fmt.Fprintf(os.Stderr, "[DEBUG] SSTable %d (priority %d) adding NON-MATCHING initial key %x (prefix %x)\n",
×
UNCOV
704
                                sst.ID, priority, entry.Key, s.prefix)
×
705
                        fmt.Fprintf(os.Stderr, "[DEBUG]   minKey=%x maxKey=%x blockIdx=%d entryIdx=%d\n",
×
706
                                sst.MinKey(), sst.MaxKey(), src.blockIdx, src.entryIdx)
×
707
                }
×
708
                s.heap = append(s.heap, prefixHeapEntry{
67✔
709
                        entry:    entry,
67✔
710
                        priority: priority,
67✔
711
                        source:   src,
67✔
712
                })
67✔
713
        } else {
16✔
714
                // No matching entries, close any resources
16✔
715
                src.close()
16✔
716
        }
16✔
717
}
718

719
func (s *prefixScanner) init() {
98✔
720
        s.heap.init()
98✔
721
}
98✔
722

723
var debugPushCount int
724

725
func (s *prefixScanner) next() bool {
2,929✔
726
        if len(s.heap) == 0 {
3,020✔
727
                return false
91✔
728
        }
91✔
729

730
        he := s.heap.pop()
2,838✔
731
        s.current = he.entry
2,838✔
732
        s.stats.KeysExamined++
2,838✔
733

2,838✔
734
        // Debug: verify the popped entry matches our prefix
2,838✔
735
        if len(s.prefix) > 0 && !hasPrefix(he.entry.Key, s.prefix) {
2,838✔
UNCOV
736
                fmt.Fprintf(os.Stderr, "[DEBUG] POPPED non-matching key %x (prefix %x) at priority %d\n",
×
UNCOV
737
                        he.entry.Key, s.prefix, he.priority)
×
UNCOV
738
        }
×
739

740
        if he.source.next() {
5,561✔
741
                newEntry := he.source.entry()
2,723✔
742
                debugPushCount++
2,723✔
743
                // Debug: check if source is pushing non-matching key
2,723✔
744
                if len(s.prefix) > 0 && !hasPrefix(newEntry.Key, s.prefix) {
2,723✔
745
                        fmt.Fprintf(os.Stderr, "[DEBUG] Push %d: non-matching key %x after %x (prefix %x)\n",
×
746
                                debugPushCount, newEntry.Key, he.entry.Key, s.prefix)
×
UNCOV
747
                }
×
748
                // Debug: check if key ordering is violated (new key < old key is bad for same source)
749
                if len(s.prefix) > 0 && CompareKeys(newEntry.Key, he.entry.Key) < 0 {
2,723✔
UNCOV
750
                        fmt.Fprintf(os.Stderr, "[DEBUG] Push %d: KEY ORDER VIOLATION! new=%x < old=%x\n",
×
751
                                debugPushCount, newEntry.Key, he.entry.Key)
×
752
                }
×
753
                s.heap.push(prefixHeapEntry{
2,723✔
754
                        entry:    newEntry,
2,723✔
755
                        priority: he.priority,
2,723✔
756
                        source:   he.source,
2,723✔
757
                })
2,723✔
758
        } else {
115✔
759
                // Source exhausted, close it to release any held locks
115✔
760
                he.source.close()
115✔
761

115✔
762
                // If this was a L1+ table source, try to add the next pending table
115✔
763
                // This implements lazy loading - we only add L1+ tables as needed
115✔
764
                if s.pendingIdx < len(s.pendingTables) {
125✔
765
                        s.addNextPendingTable()
10✔
766
                }
10✔
767
        }
768

769
        return true
2,838✔
770
}
771

772
func (s *prefixScanner) entry() Entry {
2,838✔
773
        return s.current
2,838✔
774
}
2,838✔
775

776
func (s *prefixScanner) close() {
98✔
777
        for _, he := range s.heap {
113✔
778
                he.source.close()
15✔
779
        }
15✔
780
        // Decrement refs on all SSTables we were holding
781
        decRefLevels(s.refTables)
98✔
782
}
783

784
// memtablePrefixSource wraps a memtable iterator for prefix scanning.
785
type memtablePrefixSource struct {
786
        mt     *memtable
787
        prefix []byte
788
        iter   *memtableIterator
789
        valid  bool
790
}
791

792
func (s *memtablePrefixSource) seekToPrefix() {
99✔
793
        s.iter = s.mt.Iterator()
99✔
794
        if s.iter.Seek(s.prefix) {
149✔
795
                s.valid = hasPrefix(s.iter.Key(), s.prefix)
50✔
796
        }
50✔
797
}
798

799
func (s *memtablePrefixSource) next() bool {
356✔
800
        if !s.iter.Next() {
391✔
801
                s.valid = false
35✔
802
                return false
35✔
803
        }
35✔
804
        s.valid = hasPrefix(s.iter.Key(), s.prefix)
321✔
805
        return s.valid
321✔
806
}
807

808
func (s *memtablePrefixSource) entry() Entry {
358✔
809
        return s.iter.Entry()
358✔
810
}
358✔
811

812
func (s *memtablePrefixSource) close() {
99✔
813
        if s.iter != nil {
198✔
814
                s.iter.Close()
99✔
815
        }
99✔
816
}
817

818
// sstablePrefixSource wraps an SSTable for prefix scanning.
819
type sstablePrefixSource struct {
820
        sst       *SSTable
821
        prefix    []byte
822
        cache     *lruCache
823
        verify    bool
824
        blockIdx  int
825
        entryIdx  int
826
        block     *Block
827
        valid     bool
828
        fromCache bool       // True if current block came from cache (don't release it)
829
        stats     *ScanStats // Shared stats counter (may be nil)
830
}
831

832
func (s *sstablePrefixSource) seekToPrefix() {
100✔
833
        if err := s.sst.ensureIndex(); err != nil {
100✔
UNCOV
834
                s.valid = false
×
UNCOV
835
                return
×
UNCOV
836
        }
×
837

838
        if !hasKeyInRange(s.prefix, s.sst.Index.MinKey, s.sst.Index.MaxKey) {
113✔
839
                s.valid = false
13✔
840
                return
13✔
841
        }
13✔
842

843
        if !s.findFirstMatchingBlock() {
87✔
UNCOV
844
                return
×
UNCOV
845
        }
×
846

847
        if err := s.loadBlock(); err != nil {
87✔
UNCOV
848
                s.valid = false
×
UNCOV
849
                return
×
UNCOV
850
        }
×
851

852
        if s.findFirstMatchingEntry() {
172✔
853
                return
85✔
854
        }
85✔
855

856
        s.searchNextBlocks()
2✔
857
}
858

859
// findFirstMatchingBlock locates the starting block for prefix search.
860
// Returns false if no block could contain the prefix.
861
func (s *sstablePrefixSource) findFirstMatchingBlock() bool {
87✔
862
        s.blockIdx = s.sst.Index.Search(s.prefix)
87✔
863
        if s.blockIdx < 0 {
155✔
864
                if hasPrefix(s.sst.Index.MinKey, s.prefix) {
136✔
865
                        s.blockIdx = 0
68✔
866
                        return true
68✔
867
                }
68✔
868
                s.valid = false
×
869
                return false
×
870
        }
871
        return true
19✔
872
}
873

874
// findFirstMatchingEntry scans the current block for the first entry >= prefix.
875
// Returns true if found (sets s.valid), false if should continue to next block.
876
func (s *sstablePrefixSource) findFirstMatchingEntry() bool {
87✔
877
        for s.entryIdx = 0; s.entryIdx < len(s.block.Entries); s.entryIdx++ {
358✔
878
                if CompareKeys(s.block.Entries[s.entryIdx].Key, s.prefix) >= 0 {
356✔
879
                        s.valid = hasPrefix(s.block.Entries[s.entryIdx].Key, s.prefix)
85✔
880
                        return true
85✔
881
                }
85✔
882
        }
883
        return false
2✔
884
}
885

886
// canSkipRemainingBlocks checks if the block's first key indicates no prefix matches exist.
887
func (s *sstablePrefixSource) canSkipRemainingBlocks(blockFirstKey []byte) bool {
2✔
888
        return CompareKeys(blockFirstKey, s.prefix) > 0 && !hasPrefix(blockFirstKey, s.prefix)
2✔
889
}
2✔
890

891
// searchNextBlocks continues searching subsequent blocks for prefix matches.
892
func (s *sstablePrefixSource) searchNextBlocks() {
2✔
893
        for {
4✔
894
                if s.block != nil && !s.fromCache {
2✔
895
                        s.block.Release()
×
UNCOV
896
                }
×
897
                s.block = nil
2✔
898
                s.blockIdx++
2✔
899

2✔
900
                if s.blockIdx >= len(s.sst.Index.Entries) {
2✔
UNCOV
901
                        s.valid = false
×
UNCOV
902
                        return
×
UNCOV
903
                }
×
904

905
                blockFirstKey := s.sst.Index.Entries[s.blockIdx].Key
2✔
906
                if s.canSkipRemainingBlocks(blockFirstKey) {
3✔
907
                        s.valid = false
1✔
908
                        return
1✔
909
                }
1✔
910

911
                if err := s.loadBlock(); err != nil {
1✔
UNCOV
912
                        s.valid = false
×
913
                        return
×
914
                }
×
915

916
                if len(s.block.Entries) == 0 {
1✔
UNCOV
917
                        continue
×
918
                }
919

920
                for s.entryIdx = 0; s.entryIdx < len(s.block.Entries); s.entryIdx++ {
2✔
921
                        key := s.block.Entries[s.entryIdx].Key
1✔
922
                        if CompareKeys(key, s.prefix) >= 0 {
2✔
923
                                s.valid = hasPrefix(key, s.prefix)
1✔
924
                                return
1✔
925
                        }
1✔
926
                }
927
        }
928
}
929

930
func (s *sstablePrefixSource) loadBlock() error {
170✔
931
        if s.blockIdx >= len(s.sst.Index.Entries) {
170✔
932
                return ErrKeyNotFound
×
933
        }
×
934

935
        // Release previous block if we owned it
936
        if s.block != nil && !s.fromCache {
170✔
UNCOV
937
                s.block.Release()
×
UNCOV
938
                s.block = nil
×
UNCOV
939
        }
×
940

941
        ie := s.sst.Index.Entries[s.blockIdx]
170✔
942
        cacheKey := cacheKey{FileID: s.sst.ID, BlockOffset: ie.BlockOffset}
170✔
943

170✔
944
        // Try cache first
170✔
945
        if s.cache != nil {
340✔
946
                if cached, found := s.cache.Get(cacheKey); found {
186✔
947
                        s.block = cached
16✔
948
                        s.fromCache = true
16✔
949
                        if s.stats != nil {
32✔
950
                                atomic.AddInt64(&s.stats.BlocksLoaded, 1)
16✔
951
                                atomic.AddInt64(&s.stats.BlocksCacheHit, 1)
16✔
952
                        }
16✔
953
                        return nil
16✔
954
                }
955
        }
956

957
        // Read from disk
958
        blockData := make([]byte, ie.BlockSize)
154✔
959
        if _, err := s.sst.file.ReadAt(blockData, int64(ie.BlockOffset)); err != nil {
154✔
UNCOV
960
                return err
×
UNCOV
961
        }
×
962

963
        block, err := DecodeBlock(blockData, s.verify)
154✔
964
        if err != nil {
154✔
UNCOV
965
                return err
×
UNCOV
966
        }
×
967

968
        if s.cache != nil {
308✔
969
                s.cache.Put(cacheKey, block)
154✔
970
                s.fromCache = true // Now in cache, don't release
154✔
971
        } else {
154✔
UNCOV
972
                s.fromCache = false // Not cached, we own it
×
UNCOV
973
        }
×
974
        s.block = block
154✔
975
        if s.stats != nil {
308✔
976
                atomic.AddInt64(&s.stats.BlocksLoaded, 1)
154✔
977
                atomic.AddInt64(&s.stats.BlocksDiskRead, 1)
154✔
978
        }
154✔
979
        return nil
154✔
980
}
981

982
func (s *sstablePrefixSource) next() bool {
2,482✔
983
        s.entryIdx++
2,482✔
984

2,482✔
985
        // Try next entry in current block
2,482✔
986
        if s.block != nil && s.entryIdx < len(s.block.Entries) {
4,828✔
987
                key := s.block.Entries[s.entryIdx].Key
2,346✔
988
                s.valid = hasPrefix(key, s.prefix)
2,346✔
989
                // Debug: if returning true but key doesn't match, that's a bug
2,346✔
990
                if s.valid && len(s.prefix) > 0 && len(key) > 0 && key[0] != s.prefix[0] {
2,346✔
UNCOV
991
                        fmt.Fprintf(os.Stderr, "[BUG] next() in-block: returning true, key=%x prefix=%x sst=%d block=%d entry=%d\n",
×
UNCOV
992
                                key, s.prefix, s.sst.ID, s.blockIdx, s.entryIdx)
×
UNCOV
993
                }
×
994
                return s.valid
2,346✔
995
        }
996

997
        // Move to next block
998
        s.blockIdx++
136✔
999
        s.entryIdx = 0
136✔
1000

136✔
1001
        if s.blockIdx >= len(s.sst.Index.Entries) {
190✔
1002
                s.valid = false
54✔
1003
                return false
54✔
1004
        }
54✔
1005

1006
        if err := s.loadBlock(); err != nil {
82✔
UNCOV
1007
                s.valid = false
×
UNCOV
1008
                return false
×
UNCOV
1009
        }
×
1010

1011
        if len(s.block.Entries) == 0 {
82✔
UNCOV
1012
                s.valid = false
×
UNCOV
1013
                return false
×
1014
        }
×
1015

1016
        key := s.block.Entries[0].Key
82✔
1017
        s.valid = hasPrefix(key, s.prefix)
82✔
1018
        // Debug: if returning true but key doesn't match, that's a bug
82✔
1019
        if s.valid && len(s.prefix) > 0 && len(key) > 0 && key[0] != s.prefix[0] {
82✔
UNCOV
1020
                fmt.Fprintf(os.Stderr, "[BUG] next() new-block: returning true, key=%x prefix=%x sst=%d block=%d\n",
×
UNCOV
1021
                        key, s.prefix, s.sst.ID, s.blockIdx)
×
UNCOV
1022
        }
×
1023
        return s.valid
82✔
1024
}
1025

1026
func (s *sstablePrefixSource) entry() Entry {
2,495✔
1027
        if !s.valid || s.block == nil || s.entryIdx >= len(s.block.Entries) {
2,495✔
UNCOV
1028
                return Entry{}
×
UNCOV
1029
        }
×
1030
        be := s.block.Entries[s.entryIdx]
2,495✔
1031
        // Decode value from block entry
2,495✔
1032
        val, _, _ := DecodeValue(be.Value)
2,495✔
1033
        // IMPORTANT: Copy the key since the block may be released later
2,495✔
1034
        keyCopy := make([]byte, len(be.Key))
2,495✔
1035
        copy(keyCopy, be.Key)
2,495✔
1036
        return Entry{
2,495✔
1037
                Key:   keyCopy,
2,495✔
1038
                Value: val,
2,495✔
1039
        }
2,495✔
1040
}
1041

1042
func (s *sstablePrefixSource) close() {
100✔
1043
        // Release block if we own it (not from cache)
100✔
1044
        if s.block != nil && !s.fromCache {
100✔
UNCOV
1045
                s.block.Release()
×
UNCOV
1046
                s.block = nil
×
UNCOV
1047
        }
×
1048
}
1049

1050
// ScanRange iterates over all keys in [start, end) in sorted order.
1051
// Keys are deduplicated (newest version wins) and tombstones are skipped.
1052
// Return false from the callback to stop iteration early.
1053
func (r *reader) ScanRange(start, end []byte, fn func(key []byte, value Value) bool) error {
10✔
1054
        r.mu.RLock()
10✔
1055
        memtable := r.memtable
10✔
1056
        immutables := copyImmutables(r.immutables)
10✔
1057
        levels := copyLevels(r.levels)
10✔
1058
        cache := r.cache
10✔
1059
        verify := r.opts.VerifyChecksums
10✔
1060
        r.mu.RUnlock()
10✔
1061

10✔
1062
        // Increment refs on all SSTables to prevent them from being closed during scan
10✔
1063
        incRefLevels(levels)
10✔
1064

10✔
1065
        scanner := r.setupRangeScanner(start, end, memtable, immutables, levels, cache, verify)
10✔
1066
        scanner.refTables = levels // Store for cleanup
10✔
1067
        defer scanner.close()
10✔
1068

10✔
1069
        return runRangeScan(scanner, end, fn)
10✔
1070
}
10✔
1071

1072
// setupRangeScanner creates and configures a range scanner with all sources.
1073
func (r *reader) setupRangeScanner(start, end []byte, memtable *memtable, immutables []*memtable, levels [][]*SSTable, cache *lruCache, verify bool) *rangeScanner {
10✔
1074
        scanner := newRangeScanner(start, end, cache, verify)
10✔
1075

10✔
1076
        scanner.addMemtable(memtable, 0)
10✔
1077

10✔
1078
        for i := len(immutables) - 1; i >= 0; i-- {
10✔
UNCOV
1079
                scanner.addMemtable(immutables[i], len(immutables)-i)
×
UNCOV
1080
        }
×
1081

1082
        baseIdx := len(immutables) + 1
10✔
1083
        addRangeTables(scanner, levels, start, end, &baseIdx)
10✔
1084

10✔
1085
        scanner.init()
10✔
1086
        return scanner
10✔
1087
}
1088

1089
// addRangeTables adds SSTable sources that overlap with the range.
1090
func addRangeTables(scanner *rangeScanner, levels [][]*SSTable, start, end []byte, baseIdx *int) {
10✔
1091
        for level := 0; level < len(levels); level++ {
80✔
1092
                tables := levels[level]
70✔
1093
                if level == 0 {
80✔
1094
                        addL0RangeTables(scanner, tables, start, end, baseIdx)
10✔
1095
                } else {
70✔
1096
                        addSortedRangeTables(scanner, tables, start, end, baseIdx)
60✔
1097
                }
60✔
1098
        }
1099
}
1100

1101
// addL0RangeTables adds overlapping L0 tables (newest first).
1102
func addL0RangeTables(scanner *rangeScanner, tables []*SSTable, start, end []byte, baseIdx *int) {
10✔
1103
        for i := len(tables) - 1; i >= 0; i-- {
21✔
1104
                if rangeOverlaps(start, end, tables[i].MinKey(), tables[i].MaxKey()) {
19✔
1105
                        scanner.addSSTable(tables[i], *baseIdx)
8✔
1106
                        *baseIdx++
8✔
1107
                }
8✔
1108
        }
1109
}
1110

1111
// addSortedRangeTables adds overlapping tables from sorted levels (L1+).
1112
func addSortedRangeTables(scanner *rangeScanner, tables []*SSTable, start, end []byte, baseIdx *int) {
60✔
1113
        for _, t := range tables {
60✔
1114
                if rangeOverlaps(start, end, t.MinKey(), t.MaxKey()) {
×
UNCOV
1115
                        scanner.addSSTable(t, *baseIdx)
×
UNCOV
1116
                        *baseIdx++
×
UNCOV
1117
                }
×
1118
        }
1119
}
1120

1121
// runRangeScan iterates through scanner results and calls fn for each matching entry.
1122
func runRangeScan(scanner *rangeScanner, end []byte, fn func(key []byte, value Value) bool) error {
10✔
1123
        var lastKey []byte
10✔
1124
        for scanner.next() {
332✔
1125
                entry := scanner.entry()
322✔
1126

322✔
1127
                if lastKey != nil && CompareKeys(entry.Key, lastKey) == 0 {
322✔
UNCOV
1128
                        continue
×
1129
                }
1130
                lastKey = entry.Key
322✔
1131

322✔
1132
                if entry.Value.IsTombstone() {
322✔
UNCOV
1133
                        continue
×
1134
                }
1135

1136
                if CompareKeys(entry.Key, end) >= 0 {
322✔
UNCOV
1137
                        break
×
1138
                }
1139

1140
                if !fn(entry.Key, entry.Value) {
323✔
1141
                        break
1✔
1142
                }
1143
        }
1144
        return nil
10✔
1145
}
1146

1147
// rangeOverlaps returns true if [start, end) overlaps with [minKey, maxKey].
1148
func rangeOverlaps(start, end, minKey, maxKey []byte) bool {
28✔
1149
        // Range overlaps if start < maxKey AND end > minKey
28✔
1150
        // i.e., NOT (start >= maxKey OR end <= minKey)
28✔
1151
        if CompareKeys(start, maxKey) > 0 {
31✔
1152
                return false
3✔
1153
        }
3✔
1154
        if CompareKeys(end, minKey) <= 0 {
28✔
1155
                return false
3✔
1156
        }
3✔
1157
        return true
22✔
1158
}
1159

1160
// rangeScanner merges entries from memtables and SSTables for range scanning.
1161
type rangeScanner struct {
1162
        start   []byte
1163
        end     []byte
1164
        cache   *lruCache
1165
        verify  bool
1166
        heap    rangeHeap
1167
        current Entry
1168
        stats   ScanStats
1169

1170
        // Reference counting: all SSTables we hold refs on
1171
        refTables [][]*SSTable
1172
}
1173

1174
type rangeHeapEntry struct {
1175
        entry    Entry
1176
        priority int // lower = newer/higher priority
1177
        source   rangeSource
1178
}
1179

1180
type rangeSource interface {
1181
        next() bool
1182
        entry() Entry
1183
        close()
1184
}
1185

1186
type rangeHeap []rangeHeapEntry
1187

1188
func (h rangeHeap) less(i, j int) bool {
175✔
1189
        cmp := CompareKeys(h[i].entry.Key, h[j].entry.Key)
175✔
1190
        if cmp != 0 {
350✔
1191
                return cmp < 0
175✔
1192
        }
175✔
UNCOV
1193
        return h[i].priority < h[j].priority
×
1194
}
1195

1196
func (h *rangeHeap) push(x rangeHeapEntry) {
312✔
1197
        *h = append(*h, x)
312✔
1198
        h.up(len(*h) - 1)
312✔
1199
}
312✔
1200

1201
func (h *rangeHeap) pop() rangeHeapEntry {
322✔
1202
        old := *h
322✔
1203
        n := len(old) - 1
322✔
1204
        old[0], old[n] = old[n], old[0]
322✔
1205
        h.down(0, n)
322✔
1206
        x := old[n]
322✔
1207
        *h = old[:n]
322✔
1208
        return x
322✔
1209
}
322✔
1210

1211
func (h rangeHeap) up(j int) {
312✔
1212
        for {
746✔
1213
                i := (j - 1) / 2
434✔
1214
                if i == j || !h.less(j, i) {
746✔
1215
                        break
312✔
1216
                }
1217
                h[i], h[j] = h[j], h[i]
122✔
1218
                j = i
122✔
1219
        }
1220
}
1221

1222
func (h rangeHeap) down(i, n int) {
324✔
1223
        for {
651✔
1224
                j1 := 2*i + 1
327✔
1225
                if j1 >= n || j1 < 0 {
602✔
1226
                        break
275✔
1227
                }
1228
                j := j1
52✔
1229
                if j2 := j1 + 1; j2 < n && h.less(j2, j1) {
53✔
1230
                        j = j2
1✔
1231
                }
1✔
1232
                if !h.less(j, i) {
101✔
1233
                        break
49✔
1234
                }
1235
                h[i], h[j] = h[j], h[i]
3✔
1236
                i = j
3✔
1237
        }
1238
}
1239

1240
func (h *rangeHeap) init() {
10✔
1241
        n := len(*h)
10✔
1242
        for i := n/2 - 1; i >= 0; i-- {
12✔
1243
                h.down(i, n)
2✔
1244
        }
2✔
1245
}
1246

1247
func newRangeScanner(start, end []byte, cache *lruCache, verify bool) *rangeScanner {
10✔
1248
        return &rangeScanner{
10✔
1249
                start:  start,
10✔
1250
                end:    end,
10✔
1251
                cache:  cache,
10✔
1252
                verify: verify,
10✔
1253
                heap:   make(rangeHeap, 0, 8),
10✔
1254
        }
10✔
1255
}
10✔
1256

1257
func (s *rangeScanner) addMemtable(mt *memtable, priority int) {
10✔
1258
        src := &memtableRangeSource{mt: mt, start: s.start, end: s.end}
10✔
1259
        src.seekToStart()
10✔
1260
        if src.valid {
13✔
1261
                s.heap = append(s.heap, rangeHeapEntry{
3✔
1262
                        entry:    src.entry(),
3✔
1263
                        priority: priority,
3✔
1264
                        source:   src,
3✔
1265
                })
3✔
1266
        } else {
10✔
1267
                src.close()
7✔
1268
        }
7✔
1269
}
1270

1271
func (s *rangeScanner) addSSTable(sst *SSTable, priority int) {
8✔
1272
        src := &sstableRangeSource{
8✔
1273
                sst:    sst,
8✔
1274
                start:  s.start,
8✔
1275
                end:    s.end,
8✔
1276
                cache:  s.cache,
8✔
1277
                verify: s.verify,
8✔
1278
                stats:  &s.stats,
8✔
1279
        }
8✔
1280
        src.seekToStart()
8✔
1281
        if src.valid {
16✔
1282
                s.heap = append(s.heap, rangeHeapEntry{
8✔
1283
                        entry:    src.entry(),
8✔
1284
                        priority: priority,
8✔
1285
                        source:   src,
8✔
1286
                })
8✔
1287
        } else {
8✔
UNCOV
1288
                src.close()
×
UNCOV
1289
        }
×
1290
}
1291

1292
func (s *rangeScanner) init() {
10✔
1293
        s.heap.init()
10✔
1294
}
10✔
1295

1296
func (s *rangeScanner) next() bool {
331✔
1297
        if len(s.heap) == 0 {
340✔
1298
                return false
9✔
1299
        }
9✔
1300

1301
        he := s.heap.pop()
322✔
1302
        s.current = he.entry
322✔
1303
        s.stats.KeysExamined++
322✔
1304

322✔
1305
        if he.source.next() {
634✔
1306
                s.heap.push(rangeHeapEntry{
312✔
1307
                        entry:    he.source.entry(),
312✔
1308
                        priority: he.priority,
312✔
1309
                        source:   he.source,
312✔
1310
                })
312✔
1311
        } else {
322✔
1312
                he.source.close()
10✔
1313
        }
10✔
1314

1315
        return true
322✔
1316
}
1317

1318
func (s *rangeScanner) entry() Entry {
322✔
1319
        return s.current
322✔
1320
}
322✔
1321

1322
func (s *rangeScanner) close() {
10✔
1323
        for _, he := range s.heap {
11✔
1324
                he.source.close()
1✔
1325
        }
1✔
1326
        // Decrement refs on all SSTables we were holding
1327
        decRefLevels(s.refTables)
10✔
1328
}
1329

1330
// memtableRangeSource wraps a memtable iterator for range scanning.
1331
type memtableRangeSource struct {
1332
        mt    *memtable
1333
        start []byte
1334
        end   []byte
1335
        iter  *memtableIterator
1336
        valid bool
1337
}
1338

1339
func (s *memtableRangeSource) seekToStart() {
10✔
1340
        s.iter = s.mt.Iterator()
10✔
1341
        if s.iter.Seek(s.start) {
13✔
1342
                key := s.iter.Key()
3✔
1343
                s.valid = CompareKeys(key, s.end) < 0
3✔
1344
        }
3✔
1345
}
1346

1347
func (s *memtableRangeSource) next() bool {
7✔
1348
        if !s.iter.Next() {
7✔
1349
                s.valid = false
×
1350
                return false
×
1351
        }
×
1352
        key := s.iter.Key()
7✔
1353
        s.valid = CompareKeys(key, s.end) < 0
7✔
1354
        return s.valid
7✔
1355
}
1356

1357
func (s *memtableRangeSource) entry() Entry {
7✔
1358
        return s.iter.Entry()
7✔
1359
}
7✔
1360

1361
func (s *memtableRangeSource) close() {
10✔
1362
        if s.iter != nil {
20✔
1363
                s.iter.Close()
10✔
1364
        }
10✔
1365
}
1366

1367
// sstableRangeSource wraps an SSTable for range scanning.
1368
type sstableRangeSource struct {
1369
        sst       *SSTable
1370
        start     []byte
1371
        end       []byte
1372
        cache     *lruCache
1373
        verify    bool
1374
        blockIdx  int
1375
        entryIdx  int
1376
        block     *Block
1377
        valid     bool
1378
        fromCache bool
1379
        stats     *ScanStats
1380
}
1381

1382
func (s *sstableRangeSource) seekToStart() {
8✔
1383
        // Ensure index is loaded for lazy-loaded SSTables
8✔
1384
        if err := s.sst.ensureIndex(); err != nil {
8✔
UNCOV
1385
                s.valid = false
×
UNCOV
1386
                return
×
UNCOV
1387
        }
×
1388

1389
        // Check if SSTable might contain keys in range
1390
        if !rangeOverlaps(s.start, s.end, s.sst.Index.MinKey, s.sst.Index.MaxKey) {
8✔
UNCOV
1391
                s.valid = false
×
UNCOV
1392
                return
×
UNCOV
1393
        }
×
1394

1395
        // Find starting block using index
1396
        s.blockIdx = s.sst.Index.Search(s.start)
8✔
1397
        if s.blockIdx < 0 {
12✔
1398
                s.blockIdx = 0
4✔
1399
        }
4✔
1400

1401
        // Load the block
1402
        if err := s.loadBlock(); err != nil {
8✔
UNCOV
1403
                s.valid = false
×
UNCOV
1404
                return
×
UNCOV
1405
        }
×
1406

1407
        // Find first entry >= start in block
1408
        for s.entryIdx = 0; s.entryIdx < len(s.block.Entries); s.entryIdx++ {
91✔
1409
                key := s.block.Entries[s.entryIdx].Key
83✔
1410
                if CompareKeys(key, s.start) >= 0 {
91✔
1411
                        s.valid = CompareKeys(key, s.end) < 0
8✔
1412
                        return
8✔
1413
                }
8✔
1414
        }
1415

1416
        // Not found in this block, try next block
1417
        // Release old block and clear it so we load the new block
UNCOV
1418
        if s.block != nil && !s.fromCache {
×
UNCOV
1419
                s.block.Release()
×
UNCOV
1420
        }
×
UNCOV
1421
        s.block = nil
×
UNCOV
1422
        s.blockIdx++
×
UNCOV
1423
        s.entryIdx = 0
×
UNCOV
1424

×
UNCOV
1425
        // Load and check the next block directly (don't call next() which would increment blockIdx again)
×
UNCOV
1426
        if s.blockIdx >= len(s.sst.Index.Entries) {
×
UNCOV
1427
                s.valid = false
×
UNCOV
1428
                return
×
UNCOV
1429
        }
×
1430

UNCOV
1431
        if err := s.loadBlock(); err != nil {
×
UNCOV
1432
                s.valid = false
×
UNCOV
1433
                return
×
UNCOV
1434
        }
×
1435

UNCOV
1436
        if len(s.block.Entries) == 0 {
×
UNCOV
1437
                s.valid = false
×
UNCOV
1438
                return
×
UNCOV
1439
        }
×
1440

1441
        // Check if first entry is in range
UNCOV
1442
        key := s.block.Entries[0].Key
×
UNCOV
1443
        s.valid = CompareKeys(key, s.end) < 0
×
1444
}
1445

1446
func (s *sstableRangeSource) loadBlock() error {
8✔
1447
        if s.blockIdx >= len(s.sst.Index.Entries) {
8✔
UNCOV
1448
                return ErrKeyNotFound
×
UNCOV
1449
        }
×
1450

1451
        // Release previous block if we owned it
1452
        if s.block != nil && !s.fromCache {
8✔
UNCOV
1453
                s.block.Release()
×
UNCOV
1454
                s.block = nil
×
UNCOV
1455
        }
×
1456

1457
        ie := s.sst.Index.Entries[s.blockIdx]
8✔
1458
        cacheKey := cacheKey{FileID: s.sst.ID, BlockOffset: ie.BlockOffset}
8✔
1459

8✔
1460
        // Try cache first
8✔
1461
        if s.cache != nil {
16✔
1462
                if cached, found := s.cache.Get(cacheKey); found {
12✔
1463
                        s.block = cached
4✔
1464
                        s.fromCache = true
4✔
1465
                        if s.stats != nil {
8✔
1466
                                atomic.AddInt64(&s.stats.BlocksLoaded, 1)
4✔
1467
                                atomic.AddInt64(&s.stats.BlocksCacheHit, 1)
4✔
1468
                        }
4✔
1469
                        return nil
4✔
1470
                }
1471
        }
1472

1473
        // Read from disk
1474
        blockData := make([]byte, ie.BlockSize)
4✔
1475
        if _, err := s.sst.file.ReadAt(blockData, int64(ie.BlockOffset)); err != nil {
4✔
UNCOV
1476
                return err
×
UNCOV
1477
        }
×
1478

1479
        block, err := DecodeBlock(blockData, s.verify)
4✔
1480
        if err != nil {
4✔
UNCOV
1481
                return err
×
UNCOV
1482
        }
×
1483

1484
        if s.cache != nil {
8✔
1485
                s.cache.Put(cacheKey, block)
4✔
1486
                s.fromCache = true
4✔
1487
        } else {
4✔
UNCOV
1488
                s.fromCache = false
×
UNCOV
1489
        }
×
1490
        s.block = block
4✔
1491
        if s.stats != nil {
8✔
1492
                atomic.AddInt64(&s.stats.BlocksLoaded, 1)
4✔
1493
                atomic.AddInt64(&s.stats.BlocksDiskRead, 1)
4✔
1494
        }
4✔
1495
        return nil
4✔
1496
}
1497

1498
func (s *sstableRangeSource) next() bool {
315✔
1499
        s.entryIdx++
315✔
1500

315✔
1501
        // Try next entry in current block
315✔
1502
        if s.block != nil && s.entryIdx < len(s.block.Entries) {
625✔
1503
                key := s.block.Entries[s.entryIdx].Key
310✔
1504
                s.valid = CompareKeys(key, s.end) < 0
310✔
1505
                return s.valid
310✔
1506
        }
310✔
1507

1508
        // Move to next block
1509
        s.blockIdx++
5✔
1510
        s.entryIdx = 0
5✔
1511

5✔
1512
        if s.blockIdx >= len(s.sst.Index.Entries) {
10✔
1513
                s.valid = false
5✔
1514
                return false
5✔
1515
        }
5✔
1516

UNCOV
1517
        if err := s.loadBlock(); err != nil {
×
UNCOV
1518
                s.valid = false
×
UNCOV
1519
                return false
×
UNCOV
1520
        }
×
1521

UNCOV
1522
        if len(s.block.Entries) == 0 {
×
UNCOV
1523
                s.valid = false
×
UNCOV
1524
                return false
×
UNCOV
1525
        }
×
1526

UNCOV
1527
        key := s.block.Entries[0].Key
×
UNCOV
1528
        s.valid = CompareKeys(key, s.end) < 0
×
UNCOV
1529
        return s.valid
×
1530
}
1531

1532
func (s *sstableRangeSource) entry() Entry {
316✔
1533
        if !s.valid || s.block == nil || s.entryIdx >= len(s.block.Entries) {
316✔
UNCOV
1534
                return Entry{}
×
UNCOV
1535
        }
×
1536
        be := s.block.Entries[s.entryIdx]
316✔
1537
        val, _, _ := DecodeValue(be.Value)
316✔
1538
        // IMPORTANT: Copy the key since the block may be released later
316✔
1539
        keyCopy := make([]byte, len(be.Key))
316✔
1540
        copy(keyCopy, be.Key)
316✔
1541
        return Entry{
316✔
1542
                Key:   keyCopy,
316✔
1543
                Value: val,
316✔
1544
        }
316✔
1545
}
1546

1547
func (s *sstableRangeSource) close() {
8✔
1548
        if s.block != nil && !s.fromCache {
8✔
UNCOV
1549
                s.block.Release()
×
UNCOV
1550
                s.block = nil
×
UNCOV
1551
        }
×
1552
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc