• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freeeve / tinykvs / 21186884278

20 Jan 2026 08:54PM UTC coverage: 71.178% (-0.1%) from 71.295%
21186884278

push

github

freeeve
refactor(writer): extract helpers from flushmemtable

43 of 55 new or added lines in 1 file covered. (78.18%)

273 existing lines in 4 files now uncovered.

5717 of 8032 relevant lines covered (71.18%)

407987.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.71
/reader.go
1
package tinykvs
2

3
import (
4
        "fmt"
5
        "os"
6
        "sync"
7
        "sync/atomic"
8
)
9

10
// ScanStats tracks statistics during scan operations.
11
type ScanStats struct {
12
        BlocksLoaded   int64 // Number of SSTable blocks accessed (disk + cache)
13
        BlocksCacheHit int64 // Number of blocks served from cache
14
        BlocksDiskRead int64 // Number of blocks read from disk
15
        KeysExamined   int64 // Total keys examined (including duplicates and tombstones)
16
        TablesChecked  int64 // Number of SSTables checked for prefix
17
        TablesAdded    int64 // Number of SSTables added to scanner (had matching entries)
18
}
19

20
// ScanProgress is called periodically during scan operations to report progress.
21
// Return false to stop the scan early.
22
type ScanProgress func(stats ScanStats) bool
23

24
// reader coordinates lookups across memtable and SSTables.
25
type reader struct {
26
        mu sync.RWMutex
27

28
        memtable   *memtable
29
        immutables []*memtable  // Recently flushed, waiting for SSTable write
30
        levels     [][]*SSTable // levels[0] = L0, levels[1] = L1, etc.
31
        cache      *lruCache
32
        opts       Options
33
}
34

35
// newReader creates a new reader.
36
func newReader(memtable *memtable, levels [][]*SSTable, cache *lruCache, opts Options) *reader {
209✔
37
        // Make a deep copy of levels to avoid sharing with Store
209✔
38
        // Store and reader use different mutexes, so sharing would cause races
209✔
39
        levelsCopy := make([][]*SSTable, len(levels))
209✔
40
        for i, level := range levels {
1,616✔
41
                levelsCopy[i] = make([]*SSTable, len(level))
1,407✔
42
                copy(levelsCopy[i], level)
1,407✔
43
        }
1,407✔
44
        return &reader{
209✔
45
                memtable: memtable,
209✔
46
                levels:   levelsCopy,
209✔
47
                cache:    cache,
209✔
48
                opts:     opts,
209✔
49
        }
209✔
50
}
51

52
// Get looks up a key, checking memtable first, then SSTables.
53
func (r *reader) Get(key []byte) (Value, error) {
16,530✔
54
        r.mu.RLock()
16,530✔
55
        memtable := r.memtable
16,530✔
56
        immutables := copyImmutables(r.immutables)
16,530✔
57
        levels := copyLevels(r.levels)
16,530✔
58
        cache := r.cache
16,530✔
59
        verify := r.opts.VerifyChecksums
16,530✔
60
        r.mu.RUnlock()
16,530✔
61

16,530✔
62
        if val, found, err := getFromMemtables(key, memtable, immutables); found || err != nil {
17,593✔
63
                return val, err
1,063✔
64
        }
1,063✔
65

66
        // Increment refs on all SSTables we're about to use
67
        incRefLevels(levels)
15,467✔
68
        defer decRefLevels(levels)
15,467✔
69

15,467✔
70
        return getFromSSTables(key, levels, cache, verify)
15,467✔
71
}
72

73
// getFromMemtables checks the active memtable and immutables for a key.
74
func getFromMemtables(key []byte, memtable *memtable, immutables []*memtable) (Value, bool, error) {
16,530✔
75
        if entry, found := memtable.Get(key); found {
17,591✔
76
                if entry.Value.IsTombstone() {
1,063✔
77
                        return Value{}, true, ErrKeyNotFound
2✔
78
                }
2✔
79
                return entry.Value, true, nil
1,059✔
80
        }
81

82
        for i := len(immutables) - 1; i >= 0; i-- {
15,494✔
83
                if entry, found := immutables[i].Get(key); found {
27✔
84
                        if entry.Value.IsTombstone() {
3✔
85
                                return Value{}, true, ErrKeyNotFound
1✔
86
                        }
1✔
87
                        return entry.Value, true, nil
1✔
88
                }
89
        }
90

91
        return Value{}, false, nil
15,467✔
92
}
93

94
// getFromSSTables searches SSTables level by level for a key.
95
func getFromSSTables(key []byte, levels [][]*SSTable, cache *lruCache, verify bool) (Value, error) {
15,467✔
96
        for level := 0; level < len(levels); level++ {
45,381✔
97
                tables := levels[level]
29,914✔
98
                if level == 0 {
45,381✔
99
                        if val, found, err := getFromL0(key, tables, cache, verify); found || err != nil {
18,083✔
100
                                return val, err
2,616✔
101
                        }
2,616✔
102
                } else {
14,447✔
103
                        if val, found, err := getFromSortedLevel(key, tables, cache, verify); found || err != nil {
27,018✔
104
                                return val, err
12,571✔
105
                        }
12,571✔
106
                }
107
        }
108
        return Value{}, ErrKeyNotFound
280✔
109
}
110

111
// getFromL0 checks all L0 tables (newest first, may have overlapping keys).
112
func getFromL0(key []byte, tables []*SSTable, cache *lruCache, verify bool) (Value, bool, error) {
15,467✔
113
        for i := len(tables) - 1; i >= 0; i-- {
25,183✔
114
                entry, found, err := tables[i].Get(key, cache, verify)
9,716✔
115
                if err != nil {
9,716✔
116
                        return Value{}, false, err
×
117
                }
×
118
                if found {
12,332✔
119
                        if entry.Value.IsTombstone() {
2,637✔
120
                                return Value{}, true, ErrKeyNotFound
21✔
121
                        }
21✔
122
                        return entry.Value, true, nil
2,595✔
123
                }
124
        }
125
        return Value{}, false, nil
12,851✔
126
}
127

128
// getFromSortedLevel checks a sorted level (L1+) using binary search.
129
func getFromSortedLevel(key []byte, tables []*SSTable, cache *lruCache, verify bool) (Value, bool, error) {
14,447✔
130
        idx := findTableForKey(tables, key)
14,447✔
131
        if idx < 0 {
16,322✔
132
                return Value{}, false, nil
1,875✔
133
        }
1,875✔
134
        entry, found, err := tables[idx].Get(key, cache, verify)
12,572✔
135
        if err != nil {
12,572✔
136
                return Value{}, false, err
×
137
        }
×
138
        if found {
25,143✔
139
                if entry.Value.IsTombstone() {
12,923✔
140
                        return Value{}, true, ErrKeyNotFound
352✔
141
                }
352✔
142
                return entry.Value, true, nil
12,219✔
143
        }
144
        return Value{}, false, nil
1✔
145
}
146

147
// findTableForKey finds the SSTable that may contain the key (for L1+).
148
// Returns the index of the table, or -1 if not found.
149
func findTableForKey(tables []*SSTable, key []byte) int {
14,447✔
150
        lo, hi := 0, len(tables)-1
14,447✔
151

14,447✔
152
        for lo <= hi {
49,558✔
153
                mid := (lo + hi) / 2
35,111✔
154
                minKey := tables[mid].MinKey()
35,111✔
155
                maxKey := tables[mid].MaxKey()
35,111✔
156

35,111✔
157
                // Check if key is in range [MinKey, MaxKey]
35,111✔
158
                if CompareKeys(key, minKey) >= 0 && CompareKeys(key, maxKey) <= 0 {
47,683✔
159
                        return mid
12,572✔
160
                }
12,572✔
161

162
                if CompareKeys(key, minKey) < 0 {
33,307✔
163
                        hi = mid - 1
10,768✔
164
                } else {
22,539✔
165
                        lo = mid + 1
11,771✔
166
                }
11,771✔
167
        }
168

169
        return -1
1,875✔
170
}
171

172
// findTableForPrefix finds the first SSTable that may contain keys with the given prefix (for L1+).
173
// Returns the index of the first matching table, or -1 if no table could contain the prefix.
174
func findTableForPrefix(tables []*SSTable, prefix []byte) int {
588✔
175
        if len(tables) == 0 {
1,169✔
176
                return -1
581✔
177
        }
581✔
178

179
        // Binary search to find the first table where prefix might exist
180
        lo, hi := 0, len(tables)-1
7✔
181
        result := -1
7✔
182

7✔
183
        for lo <= hi {
18✔
184
                mid := (lo + hi) / 2
11✔
185
                maxKey := tables[mid].MaxKey()
11✔
186

11✔
187
                // If prefix <= maxKey (prefix-wise), this table or an earlier one might contain matches
11✔
188
                prefixBeforeOrAtMax := true
11✔
189
                if len(maxKey) >= len(prefix) {
22✔
190
                        for i := 0; i < len(prefix); i++ {
54✔
191
                                if prefix[i] < maxKey[i] {
45✔
192
                                        break
2✔
193
                                } else if prefix[i] > maxKey[i] {
42✔
194
                                        prefixBeforeOrAtMax = false
1✔
195
                                        break
1✔
196
                                }
197
                        }
198
                }
199

200
                if prefixBeforeOrAtMax {
21✔
201
                        // This table might contain matches, but check if there's an earlier one
10✔
202
                        if hasKeyInRange(prefix, tables[mid].MinKey(), maxKey) {
19✔
203
                                result = mid
9✔
204
                        }
9✔
205
                        hi = mid - 1
10✔
206
                } else {
1✔
207
                        // Prefix is after this table's maxKey, look right
1✔
208
                        lo = mid + 1
1✔
209
                }
1✔
210
        }
211

212
        return result
7✔
213
}
214

215
// Setmemtable updates the active memtable.
216
func (r *reader) Setmemtable(mt *memtable) {
2,054✔
217
        r.mu.Lock()
2,054✔
218
        defer r.mu.Unlock()
2,054✔
219
        r.memtable = mt
2,054✔
220
}
2,054✔
221

222
// AddImmutable adds an immutable memtable.
223
func (r *reader) AddImmutable(mt *memtable) {
2,056✔
224
        r.mu.Lock()
2,056✔
225
        defer r.mu.Unlock()
2,056✔
226
        r.immutables = append(r.immutables, mt)
2,056✔
227
}
2,056✔
228

229
// RemoveImmutable removes an immutable memtable after it's been flushed.
230
func (r *reader) RemoveImmutable(mt *memtable) {
2,403✔
231
        r.mu.Lock()
2,403✔
232
        defer r.mu.Unlock()
2,403✔
233
        for i, imm := range r.immutables {
4,791✔
234
                if imm == mt {
4,442✔
235
                        r.immutables = append(r.immutables[:i], r.immutables[i+1:]...)
2,054✔
236
                        return
2,054✔
237
                }
2,054✔
238
        }
239
}
240

241
// SetLevels updates the SSTable levels.
242
func (r *reader) SetLevels(levels [][]*SSTable) {
951✔
243
        r.mu.Lock()
951✔
244
        defer r.mu.Unlock()
951✔
245
        r.levels = levels
951✔
246
}
951✔
247

248
// AddSSTable adds an SSTable to a level.
249
func (r *reader) AddSSTable(level int, sst *SSTable) {
2,404✔
250
        r.mu.Lock()
2,404✔
251
        defer r.mu.Unlock()
2,404✔
252

2,404✔
253
        // Ensure the level exists
2,404✔
254
        for len(r.levels) <= level {
2,410✔
255
                r.levels = append(r.levels, nil)
6✔
256
        }
6✔
257

258
        r.levels[level] = append(r.levels[level], sst)
2,404✔
259
}
260

261
// GetLevels returns a copy of the current levels.
262
func (r *reader) GetLevels() [][]*SSTable {
5,470✔
263
        r.mu.RLock()
5,470✔
264
        defer r.mu.RUnlock()
5,470✔
265

5,470✔
266
        levels := make([][]*SSTable, len(r.levels))
5,470✔
267
        for i, level := range r.levels {
39,286✔
268
                levels[i] = make([]*SSTable, len(level))
33,816✔
269
                copy(levels[i], level)
33,816✔
270
        }
33,816✔
271
        return levels
5,470✔
272
}
273

274
// ScanPrefix iterates over all keys with the given prefix in sorted order.
275
// Keys are deduplicated (newest version wins) and tombstones are skipped.
276
// Return false from the callback to stop iteration early.
277
// Keys and values passed to the callback are copies owned by the caller.
278
// Note: Streams entries directly without buffering for constant memory usage.
279
func (r *reader) ScanPrefix(prefix []byte, fn func(key []byte, value Value) bool) error {
97✔
280
        _, err := r.ScanPrefixWithStats(prefix, fn, nil)
97✔
281
        return err
97✔
282
}
97✔
283

284
// ScanPrefixWithStats is like ScanPrefix but also returns scan statistics.
285
// The progress callback (if non-nil) is called periodically during the scan
286
// with current stats. Return false from progress to stop the scan early.
287
// Note: This streams entries directly to the callback without buffering,
288
// so it uses constant memory regardless of result size.
289
func (r *reader) ScanPrefixWithStats(prefix []byte, fn func(key []byte, value Value) bool, progress ScanProgress) (ScanStats, error) {
98✔
290
        scanner := r.setupPrefixScanner(prefix)
98✔
291
        defer scanner.close()
98✔
292

98✔
293
        return r.scanPrefixLoop(scanner, prefix, fn, progress)
98✔
294
}
98✔
295

296
// setupPrefixScanner creates and initializes a prefix scanner with all sources.
297
func (r *reader) setupPrefixScanner(prefix []byte) *prefixScanner {
98✔
298
        r.mu.RLock()
98✔
299
        memtable := r.memtable
98✔
300
        immutables := copyImmutables(r.immutables)
98✔
301
        levels := copyLevels(r.levels)
98✔
302
        cache := r.cache
98✔
303
        verify := r.opts.VerifyChecksums
98✔
304
        r.mu.RUnlock()
98✔
305

98✔
306
        // Increment refs on all SSTables to prevent them from being closed during scan
98✔
307
        incRefLevels(levels)
98✔
308

98✔
309
        scanner := newPrefixScanner(prefix, cache, verify)
98✔
310
        scanner.refTables = levels // Store for cleanup
98✔
311

98✔
312
        scanner.addmemtable(memtable, 0)
98✔
313

98✔
314
        for i := len(immutables) - 1; i >= 0; i-- {
99✔
315
                scanner.addmemtable(immutables[i], len(immutables)-i)
1✔
316
        }
1✔
317

318
        baseIdx := len(immutables) + 1
98✔
319
        sortedTables := r.collectPrefixTables(levels, prefix, scanner, &baseIdx)
98✔
320

98✔
321
        if len(sortedTables) > 0 {
105✔
322
                scanner.queueSortedTables(sortedTables, baseIdx)
7✔
323
        }
7✔
324

325
        scanner.init()
98✔
326
        return scanner
98✔
327
}
328

329
// collectPrefixTables adds L0 tables to scanner and returns L1+ tables for lazy loading.
330
func (r *reader) collectPrefixTables(levels [][]*SSTable, prefix []byte, scanner *prefixScanner, baseIdx *int) []*SSTable {
98✔
331
        var sortedTables []*SSTable
98✔
332

98✔
333
        for level := 0; level < len(levels); level++ {
784✔
334
                tables := levels[level]
686✔
335
                if level == 0 {
784✔
336
                        addL0Tables(tables, scanner, baseIdx)
98✔
337
                } else {
686✔
338
                        sortedTables = appendMatchingTables(sortedTables, tables, prefix)
588✔
339
                }
588✔
340
        }
341
        return sortedTables
98✔
342
}
343

344
// addL0Tables adds L0 tables to scanner in reverse order (newest first).
345
func addL0Tables(tables []*SSTable, scanner *prefixScanner, baseIdx *int) {
98✔
346
        for i := len(tables) - 1; i >= 0; i-- {
184✔
347
                scanner.addSSTable(tables[i], *baseIdx)
86✔
348
                *baseIdx++
86✔
349
        }
86✔
350
}
351

352
// appendMatchingTables appends tables that may contain the prefix to sortedTables.
353
func appendMatchingTables(sortedTables, tables []*SSTable, prefix []byte) []*SSTable {
588✔
354
        startIdx := findTableForPrefix(tables, prefix)
588✔
355
        if startIdx < 0 {
1,169✔
356
                return sortedTables
581✔
357
        }
581✔
358
        for i := startIdx; i < len(tables); i++ {
25✔
359
                if !hasKeyInRange(prefix, tables[i].MinKey(), tables[i].MaxKey()) {
20✔
360
                        break
2✔
361
                }
362
                sortedTables = append(sortedTables, tables[i])
16✔
363
        }
364
        return sortedTables
7✔
365
}
366

367
// scanPrefixLoop iterates through entries, calling fn for each matching key.
368
func (r *reader) scanPrefixLoop(scanner *prefixScanner, prefix []byte, fn func(key []byte, value Value) bool, progress ScanProgress) (ScanStats, error) {
98✔
369
        var lastKey []byte
98✔
370
        var progressCount int64
98✔
371

98✔
372
        for scanner.next() {
2,946✔
373
                entry := scanner.entry()
2,848✔
374

2,848✔
375
                progressCount++
2,848✔
376
                if progress != nil && progressCount%10000 == 0 {
2,848✔
UNCOV
377
                        if !progress(scanner.stats) {
×
UNCOV
378
                                break
×
379
                        }
380
                }
381

382
                if lastKey != nil && CompareKeys(entry.Key, lastKey) == 0 {
3,167✔
383
                        continue
319✔
384
                }
385

386
                lastKey = make([]byte, len(entry.Key))
2,529✔
387
                copy(lastKey, entry.Key)
2,529✔
388

2,529✔
389
                if entry.Value.IsTombstone() {
2,560✔
390
                        continue
31✔
391
                }
392

393
                if !hasPrefix(entry.Key, prefix) {
2,498✔
UNCOV
394
                        break
×
395
                }
396

397
                keyCopy := make([]byte, len(entry.Key))
2,498✔
398
                copy(keyCopy, entry.Key)
2,498✔
399

2,498✔
400
                if !fn(keyCopy, copyValue(entry.Value)) {
2,505✔
401
                        break
7✔
402
                }
403
        }
404

405
        return scanner.stats, nil
98✔
406
}
407

408
// copyValue creates a deep copy of a Value.
409
func copyValue(v Value) Value {
2,498✔
410
        result := Value{
2,498✔
411
                Type:    v.Type,
2,498✔
412
                Int64:   v.Int64,
2,498✔
413
                Float64: v.Float64,
2,498✔
414
                Bool:    v.Bool,
2,498✔
415
        }
2,498✔
416
        if v.Bytes != nil {
4,045✔
417
                result.Bytes = make([]byte, len(v.Bytes))
1,547✔
418
                copy(result.Bytes, v.Bytes)
1,547✔
419
        }
1,547✔
420
        if v.Pointer != nil {
2,498✔
UNCOV
421
                result.Pointer = &dataPointer{
×
UNCOV
422
                        FileID:      v.Pointer.FileID,
×
UNCOV
423
                        BlockOffset: v.Pointer.BlockOffset,
×
UNCOV
424
                        DataOffset:  v.Pointer.DataOffset,
×
UNCOV
425
                        Length:      v.Pointer.Length,
×
UNCOV
426
                }
×
UNCOV
427
        }
×
428
        if v.Record != nil {
2,501✔
429
                result.Record = make(map[string]any, len(v.Record))
3✔
430
                for k, val := range v.Record {
9✔
431
                        result.Record[k] = val
6✔
432
                }
6✔
433
        }
434
        return result
2,498✔
435
}
436

437
// copyLevels creates a deep copy of the levels slice.
438
// This is needed to prevent race conditions when readers snapshot state
439
// while writers are modifying the levels.
440
func copyLevels(levels [][]*SSTable) [][]*SSTable {
16,638✔
441
        result := make([][]*SSTable, len(levels))
16,638✔
442
        for i, level := range levels {
121,945✔
443
                result[i] = make([]*SSTable, len(level))
105,307✔
444
                copy(result[i], level)
105,307✔
445
        }
105,307✔
446
        return result
16,638✔
447
}
448

449
// copyImmutables creates a copy of the immutables slice.
450
func copyImmutables(immutables []*memtable) []*memtable {
16,638✔
451
        result := make([]*memtable, len(immutables))
16,638✔
452
        copy(result, immutables)
16,638✔
453
        return result
16,638✔
454
}
16,638✔
455

456
// incRefLevels increments reference counts on all SSTables in the levels.
457
func incRefLevels(levels [][]*SSTable) {
15,575✔
458
        for _, level := range levels {
113,460✔
459
                for _, sst := range level {
481,783✔
460
                        sst.IncRef()
383,898✔
461
                }
383,898✔
462
        }
463
}
464

465
// decRefLevels decrements reference counts on all SSTables in the levels.
466
func decRefLevels(levels [][]*SSTable) {
15,575✔
467
        for _, level := range levels {
113,460✔
468
                for _, sst := range level {
481,783✔
469
                        sst.DecRef()
383,898✔
470
                }
383,898✔
471
        }
472
}
473

474
// hasPrefix returns true if key starts with prefix.
475
func hasPrefix(key, prefix []byte) bool {
10,697✔
476
        if len(key) < len(prefix) {
10,704✔
477
                return false
7✔
478
        }
7✔
479
        for i := 0; i < len(prefix); i++ {
57,350✔
480
                if key[i] != prefix[i] {
46,686✔
481
                        return false
26✔
482
                }
26✔
483
        }
484
        return true
10,664✔
485
}
486

487
// hasKeyInRange returns true if a key with the given prefix might exist in [minKey, maxKey].
488
func hasKeyInRange(prefix, minKey, maxKey []byte) bool {
156✔
489
        // Prefix is before or equal to maxKey AND prefix+\xff... is >= minKey
156✔
490
        // Simplified: prefix <= maxKey (prefix-wise) AND minKey prefix-matches or is < prefix
156✔
491
        if !isPrefixNotAfter(prefix, maxKey) {
168✔
492
                return false
12✔
493
        }
12✔
494
        return couldHavePrefixMatch(prefix, minKey)
144✔
495
}
496

497
// isPrefixNotAfter returns true if prefix <= key (comparing only prefix length bytes).
498
func isPrefixNotAfter(prefix, key []byte) bool {
156✔
499
        compareLen := len(prefix)
156✔
500
        if len(key) < compareLen {
159✔
501
                compareLen = len(key)
3✔
502
        }
3✔
503
        for i := 0; i < compareLen; i++ {
595✔
504
                if prefix[i] < key[i] {
473✔
505
                        return true // prefix < key
34✔
506
                }
34✔
507
                if prefix[i] > key[i] {
417✔
508
                        return false // prefix > key
12✔
509
                }
12✔
510
        }
511
        return true // equal up to compareLen
110✔
512
}
513

514
// couldHavePrefixMatch returns true if minKey could have keys >= it that match prefix.
515
func couldHavePrefixMatch(prefix, minKey []byte) bool {
144✔
516
        compareLen := len(prefix)
144✔
517
        if len(minKey) < compareLen {
145✔
518
                compareLen = len(minKey)
1✔
519
        }
1✔
520
        for i := 0; i < compareLen; i++ {
560✔
521
                if minKey[i] < prefix[i] {
438✔
522
                        return true // minKey < prefix, so keys after minKey could match
22✔
523
                }
22✔
524
                if minKey[i] > prefix[i] {
403✔
525
                        return false // minKey > prefix, so no keys with prefix possible
9✔
526
                }
9✔
527
        }
528
        return true // minKey starts with prefix or is prefix of prefix
113✔
529
}
530

531
// prefixScanner merges entries from memtables and SSTables for prefix scanning.
532
type prefixScanner struct {
533
        prefix  []byte
534
        cache   *lruCache
535
        verify  bool
536
        heap    prefixHeap
537
        current Entry
538
        stats   ScanStats // Tracks blocks loaded and keys examined
539

540
        // Lazy loading for L1+ tables (sorted, non-overlapping)
541
        // Only one L1+ table per level needs to be in the heap at a time
542
        pendingTables []*SSTable // Tables waiting to be added (sorted by minKey)
543
        pendingIdx    int        // Next table to add from pendingTables
544
        basePriority  int        // Priority for pending tables
545

546
        // Reference counting: all SSTables we hold refs on
547
        refTables [][]*SSTable
548
}
549

550
type prefixHeapEntry struct {
551
        entry    Entry
552
        priority int // lower = newer/higher priority
553
        source   prefixSource
554
}
555

556
type prefixSource interface {
557
        next() bool
558
        entry() Entry
559
        close()
560
}
561

562
type prefixHeap []prefixHeapEntry
563

564
func (h prefixHeap) less(i, j int) bool {
8,129✔
565
        cmp := CompareKeys(h[i].entry.Key, h[j].entry.Key)
8,129✔
566
        if cmp != 0 {
15,329✔
567
                return cmp < 0
7,200✔
568
        }
7,200✔
569
        return h[i].priority < h[j].priority
929✔
570
}
571

572
func (h *prefixHeap) push(x prefixHeapEntry) {
2,747✔
573
        *h = append(*h, x)
2,747✔
574
        h.up(len(*h) - 1)
2,747✔
575
}
2,747✔
576

577
func (h *prefixHeap) pop() prefixHeapEntry {
2,848✔
578
        old := *h
2,848✔
579
        n := len(old) - 1
2,848✔
580
        old[0], old[n] = old[n], old[0]
2,848✔
581
        h.down(0, n)
2,848✔
582
        x := old[n]
2,848✔
583
        *h = old[:n]
2,848✔
584
        return x
2,848✔
585
}
2,848✔
586

587
func (h prefixHeap) up(j int) {
2,747✔
588
        for {
8,291✔
589
                i := (j - 1) / 2
5,544✔
590
                if i == j || !h.less(j, i) {
8,291✔
591
                        break
2,747✔
592
                }
593
                h[i], h[j] = h[j], h[i]
2,797✔
594
                j = i
2,797✔
595
        }
596
}
597

598
func (h prefixHeap) down(i, n int) {
2,870✔
599
        for {
7,679✔
600
                j1 := 2*i + 1
4,809✔
601
                if j1 >= n || j1 < 0 {
6,797✔
602
                        break
1,988✔
603
                }
604
                j := j1
2,821✔
605
                if j2 := j1 + 1; j2 < n && h.less(j2, j1) {
3,733✔
606
                        j = j2
912✔
607
                }
912✔
608
                if !h.less(j, i) {
3,703✔
609
                        break
882✔
610
                }
611
                h[i], h[j] = h[j], h[i]
1,939✔
612
                i = j
1,939✔
613
        }
614
}
615

616
func (h *prefixHeap) init() {
98✔
617
        n := len(*h)
98✔
618
        for i := n/2 - 1; i >= 0; i-- {
120✔
619
                h.down(i, n)
22✔
620
        }
22✔
621
}
622

623
func newPrefixScanner(prefix []byte, cache *lruCache, verify bool) *prefixScanner {
98✔
624
        return &prefixScanner{
98✔
625
                prefix: prefix,
98✔
626
                cache:  cache,
98✔
627
                verify: verify,
98✔
628
                heap:   make(prefixHeap, 0, 8),
98✔
629
        }
98✔
630
}
98✔
631

632
// queueSortedTables queues L1+ tables for lazy loading.
633
// Tables must be sorted by minKey and non-overlapping.
634
// Only the first table is added to the heap immediately; others are loaded on demand.
635
func (s *prefixScanner) queueSortedTables(tables []*SSTable, basePriority int) {
7✔
636
        if len(tables) == 0 {
7✔
UNCOV
637
                return
×
UNCOV
638
        }
×
639

640
        s.pendingTables = tables
7✔
641
        s.pendingIdx = 0
7✔
642
        s.basePriority = basePriority
7✔
643

7✔
644
        // Add only the first table immediately
7✔
645
        s.addNextPendingTable()
7✔
646
}
647

648
// addNextPendingTable adds the next pending L1+ table to the heap.
649
func (s *prefixScanner) addNextPendingTable() {
16✔
650
        for s.pendingIdx < len(s.pendingTables) {
32✔
651
                sst := s.pendingTables[s.pendingIdx]
16✔
652
                s.pendingIdx++
16✔
653

16✔
654
                // Check if this table could have matching keys
16✔
655
                if !hasKeyInRange(s.prefix, sst.MinKey(), sst.MaxKey()) {
16✔
UNCOV
656
                        continue
×
657
                }
658

659
                atomic.AddInt64(&s.stats.TablesChecked, 1)
16✔
660

16✔
661
                src := &sstablePrefixSource{
16✔
662
                        sst:    sst,
16✔
663
                        prefix: s.prefix,
16✔
664
                        cache:  s.cache,
16✔
665
                        verify: s.verify,
16✔
666
                        stats:  &s.stats,
16✔
667
                }
16✔
668
                src.seekToPrefix()
16✔
669
                if src.valid {
32✔
670
                        atomic.AddInt64(&s.stats.TablesAdded, 1)
16✔
671
                        s.heap.push(prefixHeapEntry{
16✔
672
                                entry:    src.entry(),
16✔
673
                                priority: s.basePriority,
16✔
674
                                source:   src,
16✔
675
                        })
16✔
676
                        return // Only add one table at a time
16✔
677
                }
16✔
UNCOV
678
                src.close()
×
679
        }
680
}
681

682
func (s *prefixScanner) addmemtable(mt *memtable, priority int) {
99✔
683
        src := &memtablePrefixSource{mt: mt, prefix: s.prefix}
99✔
684
        src.seekToPrefix()
99✔
685
        if src.valid {
145✔
686
                s.heap = append(s.heap, prefixHeapEntry{
46✔
687
                        entry:    src.entry(),
46✔
688
                        priority: priority,
46✔
689
                        source:   src,
46✔
690
                })
46✔
691
        } else {
99✔
692
                // No matching entries, close the iterator to release lock
53✔
693
                src.close()
53✔
694
        }
53✔
695
}
696

697
func (s *prefixScanner) addSSTable(sst *SSTable, priority int) {
86✔
698
        atomic.AddInt64(&s.stats.TablesChecked, 1)
86✔
699

86✔
700
        src := &sstablePrefixSource{
86✔
701
                sst:    sst,
86✔
702
                prefix: s.prefix,
86✔
703
                cache:  s.cache,
86✔
704
                verify: s.verify,
86✔
705
                stats:  &s.stats,
86✔
706
        }
86✔
707
        src.seekToPrefix()
86✔
708
        if src.valid {
156✔
709
                atomic.AddInt64(&s.stats.TablesAdded, 1)
70✔
710
                entry := src.entry()
70✔
711
                // Debug: ALWAYS log initial entry for debugging
70✔
712
                if len(s.prefix) > 0 && !hasPrefix(entry.Key, s.prefix) {
70✔
UNCOV
713
                        fmt.Fprintf(os.Stderr, "[DEBUG] SSTable %d (priority %d) adding NON-MATCHING initial key %x (prefix %x)\n",
×
UNCOV
714
                                sst.ID, priority, entry.Key, s.prefix)
×
UNCOV
715
                        fmt.Fprintf(os.Stderr, "[DEBUG]   minKey=%x maxKey=%x blockIdx=%d entryIdx=%d\n",
×
UNCOV
716
                                sst.MinKey(), sst.MaxKey(), src.blockIdx, src.entryIdx)
×
UNCOV
717
                }
×
718
                s.heap = append(s.heap, prefixHeapEntry{
70✔
719
                        entry:    entry,
70✔
720
                        priority: priority,
70✔
721
                        source:   src,
70✔
722
                })
70✔
723
        } else {
16✔
724
                // No matching entries, close any resources
16✔
725
                src.close()
16✔
726
        }
16✔
727
}
728

729
func (s *prefixScanner) init() {
98✔
730
        s.heap.init()
98✔
731
}
98✔
732

733
var debugPushCount int
734

735
func (s *prefixScanner) next() bool {
2,939✔
736
        if len(s.heap) == 0 {
3,030✔
737
                return false
91✔
738
        }
91✔
739

740
        he := s.heap.pop()
2,848✔
741
        s.current = he.entry
2,848✔
742
        s.stats.KeysExamined++
2,848✔
743

2,848✔
744
        // Debug: verify the popped entry matches our prefix
2,848✔
745
        if len(s.prefix) > 0 && !hasPrefix(he.entry.Key, s.prefix) {
2,848✔
746
                fmt.Fprintf(os.Stderr, "[DEBUG] POPPED non-matching key %x (prefix %x) at priority %d\n",
×
UNCOV
747
                        he.entry.Key, s.prefix, he.priority)
×
UNCOV
748
        }
×
749

750
        if he.source.next() {
5,579✔
751
                newEntry := he.source.entry()
2,731✔
752
                debugPushCount++
2,731✔
753
                // Debug: check if source is pushing non-matching key
2,731✔
754
                if len(s.prefix) > 0 && !hasPrefix(newEntry.Key, s.prefix) {
2,731✔
UNCOV
755
                        fmt.Fprintf(os.Stderr, "[DEBUG] Push %d: non-matching key %x after %x (prefix %x)\n",
×
UNCOV
756
                                debugPushCount, newEntry.Key, he.entry.Key, s.prefix)
×
UNCOV
757
                }
×
758
                // Debug: check if key ordering is violated (new key < old key is bad for same source)
759
                if len(s.prefix) > 0 && CompareKeys(newEntry.Key, he.entry.Key) < 0 {
2,731✔
UNCOV
760
                        fmt.Fprintf(os.Stderr, "[DEBUG] Push %d: KEY ORDER VIOLATION! new=%x < old=%x\n",
×
UNCOV
761
                                debugPushCount, newEntry.Key, he.entry.Key)
×
UNCOV
762
                }
×
763
                s.heap.push(prefixHeapEntry{
2,731✔
764
                        entry:    newEntry,
2,731✔
765
                        priority: he.priority,
2,731✔
766
                        source:   he.source,
2,731✔
767
                })
2,731✔
768
        } else {
117✔
769
                // Source exhausted, close it to release any held locks
117✔
770
                he.source.close()
117✔
771

117✔
772
                // If this was a L1+ table source, try to add the next pending table
117✔
773
                // This implements lazy loading - we only add L1+ tables as needed
117✔
774
                if s.pendingIdx < len(s.pendingTables) {
126✔
775
                        s.addNextPendingTable()
9✔
776
                }
9✔
777
        }
778

779
        return true
2,848✔
780
}
781

782
func (s *prefixScanner) entry() Entry {
2,848✔
783
        return s.current
2,848✔
784
}
2,848✔
785

786
func (s *prefixScanner) close() {
98✔
787
        for _, he := range s.heap {
113✔
788
                he.source.close()
15✔
789
        }
15✔
790
        // Decrement refs on all SSTables we were holding
791
        decRefLevels(s.refTables)
98✔
792
}
793

794
// memtablePrefixSource wraps a memtable iterator for prefix scanning.
795
type memtablePrefixSource struct {
796
        mt     *memtable
797
        prefix []byte
798
        iter   *memtableIterator
799
        valid  bool
800
}
801

802
func (s *memtablePrefixSource) seekToPrefix() {
99✔
803
        s.iter = s.mt.Iterator()
99✔
804
        if s.iter.Seek(s.prefix) {
149✔
805
                s.valid = hasPrefix(s.iter.Key(), s.prefix)
50✔
806
        }
50✔
807
}
808

809
func (s *memtablePrefixSource) next() bool {
356✔
810
        if !s.iter.Next() {
391✔
811
                s.valid = false
35✔
812
                return false
35✔
813
        }
35✔
814
        s.valid = hasPrefix(s.iter.Key(), s.prefix)
321✔
815
        return s.valid
321✔
816
}
817

818
func (s *memtablePrefixSource) entry() Entry {
358✔
819
        return s.iter.Entry()
358✔
820
}
358✔
821

822
func (s *memtablePrefixSource) close() {
99✔
823
        if s.iter != nil {
198✔
824
                s.iter.Close()
99✔
825
        }
99✔
826
}
827

828
// sstablePrefixSource wraps an SSTable for prefix scanning.
829
type sstablePrefixSource struct {
830
        sst       *SSTable
831
        prefix    []byte
832
        cache     *lruCache
833
        verify    bool
834
        blockIdx  int
835
        entryIdx  int
836
        block     *Block
837
        valid     bool
838
        fromCache bool       // True if current block came from cache (don't release it)
839
        stats     *ScanStats // Shared stats counter (may be nil)
840
}
841

842
func (s *sstablePrefixSource) seekToPrefix() {
102✔
843
        if err := s.sst.ensureIndex(); err != nil {
102✔
844
                s.valid = false
×
UNCOV
845
                return
×
UNCOV
846
        }
×
847

848
        if !hasKeyInRange(s.prefix, s.sst.Index.MinKey, s.sst.Index.MaxKey) {
115✔
849
                s.valid = false
13✔
850
                return
13✔
851
        }
13✔
852

853
        if !s.findFirstMatchingBlock() {
89✔
UNCOV
854
                return
×
UNCOV
855
        }
×
856

857
        if err := s.loadBlock(); err != nil {
89✔
UNCOV
858
                s.valid = false
×
UNCOV
859
                return
×
UNCOV
860
        }
×
861

862
        if s.findFirstMatchingEntry() {
176✔
863
                return
87✔
864
        }
87✔
865

866
        s.searchNextBlocks()
2✔
867
}
868

869
// findFirstMatchingBlock locates the starting block for prefix search.
870
// Returns false if no block could contain the prefix.
871
func (s *sstablePrefixSource) findFirstMatchingBlock() bool {
89✔
872
        s.blockIdx = s.sst.Index.Search(s.prefix)
89✔
873
        if s.blockIdx < 0 {
159✔
874
                if hasPrefix(s.sst.Index.MinKey, s.prefix) {
140✔
875
                        s.blockIdx = 0
70✔
876
                        return true
70✔
877
                }
70✔
UNCOV
878
                s.valid = false
×
UNCOV
879
                return false
×
880
        }
881
        return true
19✔
882
}
883

884
// findFirstMatchingEntry scans the current block for the first entry >= prefix.
885
// Returns true if found (sets s.valid), false if should continue to next block.
886
func (s *sstablePrefixSource) findFirstMatchingEntry() bool {
89✔
887
        for s.entryIdx = 0; s.entryIdx < len(s.block.Entries); s.entryIdx++ {
362✔
888
                if CompareKeys(s.block.Entries[s.entryIdx].Key, s.prefix) >= 0 {
360✔
889
                        s.valid = hasPrefix(s.block.Entries[s.entryIdx].Key, s.prefix)
87✔
890
                        return true
87✔
891
                }
87✔
892
        }
893
        return false
2✔
894
}
895

896
// canSkipRemainingBlocks checks if the block's first key indicates no prefix matches exist.
897
func (s *sstablePrefixSource) canSkipRemainingBlocks(blockFirstKey []byte) bool {
2✔
898
        return CompareKeys(blockFirstKey, s.prefix) > 0 && !hasPrefix(blockFirstKey, s.prefix)
2✔
899
}
2✔
900

901
// searchNextBlocks continues searching subsequent blocks for prefix matches.
902
func (s *sstablePrefixSource) searchNextBlocks() {
2✔
903
        for {
4✔
904
                if s.block != nil && !s.fromCache {
2✔
UNCOV
905
                        s.block.Release()
×
UNCOV
906
                }
×
907
                s.block = nil
2✔
908
                s.blockIdx++
2✔
909

2✔
910
                if s.blockIdx >= len(s.sst.Index.Entries) {
2✔
911
                        s.valid = false
×
912
                        return
×
913
                }
×
914

915
                blockFirstKey := s.sst.Index.Entries[s.blockIdx].Key
2✔
916
                if s.canSkipRemainingBlocks(blockFirstKey) {
3✔
917
                        s.valid = false
1✔
918
                        return
1✔
919
                }
1✔
920

921
                if err := s.loadBlock(); err != nil {
1✔
UNCOV
922
                        s.valid = false
×
UNCOV
923
                        return
×
UNCOV
924
                }
×
925

926
                if len(s.block.Entries) == 0 {
1✔
UNCOV
927
                        continue
×
928
                }
929

930
                for s.entryIdx = 0; s.entryIdx < len(s.block.Entries); s.entryIdx++ {
2✔
931
                        key := s.block.Entries[s.entryIdx].Key
1✔
932
                        if CompareKeys(key, s.prefix) >= 0 {
2✔
933
                                s.valid = hasPrefix(key, s.prefix)
1✔
934
                                return
1✔
935
                        }
1✔
936
                }
937
        }
938
}
939

940
func (s *sstablePrefixSource) loadBlock() error {
172✔
941
        if s.blockIdx >= len(s.sst.Index.Entries) {
172✔
UNCOV
942
                return ErrKeyNotFound
×
UNCOV
943
        }
×
944

945
        // Release previous block if we owned it
946
        if s.block != nil && !s.fromCache {
172✔
UNCOV
947
                s.block.Release()
×
UNCOV
948
                s.block = nil
×
UNCOV
949
        }
×
950

951
        ie := s.sst.Index.Entries[s.blockIdx]
172✔
952
        cacheKey := cacheKey{FileID: s.sst.ID, BlockOffset: ie.BlockOffset}
172✔
953

172✔
954
        // Try cache first
172✔
955
        if s.cache != nil {
344✔
956
                if cached, found := s.cache.Get(cacheKey); found {
187✔
957
                        s.block = cached
15✔
958
                        s.fromCache = true
15✔
959
                        if s.stats != nil {
30✔
960
                                atomic.AddInt64(&s.stats.BlocksLoaded, 1)
15✔
961
                                atomic.AddInt64(&s.stats.BlocksCacheHit, 1)
15✔
962
                        }
15✔
963
                        return nil
15✔
964
                }
965
        }
966

967
        // Read from disk
968
        blockData := make([]byte, ie.BlockSize)
157✔
969
        if _, err := s.sst.file.ReadAt(blockData, int64(ie.BlockOffset)); err != nil {
157✔
UNCOV
970
                return err
×
971
        }
×
972

973
        block, err := DecodeBlock(blockData, s.verify)
157✔
974
        if err != nil {
157✔
UNCOV
975
                return err
×
UNCOV
976
        }
×
977

978
        if s.cache != nil {
314✔
979
                s.cache.Put(cacheKey, block)
157✔
980
                s.fromCache = true // Now in cache, don't release
157✔
981
        } else {
157✔
UNCOV
982
                s.fromCache = false // Not cached, we own it
×
UNCOV
983
        }
×
984
        s.block = block
157✔
985
        if s.stats != nil {
314✔
986
                atomic.AddInt64(&s.stats.BlocksLoaded, 1)
157✔
987
                atomic.AddInt64(&s.stats.BlocksDiskRead, 1)
157✔
988
        }
157✔
989
        return nil
157✔
990
}
991

992
func (s *sstablePrefixSource) next() bool {
2,492✔
993
        s.entryIdx++
2,492✔
994

2,492✔
995
        // Try next entry in current block
2,492✔
996
        if s.block != nil && s.entryIdx < len(s.block.Entries) {
4,846✔
997
                key := s.block.Entries[s.entryIdx].Key
2,354✔
998
                s.valid = hasPrefix(key, s.prefix)
2,354✔
999
                // Debug: if returning true but key doesn't match, that's a bug
2,354✔
1000
                if s.valid && len(s.prefix) > 0 && len(key) > 0 && key[0] != s.prefix[0] {
2,354✔
UNCOV
1001
                        fmt.Fprintf(os.Stderr, "[BUG] next() in-block: returning true, key=%x prefix=%x sst=%d block=%d entry=%d\n",
×
UNCOV
1002
                                key, s.prefix, s.sst.ID, s.blockIdx, s.entryIdx)
×
UNCOV
1003
                }
×
1004
                return s.valid
2,354✔
1005
        }
1006

1007
        // Move to next block
1008
        s.blockIdx++
138✔
1009
        s.entryIdx = 0
138✔
1010

138✔
1011
        if s.blockIdx >= len(s.sst.Index.Entries) {
194✔
1012
                s.valid = false
56✔
1013
                return false
56✔
1014
        }
56✔
1015

1016
        if err := s.loadBlock(); err != nil {
82✔
UNCOV
1017
                s.valid = false
×
UNCOV
1018
                return false
×
1019
        }
×
1020

1021
        if len(s.block.Entries) == 0 {
82✔
UNCOV
1022
                s.valid = false
×
UNCOV
1023
                return false
×
UNCOV
1024
        }
×
1025

1026
        key := s.block.Entries[0].Key
82✔
1027
        s.valid = hasPrefix(key, s.prefix)
82✔
1028
        // Debug: if returning true but key doesn't match, that's a bug
82✔
1029
        if s.valid && len(s.prefix) > 0 && len(key) > 0 && key[0] != s.prefix[0] {
82✔
UNCOV
1030
                fmt.Fprintf(os.Stderr, "[BUG] next() new-block: returning true, key=%x prefix=%x sst=%d block=%d\n",
×
UNCOV
1031
                        key, s.prefix, s.sst.ID, s.blockIdx)
×
UNCOV
1032
        }
×
1033
        return s.valid
82✔
1034
}
1035

1036
func (s *sstablePrefixSource) entry() Entry {
2,505✔
1037
        if !s.valid || s.block == nil || s.entryIdx >= len(s.block.Entries) {
2,505✔
UNCOV
1038
                return Entry{}
×
UNCOV
1039
        }
×
1040
        be := s.block.Entries[s.entryIdx]
2,505✔
1041
        // Decode value from block entry
2,505✔
1042
        val, _, _ := DecodeValue(be.Value)
2,505✔
1043
        // IMPORTANT: Copy the key since the block may be released later
2,505✔
1044
        keyCopy := make([]byte, len(be.Key))
2,505✔
1045
        copy(keyCopy, be.Key)
2,505✔
1046
        return Entry{
2,505✔
1047
                Key:   keyCopy,
2,505✔
1048
                Value: val,
2,505✔
1049
        }
2,505✔
1050
}
1051

1052
func (s *sstablePrefixSource) close() {
102✔
1053
        // Release block if we own it (not from cache)
102✔
1054
        if s.block != nil && !s.fromCache {
102✔
UNCOV
1055
                s.block.Release()
×
UNCOV
1056
                s.block = nil
×
UNCOV
1057
        }
×
1058
}
1059

1060
// ScanRange iterates over all keys in [start, end) in sorted order.
1061
// Keys are deduplicated (newest version wins) and tombstones are skipped.
1062
// Return false from the callback to stop iteration early.
1063
func (r *reader) ScanRange(start, end []byte, fn func(key []byte, value Value) bool) error {
10✔
1064
        r.mu.RLock()
10✔
1065
        memtable := r.memtable
10✔
1066
        immutables := copyImmutables(r.immutables)
10✔
1067
        levels := copyLevels(r.levels)
10✔
1068
        cache := r.cache
10✔
1069
        verify := r.opts.VerifyChecksums
10✔
1070
        r.mu.RUnlock()
10✔
1071

10✔
1072
        // Increment refs on all SSTables to prevent them from being closed during scan
10✔
1073
        incRefLevels(levels)
10✔
1074

10✔
1075
        scanner := r.setupRangeScanner(start, end, memtable, immutables, levels, cache, verify)
10✔
1076
        scanner.refTables = levels // Store for cleanup
10✔
1077
        defer scanner.close()
10✔
1078

10✔
1079
        return runRangeScan(scanner, end, fn)
10✔
1080
}
10✔
1081

1082
// setupRangeScanner creates and configures a range scanner with all sources.
1083
func (r *reader) setupRangeScanner(start, end []byte, memtable *memtable, immutables []*memtable, levels [][]*SSTable, cache *lruCache, verify bool) *rangeScanner {
10✔
1084
        scanner := newRangeScanner(start, end, cache, verify)
10✔
1085

10✔
1086
        scanner.addMemtable(memtable, 0)
10✔
1087

10✔
1088
        for i := len(immutables) - 1; i >= 0; i-- {
10✔
UNCOV
1089
                scanner.addMemtable(immutables[i], len(immutables)-i)
×
UNCOV
1090
        }
×
1091

1092
        baseIdx := len(immutables) + 1
10✔
1093
        addRangeTables(scanner, levels, start, end, &baseIdx)
10✔
1094

10✔
1095
        scanner.init()
10✔
1096
        return scanner
10✔
1097
}
1098

1099
// addRangeTables adds SSTable sources that overlap with the range.
1100
func addRangeTables(scanner *rangeScanner, levels [][]*SSTable, start, end []byte, baseIdx *int) {
10✔
1101
        for level := 0; level < len(levels); level++ {
80✔
1102
                tables := levels[level]
70✔
1103
                if level == 0 {
80✔
1104
                        addL0RangeTables(scanner, tables, start, end, baseIdx)
10✔
1105
                } else {
70✔
1106
                        addSortedRangeTables(scanner, tables, start, end, baseIdx)
60✔
1107
                }
60✔
1108
        }
1109
}
1110

1111
// addL0RangeTables adds overlapping L0 tables (newest first).
1112
func addL0RangeTables(scanner *rangeScanner, tables []*SSTable, start, end []byte, baseIdx *int) {
10✔
1113
        for i := len(tables) - 1; i >= 0; i-- {
21✔
1114
                if rangeOverlaps(start, end, tables[i].MinKey(), tables[i].MaxKey()) {
19✔
1115
                        scanner.addSSTable(tables[i], *baseIdx)
8✔
1116
                        *baseIdx++
8✔
1117
                }
8✔
1118
        }
1119
}
1120

1121
// addSortedRangeTables adds overlapping tables from sorted levels (L1+).
1122
func addSortedRangeTables(scanner *rangeScanner, tables []*SSTable, start, end []byte, baseIdx *int) {
60✔
1123
        for _, t := range tables {
60✔
UNCOV
1124
                if rangeOverlaps(start, end, t.MinKey(), t.MaxKey()) {
×
UNCOV
1125
                        scanner.addSSTable(t, *baseIdx)
×
UNCOV
1126
                        *baseIdx++
×
1127
                }
×
1128
        }
1129
}
1130

1131
// runRangeScan iterates through scanner results and calls fn for each matching entry.
1132
func runRangeScan(scanner *rangeScanner, end []byte, fn func(key []byte, value Value) bool) error {
10✔
1133
        var lastKey []byte
10✔
1134
        for scanner.next() {
332✔
1135
                entry := scanner.entry()
322✔
1136

322✔
1137
                if lastKey != nil && CompareKeys(entry.Key, lastKey) == 0 {
322✔
UNCOV
1138
                        continue
×
1139
                }
1140
                lastKey = entry.Key
322✔
1141

322✔
1142
                if entry.Value.IsTombstone() {
322✔
UNCOV
1143
                        continue
×
1144
                }
1145

1146
                if CompareKeys(entry.Key, end) >= 0 {
322✔
UNCOV
1147
                        break
×
1148
                }
1149

1150
                if !fn(entry.Key, entry.Value) {
323✔
1151
                        break
1✔
1152
                }
1153
        }
1154
        return nil
10✔
1155
}
1156

1157
// rangeOverlaps returns true if [start, end) overlaps with [minKey, maxKey].
1158
func rangeOverlaps(start, end, minKey, maxKey []byte) bool {
28✔
1159
        // Range overlaps if start < maxKey AND end > minKey
28✔
1160
        // i.e., NOT (start >= maxKey OR end <= minKey)
28✔
1161
        if CompareKeys(start, maxKey) > 0 {
31✔
1162
                return false
3✔
1163
        }
3✔
1164
        if CompareKeys(end, minKey) <= 0 {
28✔
1165
                return false
3✔
1166
        }
3✔
1167
        return true
22✔
1168
}
1169

1170
// rangeScanner merges entries from memtables and SSTables for range scanning.
1171
type rangeScanner struct {
1172
        start   []byte
1173
        end     []byte
1174
        cache   *lruCache
1175
        verify  bool
1176
        heap    rangeHeap
1177
        current Entry
1178
        stats   ScanStats
1179

1180
        // Reference counting: all SSTables we hold refs on
1181
        refTables [][]*SSTable
1182
}
1183

1184
type rangeHeapEntry struct {
1185
        entry    Entry
1186
        priority int // lower = newer/higher priority
1187
        source   rangeSource
1188
}
1189

1190
type rangeSource interface {
1191
        next() bool
1192
        entry() Entry
1193
        close()
1194
}
1195

1196
type rangeHeap []rangeHeapEntry
1197

1198
func (h rangeHeap) less(i, j int) bool {
175✔
1199
        cmp := CompareKeys(h[i].entry.Key, h[j].entry.Key)
175✔
1200
        if cmp != 0 {
350✔
1201
                return cmp < 0
175✔
1202
        }
175✔
UNCOV
1203
        return h[i].priority < h[j].priority
×
1204
}
1205

1206
func (h *rangeHeap) push(x rangeHeapEntry) {
312✔
1207
        *h = append(*h, x)
312✔
1208
        h.up(len(*h) - 1)
312✔
1209
}
312✔
1210

1211
func (h *rangeHeap) pop() rangeHeapEntry {
322✔
1212
        old := *h
322✔
1213
        n := len(old) - 1
322✔
1214
        old[0], old[n] = old[n], old[0]
322✔
1215
        h.down(0, n)
322✔
1216
        x := old[n]
322✔
1217
        *h = old[:n]
322✔
1218
        return x
322✔
1219
}
322✔
1220

1221
func (h rangeHeap) up(j int) {
312✔
1222
        for {
746✔
1223
                i := (j - 1) / 2
434✔
1224
                if i == j || !h.less(j, i) {
746✔
1225
                        break
312✔
1226
                }
1227
                h[i], h[j] = h[j], h[i]
122✔
1228
                j = i
122✔
1229
        }
1230
}
1231

1232
func (h rangeHeap) down(i, n int) {
324✔
1233
        for {
651✔
1234
                j1 := 2*i + 1
327✔
1235
                if j1 >= n || j1 < 0 {
602✔
1236
                        break
275✔
1237
                }
1238
                j := j1
52✔
1239
                if j2 := j1 + 1; j2 < n && h.less(j2, j1) {
53✔
1240
                        j = j2
1✔
1241
                }
1✔
1242
                if !h.less(j, i) {
101✔
1243
                        break
49✔
1244
                }
1245
                h[i], h[j] = h[j], h[i]
3✔
1246
                i = j
3✔
1247
        }
1248
}
1249

1250
func (h *rangeHeap) init() {
10✔
1251
        n := len(*h)
10✔
1252
        for i := n/2 - 1; i >= 0; i-- {
12✔
1253
                h.down(i, n)
2✔
1254
        }
2✔
1255
}
1256

1257
func newRangeScanner(start, end []byte, cache *lruCache, verify bool) *rangeScanner {
10✔
1258
        return &rangeScanner{
10✔
1259
                start:  start,
10✔
1260
                end:    end,
10✔
1261
                cache:  cache,
10✔
1262
                verify: verify,
10✔
1263
                heap:   make(rangeHeap, 0, 8),
10✔
1264
        }
10✔
1265
}
10✔
1266

1267
func (s *rangeScanner) addMemtable(mt *memtable, priority int) {
10✔
1268
        src := &memtableRangeSource{mt: mt, start: s.start, end: s.end}
10✔
1269
        src.seekToStart()
10✔
1270
        if src.valid {
13✔
1271
                s.heap = append(s.heap, rangeHeapEntry{
3✔
1272
                        entry:    src.entry(),
3✔
1273
                        priority: priority,
3✔
1274
                        source:   src,
3✔
1275
                })
3✔
1276
        } else {
10✔
1277
                src.close()
7✔
1278
        }
7✔
1279
}
1280

1281
func (s *rangeScanner) addSSTable(sst *SSTable, priority int) {
8✔
1282
        src := &sstableRangeSource{
8✔
1283
                sst:    sst,
8✔
1284
                start:  s.start,
8✔
1285
                end:    s.end,
8✔
1286
                cache:  s.cache,
8✔
1287
                verify: s.verify,
8✔
1288
                stats:  &s.stats,
8✔
1289
        }
8✔
1290
        src.seekToStart()
8✔
1291
        if src.valid {
16✔
1292
                s.heap = append(s.heap, rangeHeapEntry{
8✔
1293
                        entry:    src.entry(),
8✔
1294
                        priority: priority,
8✔
1295
                        source:   src,
8✔
1296
                })
8✔
1297
        } else {
8✔
UNCOV
1298
                src.close()
×
UNCOV
1299
        }
×
1300
}
1301

1302
func (s *rangeScanner) init() {
10✔
1303
        s.heap.init()
10✔
1304
}
10✔
1305

1306
func (s *rangeScanner) next() bool {
331✔
1307
        if len(s.heap) == 0 {
340✔
1308
                return false
9✔
1309
        }
9✔
1310

1311
        he := s.heap.pop()
322✔
1312
        s.current = he.entry
322✔
1313
        s.stats.KeysExamined++
322✔
1314

322✔
1315
        if he.source.next() {
634✔
1316
                s.heap.push(rangeHeapEntry{
312✔
1317
                        entry:    he.source.entry(),
312✔
1318
                        priority: he.priority,
312✔
1319
                        source:   he.source,
312✔
1320
                })
312✔
1321
        } else {
322✔
1322
                he.source.close()
10✔
1323
        }
10✔
1324

1325
        return true
322✔
1326
}
1327

1328
func (s *rangeScanner) entry() Entry {
322✔
1329
        return s.current
322✔
1330
}
322✔
1331

1332
func (s *rangeScanner) close() {
10✔
1333
        for _, he := range s.heap {
11✔
1334
                he.source.close()
1✔
1335
        }
1✔
1336
        // Decrement refs on all SSTables we were holding
1337
        decRefLevels(s.refTables)
10✔
1338
}
1339

1340
// memtableRangeSource wraps a memtable iterator for range scanning.
1341
type memtableRangeSource struct {
1342
        mt    *memtable
1343
        start []byte
1344
        end   []byte
1345
        iter  *memtableIterator
1346
        valid bool
1347
}
1348

1349
func (s *memtableRangeSource) seekToStart() {
10✔
1350
        s.iter = s.mt.Iterator()
10✔
1351
        if s.iter.Seek(s.start) {
13✔
1352
                key := s.iter.Key()
3✔
1353
                s.valid = CompareKeys(key, s.end) < 0
3✔
1354
        }
3✔
1355
}
1356

1357
func (s *memtableRangeSource) next() bool {
7✔
1358
        if !s.iter.Next() {
7✔
UNCOV
1359
                s.valid = false
×
UNCOV
1360
                return false
×
UNCOV
1361
        }
×
1362
        key := s.iter.Key()
7✔
1363
        s.valid = CompareKeys(key, s.end) < 0
7✔
1364
        return s.valid
7✔
1365
}
1366

1367
func (s *memtableRangeSource) entry() Entry {
7✔
1368
        return s.iter.Entry()
7✔
1369
}
7✔
1370

1371
func (s *memtableRangeSource) close() {
10✔
1372
        if s.iter != nil {
20✔
1373
                s.iter.Close()
10✔
1374
        }
10✔
1375
}
1376

1377
// sstableRangeSource wraps an SSTable for range scanning.
1378
type sstableRangeSource struct {
1379
        sst       *SSTable
1380
        start     []byte
1381
        end       []byte
1382
        cache     *lruCache
1383
        verify    bool
1384
        blockIdx  int
1385
        entryIdx  int
1386
        block     *Block
1387
        valid     bool
1388
        fromCache bool
1389
        stats     *ScanStats
1390
}
1391

1392
func (s *sstableRangeSource) seekToStart() {
8✔
1393
        // Ensure index is loaded for lazy-loaded SSTables
8✔
1394
        if err := s.sst.ensureIndex(); err != nil {
8✔
UNCOV
1395
                s.valid = false
×
UNCOV
1396
                return
×
UNCOV
1397
        }
×
1398

1399
        // Check if SSTable might contain keys in range
1400
        if !rangeOverlaps(s.start, s.end, s.sst.Index.MinKey, s.sst.Index.MaxKey) {
8✔
UNCOV
1401
                s.valid = false
×
1402
                return
×
1403
        }
×
1404

1405
        // Find starting block using index
1406
        s.blockIdx = s.sst.Index.Search(s.start)
8✔
1407
        if s.blockIdx < 0 {
12✔
1408
                s.blockIdx = 0
4✔
1409
        }
4✔
1410

1411
        // Load the block
1412
        if err := s.loadBlock(); err != nil {
8✔
UNCOV
1413
                s.valid = false
×
UNCOV
1414
                return
×
UNCOV
1415
        }
×
1416

1417
        // Find first entry >= start in block
1418
        for s.entryIdx = 0; s.entryIdx < len(s.block.Entries); s.entryIdx++ {
91✔
1419
                key := s.block.Entries[s.entryIdx].Key
83✔
1420
                if CompareKeys(key, s.start) >= 0 {
91✔
1421
                        s.valid = CompareKeys(key, s.end) < 0
8✔
1422
                        return
8✔
1423
                }
8✔
1424
        }
1425

1426
        // Not found in this block, try next block
1427
        // Release old block and clear it so we load the new block
1428
        if s.block != nil && !s.fromCache {
×
UNCOV
1429
                s.block.Release()
×
1430
        }
×
1431
        s.block = nil
×
1432
        s.blockIdx++
×
1433
        s.entryIdx = 0
×
UNCOV
1434

×
1435
        // Load and check the next block directly (don't call next() which would increment blockIdx again)
×
1436
        if s.blockIdx >= len(s.sst.Index.Entries) {
×
1437
                s.valid = false
×
1438
                return
×
UNCOV
1439
        }
×
1440

1441
        if err := s.loadBlock(); err != nil {
×
1442
                s.valid = false
×
UNCOV
1443
                return
×
UNCOV
1444
        }
×
1445

UNCOV
1446
        if len(s.block.Entries) == 0 {
×
1447
                s.valid = false
×
1448
                return
×
UNCOV
1449
        }
×
1450

1451
        // Check if first entry is in range
1452
        key := s.block.Entries[0].Key
×
1453
        s.valid = CompareKeys(key, s.end) < 0
×
1454
}
1455

1456
func (s *sstableRangeSource) loadBlock() error {
8✔
1457
        if s.blockIdx >= len(s.sst.Index.Entries) {
8✔
UNCOV
1458
                return ErrKeyNotFound
×
UNCOV
1459
        }
×
1460

1461
        // Release previous block if we owned it
1462
        if s.block != nil && !s.fromCache {
8✔
UNCOV
1463
                s.block.Release()
×
UNCOV
1464
                s.block = nil
×
UNCOV
1465
        }
×
1466

1467
        ie := s.sst.Index.Entries[s.blockIdx]
8✔
1468
        cacheKey := cacheKey{FileID: s.sst.ID, BlockOffset: ie.BlockOffset}
8✔
1469

8✔
1470
        // Try cache first
8✔
1471
        if s.cache != nil {
16✔
1472
                if cached, found := s.cache.Get(cacheKey); found {
12✔
1473
                        s.block = cached
4✔
1474
                        s.fromCache = true
4✔
1475
                        if s.stats != nil {
8✔
1476
                                atomic.AddInt64(&s.stats.BlocksLoaded, 1)
4✔
1477
                                atomic.AddInt64(&s.stats.BlocksCacheHit, 1)
4✔
1478
                        }
4✔
1479
                        return nil
4✔
1480
                }
1481
        }
1482

1483
        // Read from disk
1484
        blockData := make([]byte, ie.BlockSize)
4✔
1485
        if _, err := s.sst.file.ReadAt(blockData, int64(ie.BlockOffset)); err != nil {
4✔
UNCOV
1486
                return err
×
1487
        }
×
1488

1489
        block, err := DecodeBlock(blockData, s.verify)
4✔
1490
        if err != nil {
4✔
UNCOV
1491
                return err
×
UNCOV
1492
        }
×
1493

1494
        if s.cache != nil {
8✔
1495
                s.cache.Put(cacheKey, block)
4✔
1496
                s.fromCache = true
4✔
1497
        } else {
4✔
UNCOV
1498
                s.fromCache = false
×
UNCOV
1499
        }
×
1500
        s.block = block
4✔
1501
        if s.stats != nil {
8✔
1502
                atomic.AddInt64(&s.stats.BlocksLoaded, 1)
4✔
1503
                atomic.AddInt64(&s.stats.BlocksDiskRead, 1)
4✔
1504
        }
4✔
1505
        return nil
4✔
1506
}
1507

1508
func (s *sstableRangeSource) next() bool {
315✔
1509
        s.entryIdx++
315✔
1510

315✔
1511
        // Try next entry in current block
315✔
1512
        if s.block != nil && s.entryIdx < len(s.block.Entries) {
625✔
1513
                key := s.block.Entries[s.entryIdx].Key
310✔
1514
                s.valid = CompareKeys(key, s.end) < 0
310✔
1515
                return s.valid
310✔
1516
        }
310✔
1517

1518
        // Move to next block
1519
        s.blockIdx++
5✔
1520
        s.entryIdx = 0
5✔
1521

5✔
1522
        if s.blockIdx >= len(s.sst.Index.Entries) {
10✔
1523
                s.valid = false
5✔
1524
                return false
5✔
1525
        }
5✔
1526

1527
        if err := s.loadBlock(); err != nil {
×
1528
                s.valid = false
×
UNCOV
1529
                return false
×
UNCOV
1530
        }
×
1531

UNCOV
1532
        if len(s.block.Entries) == 0 {
×
1533
                s.valid = false
×
1534
                return false
×
UNCOV
1535
        }
×
1536

UNCOV
1537
        key := s.block.Entries[0].Key
×
UNCOV
1538
        s.valid = CompareKeys(key, s.end) < 0
×
UNCOV
1539
        return s.valid
×
1540
}
1541

1542
func (s *sstableRangeSource) entry() Entry {
316✔
1543
        if !s.valid || s.block == nil || s.entryIdx >= len(s.block.Entries) {
316✔
UNCOV
1544
                return Entry{}
×
UNCOV
1545
        }
×
1546
        be := s.block.Entries[s.entryIdx]
316✔
1547
        val, _, _ := DecodeValue(be.Value)
316✔
1548
        // IMPORTANT: Copy the key since the block may be released later
316✔
1549
        keyCopy := make([]byte, len(be.Key))
316✔
1550
        copy(keyCopy, be.Key)
316✔
1551
        return Entry{
316✔
1552
                Key:   keyCopy,
316✔
1553
                Value: val,
316✔
1554
        }
316✔
1555
}
1556

1557
func (s *sstableRangeSource) close() {
8✔
1558
        if s.block != nil && !s.fromCache {
8✔
UNCOV
1559
                s.block.Release()
×
UNCOV
1560
                s.block = nil
×
UNCOV
1561
        }
×
1562
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc