• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freeeve / tinykvs / 21051114331

16 Jan 2026 12:23AM UTC coverage: 81.3% (+0.1%) from 81.16%
21051114331

push

github

freeeve
Fix gofmt -s and add go.dev/goreportcard badges

2326 of 2861 relevant lines covered (81.3%)

536.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.41
/reader.go
1
package tinykvs
2

3
import "sync"
4

5
// Reader coordinates lookups across memtable and SSTables.
6
type Reader struct {
7
        mu sync.RWMutex
8

9
        memtable   *Memtable
10
        immutables []*Memtable  // Recently flushed, waiting for SSTable write
11
        levels     [][]*SSTable // levels[0] = L0, levels[1] = L1, etc.
12
        cache      *LRUCache
13
        opts       Options
14
}
15

16
// NewReader creates a new reader.
17
func NewReader(memtable *Memtable, levels [][]*SSTable, cache *LRUCache, opts Options) *Reader {
30✔
18
        // Make a deep copy of levels to avoid sharing with Store
30✔
19
        // Store and Reader use different mutexes, so sharing would cause races
30✔
20
        levelsCopy := make([][]*SSTable, len(levels))
30✔
21
        for i, level := range levels {
226✔
22
                levelsCopy[i] = make([]*SSTable, len(level))
196✔
23
                copy(levelsCopy[i], level)
196✔
24
        }
196✔
25
        return &Reader{
30✔
26
                memtable: memtable,
30✔
27
                levels:   levelsCopy,
30✔
28
                cache:    cache,
30✔
29
                opts:     opts,
30✔
30
        }
30✔
31
}
32

33
// Get looks up a key, checking memtable first, then SSTables.
34
func (r *Reader) Get(key []byte) (Value, error) {
1,523✔
35
        r.mu.RLock()
1,523✔
36
        defer r.mu.RUnlock()
1,523✔
37

1,523✔
38
        // 1. Check active memtable
1,523✔
39
        if entry, found := r.memtable.Get(key); found {
1,537✔
40
                if entry.Value.IsTombstone() {
16✔
41
                        return Value{}, ErrKeyNotFound
2✔
42
                }
2✔
43
                return entry.Value, nil
12✔
44
        }
45

46
        // 2. Check immutable memtables (newest first)
47
        for i := len(r.immutables) - 1; i >= 0; i-- {
1,511✔
48
                if entry, found := r.immutables[i].Get(key); found {
4✔
49
                        if entry.Value.IsTombstone() {
3✔
50
                                return Value{}, ErrKeyNotFound
1✔
51
                        }
1✔
52
                        return entry.Value, nil
1✔
53
                }
54
        }
55

56
        // 3. Check SSTables level by level (L0 to Lmax)
57
        for level := 0; level < len(r.levels); level++ {
3,947✔
58
                tables := r.levels[level]
2,440✔
59

2,440✔
60
                if level == 0 {
3,947✔
61
                        // L0: Check all tables (newest first, may have overlapping keys)
1,507✔
62
                        for i := len(tables) - 1; i >= 0; i-- {
2,647✔
63
                                entry, found, err := tables[i].Get(key, r.cache, r.opts.VerifyChecksums)
1,140✔
64
                                if err != nil {
1,140✔
65
                                        return Value{}, err
×
66
                                }
×
67
                                if found {
2,259✔
68
                                        if entry.Value.IsTombstone() {
1,120✔
69
                                                return Value{}, ErrKeyNotFound
1✔
70
                                        }
1✔
71
                                        return entry.Value, nil
1,118✔
72
                                }
73
                        }
74
                } else {
933✔
75
                        // L1+: Tables are sorted and non-overlapping, binary search
933✔
76
                        idx := r.findTableForKey(tables, key)
933✔
77
                        if idx >= 0 {
1,213✔
78
                                entry, found, err := tables[idx].Get(key, r.cache, r.opts.VerifyChecksums)
280✔
79
                                if err != nil {
280✔
80
                                        return Value{}, err
×
81
                                }
×
82
                                if found {
559✔
83
                                        if entry.Value.IsTombstone() {
306✔
84
                                                return Value{}, ErrKeyNotFound
27✔
85
                                        }
27✔
86
                                        return entry.Value, nil
252✔
87
                                }
88
                        }
89
                }
90
        }
91

92
        return Value{}, ErrKeyNotFound
109✔
93
}
94

95
// findTableForKey finds the SSTable that may contain the key (for L1+).
96
// Returns the index of the table, or -1 if not found.
97
func (r *Reader) findTableForKey(tables []*SSTable, key []byte) int {
933✔
98
        lo, hi := 0, len(tables)-1
933✔
99

933✔
100
        for lo <= hi {
1,389✔
101
                mid := (lo + hi) / 2
456✔
102
                minKey := tables[mid].MinKey()
456✔
103
                maxKey := tables[mid].MaxKey()
456✔
104

456✔
105
                // Check if key is in range [MinKey, MaxKey]
456✔
106
                if CompareKeys(key, minKey) >= 0 && CompareKeys(key, maxKey) <= 0 {
736✔
107
                        return mid
280✔
108
                }
280✔
109

110
                if CompareKeys(key, minKey) < 0 {
331✔
111
                        hi = mid - 1
155✔
112
                } else {
176✔
113
                        lo = mid + 1
21✔
114
                }
21✔
115
        }
116

117
        return -1
653✔
118
}
119

120
// SetMemtable updates the active memtable.
121
func (r *Reader) SetMemtable(mt *Memtable) {
50✔
122
        r.mu.Lock()
50✔
123
        defer r.mu.Unlock()
50✔
124
        r.memtable = mt
50✔
125
}
50✔
126

127
// AddImmutable adds an immutable memtable.
128
func (r *Reader) AddImmutable(mt *Memtable) {
52✔
129
        r.mu.Lock()
52✔
130
        defer r.mu.Unlock()
52✔
131
        r.immutables = append(r.immutables, mt)
52✔
132
}
52✔
133

134
// RemoveImmutable removes an immutable memtable after it's been flushed.
135
func (r *Reader) RemoveImmutable(mt *Memtable) {
61✔
136
        r.mu.Lock()
61✔
137
        defer r.mu.Unlock()
61✔
138
        for i, imm := range r.immutables {
123✔
139
                if imm == mt {
112✔
140
                        r.immutables = append(r.immutables[:i], r.immutables[i+1:]...)
50✔
141
                        return
50✔
142
                }
50✔
143
        }
144
}
145

146
// SetLevels updates the SSTable levels.
147
func (r *Reader) SetLevels(levels [][]*SSTable) {
20✔
148
        r.mu.Lock()
20✔
149
        defer r.mu.Unlock()
20✔
150
        r.levels = levels
20✔
151
}
20✔
152

153
// AddSSTable adds an SSTable to a level.
154
func (r *Reader) AddSSTable(level int, sst *SSTable) {
62✔
155
        r.mu.Lock()
62✔
156
        defer r.mu.Unlock()
62✔
157

62✔
158
        // Ensure the level exists
62✔
159
        for len(r.levels) <= level {
68✔
160
                r.levels = append(r.levels, nil)
6✔
161
        }
6✔
162

163
        r.levels[level] = append(r.levels[level], sst)
62✔
164
}
165

166
// GetLevels returns a copy of the current levels.
167
func (r *Reader) GetLevels() [][]*SSTable {
98✔
168
        r.mu.RLock()
98✔
169
        defer r.mu.RUnlock()
98✔
170

98✔
171
        levels := make([][]*SSTable, len(r.levels))
98✔
172
        for i, level := range r.levels {
783✔
173
                levels[i] = make([]*SSTable, len(level))
685✔
174
                copy(levels[i], level)
685✔
175
        }
685✔
176
        return levels
98✔
177
}
178

179
// ScanPrefix iterates over all keys with the given prefix in sorted order.
180
// Keys are deduplicated (newest version wins) and tombstones are skipped.
181
// Return false from the callback to stop iteration early.
182
func (r *Reader) ScanPrefix(prefix []byte, fn func(key []byte, value Value) bool) error {
6✔
183
        r.mu.RLock()
6✔
184
        defer r.mu.RUnlock()
6✔
185

6✔
186
        // Build a prefix scanner with all sources
6✔
187
        scanner := newPrefixScanner(prefix, r.cache, r.opts.VerifyChecksums)
6✔
188

6✔
189
        // Add memtable (highest priority - index 0)
6✔
190
        scanner.addMemtable(r.memtable, 0)
6✔
191

6✔
192
        // Add immutable memtables (newest first)
6✔
193
        for i := len(r.immutables) - 1; i >= 0; i-- {
7✔
194
                scanner.addMemtable(r.immutables[i], len(r.immutables)-i)
1✔
195
        }
1✔
196

197
        // Add SSTable levels
198
        baseIdx := len(r.immutables) + 1
6✔
199
        for level := 0; level < len(r.levels); level++ {
48✔
200
                tables := r.levels[level]
42✔
201
                if level == 0 {
48✔
202
                        // L0: add all tables (newest first)
6✔
203
                        for i := len(tables) - 1; i >= 0; i-- {
7✔
204
                                scanner.addSSTable(tables[i], baseIdx)
1✔
205
                                baseIdx++
1✔
206
                        }
1✔
207
                } else {
36✔
208
                        // L1+: add tables that may contain prefix
36✔
209
                        for _, t := range tables {
37✔
210
                                if hasKeyInRange(prefix, t.MinKey(), t.MaxKey()) {
2✔
211
                                        scanner.addSSTable(t, baseIdx)
1✔
212
                                        baseIdx++
1✔
213
                                }
1✔
214
                        }
215
                }
216
        }
217

218
        scanner.init()
6✔
219

6✔
220
        // Iterate and call callback
6✔
221
        var lastKey []byte
6✔
222
        for scanner.next() {
166✔
223
                entry := scanner.entry()
160✔
224

160✔
225
                // Skip duplicates (newer version already seen)
160✔
226
                if lastKey != nil && CompareKeys(entry.Key, lastKey) == 0 {
210✔
227
                        continue
50✔
228
                }
229
                lastKey = entry.Key
110✔
230

110✔
231
                // Skip tombstones
110✔
232
                if entry.Value.IsTombstone() {
110✔
233
                        continue
×
234
                }
235

236
                // Check prefix still matches
237
                if !hasPrefix(entry.Key, prefix) {
110✔
238
                        break
×
239
                }
240

241
                if !fn(entry.Key, entry.Value) {
111✔
242
                        break
1✔
243
                }
244
        }
245

246
        scanner.close()
6✔
247
        return nil
6✔
248
}
249

250
// hasPrefix returns true if key starts with prefix.
251
func hasPrefix(key, prefix []byte) bool {
276✔
252
        if len(key) < len(prefix) {
277✔
253
                return false
1✔
254
        }
1✔
255
        for i := 0; i < len(prefix); i++ {
1,407✔
256
                if key[i] != prefix[i] {
1,134✔
257
                        return false
2✔
258
                }
2✔
259
        }
260
        return true
273✔
261
}
262

263
// hasKeyInRange returns true if a key with the given prefix might exist in [minKey, maxKey].
264
func hasKeyInRange(prefix, minKey, maxKey []byte) bool {
3✔
265
        // Prefix is before or equal to maxKey AND prefix+\xff... is >= minKey
3✔
266
        // Simplified: prefix <= maxKey (prefix-wise) AND minKey prefix-matches or is < prefix
3✔
267
        if len(maxKey) >= len(prefix) {
6✔
268
                for i := 0; i < len(prefix); i++ {
15✔
269
                        if prefix[i] < maxKey[i] {
12✔
270
                                break // prefix < maxKey prefix, OK
×
271
                        } else if prefix[i] > maxKey[i] {
12✔
272
                                return false // prefix > maxKey, no match possible
×
273
                        }
×
274
                }
275
        } else {
×
276
                // maxKey is shorter than prefix - check if maxKey is a prefix of prefix
×
277
                for i := 0; i < len(maxKey); i++ {
×
278
                        if prefix[i] < maxKey[i] {
×
279
                                break
×
280
                        } else if prefix[i] > maxKey[i] {
×
281
                                return false
×
282
                        }
×
283
                }
284
        }
285

286
        // Check if minKey could have a prefix match
287
        if len(minKey) >= len(prefix) {
6✔
288
                for i := 0; i < len(prefix); i++ {
15✔
289
                        if minKey[i] < prefix[i] {
12✔
290
                                return true // minKey < prefix, might have prefix matches after minKey
×
291
                        } else if minKey[i] > prefix[i] {
12✔
292
                                return false // minKey > prefix+\xff..., no prefix matches
×
293
                        }
×
294
                }
295
                return true // minKey starts with prefix
3✔
296
        }
297
        // minKey is shorter - if minKey < prefix, OK
298
        for i := 0; i < len(minKey); i++ {
×
299
                if minKey[i] < prefix[i] {
×
300
                        return true
×
301
                } else if minKey[i] > prefix[i] {
×
302
                        return false
×
303
                }
×
304
        }
305
        return true // minKey is prefix of prefix
×
306
}
307

308
// prefixScanner merges entries from memtables and SSTables for prefix scanning.
309
type prefixScanner struct {
310
        prefix  []byte
311
        cache   *LRUCache
312
        verify  bool
313
        heap    prefixHeap
314
        current Entry
315
}
316

317
type prefixHeapEntry struct {
318
        entry    Entry
319
        priority int // lower = newer/higher priority
320
        source   prefixSource
321
}
322

323
type prefixSource interface {
324
        next() bool
325
        entry() Entry
326
        close()
327
}
328

329
type prefixHeap []prefixHeapEntry
330

331
func (h prefixHeap) less(i, j int) bool {
487✔
332
        cmp := CompareKeys(h[i].entry.Key, h[j].entry.Key)
487✔
333
        if cmp != 0 {
924✔
334
                return cmp < 0
437✔
335
        }
437✔
336
        return h[i].priority < h[j].priority
50✔
337
}
338

339
func (h *prefixHeap) push(x prefixHeapEntry) {
153✔
340
        *h = append(*h, x)
153✔
341
        h.up(len(*h) - 1)
153✔
342
}
153✔
343

344
func (h *prefixHeap) pop() prefixHeapEntry {
160✔
345
        old := *h
160✔
346
        n := len(old) - 1
160✔
347
        old[0], old[n] = old[n], old[0]
160✔
348
        h.down(0, n)
160✔
349
        x := old[n]
160✔
350
        *h = old[:n]
160✔
351
        return x
160✔
352
}
160✔
353

354
func (h prefixHeap) up(j int) {
153✔
355
        for {
457✔
356
                i := (j - 1) / 2
304✔
357
                if i == j || !h.less(j, i) {
457✔
358
                        break
153✔
359
                }
360
                h[i], h[j] = h[j], h[i]
151✔
361
                j = i
151✔
362
        }
363
}
364

365
func (h prefixHeap) down(i, n int) {
162✔
366
        for {
438✔
367
                j1 := 2*i + 1
276✔
368
                if j1 >= n || j1 < 0 {
425✔
369
                        break
149✔
370
                }
371
                j := j1
127✔
372
                if j2 := j1 + 1; j2 < n && h.less(j2, j1) {
127✔
373
                        j = j2
×
374
                }
×
375
                if !h.less(j, i) {
140✔
376
                        break
13✔
377
                }
378
                h[i], h[j] = h[j], h[i]
114✔
379
                i = j
114✔
380
        }
381
}
382

383
func (h *prefixHeap) init() {
6✔
384
        n := len(*h)
6✔
385
        for i := n/2 - 1; i >= 0; i-- {
8✔
386
                h.down(i, n)
2✔
387
        }
2✔
388
}
389

390
func newPrefixScanner(prefix []byte, cache *LRUCache, verify bool) *prefixScanner {
6✔
391
        return &prefixScanner{
6✔
392
                prefix: prefix,
6✔
393
                cache:  cache,
6✔
394
                verify: verify,
6✔
395
                heap:   make(prefixHeap, 0, 8),
6✔
396
        }
6✔
397
}
6✔
398

399
func (s *prefixScanner) addMemtable(mt *Memtable, priority int) {
7✔
400
        src := &memtablePrefixSource{mt: mt, prefix: s.prefix}
7✔
401
        src.seekToPrefix()
7✔
402
        if src.valid {
13✔
403
                s.heap = append(s.heap, prefixHeapEntry{
6✔
404
                        entry:    src.entry(),
6✔
405
                        priority: priority,
6✔
406
                        source:   src,
6✔
407
                })
6✔
408
        }
6✔
409
}
410

411
func (s *prefixScanner) addSSTable(sst *SSTable, priority int) {
2✔
412
        src := &sstablePrefixSource{
2✔
413
                sst:    sst,
2✔
414
                prefix: s.prefix,
2✔
415
                cache:  s.cache,
2✔
416
                verify: s.verify,
2✔
417
        }
2✔
418
        src.seekToPrefix()
2✔
419
        if src.valid {
4✔
420
                s.heap = append(s.heap, prefixHeapEntry{
2✔
421
                        entry:    src.entry(),
2✔
422
                        priority: priority,
2✔
423
                        source:   src,
2✔
424
                })
2✔
425
        }
2✔
426
}
427

428
func (s *prefixScanner) init() {
6✔
429
        s.heap.init()
6✔
430
}
6✔
431

432
func (s *prefixScanner) next() bool {
165✔
433
        if len(s.heap) == 0 {
170✔
434
                return false
5✔
435
        }
5✔
436

437
        he := s.heap.pop()
160✔
438
        s.current = he.entry
160✔
439

160✔
440
        if he.source.next() {
313✔
441
                s.heap.push(prefixHeapEntry{
153✔
442
                        entry:    he.source.entry(),
153✔
443
                        priority: he.priority,
153✔
444
                        source:   he.source,
153✔
445
                })
153✔
446
        }
153✔
447

448
        return true
160✔
449
}
450

451
func (s *prefixScanner) entry() Entry {
160✔
452
        return s.current
160✔
453
}
160✔
454

455
func (s *prefixScanner) close() {
6✔
456
        for _, he := range s.heap {
7✔
457
                he.source.close()
1✔
458
        }
1✔
459
}
460

461
// memtablePrefixSource wraps a memtable iterator for prefix scanning.
462
type memtablePrefixSource struct {
463
        mt     *Memtable
464
        prefix []byte
465
        iter   *MemtableIterator
466
        valid  bool
467
}
468

469
func (s *memtablePrefixSource) seekToPrefix() {
7✔
470
        s.iter = s.mt.Iterator()
7✔
471
        if s.iter.Seek(s.prefix) {
14✔
472
                s.valid = hasPrefix(s.iter.Key(), s.prefix)
7✔
473
        }
7✔
474
}
475

476
func (s *memtablePrefixSource) next() bool {
60✔
477
        if !s.iter.Next() {
63✔
478
                s.valid = false
3✔
479
                return false
3✔
480
        }
3✔
481
        s.valid = hasPrefix(s.iter.Key(), s.prefix)
57✔
482
        return s.valid
57✔
483
}
484

485
func (s *memtablePrefixSource) entry() Entry {
61✔
486
        return s.iter.Entry()
61✔
487
}
61✔
488

489
func (s *memtablePrefixSource) close() {
1✔
490
        if s.iter != nil {
2✔
491
                s.iter.Close()
1✔
492
        }
1✔
493
}
494

495
// sstablePrefixSource wraps an SSTable for prefix scanning.
496
type sstablePrefixSource struct {
497
        sst      *SSTable
498
        prefix   []byte
499
        cache    *LRUCache
500
        verify   bool
501
        blockIdx int
502
        entryIdx int
503
        block    *Block
504
        valid    bool
505
}
506

507
func (s *sstablePrefixSource) seekToPrefix() {
2✔
508
        // Check if SSTable might contain keys with this prefix
2✔
509
        if !hasKeyInRange(s.prefix, s.sst.Index.MinKey, s.sst.Index.MaxKey) {
2✔
510
                s.valid = false
×
511
                return
×
512
        }
×
513

514
        // Find starting block using index
515
        s.blockIdx = s.sst.Index.Search(s.prefix)
2✔
516
        if s.blockIdx < 0 {
4✔
517
                // Prefix is before all keys - start at block 0 if minKey has prefix
2✔
518
                if hasPrefix(s.sst.Index.MinKey, s.prefix) {
4✔
519
                        s.blockIdx = 0
2✔
520
                } else {
2✔
521
                        s.valid = false
×
522
                        return
×
523
                }
×
524
        }
525

526
        // Load the block
527
        if err := s.loadBlock(); err != nil {
2✔
528
                s.valid = false
×
529
                return
×
530
        }
×
531

532
        // Find first entry >= prefix in block
533
        for s.entryIdx = 0; s.entryIdx < len(s.block.Entries); s.entryIdx++ {
4✔
534
                if CompareKeys(s.block.Entries[s.entryIdx].Key, s.prefix) >= 0 {
4✔
535
                        s.valid = hasPrefix(s.block.Entries[s.entryIdx].Key, s.prefix)
2✔
536
                        return
2✔
537
                }
2✔
538
        }
539

540
        // Not found in this block, try next
541
        s.blockIdx++
×
542
        s.entryIdx = -1
×
543
        s.valid = s.next()
×
544
}
545

546
func (s *sstablePrefixSource) loadBlock() error {
2✔
547
        if s.blockIdx >= len(s.sst.Index.Entries) {
2✔
548
                return ErrKeyNotFound
×
549
        }
×
550

551
        ie := s.sst.Index.Entries[s.blockIdx]
2✔
552
        cacheKey := CacheKey{FileID: s.sst.ID, BlockOffset: ie.BlockOffset}
2✔
553

2✔
554
        // Try cache first
2✔
555
        if s.cache != nil {
4✔
556
                if cached, found := s.cache.Get(cacheKey); found {
3✔
557
                        s.block = cached
1✔
558
                        return nil
1✔
559
                }
1✔
560
        }
561

562
        // Read from disk
563
        blockData := make([]byte, ie.BlockSize)
1✔
564
        if _, err := s.sst.file.ReadAt(blockData, int64(ie.BlockOffset)); err != nil {
1✔
565
                return err
×
566
        }
×
567

568
        block, err := DecodeBlock(blockData, s.verify)
1✔
569
        if err != nil {
1✔
570
                return err
×
571
        }
×
572

573
        if s.cache != nil {
2✔
574
                s.cache.Put(cacheKey, block)
1✔
575
        }
1✔
576
        s.block = block
1✔
577
        return nil
1✔
578
}
579

580
func (s *sstablePrefixSource) next() bool {
100✔
581
        s.entryIdx++
100✔
582

100✔
583
        // Try next entry in current block
100✔
584
        if s.block != nil && s.entryIdx < len(s.block.Entries) {
198✔
585
                s.valid = hasPrefix(s.block.Entries[s.entryIdx].Key, s.prefix)
98✔
586
                return s.valid
98✔
587
        }
98✔
588

589
        // Move to next block
590
        s.blockIdx++
2✔
591
        s.entryIdx = 0
2✔
592

2✔
593
        if s.blockIdx >= len(s.sst.Index.Entries) {
4✔
594
                s.valid = false
2✔
595
                return false
2✔
596
        }
2✔
597

598
        if err := s.loadBlock(); err != nil {
×
599
                s.valid = false
×
600
                return false
×
601
        }
×
602

603
        if len(s.block.Entries) == 0 {
×
604
                s.valid = false
×
605
                return false
×
606
        }
×
607

608
        s.valid = hasPrefix(s.block.Entries[0].Key, s.prefix)
×
609
        return s.valid
×
610
}
611

612
func (s *sstablePrefixSource) entry() Entry {
100✔
613
        if !s.valid || s.block == nil || s.entryIdx >= len(s.block.Entries) {
100✔
614
                return Entry{}
×
615
        }
×
616
        be := s.block.Entries[s.entryIdx]
100✔
617
        // Decode value from block entry
100✔
618
        val, _, _ := DecodeValue(be.Value)
100✔
619
        return Entry{
100✔
620
                Key:   be.Key,
100✔
621
                Value: val,
100✔
622
        }
100✔
623
}
624

625
func (s *sstablePrefixSource) close() {
×
626
        // Nothing to close
×
627
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc