• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / badger / 4229068887

21 Feb 2023 04:16AM UTC coverage: 42.43% (-18.8%) from 61.239%
4229068887

Pull #1866

Aman Mangal
combine memtables before flushing to L0
Pull Request #1866: combine memtables before flushing to L0

49 of 49 new or added lines in 1 file covered. (100.0%)

5919 of 13950 relevant lines covered (42.43%)

437472.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.07
/iterator.go
1
/*
2
 * Copyright 2017 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package badger
18

19
import (
20
        "bytes"
21
        "fmt"
22
        "hash/crc32"
23
        "math"
24
        "sort"
25
        "sync"
26
        "sync/atomic"
27
        "time"
28

29
        "github.com/dgraph-io/badger/v3/table"
30
        "github.com/dgraph-io/badger/v3/y"
31
        "github.com/dgraph-io/ristretto/z"
32
)
33

34
type prefetchStatus uint8
35

36
const (
37
        prefetched prefetchStatus = iota + 1
38
)
39

40
// Item is returned during iteration. Both the Key() and Value() output is only valid until
41
// iterator.Next() is called.
42
type Item struct {
43
        key       []byte
44
        vptr      []byte
45
        val       []byte
46
        version   uint64
47
        expiresAt uint64
48

49
        slice *y.Slice // Used only during prefetching.
50
        next  *Item
51
        txn   *Txn
52

53
        err      error
54
        wg       sync.WaitGroup
55
        status   prefetchStatus
56
        meta     byte // We need to store meta to know about bitValuePointer.
57
        userMeta byte
58
}
59

60
// String returns a string representation of Item
61
func (item *Item) String() string {
×
62
        return fmt.Sprintf("key=%q, version=%d, meta=%x", item.Key(), item.Version(), item.meta)
×
63
}
×
64

65
// Key returns the key.
66
//
67
// Key is only valid as long as item is valid, or transaction is valid.  If you need to use it
68
// outside its validity, please use KeyCopy.
69
func (item *Item) Key() []byte {
123,413,488✔
70
        return item.key
123,413,488✔
71
}
123,413,488✔
72

73
// KeyCopy returns a copy of the key of the item, writing it to dst slice.
74
// If nil is passed, or capacity of dst isn't sufficient, a new slice would be allocated and
75
// returned.
76
func (item *Item) KeyCopy(dst []byte) []byte {
13,000,000✔
77
        return y.SafeCopy(dst, item.key)
13,000,000✔
78
}
13,000,000✔
79

80
// Version returns the commit timestamp of the item.
81
func (item *Item) Version() uint64 {
13,000,000✔
82
        return item.version
13,000,000✔
83
}
13,000,000✔
84

85
// Value retrieves the value of the item from the value log.
86
//
87
// This method must be called within a transaction. Calling it outside a
88
// transaction is considered undefined behavior. If an iterator is being used,
89
// then Item.Value() is defined in the current iteration only, because items are
90
// reused.
91
//
92
// If you need to use a value outside a transaction, please use Item.ValueCopy
93
// instead, or copy it yourself. Value might change once discard or commit is called.
94
// Use ValueCopy if you want to do a Set after Get.
95
func (item *Item) Value(fn func(val []byte) error) error {
14,324,568✔
96
        item.wg.Wait()
14,324,568✔
97
        if item.status == prefetched {
15,049,008✔
98
                if item.err == nil && fn != nil {
1,448,880✔
99
                        if err := fn(item.val); err != nil {
724,440✔
100
                                return err
×
101
                        }
×
102
                }
103
                return item.err
724,440✔
104
        }
105
        buf, cb, err := item.yieldItemValue()
13,600,128✔
106
        defer runCallback(cb)
13,600,128✔
107
        if err != nil {
13,600,128✔
108
                return err
×
109
        }
×
110
        if fn != nil {
27,200,256✔
111
                return fn(buf)
13,600,128✔
112
        }
13,600,128✔
113
        return nil
×
114
}
115

116
// ValueCopy returns a copy of the value of the item from the value log, writing it to dst slice.
117
// If nil is passed, or capacity of dst isn't sufficient, a new slice would be allocated and
118
// returned. Tip: It might make sense to reuse the returned slice as dst argument for the next call.
119
//
120
// This function is useful in long running iterate/update transactions to avoid a write deadlock.
121
// See Github issue: https://github.com/dgraph-io/badger/issues/315
122
func (item *Item) ValueCopy(dst []byte) ([]byte, error) {
600,064✔
123
        item.wg.Wait()
600,064✔
124
        if item.status == prefetched {
600,064✔
125
                return y.SafeCopy(dst, item.val), item.err
×
126
        }
×
127
        buf, cb, err := item.yieldItemValue()
600,064✔
128
        defer runCallback(cb)
600,064✔
129
        return y.SafeCopy(dst, buf), err
600,064✔
130
}
131

132
func (item *Item) hasValue() bool {
18,918,832✔
133
        if item.meta == 0 && item.vptr == nil {
18,918,832✔
134
                // key not found
×
135
                return false
×
136
        }
×
137
        return true
18,918,832✔
138
}
139

140
// IsDeletedOrExpired returns true if item contains deleted or expired value.
141
func (item *Item) IsDeletedOrExpired() bool {
13,000,000✔
142
        return isDeletedOrExpired(item.meta, item.expiresAt)
13,000,000✔
143
}
13,000,000✔
144

145
// DiscardEarlierVersions returns whether the item was created with the
146
// option to discard earlier versions of a key when multiple are available.
147
func (item *Item) DiscardEarlierVersions() bool {
×
148
        return item.meta&bitDiscardEarlierVersions > 0
×
149
}
×
150

151
func (item *Item) yieldItemValue() ([]byte, func(), error) {
18,918,832✔
152
        key := item.Key() // No need to copy.
18,918,832✔
153
        if !item.hasValue() {
18,918,832✔
154
                return nil, nil, nil
×
155
        }
×
156

157
        if item.slice == nil {
19,318,960✔
158
                item.slice = new(y.Slice)
400,128✔
159
        }
400,128✔
160

161
        if (item.meta & bitValuePointer) == 0 {
37,837,663✔
162
                val := item.slice.Resize(len(item.vptr))
18,918,831✔
163
                copy(val, item.vptr)
18,918,831✔
164
                return val, nil, nil
18,918,831✔
165
        }
18,918,831✔
166

167
        var vp valuePointer
1✔
168
        vp.Decode(item.vptr)
1✔
169
        db := item.txn.db
1✔
170
        result, cb, err := db.vlog.Read(vp, item.slice)
1✔
171
        if err != nil {
1✔
172
                db.opt.Logger.Errorf("Unable to read: Key: %v, Version : %v, meta: %v, userMeta: %v"+
×
173
                        " Error: %v", key, item.version, item.meta, item.userMeta, err)
×
174
                var txn *Txn
×
175
                if db.opt.managedTxns {
×
176
                        txn = db.NewTransactionAt(math.MaxUint64, false)
×
177
                } else {
×
178
                        txn = db.NewTransaction(false)
×
179
                }
×
180
                defer txn.Discard()
×
181

×
182
                iopt := DefaultIteratorOptions
×
183
                iopt.AllVersions = true
×
184
                iopt.InternalAccess = true
×
185
                iopt.PrefetchValues = false
×
186

×
187
                it := txn.NewKeyIterator(item.Key(), iopt)
×
188
                defer it.Close()
×
189
                for it.Rewind(); it.Valid(); it.Next() {
×
190
                        item := it.Item()
×
191
                        var vp valuePointer
×
192
                        if item.meta&bitValuePointer > 0 {
×
193
                                vp.Decode(item.vptr)
×
194
                        }
×
195
                        db.opt.Logger.Errorf("Key: %v, Version : %v, meta: %v, userMeta: %v valuePointer: %+v",
×
196
                                item.Key(), item.version, item.meta, item.userMeta, vp)
×
197
                }
198
        }
199
        // Don't return error if we cannot read the value. Just log the error.
200
        return result, cb, nil
1✔
201
}
202

203
func runCallback(cb func()) {
18,918,832✔
204
        if cb != nil {
18,918,833✔
205
                cb()
1✔
206
        }
1✔
207
}
208

209
func (item *Item) prefetchValue() {
4,718,640✔
210
        val, cb, err := item.yieldItemValue()
4,718,640✔
211
        defer runCallback(cb)
4,718,640✔
212

4,718,640✔
213
        item.err = err
4,718,640✔
214
        item.status = prefetched
4,718,640✔
215
        if val == nil {
4,718,640✔
216
                return
×
217
        }
×
218
        buf := item.slice.Resize(len(val))
4,718,640✔
219
        copy(buf, val)
4,718,640✔
220
        item.val = buf
4,718,640✔
221
}
222

223
// EstimatedSize returns the approximate size of the key-value pair.
224
//
225
// This can be called while iterating through a store to quickly estimate the
226
// size of a range of key-value pairs (without fetching the corresponding
227
// values).
228
func (item *Item) EstimatedSize() int64 {
×
229
        if !item.hasValue() {
×
230
                return 0
×
231
        }
×
232
        if (item.meta & bitValuePointer) == 0 {
×
233
                return int64(len(item.key) + len(item.vptr))
×
234
        }
×
235
        var vp valuePointer
×
236
        vp.Decode(item.vptr)
×
237
        return int64(vp.Len) // includes key length.
×
238
}
239

240
// KeySize returns the size of the key.
241
// Exact size of the key is key + 8 bytes of timestamp
242
func (item *Item) KeySize() int64 {
×
243
        return int64(len(item.key))
×
244
}
×
245

246
// ValueSize returns the approximate size of the value.
247
//
248
// This can be called to quickly estimate the size of a value without fetching
249
// it.
250
func (item *Item) ValueSize() int64 {
×
251
        if !item.hasValue() {
×
252
                return 0
×
253
        }
×
254
        if (item.meta & bitValuePointer) == 0 {
×
255
                return int64(len(item.vptr))
×
256
        }
×
257
        var vp valuePointer
×
258
        vp.Decode(item.vptr)
×
259

×
260
        klen := int64(len(item.key) + 8) // 8 bytes for timestamp.
×
261
        // 6 bytes are for the approximate length of the header. Since header is encoded in varint, we
×
262
        // cannot find the exact length of header without fetching it.
×
263
        return int64(vp.Len) - klen - 6 - crc32.Size
×
264
}
265

266
// UserMeta returns the userMeta set by the user. Typically, this byte, optionally set by the user
267
// is used to interpret the value.
268
func (item *Item) UserMeta() byte {
13,000,000✔
269
        return item.userMeta
13,000,000✔
270
}
13,000,000✔
271

272
// ExpiresAt returns a Unix time value indicating when the item will be
273
// considered expired. 0 indicates that the item will never expire.
274
func (item *Item) ExpiresAt() uint64 {
13,000,000✔
275
        return item.expiresAt
13,000,000✔
276
}
13,000,000✔
277

278
// TODO: Switch this to use linked list container in Go.
279
type list struct {
280
        head *Item
281
        tail *Item
282
}
283

284
func (l *list) push(i *Item) {
31,402,952✔
285
        i.next = nil
31,402,952✔
286
        if l.tail == nil {
58,165,776✔
287
                l.head = i
26,762,824✔
288
                l.tail = i
26,762,824✔
289
                return
26,762,824✔
290
        }
26,762,824✔
291
        l.tail.next = i
4,640,128✔
292
        l.tail = i
4,640,128✔
293
}
294

295
func (l *list) pop() *Item {
35,520,172✔
296
        if l.head == nil {
39,637,392✔
297
                return nil
4,117,220✔
298
        }
4,117,220✔
299
        i := l.head
31,402,952✔
300
        if l.head == l.tail {
58,165,776✔
301
                l.tail = nil
26,762,824✔
302
                l.head = nil
26,762,824✔
303
        } else {
31,402,952✔
304
                l.head = i.next
4,640,128✔
305
        }
4,640,128✔
306
        i.next = nil
31,402,952✔
307
        return i
31,402,952✔
308
}
309

310
// IteratorOptions is used to set options when iterating over Badger key-value
311
// stores.
312
//
313
// This package provides DefaultIteratorOptions which contains options that
314
// should work for most applications. Consider using that as a starting point
315
// before customizing it for your own needs.
316
type IteratorOptions struct {
317
        // PrefetchSize is the number of KV pairs to prefetch while iterating.
318
        // Valid only if PrefetchValues is true.
319
        PrefetchSize int
320
        // PrefetchValues Indicates whether we should prefetch values during
321
        // iteration and store them.
322
        PrefetchValues bool
323
        Reverse        bool // Direction of iteration. False is forward, true is backward.
324
        AllVersions    bool // Fetch all valid versions of the same key.
325
        InternalAccess bool // Used to allow internal access to badger keys.
326

327
        // The following option is used to narrow down the SSTables that iterator
328
        // picks up. If Prefix is specified, only tables which could have this
329
        // prefix are picked based on their range of keys.
330
        prefixIsKey bool   // If set, use the prefix for bloom filter lookup.
331
        Prefix      []byte // Only iterate over this given prefix.
332
        SinceTs     uint64 // Only read data that has version > SinceTs.
333
}
334

335
func (opt *IteratorOptions) compareToPrefix(key []byte) int {
×
336
        // We should compare key without timestamp. For example key - a[TS] might be > "aa" prefix.
×
337
        key = y.ParseKey(key)
×
338
        if len(key) > len(opt.Prefix) {
×
339
                key = key[:len(opt.Prefix)]
×
340
        }
×
341
        return bytes.Compare(key, opt.Prefix)
×
342
}
343

344
func (opt *IteratorOptions) pickTable(t table.TableInterface) bool {
44✔
345
        // Ignore this table if its max version is less than the sinceTs.
44✔
346
        if t.MaxVersion() < opt.SinceTs {
44✔
347
                return false
×
348
        }
×
349
        if len(opt.Prefix) == 0 {
88✔
350
                return true
44✔
351
        }
44✔
352
        if opt.compareToPrefix(t.Smallest()) > 0 {
×
353
                return false
×
354
        }
×
355
        if opt.compareToPrefix(t.Biggest()) < 0 {
×
356
                return false
×
357
        }
×
358
        // Bloom filter lookup would only work if opt.Prefix does NOT have the read
359
        // timestamp as part of the key.
360
        if opt.prefixIsKey && t.DoesNotHave(y.Hash(opt.Prefix)) {
×
361
                return false
×
362
        }
×
363
        return true
×
364
}
365

366
// pickTables picks the necessary table for the iterator. This function also assumes
367
// that the tables are sorted in the right order.
368
func (opt *IteratorOptions) pickTables(all []*table.Table) []*table.Table {
241,308✔
369
        filterTables := func(tables []*table.Table) []*table.Table {
241,656✔
370
                if opt.SinceTs > 0 {
348✔
371
                        tmp := tables[:0]
×
372
                        for _, t := range tables {
×
373
                                if t.MaxVersion() < opt.SinceTs {
×
374
                                        continue
×
375
                                }
376
                                tmp = append(tmp, t)
×
377
                        }
378
                        tables = tmp
×
379
                }
380
                return tables
348✔
381
        }
382

383
        if len(opt.Prefix) == 0 {
241,656✔
384
                out := make([]*table.Table, len(all))
348✔
385
                copy(out, all)
348✔
386
                return filterTables(out)
348✔
387
        }
348✔
388
        sIdx := sort.Search(len(all), func(i int) bool {
240,960✔
389
                // table.Biggest >= opt.prefix
×
390
                // if opt.Prefix < table.Biggest, then surely it is not in any of the preceding tables.
×
391
                return opt.compareToPrefix(all[i].Biggest()) >= 0
×
392
        })
×
393
        if sIdx == len(all) {
481,920✔
394
                // Not found.
240,960✔
395
                return []*table.Table{}
240,960✔
396
        }
240,960✔
397

398
        filtered := all[sIdx:]
×
399
        if !opt.prefixIsKey {
×
400
                eIdx := sort.Search(len(filtered), func(i int) bool {
×
401
                        return opt.compareToPrefix(filtered[i].Smallest()) > 0
×
402
                })
×
403
                out := make([]*table.Table, len(filtered[:eIdx]))
×
404
                copy(out, filtered[:eIdx])
×
405
                return filterTables(out)
×
406
        }
407

408
        // opt.prefixIsKey == true. This code is optimizing for opt.prefixIsKey part.
409
        var out []*table.Table
×
410
        hash := y.Hash(opt.Prefix)
×
411
        for _, t := range filtered {
×
412
                // When we encounter the first table whose smallest key is higher than opt.Prefix, we can
×
413
                // stop. This is an IMPORTANT optimization, just considering how often we call
×
414
                // NewKeyIterator.
×
415
                if opt.compareToPrefix(t.Smallest()) > 0 {
×
416
                        // if table.Smallest > opt.Prefix, then this and all tables after this can be ignored.
×
417
                        break
×
418
                }
419
                // opt.Prefix is actually the key. So, we can run bloom filter checks
420
                // as well.
421
                if t.DoesNotHave(hash) {
×
422
                        continue
×
423
                }
424
                out = append(out, t)
×
425
        }
426
        return filterTables(out)
×
427
}
428

429
// DefaultIteratorOptions contains default options when iterating over Badger key-value stores.
430
var DefaultIteratorOptions = IteratorOptions{
431
        PrefetchValues: true,
432
        PrefetchSize:   100,
433
        Reverse:        false,
434
        AllVersions:    false,
435
}
436

437
// Iterator helps iterating over the KV pairs in a lexicographically sorted order.
438
type Iterator struct {
439
        iitr   y.Iterator
440
        txn    *Txn
441
        readTs uint64
442

443
        opt   IteratorOptions
444
        item  *Item
445
        data  list
446
        waste list
447

448
        lastKey []byte // Used to skip over multiple versions of the same key.
449

450
        closed  bool
451
        scanned int // Used to estimate the size of data scanned by iterator.
452

453
        // ThreadId is an optional value that can be set to identify which goroutine created
454
        // the iterator. It can be used, for example, to uniquely identify each of the
455
        // iterators created by the stream interface
456
        ThreadId int
457

458
        Alloc *z.Allocator
459
}
460

461
// NewIterator returns a new iterator. Depending upon the options, either only keys, or both
462
// key-value pairs would be fetched. The keys are returned in lexicographically sorted order.
463
// Using prefetch is recommended if you're doing a long running iteration, for performance.
464
//
465
// Multiple Iterators:
466
// For a read-only txn, multiple iterators can be running simultaneously.  However, for a read-write
467
// txn, iterators have the nuance of being a snapshot of the writes for the transaction at the time
468
// iterator was created. If writes are performed after an iterator is created, then that iterator
469
// will not be able to see those writes. Only writes performed before an iterator was created can be
470
// viewed.
471
func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
40,218✔
472
        if txn.discarded {
40,218✔
473
                panic("Transaction has already been discarded")
×
474
        }
475
        if txn.db.IsClosed() {
40,218✔
476
                panic(ErrDBClosed.Error())
×
477
        }
478

479
        // Keep track of the number of active iterators.
480
        atomic.AddInt32(&txn.numIterators, 1)
40,218✔
481

40,218✔
482
        // TODO: If Prefix is set, only pick those memtables which have keys with
40,218✔
483
        // the prefix.
40,218✔
484
        tables, decr := txn.db.getMemTables()
40,218✔
485
        defer decr()
40,218✔
486
        txn.db.vlog.incrIteratorCount()
40,218✔
487
        var iters []y.Iterator
40,218✔
488
        if itr := txn.newPendingWritesIterator(opt.Reverse); itr != nil {
40,218✔
489
                iters = append(iters, itr)
×
490
        }
×
491
        for i := 0; i < len(tables); i++ {
80,436✔
492
                iters = append(iters, tables[i].sl.NewUniIterator(opt.Reverse))
40,218✔
493
        }
40,218✔
494
        iters = txn.db.lc.appendIterators(iters, &opt) // This will increment references.
40,218✔
495
        res := &Iterator{
40,218✔
496
                txn:    txn,
40,218✔
497
                iitr:   table.NewMergeIterator(iters, opt.Reverse),
40,218✔
498
                opt:    opt,
40,218✔
499
                readTs: txn.readTs,
40,218✔
500
        }
40,218✔
501
        return res
40,218✔
502
}
503

504
// NewKeyIterator is just like NewIterator, but allows the user to iterate over all versions of a
505
// single key. Internally, it sets the Prefix option in provided opt, and uses that prefix to
506
// additionally run bloom filter lookups before picking tables from the LSM tree.
507
func (txn *Txn) NewKeyIterator(key []byte, opt IteratorOptions) *Iterator {
40,000✔
508
        if len(opt.Prefix) > 0 {
40,000✔
509
                panic("opt.Prefix should be nil for NewKeyIterator.")
×
510
        }
511
        opt.Prefix = key // This key must be without the timestamp.
40,000✔
512
        opt.prefixIsKey = true
40,000✔
513
        opt.AllVersions = true
40,000✔
514
        return txn.NewIterator(opt)
40,000✔
515
}
516

517
func (it *Iterator) newItem() *Item {
17,718,728✔
518
        item := it.waste.pop()
17,718,728✔
519
        if item == nil {
21,715,264✔
520
                item = &Item{slice: new(y.Slice), txn: it.txn}
3,996,536✔
521
        }
3,996,536✔
522
        return item
17,718,728✔
523
}
524

525
// Item returns pointer to the current key-value pair.
526
// This item is only valid until it.Next() gets called.
527
func (it *Iterator) Item() *Item {
39,724,484✔
528
        tx := it.txn
39,724,484✔
529
        tx.addReadKey(it.item.Key())
39,724,484✔
530
        return it.item
39,724,484✔
531
}
39,724,484✔
532

533
// Valid returns false when iteration is done.
534
func (it *Iterator) Valid() bool {
39,764,656✔
535
        if it.item == nil {
39,764,688✔
536
                return false
32✔
537
        }
32✔
538
        if it.opt.prefixIsKey {
39,844,620✔
539
                return bytes.Equal(it.item.key, it.opt.Prefix)
79,996✔
540
        }
79,996✔
541
        return bytes.HasPrefix(it.item.key, it.opt.Prefix)
39,684,628✔
542
}
543

544
// ValidForPrefix returns false when iteration is done
545
// or when the current key is not prefixed by the specified prefix.
546
func (it *Iterator) ValidForPrefix(prefix []byte) bool {
×
547
        return it.Valid() && bytes.HasPrefix(it.item.key, prefix)
×
548
}
×
549

550
// Close would close the iterator. It is important to call this when you're done with iteration.
551
func (it *Iterator) Close() {
40,224✔
552
        if it.closed {
40,230✔
553
                return
6✔
554
        }
6✔
555
        it.closed = true
40,218✔
556
        if it.iitr == nil {
40,218✔
557
                atomic.AddInt32(&it.txn.numIterators, -1)
×
558
                return
×
559
        }
×
560

561
        it.iitr.Close()
40,218✔
562
        // It is important to wait for the fill goroutines to finish. Otherwise, we might leave zombie
40,218✔
563
        // goroutines behind, which are waiting to acquire file read locks after DB has been closed.
40,218✔
564
        waitFor := func(l list) {
120,654✔
565
                item := l.pop()
80,436✔
566
                for item != nil {
4,036,788✔
567
                        item.wg.Wait()
3,956,352✔
568
                        item = l.pop()
3,956,352✔
569
                }
3,956,352✔
570
        }
571
        waitFor(it.waste)
40,218✔
572
        waitFor(it.data)
40,218✔
573

40,218✔
574
        // TODO: We could handle this error.
40,218✔
575
        _ = it.txn.db.vlog.decrIteratorCount()
40,218✔
576
        atomic.AddInt32(&it.txn.numIterators, -1)
40,218✔
577
}
578

579
// Next would advance the iterator by one. Always check it.Valid() after a Next()
580
// to ensure you have access to a valid it.Item().
581
func (it *Iterator) Next() {
13,724,440✔
582
        if it.iitr == nil {
13,724,440✔
583
                return
×
584
        }
×
585
        // Reuse current item
586
        it.item.wg.Wait() // Just cleaner to wait before pushing to avoid doing ref counting.
13,724,440✔
587
        it.scanned += len(it.item.key) + len(it.item.val) + len(it.item.vptr) + 2
13,724,440✔
588
        it.waste.push(it.item)
13,724,440✔
589

13,724,440✔
590
        // Set next item to current
13,724,440✔
591
        it.item = it.data.pop()
13,724,440✔
592
        for it.iitr.Valid() {
27,446,632✔
593
                if it.parseItem() {
27,444,384✔
594
                        // parseItem calls one extra next.
13,722,192✔
595
                        // This is used to deal with the complexity of reverse iteration.
13,722,192✔
596
                        break
13,722,192✔
597
                }
598
        }
599
}
600

601
func isDeletedOrExpired(meta byte, expiresAt uint64) bool {
14,298,968✔
602
        if meta&bitDelete > 0 {
14,498,968✔
603
                return true
200,000✔
604
        }
200,000✔
605
        if expiresAt == 0 {
28,197,936✔
606
                return false
14,098,968✔
607
        }
14,098,968✔
608
        return expiresAt <= uint64(time.Now().Unix())
×
609
}
610

611
// parseItem is a complex function because it needs to handle both forward and reverse iteration
612
// implementation. We store keys such that their versions are sorted in descending order. This makes
613
// forward iteration efficient, but revese iteration complicated. This tradeoff is better because
614
// forward iteration is more common than reverse. It returns true, if either the iterator is invalid
615
// or it has pushed an item into it.data list, else it returns false.
616
//
617
// This function advances the iterator.
618
func (it *Iterator) parseItem() bool {
17,718,728✔
619
        mi := it.iitr
17,718,728✔
620
        key := mi.Key()
17,718,728✔
621

17,718,728✔
622
        setItem := func(item *Item) {
35,437,456✔
623
                if it.item == nil {
17,758,944✔
624
                        it.item = item
40,216✔
625
                } else {
17,718,728✔
626
                        it.data.push(item)
17,678,512✔
627
                }
17,678,512✔
628
        }
629

630
        isInternalKey := bytes.HasPrefix(key, badgerPrefix)
17,718,728✔
631
        // Skip badger keys.
17,718,728✔
632
        if !it.opt.InternalAccess && isInternalKey {
17,718,728✔
633
                mi.Next()
×
634
                return false
×
635
        }
×
636

637
        // Skip any versions which are beyond the readTs.
638
        version := y.ParseTs(key)
17,718,728✔
639
        // Ignore everything that is above the readTs and below or at the sinceTs.
17,718,728✔
640
        if version > it.readTs || (it.opt.SinceTs > 0 && version <= it.opt.SinceTs) {
17,718,728✔
641
                mi.Next()
×
642
                return false
×
643
        }
×
644

645
        // Skip banned keys only if it does not have badger internal prefix.
646
        if !isInternalKey && it.txn.db.isBanned(key) != nil {
17,718,728✔
647
                mi.Next()
×
648
                return false
×
649
        }
×
650

651
        if it.opt.AllVersions {
34,738,616✔
652
                // Return deleted or expired values also, otherwise user can't figure out
17,019,888✔
653
                // whether the key was deleted.
17,019,888✔
654
                item := it.newItem()
17,019,888✔
655
                it.fill(item)
17,019,888✔
656
                setItem(item)
17,019,888✔
657
                mi.Next()
17,019,888✔
658
                return true
17,019,888✔
659
        }
17,019,888✔
660

661
        // If iterating in forward direction, then just checking the last key against current key would
662
        // be sufficient.
663
        if !it.opt.Reverse {
1,397,680✔
664
                if y.SameKey(it.lastKey, key) {
698,840✔
665
                        mi.Next()
×
666
                        return false
×
667
                }
×
668
                // Only track in forward direction.
669
                // We should update lastKey as soon as we find a different key in our snapshot.
670
                // Consider keys: a 5, b 7 (del), b 5. When iterating, lastKey = a.
671
                // Then we see b 7, which is deleted. If we don't store lastKey = b, we'll then return b 5,
672
                // which is wrong. Therefore, update lastKey here.
673
                it.lastKey = y.SafeCopy(it.lastKey, mi.Key())
698,840✔
674
        }
675

676
FILL:
677
        // If deleted, advance and return.
678
        vs := mi.Value()
698,840✔
679
        if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
698,840✔
680
                mi.Next()
×
681
                return false
×
682
        }
×
683

684
        item := it.newItem()
698,840✔
685
        it.fill(item)
698,840✔
686
        // fill item based on current cursor position. All Next calls have returned, so reaching here
698,840✔
687
        // means no Next was called.
698,840✔
688

698,840✔
689
        mi.Next()                           // Advance but no fill item yet.
698,840✔
690
        if !it.opt.Reverse || !mi.Valid() { // Forward direction, or invalid.
1,397,680✔
691
                setItem(item)
698,840✔
692
                return true
698,840✔
693
        }
698,840✔
694

695
        // Reverse direction.
696
        nextTs := y.ParseTs(mi.Key())
×
697
        mik := y.ParseKey(mi.Key())
×
698
        if nextTs <= it.readTs && bytes.Equal(mik, item.key) {
×
699
                // This is a valid potential candidate.
×
700
                goto FILL
×
701
        }
702
        // Ignore the next candidate. Return the current one.
703
        setItem(item)
×
704
        return true
×
705
}
706

707
func (it *Iterator) fill(item *Item) {
17,718,728✔
708
        vs := it.iitr.Value()
17,718,728✔
709
        item.meta = vs.Meta
17,718,728✔
710
        item.userMeta = vs.UserMeta
17,718,728✔
711
        item.expiresAt = vs.ExpiresAt
17,718,728✔
712

17,718,728✔
713
        item.version = y.ParseTs(it.iitr.Key())
17,718,728✔
714
        item.key = y.SafeCopy(item.key, y.ParseKey(it.iitr.Key()))
17,718,728✔
715

17,718,728✔
716
        item.vptr = y.SafeCopy(item.vptr, vs.Value)
17,718,728✔
717
        item.val = nil
17,718,728✔
718
        if it.opt.PrefetchValues {
22,437,368✔
719
                item.wg.Add(1)
4,718,640✔
720
                go func() {
9,437,280✔
721
                        // FIXME we are not handling errors here.
4,718,640✔
722
                        item.prefetchValue()
4,718,640✔
723
                        item.wg.Done()
4,718,640✔
724
                }()
4,718,640✔
725
        }
726
}
727

728
func (it *Iterator) prefetch() {
40,216✔
729
        prefetchSize := 2
40,216✔
730
        if it.opt.PrefetchValues && it.opt.PrefetchSize > 1 {
80,386✔
731
                prefetchSize = it.opt.PrefetchSize
40,170✔
732
        }
40,170✔
733

734
        i := it.iitr
40,216✔
735
        var count int
40,216✔
736
        it.item = nil
40,216✔
737
        for i.Valid() {
4,036,752✔
738
                if !it.parseItem() {
3,996,536✔
739
                        continue
×
740
                }
741
                count++
3,996,536✔
742
                if count == prefetchSize {
4,036,348✔
743
                        break
39,812✔
744
                }
745
        }
746
}
747

748
// Seek would seek to the provided key if present. If absent, it would seek to the next
749
// smallest key greater than the provided key if iterating in the forward direction.
750
// Behavior would be reversed if iterating backwards.
751
func (it *Iterator) Seek(key []byte) {
40,216✔
752
        if it.iitr == nil {
40,216✔
753
                return
×
754
        }
×
755
        if len(key) > 0 {
40,260✔
756
                it.txn.addReadKey(key)
44✔
757
        }
44✔
758
        for i := it.data.pop(); i != nil; i = it.data.pop() {
40,216✔
759
                i.wg.Wait()
×
760
                it.waste.push(i)
×
761
        }
×
762

763
        it.lastKey = it.lastKey[:0]
40,216✔
764
        if len(key) == 0 {
80,388✔
765
                key = it.opt.Prefix
40,172✔
766
        }
40,172✔
767
        if len(key) == 0 {
40,228✔
768
                it.iitr.Rewind()
12✔
769
                it.prefetch()
12✔
770
                return
12✔
771
        }
12✔
772

773
        if !it.opt.Reverse {
80,408✔
774
                key = y.KeyWithTs(key, it.txn.readTs)
40,204✔
775
        } else {
40,204✔
776
                key = y.KeyWithTs(key, 0)
×
777
        }
×
778
        it.iitr.Seek(key)
40,204✔
779
        it.prefetch()
40,204✔
780
}
781

782
// Rewind would rewind the iterator cursor all the way to zero-th position, which would be the
783
// smallest key if iterating forward, and largest if iterating backward. It does not keep track of
784
// whether the cursor started with a Seek().
785
func (it *Iterator) Rewind() {
40,170✔
786
        it.Seek(nil)
40,170✔
787
}
40,170✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc