• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / badger / 4151149783

11 Feb 2023 12:06PM UTC coverage: 48.346% (-21.7%) from 70.055%
4151149783

push

GitHub
chore(changelog): update changelog (#1858)

5888 of 12179 relevant lines covered (48.35%)

501070.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.07
/iterator.go
1
/*
2
 * Copyright 2017 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package badger
18

19
import (
20
        "bytes"
21
        "fmt"
22
        "hash/crc32"
23
        "math"
24
        "sort"
25
        "sync"
26
        "sync/atomic"
27
        "time"
28

29
        "github.com/dgraph-io/badger/v3/table"
30
        "github.com/dgraph-io/ristretto/z"
31

32
        "github.com/dgraph-io/badger/v3/y"
33
)
34

35
type prefetchStatus uint8
36

37
const (
38
        prefetched prefetchStatus = iota + 1
39
)
40

41
// Item is returned during iteration. Both the Key() and Value() output is only valid until
42
// iterator.Next() is called.
43
type Item struct {
44
        key       []byte
45
        vptr      []byte
46
        val       []byte
47
        version   uint64
48
        expiresAt uint64
49

50
        slice *y.Slice // Used only during prefetching.
51
        next  *Item
52
        txn   *Txn
53

54
        err      error
55
        wg       sync.WaitGroup
56
        status   prefetchStatus
57
        meta     byte // We need to store meta to know about bitValuePointer.
58
        userMeta byte
59
}
60

61
// String returns a string representation of Item
62
func (item *Item) String() string {
×
63
        return fmt.Sprintf("key=%q, version=%d, meta=%x", item.Key(), item.Version(), item.meta)
×
64
}
×
65

66
// Key returns the key.
67
//
68
// Key is only valid as long as item is valid, or transaction is valid.  If you need to use it
69
// outside its validity, please use KeyCopy.
70
func (item *Item) Key() []byte {
123,413,488✔
71
        return item.key
123,413,488✔
72
}
123,413,488✔
73

74
// KeyCopy returns a copy of the key of the item, writing it to dst slice.
75
// If nil is passed, or capacity of dst isn't sufficient, a new slice would be allocated and
76
// returned.
77
func (item *Item) KeyCopy(dst []byte) []byte {
13,000,000✔
78
        return y.SafeCopy(dst, item.key)
13,000,000✔
79
}
13,000,000✔
80

81
// Version returns the commit timestamp of the item.
82
func (item *Item) Version() uint64 {
13,000,000✔
83
        return item.version
13,000,000✔
84
}
13,000,000✔
85

86
// Value retrieves the value of the item from the value log.
87
//
88
// This method must be called within a transaction. Calling it outside a
89
// transaction is considered undefined behavior. If an iterator is being used,
90
// then Item.Value() is defined in the current iteration only, because items are
91
// reused.
92
//
93
// If you need to use a value outside a transaction, please use Item.ValueCopy
94
// instead, or copy it yourself. Value might change once discard or commit is called.
95
// Use ValueCopy if you want to do a Set after Get.
96
func (item *Item) Value(fn func(val []byte) error) error {
14,324,568✔
97
        item.wg.Wait()
14,324,568✔
98
        if item.status == prefetched {
15,049,008✔
99
                if item.err == nil && fn != nil {
1,448,880✔
100
                        if err := fn(item.val); err != nil {
724,440✔
101
                                return err
×
102
                        }
×
103
                }
104
                return item.err
724,440✔
105
        }
106
        buf, cb, err := item.yieldItemValue()
13,600,128✔
107
        defer runCallback(cb)
13,600,128✔
108
        if err != nil {
13,600,128✔
109
                return err
×
110
        }
×
111
        if fn != nil {
27,200,256✔
112
                return fn(buf)
13,600,128✔
113
        }
13,600,128✔
114
        return nil
×
115
}
116

117
// ValueCopy returns a copy of the value of the item from the value log, writing it to dst slice.
118
// If nil is passed, or capacity of dst isn't sufficient, a new slice would be allocated and
119
// returned. Tip: It might make sense to reuse the returned slice as dst argument for the next call.
120
//
121
// This function is useful in long running iterate/update transactions to avoid a write deadlock.
122
// See Github issue: https://github.com/dgraph-io/badger/issues/315
123
func (item *Item) ValueCopy(dst []byte) ([]byte, error) {
600,064✔
124
        item.wg.Wait()
600,064✔
125
        if item.status == prefetched {
600,064✔
126
                return y.SafeCopy(dst, item.val), item.err
×
127
        }
×
128
        buf, cb, err := item.yieldItemValue()
600,064✔
129
        defer runCallback(cb)
600,064✔
130
        return y.SafeCopy(dst, buf), err
600,064✔
131
}
132

133
func (item *Item) hasValue() bool {
18,918,832✔
134
        if item.meta == 0 && item.vptr == nil {
18,918,832✔
135
                // key not found
×
136
                return false
×
137
        }
×
138
        return true
18,918,832✔
139
}
140

141
// IsDeletedOrExpired returns true if item contains deleted or expired value.
142
func (item *Item) IsDeletedOrExpired() bool {
13,000,000✔
143
        return isDeletedOrExpired(item.meta, item.expiresAt)
13,000,000✔
144
}
13,000,000✔
145

146
// DiscardEarlierVersions returns whether the item was created with the
147
// option to discard earlier versions of a key when multiple are available.
148
func (item *Item) DiscardEarlierVersions() bool {
×
149
        return item.meta&bitDiscardEarlierVersions > 0
×
150
}
×
151

152
func (item *Item) yieldItemValue() ([]byte, func(), error) {
18,918,832✔
153
        key := item.Key() // No need to copy.
18,918,832✔
154
        if !item.hasValue() {
18,918,832✔
155
                return nil, nil, nil
×
156
        }
×
157

158
        if item.slice == nil {
19,318,960✔
159
                item.slice = new(y.Slice)
400,128✔
160
        }
400,128✔
161

162
        if (item.meta & bitValuePointer) == 0 {
37,837,663✔
163
                val := item.slice.Resize(len(item.vptr))
18,918,831✔
164
                copy(val, item.vptr)
18,918,831✔
165
                return val, nil, nil
18,918,831✔
166
        }
18,918,831✔
167

168
        var vp valuePointer
1✔
169
        vp.Decode(item.vptr)
1✔
170
        db := item.txn.db
1✔
171
        result, cb, err := db.vlog.Read(vp, item.slice)
1✔
172
        if err != nil {
1✔
173
                db.opt.Logger.Errorf("Unable to read: Key: %v, Version : %v, meta: %v, userMeta: %v"+
×
174
                        " Error: %v", key, item.version, item.meta, item.userMeta, err)
×
175
                var txn *Txn
×
176
                if db.opt.managedTxns {
×
177
                        txn = db.NewTransactionAt(math.MaxUint64, false)
×
178
                } else {
×
179
                        txn = db.NewTransaction(false)
×
180
                }
×
181
                defer txn.Discard()
×
182

×
183
                iopt := DefaultIteratorOptions
×
184
                iopt.AllVersions = true
×
185
                iopt.InternalAccess = true
×
186
                iopt.PrefetchValues = false
×
187

×
188
                it := txn.NewKeyIterator(item.Key(), iopt)
×
189
                defer it.Close()
×
190
                for it.Rewind(); it.Valid(); it.Next() {
×
191
                        item := it.Item()
×
192
                        var vp valuePointer
×
193
                        if item.meta&bitValuePointer > 0 {
×
194
                                vp.Decode(item.vptr)
×
195
                        }
×
196
                        db.opt.Logger.Errorf("Key: %v, Version : %v, meta: %v, userMeta: %v valuePointer: %+v",
×
197
                                item.Key(), item.version, item.meta, item.userMeta, vp)
×
198
                }
199
        }
200
        // Don't return error if we cannot read the value. Just log the error.
201
        return result, cb, nil
1✔
202
}
203

204
func runCallback(cb func()) {
18,918,832✔
205
        if cb != nil {
18,918,833✔
206
                cb()
1✔
207
        }
1✔
208
}
209

210
func (item *Item) prefetchValue() {
4,718,640✔
211
        val, cb, err := item.yieldItemValue()
4,718,640✔
212
        defer runCallback(cb)
4,718,640✔
213

4,718,640✔
214
        item.err = err
4,718,640✔
215
        item.status = prefetched
4,718,640✔
216
        if val == nil {
4,718,640✔
217
                return
×
218
        }
×
219
        buf := item.slice.Resize(len(val))
4,718,640✔
220
        copy(buf, val)
4,718,640✔
221
        item.val = buf
4,718,640✔
222
}
223

224
// EstimatedSize returns the approximate size of the key-value pair.
225
//
226
// This can be called while iterating through a store to quickly estimate the
227
// size of a range of key-value pairs (without fetching the corresponding
228
// values).
229
func (item *Item) EstimatedSize() int64 {
×
230
        if !item.hasValue() {
×
231
                return 0
×
232
        }
×
233
        if (item.meta & bitValuePointer) == 0 {
×
234
                return int64(len(item.key) + len(item.vptr))
×
235
        }
×
236
        var vp valuePointer
×
237
        vp.Decode(item.vptr)
×
238
        return int64(vp.Len) // includes key length.
×
239
}
240

241
// KeySize returns the size of the key.
242
// Exact size of the key is key + 8 bytes of timestamp
243
func (item *Item) KeySize() int64 {
×
244
        return int64(len(item.key))
×
245
}
×
246

247
// ValueSize returns the approximate size of the value.
248
//
249
// This can be called to quickly estimate the size of a value without fetching
250
// it.
251
func (item *Item) ValueSize() int64 {
×
252
        if !item.hasValue() {
×
253
                return 0
×
254
        }
×
255
        if (item.meta & bitValuePointer) == 0 {
×
256
                return int64(len(item.vptr))
×
257
        }
×
258
        var vp valuePointer
×
259
        vp.Decode(item.vptr)
×
260

×
261
        klen := int64(len(item.key) + 8) // 8 bytes for timestamp.
×
262
        // 6 bytes are for the approximate length of the header. Since header is encoded in varint, we
×
263
        // cannot find the exact length of header without fetching it.
×
264
        return int64(vp.Len) - klen - 6 - crc32.Size
×
265
}
266

267
// UserMeta returns the userMeta set by the user. Typically, this byte, optionally set by the user
268
// is used to interpret the value.
269
func (item *Item) UserMeta() byte {
13,000,000✔
270
        return item.userMeta
13,000,000✔
271
}
13,000,000✔
272

273
// ExpiresAt returns a Unix time value indicating when the item will be
274
// considered expired. 0 indicates that the item will never expire.
275
func (item *Item) ExpiresAt() uint64 {
13,000,000✔
276
        return item.expiresAt
13,000,000✔
277
}
13,000,000✔
278

279
// TODO: Switch this to use linked list container in Go.
280
type list struct {
281
        head *Item
282
        tail *Item
283
}
284

285
func (l *list) push(i *Item) {
31,402,952✔
286
        i.next = nil
31,402,952✔
287
        if l.tail == nil {
58,165,776✔
288
                l.head = i
26,762,824✔
289
                l.tail = i
26,762,824✔
290
                return
26,762,824✔
291
        }
26,762,824✔
292
        l.tail.next = i
4,640,128✔
293
        l.tail = i
4,640,128✔
294
}
295

296
func (l *list) pop() *Item {
35,520,172✔
297
        if l.head == nil {
39,637,392✔
298
                return nil
4,117,220✔
299
        }
4,117,220✔
300
        i := l.head
31,402,952✔
301
        if l.head == l.tail {
58,165,776✔
302
                l.tail = nil
26,762,824✔
303
                l.head = nil
26,762,824✔
304
        } else {
31,402,952✔
305
                l.head = i.next
4,640,128✔
306
        }
4,640,128✔
307
        i.next = nil
31,402,952✔
308
        return i
31,402,952✔
309
}
310

311
// IteratorOptions is used to set options when iterating over Badger key-value
312
// stores.
313
//
314
// This package provides DefaultIteratorOptions which contains options that
315
// should work for most applications. Consider using that as a starting point
316
// before customizing it for your own needs.
317
type IteratorOptions struct {
318
        // PrefetchSize is the number of KV pairs to prefetch while iterating.
319
        // Valid only if PrefetchValues is true.
320
        PrefetchSize int
321
        // PrefetchValues Indicates whether we should prefetch values during
322
        // iteration and store them.
323
        PrefetchValues bool
324
        Reverse        bool // Direction of iteration. False is forward, true is backward.
325
        AllVersions    bool // Fetch all valid versions of the same key.
326
        InternalAccess bool // Used to allow internal access to badger keys.
327

328
        // The following option is used to narrow down the SSTables that iterator
329
        // picks up. If Prefix is specified, only tables which could have this
330
        // prefix are picked based on their range of keys.
331
        prefixIsKey bool   // If set, use the prefix for bloom filter lookup.
332
        Prefix      []byte // Only iterate over this given prefix.
333
        SinceTs     uint64 // Only read data that has version > SinceTs.
334
}
335

336
func (opt *IteratorOptions) compareToPrefix(key []byte) int {
×
337
        // We should compare key without timestamp. For example key - a[TS] might be > "aa" prefix.
×
338
        key = y.ParseKey(key)
×
339
        if len(key) > len(opt.Prefix) {
×
340
                key = key[:len(opt.Prefix)]
×
341
        }
×
342
        return bytes.Compare(key, opt.Prefix)
×
343
}
344

345
func (opt *IteratorOptions) pickTable(t table.TableInterface) bool {
44✔
346
        // Ignore this table if its max version is less than the sinceTs.
44✔
347
        if t.MaxVersion() < opt.SinceTs {
44✔
348
                return false
×
349
        }
×
350
        if len(opt.Prefix) == 0 {
88✔
351
                return true
44✔
352
        }
44✔
353
        if opt.compareToPrefix(t.Smallest()) > 0 {
×
354
                return false
×
355
        }
×
356
        if opt.compareToPrefix(t.Biggest()) < 0 {
×
357
                return false
×
358
        }
×
359
        // Bloom filter lookup would only work if opt.Prefix does NOT have the read
360
        // timestamp as part of the key.
361
        if opt.prefixIsKey && t.DoesNotHave(y.Hash(opt.Prefix)) {
×
362
                return false
×
363
        }
×
364
        return true
×
365
}
366

367
// pickTables picks the necessary table for the iterator. This function also assumes
368
// that the tables are sorted in the right order.
369
func (opt *IteratorOptions) pickTables(all []*table.Table) []*table.Table {
241,308✔
370
        filterTables := func(tables []*table.Table) []*table.Table {
241,656✔
371
                if opt.SinceTs > 0 {
348✔
372
                        tmp := tables[:0]
×
373
                        for _, t := range tables {
×
374
                                if t.MaxVersion() < opt.SinceTs {
×
375
                                        continue
×
376
                                }
377
                                tmp = append(tmp, t)
×
378
                        }
379
                        tables = tmp
×
380
                }
381
                return tables
348✔
382
        }
383

384
        if len(opt.Prefix) == 0 {
241,656✔
385
                out := make([]*table.Table, len(all))
348✔
386
                copy(out, all)
348✔
387
                return filterTables(out)
348✔
388
        }
348✔
389
        sIdx := sort.Search(len(all), func(i int) bool {
240,960✔
390
                // table.Biggest >= opt.prefix
×
391
                // if opt.Prefix < table.Biggest, then surely it is not in any of the preceding tables.
×
392
                return opt.compareToPrefix(all[i].Biggest()) >= 0
×
393
        })
×
394
        if sIdx == len(all) {
481,920✔
395
                // Not found.
240,960✔
396
                return []*table.Table{}
240,960✔
397
        }
240,960✔
398

399
        filtered := all[sIdx:]
×
400
        if !opt.prefixIsKey {
×
401
                eIdx := sort.Search(len(filtered), func(i int) bool {
×
402
                        return opt.compareToPrefix(filtered[i].Smallest()) > 0
×
403
                })
×
404
                out := make([]*table.Table, len(filtered[:eIdx]))
×
405
                copy(out, filtered[:eIdx])
×
406
                return filterTables(out)
×
407
        }
408

409
        // opt.prefixIsKey == true. This code is optimizing for opt.prefixIsKey part.
410
        var out []*table.Table
×
411
        hash := y.Hash(opt.Prefix)
×
412
        for _, t := range filtered {
×
413
                // When we encounter the first table whose smallest key is higher than opt.Prefix, we can
×
414
                // stop. This is an IMPORTANT optimization, just considering how often we call
×
415
                // NewKeyIterator.
×
416
                if opt.compareToPrefix(t.Smallest()) > 0 {
×
417
                        // if table.Smallest > opt.Prefix, then this and all tables after this can be ignored.
×
418
                        break
×
419
                }
420
                // opt.Prefix is actually the key. So, we can run bloom filter checks
421
                // as well.
422
                if t.DoesNotHave(hash) {
×
423
                        continue
×
424
                }
425
                out = append(out, t)
×
426
        }
427
        return filterTables(out)
×
428
}
429

430
// DefaultIteratorOptions contains default options when iterating over Badger key-value stores.
431
var DefaultIteratorOptions = IteratorOptions{
432
        PrefetchValues: true,
433
        PrefetchSize:   100,
434
        Reverse:        false,
435
        AllVersions:    false,
436
}
437

438
// Iterator helps iterating over the KV pairs in a lexicographically sorted order.
439
type Iterator struct {
440
        iitr   y.Iterator
441
        txn    *Txn
442
        readTs uint64
443

444
        opt   IteratorOptions
445
        item  *Item
446
        data  list
447
        waste list
448

449
        lastKey []byte // Used to skip over multiple versions of the same key.
450

451
        closed  bool
452
        scanned int // Used to estimate the size of data scanned by iterator.
453

454
        // ThreadId is an optional value that can be set to identify which goroutine created
455
        // the iterator. It can be used, for example, to uniquely identify each of the
456
        // iterators created by the stream interface
457
        ThreadId int
458

459
        Alloc *z.Allocator
460
}
461

462
// NewIterator returns a new iterator. Depending upon the options, either only keys, or both
463
// key-value pairs would be fetched. The keys are returned in lexicographically sorted order.
464
// Using prefetch is recommended if you're doing a long running iteration, for performance.
465
//
466
// Multiple Iterators:
467
// For a read-only txn, multiple iterators can be running simultaneously.  However, for a read-write
468
// txn, iterators have the nuance of being a snapshot of the writes for the transaction at the time
469
// iterator was created. If writes are performed after an iterator is created, then that iterator
470
// will not be able to see those writes. Only writes performed before an iterator was created can be
471
// viewed.
472
func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
40,218✔
473
        if txn.discarded {
40,218✔
474
                panic("Transaction has already been discarded")
×
475
        }
476
        if txn.db.IsClosed() {
40,218✔
477
                panic(ErrDBClosed.Error())
×
478
        }
479

480
        // Keep track of the number of active iterators.
481
        atomic.AddInt32(&txn.numIterators, 1)
40,218✔
482

40,218✔
483
        // TODO: If Prefix is set, only pick those memtables which have keys with
40,218✔
484
        // the prefix.
40,218✔
485
        tables, decr := txn.db.getMemTables()
40,218✔
486
        defer decr()
40,218✔
487
        txn.db.vlog.incrIteratorCount()
40,218✔
488
        var iters []y.Iterator
40,218✔
489
        if itr := txn.newPendingWritesIterator(opt.Reverse); itr != nil {
40,218✔
490
                iters = append(iters, itr)
×
491
        }
×
492
        for i := 0; i < len(tables); i++ {
80,436✔
493
                iters = append(iters, tables[i].sl.NewUniIterator(opt.Reverse))
40,218✔
494
        }
40,218✔
495
        iters = txn.db.lc.appendIterators(iters, &opt) // This will increment references.
40,218✔
496
        res := &Iterator{
40,218✔
497
                txn:    txn,
40,218✔
498
                iitr:   table.NewMergeIterator(iters, opt.Reverse),
40,218✔
499
                opt:    opt,
40,218✔
500
                readTs: txn.readTs,
40,218✔
501
        }
40,218✔
502
        return res
40,218✔
503
}
504

505
// NewKeyIterator is just like NewIterator, but allows the user to iterate over all versions of a
506
// single key. Internally, it sets the Prefix option in provided opt, and uses that prefix to
507
// additionally run bloom filter lookups before picking tables from the LSM tree.
508
func (txn *Txn) NewKeyIterator(key []byte, opt IteratorOptions) *Iterator {
40,000✔
509
        if len(opt.Prefix) > 0 {
40,000✔
510
                panic("opt.Prefix should be nil for NewKeyIterator.")
×
511
        }
512
        opt.Prefix = key // This key must be without the timestamp.
40,000✔
513
        opt.prefixIsKey = true
40,000✔
514
        opt.AllVersions = true
40,000✔
515
        return txn.NewIterator(opt)
40,000✔
516
}
517

518
func (it *Iterator) newItem() *Item {
17,718,728✔
519
        item := it.waste.pop()
17,718,728✔
520
        if item == nil {
21,715,264✔
521
                item = &Item{slice: new(y.Slice), txn: it.txn}
3,996,536✔
522
        }
3,996,536✔
523
        return item
17,718,728✔
524
}
525

526
// Item returns pointer to the current key-value pair.
527
// This item is only valid until it.Next() gets called.
528
func (it *Iterator) Item() *Item {
39,724,484✔
529
        tx := it.txn
39,724,484✔
530
        tx.addReadKey(it.item.Key())
39,724,484✔
531
        return it.item
39,724,484✔
532
}
39,724,484✔
533

534
// Valid returns false when iteration is done.
535
func (it *Iterator) Valid() bool {
39,764,656✔
536
        if it.item == nil {
39,764,688✔
537
                return false
32✔
538
        }
32✔
539
        if it.opt.prefixIsKey {
39,844,620✔
540
                return bytes.Equal(it.item.key, it.opt.Prefix)
79,996✔
541
        }
79,996✔
542
        return bytes.HasPrefix(it.item.key, it.opt.Prefix)
39,684,628✔
543
}
544

545
// ValidForPrefix returns false when iteration is done
546
// or when the current key is not prefixed by the specified prefix.
547
func (it *Iterator) ValidForPrefix(prefix []byte) bool {
×
548
        return it.Valid() && bytes.HasPrefix(it.item.key, prefix)
×
549
}
×
550

551
// Close would close the iterator. It is important to call this when you're done with iteration.
552
func (it *Iterator) Close() {
40,224✔
553
        if it.closed {
40,230✔
554
                return
6✔
555
        }
6✔
556
        it.closed = true
40,218✔
557
        if it.iitr == nil {
40,218✔
558
                atomic.AddInt32(&it.txn.numIterators, -1)
×
559
                return
×
560
        }
×
561

562
        it.iitr.Close()
40,218✔
563
        // It is important to wait for the fill goroutines to finish. Otherwise, we might leave zombie
40,218✔
564
        // goroutines behind, which are waiting to acquire file read locks after DB has been closed.
40,218✔
565
        waitFor := func(l list) {
120,654✔
566
                item := l.pop()
80,436✔
567
                for item != nil {
4,036,788✔
568
                        item.wg.Wait()
3,956,352✔
569
                        item = l.pop()
3,956,352✔
570
                }
3,956,352✔
571
        }
572
        waitFor(it.waste)
40,218✔
573
        waitFor(it.data)
40,218✔
574

40,218✔
575
        // TODO: We could handle this error.
40,218✔
576
        _ = it.txn.db.vlog.decrIteratorCount()
40,218✔
577
        atomic.AddInt32(&it.txn.numIterators, -1)
40,218✔
578
}
579

580
// Next would advance the iterator by one. Always check it.Valid() after a Next()
581
// to ensure you have access to a valid it.Item().
582
func (it *Iterator) Next() {
13,724,440✔
583
        if it.iitr == nil {
13,724,440✔
584
                return
×
585
        }
×
586
        // Reuse current item
587
        it.item.wg.Wait() // Just cleaner to wait before pushing to avoid doing ref counting.
13,724,440✔
588
        it.scanned += len(it.item.key) + len(it.item.val) + len(it.item.vptr) + 2
13,724,440✔
589
        it.waste.push(it.item)
13,724,440✔
590

13,724,440✔
591
        // Set next item to current
13,724,440✔
592
        it.item = it.data.pop()
13,724,440✔
593
        for it.iitr.Valid() {
27,446,632✔
594
                if it.parseItem() {
27,444,384✔
595
                        // parseItem calls one extra next.
13,722,192✔
596
                        // This is used to deal with the complexity of reverse iteration.
13,722,192✔
597
                        break
13,722,192✔
598
                }
599
        }
600
}
601

602
func isDeletedOrExpired(meta byte, expiresAt uint64) bool {
14,298,968✔
603
        if meta&bitDelete > 0 {
14,498,968✔
604
                return true
200,000✔
605
        }
200,000✔
606
        if expiresAt == 0 {
28,197,936✔
607
                return false
14,098,968✔
608
        }
14,098,968✔
609
        return expiresAt <= uint64(time.Now().Unix())
×
610
}
611

612
// parseItem is a complex function because it needs to handle both forward and reverse iteration
613
// implementation. We store keys such that their versions are sorted in descending order. This makes
614
// forward iteration efficient, but revese iteration complicated. This tradeoff is better because
615
// forward iteration is more common than reverse. It returns true, if either the iterator is invalid
616
// or it has pushed an item into it.data list, else it returns false.
617
//
618
// This function advances the iterator.
619
func (it *Iterator) parseItem() bool {
17,718,728✔
620
        mi := it.iitr
17,718,728✔
621
        key := mi.Key()
17,718,728✔
622

17,718,728✔
623
        setItem := func(item *Item) {
35,437,456✔
624
                if it.item == nil {
17,758,944✔
625
                        it.item = item
40,216✔
626
                } else {
17,718,728✔
627
                        it.data.push(item)
17,678,512✔
628
                }
17,678,512✔
629
        }
630

631
        isInternalKey := bytes.HasPrefix(key, badgerPrefix)
17,718,728✔
632
        // Skip badger keys.
17,718,728✔
633
        if !it.opt.InternalAccess && isInternalKey {
17,718,728✔
634
                mi.Next()
×
635
                return false
×
636
        }
×
637

638
        // Skip any versions which are beyond the readTs.
639
        version := y.ParseTs(key)
17,718,728✔
640
        // Ignore everything that is above the readTs and below or at the sinceTs.
17,718,728✔
641
        if version > it.readTs || (it.opt.SinceTs > 0 && version <= it.opt.SinceTs) {
17,718,728✔
642
                mi.Next()
×
643
                return false
×
644
        }
×
645

646
        // Skip banned keys only if it does not have badger internal prefix.
647
        if !isInternalKey && it.txn.db.isBanned(key) != nil {
17,718,728✔
648
                mi.Next()
×
649
                return false
×
650
        }
×
651

652
        if it.opt.AllVersions {
34,738,616✔
653
                // Return deleted or expired values also, otherwise user can't figure out
17,019,888✔
654
                // whether the key was deleted.
17,019,888✔
655
                item := it.newItem()
17,019,888✔
656
                it.fill(item)
17,019,888✔
657
                setItem(item)
17,019,888✔
658
                mi.Next()
17,019,888✔
659
                return true
17,019,888✔
660
        }
17,019,888✔
661

662
        // If iterating in forward direction, then just checking the last key against current key would
663
        // be sufficient.
664
        if !it.opt.Reverse {
1,397,680✔
665
                if y.SameKey(it.lastKey, key) {
698,840✔
666
                        mi.Next()
×
667
                        return false
×
668
                }
×
669
                // Only track in forward direction.
670
                // We should update lastKey as soon as we find a different key in our snapshot.
671
                // Consider keys: a 5, b 7 (del), b 5. When iterating, lastKey = a.
672
                // Then we see b 7, which is deleted. If we don't store lastKey = b, we'll then return b 5,
673
                // which is wrong. Therefore, update lastKey here.
674
                it.lastKey = y.SafeCopy(it.lastKey, mi.Key())
698,840✔
675
        }
676

677
FILL:
678
        // If deleted, advance and return.
679
        vs := mi.Value()
698,840✔
680
        if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
698,840✔
681
                mi.Next()
×
682
                return false
×
683
        }
×
684

685
        item := it.newItem()
698,840✔
686
        it.fill(item)
698,840✔
687
        // fill item based on current cursor position. All Next calls have returned, so reaching here
698,840✔
688
        // means no Next was called.
698,840✔
689

698,840✔
690
        mi.Next()                           // Advance but no fill item yet.
698,840✔
691
        if !it.opt.Reverse || !mi.Valid() { // Forward direction, or invalid.
1,397,680✔
692
                setItem(item)
698,840✔
693
                return true
698,840✔
694
        }
698,840✔
695

696
        // Reverse direction.
697
        nextTs := y.ParseTs(mi.Key())
×
698
        mik := y.ParseKey(mi.Key())
×
699
        if nextTs <= it.readTs && bytes.Equal(mik, item.key) {
×
700
                // This is a valid potential candidate.
×
701
                goto FILL
×
702
        }
703
        // Ignore the next candidate. Return the current one.
704
        setItem(item)
×
705
        return true
×
706
}
707

708
func (it *Iterator) fill(item *Item) {
17,718,728✔
709
        vs := it.iitr.Value()
17,718,728✔
710
        item.meta = vs.Meta
17,718,728✔
711
        item.userMeta = vs.UserMeta
17,718,728✔
712
        item.expiresAt = vs.ExpiresAt
17,718,728✔
713

17,718,728✔
714
        item.version = y.ParseTs(it.iitr.Key())
17,718,728✔
715
        item.key = y.SafeCopy(item.key, y.ParseKey(it.iitr.Key()))
17,718,728✔
716

17,718,728✔
717
        item.vptr = y.SafeCopy(item.vptr, vs.Value)
17,718,728✔
718
        item.val = nil
17,718,728✔
719
        if it.opt.PrefetchValues {
22,437,368✔
720
                item.wg.Add(1)
4,718,640✔
721
                go func() {
9,437,280✔
722
                        // FIXME we are not handling errors here.
4,718,640✔
723
                        item.prefetchValue()
4,718,640✔
724
                        item.wg.Done()
4,718,640✔
725
                }()
4,718,640✔
726
        }
727
}
728

729
func (it *Iterator) prefetch() {
40,216✔
730
        prefetchSize := 2
40,216✔
731
        if it.opt.PrefetchValues && it.opt.PrefetchSize > 1 {
80,386✔
732
                prefetchSize = it.opt.PrefetchSize
40,170✔
733
        }
40,170✔
734

735
        i := it.iitr
40,216✔
736
        var count int
40,216✔
737
        it.item = nil
40,216✔
738
        for i.Valid() {
4,036,752✔
739
                if !it.parseItem() {
3,996,536✔
740
                        continue
×
741
                }
742
                count++
3,996,536✔
743
                if count == prefetchSize {
4,036,348✔
744
                        break
39,812✔
745
                }
746
        }
747
}
748

749
// Seek would seek to the provided key if present. If absent, it would seek to the next
750
// smallest key greater than the provided key if iterating in the forward direction.
751
// Behavior would be reversed if iterating backwards.
752
func (it *Iterator) Seek(key []byte) {
40,216✔
753
        if it.iitr == nil {
40,216✔
754
                return
×
755
        }
×
756
        if len(key) > 0 {
40,260✔
757
                it.txn.addReadKey(key)
44✔
758
        }
44✔
759
        for i := it.data.pop(); i != nil; i = it.data.pop() {
40,216✔
760
                i.wg.Wait()
×
761
                it.waste.push(i)
×
762
        }
×
763

764
        it.lastKey = it.lastKey[:0]
40,216✔
765
        if len(key) == 0 {
80,388✔
766
                key = it.opt.Prefix
40,172✔
767
        }
40,172✔
768
        if len(key) == 0 {
40,228✔
769
                it.iitr.Rewind()
12✔
770
                it.prefetch()
12✔
771
                return
12✔
772
        }
12✔
773

774
        if !it.opt.Reverse {
80,408✔
775
                key = y.KeyWithTs(key, it.txn.readTs)
40,204✔
776
        } else {
40,204✔
777
                key = y.KeyWithTs(key, 0)
×
778
        }
×
779
        it.iitr.Seek(key)
40,204✔
780
        it.prefetch()
40,204✔
781
}
782

783
// Rewind would rewind the iterator cursor all the way to zero-th position, which would be the
784
// smallest key if iterating forward, and largest if iterating backward. It does not keep track of
785
// whether the cursor started with a Seek().
786
func (it *Iterator) Rewind() {
40,170✔
787
        it.Seek(nil)
40,170✔
788
}
40,170✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc