• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / badger / 4302114126

01 Mar 2023 09:52AM UTC coverage: 61.149% (+0.4%) from 60.713%
4302114126

Pull #1872

Aman Mangal
fix data race
Pull Request #1872: opt(stream): add option to directly copy over tables from lower levels (#1700)

425 of 425 new or added lines in 11 files covered. (100.0%)

8814 of 14414 relevant lines covered (61.15%)

451856.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.58
/level_handler.go
1
/*
2
 * Copyright 2017 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package badger
18

19
import (
20
        "fmt"
21
        "sort"
22
        "sync"
23

24
        "github.com/dgraph-io/badger/v4/table"
25
        "github.com/dgraph-io/badger/v4/y"
26
)
27

28
type levelHandler struct {
29
        // Guards tables, totalSize.
30
        sync.RWMutex
31

32
        // For level >= 1, tables are sorted by key ranges, which do not overlap.
33
        // For level 0, tables are sorted by time.
34
        // For level 0, newest table are at the back. Compact the oldest one first, which is at the front.
35
        tables         []*table.Table
36
        totalSize      int64
37
        totalStaleSize int64
38

39
        // The following are initialized once and const.
40
        level    int
41
        strLevel string
42
        db       *DB
43
}
44

45
func (s *levelHandler) isLastLevel() bool {
246✔
46
        return s.level == s.db.opt.MaxLevels-1
246✔
47
}
246✔
48

49
func (s *levelHandler) getTotalStaleSize() int64 {
2✔
50
        s.RLock()
2✔
51
        defer s.RUnlock()
2✔
52
        return s.totalStaleSize
2✔
53
}
2✔
54

55
func (s *levelHandler) getTotalSize() int64 {
846,469✔
56
        s.RLock()
846,469✔
57
        defer s.RUnlock()
846,469✔
58
        return s.totalSize
846,469✔
59
}
846,469✔
60

61
// initTables replaces s.tables with given tables. This is done during loading.
62
func (s *levelHandler) initTables(tables []*table.Table) {
2,093✔
63
        s.Lock()
2,093✔
64
        defer s.Unlock()
2,093✔
65

2,093✔
66
        s.tables = tables
2,093✔
67
        s.totalSize = 0
2,093✔
68
        s.totalStaleSize = 0
2,093✔
69
        for _, t := range tables {
2,177✔
70
                s.addSize(t)
84✔
71
        }
84✔
72

73
        if s.level == 0 {
2,392✔
74
                // Key range will overlap. Just sort by fileID in ascending order
299✔
75
                // because newer tables are at the end of level 0.
299✔
76
                sort.Slice(s.tables, func(i, j int) bool {
305✔
77
                        return s.tables[i].ID() < s.tables[j].ID()
6✔
78
                })
6✔
79
        } else {
1,794✔
80
                // Sort tables by keys.
1,794✔
81
                sort.Slice(s.tables, func(i, j int) bool {
1,880✔
82
                        return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
86✔
83
                })
86✔
84
        }
85
}
86

87
// deleteTables remove tables idx0, ..., idx1-1.
88
func (s *levelHandler) deleteTables(toDel []*table.Table) error {
142✔
89
        s.Lock() // s.Unlock() below
142✔
90

142✔
91
        toDelMap := make(map[uint64]struct{})
142✔
92
        for _, t := range toDel {
294✔
93
                toDelMap[t.ID()] = struct{}{}
152✔
94
        }
152✔
95

96
        // Make a copy as iterators might be keeping a slice of tables.
97
        var newTables []*table.Table
142✔
98
        for _, t := range s.tables {
1,059✔
99
                _, found := toDelMap[t.ID()]
917✔
100
                if !found {
1,682✔
101
                        newTables = append(newTables, t)
765✔
102
                        continue
765✔
103
                }
104
                s.subtractSize(t)
152✔
105
        }
106
        s.tables = newTables
142✔
107

142✔
108
        s.Unlock() // Unlock s _before_ we DecrRef our tables, which can be slow.
142✔
109

142✔
110
        return decrRefs(toDel)
142✔
111
}
112

113
// replaceTables will replace tables[left:right] with newTables. Note this EXCLUDES tables[right].
114
// You must call decr() to delete the old tables _after_ writing the update to the manifest.
115
func (s *levelHandler) replaceTables(toDel, toAdd []*table.Table) error {
155✔
116
        // Need to re-search the range of tables in this level to be replaced as other goroutines might
155✔
117
        // be changing it as well.  (They can't touch our tables, but if they add/remove other tables,
155✔
118
        // the indices get shifted around.)
155✔
119
        s.Lock() // We s.Unlock() below.
155✔
120

155✔
121
        toDelMap := make(map[uint64]struct{})
155✔
122
        for _, t := range toDel {
365✔
123
                toDelMap[t.ID()] = struct{}{}
210✔
124
        }
210✔
125
        var newTables []*table.Table
155✔
126
        for _, t := range s.tables {
2,195✔
127
                _, found := toDelMap[t.ID()]
2,040✔
128
                if !found {
3,870✔
129
                        newTables = append(newTables, t)
1,830✔
130
                        continue
1,830✔
131
                }
132
                s.subtractSize(t)
210✔
133
        }
134

135
        // Increase totalSize first.
136
        for _, t := range toAdd {
488✔
137
                s.addSize(t)
333✔
138
                t.IncrRef()
333✔
139
                newTables = append(newTables, t)
333✔
140
        }
333✔
141

142
        // Assign tables.
143
        s.tables = newTables
155✔
144
        sort.Slice(s.tables, func(i, j int) bool {
7,633✔
145
                return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
7,478✔
146
        })
7,478✔
147
        s.Unlock() // s.Unlock before we DecrRef tables -- that can be slow.
155✔
148
        return decrRefs(toDel)
155✔
149
}
150

151
// addTable adds toAdd table to levelHandler. Normally when we add tables to levelHandler, we sort
152
// tables based on table.Smallest. This is required for correctness of the system. But in case of
153
// stream writer this can be avoided. We can just add tables to levelHandler's table list
154
// and after all addTable calls, we can sort table list(check sortTable method).
155
// NOTE: levelHandler.sortTables() should be called after call addTable calls are done.
156
func (s *levelHandler) addTable(t *table.Table) {
83✔
157
        s.Lock()
83✔
158
        defer s.Unlock()
83✔
159

83✔
160
        s.addSize(t) // Increase totalSize first.
83✔
161
        t.IncrRef()
83✔
162
        s.tables = append(s.tables, t)
83✔
163
}
83✔
164

165
// sortTables sorts tables of levelHandler based on table.Smallest.
166
// Normally it should be called after all addTable calls.
167
func (s *levelHandler) sortTables() {
175✔
168
        s.RLock()
175✔
169
        defer s.RUnlock()
175✔
170

175✔
171
        sort.Slice(s.tables, func(i, j int) bool {
320✔
172
                return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
145✔
173
        })
145✔
174
}
175

176
func decrRefs(tables []*table.Table) error {
439✔
177
        for _, table := range tables {
1,121✔
178
                if err := table.DecrRef(); err != nil {
682✔
179
                        return err
×
180
                }
×
181
        }
182
        return nil
439✔
183
}
184

185
func newLevelHandler(db *DB, level int) *levelHandler {
2,226✔
186
        return &levelHandler{
2,226✔
187
                level:    level,
2,226✔
188
                strLevel: fmt.Sprintf("l%d", level),
2,226✔
189
                db:       db,
2,226✔
190
        }
2,226✔
191
}
2,226✔
192

193
// tryAddLevel0Table returns true if ok and no stalling.
194
func (s *levelHandler) tryAddLevel0Table(t *table.Table) bool {
286✔
195
        y.AssertTrue(s.level == 0)
286✔
196
        // Need lock as we may be deleting the first table during a level 0 compaction.
286✔
197
        s.Lock()
286✔
198
        defer s.Unlock()
286✔
199
        // Stall (by returning false) if we are above the specified stall setting for L0.
286✔
200
        if len(s.tables) >= s.db.opt.NumLevelZeroTablesStall {
286✔
201
                return false
×
202
        }
×
203

204
        s.tables = append(s.tables, t)
286✔
205
        t.IncrRef()
286✔
206
        s.addSize(t)
286✔
207

286✔
208
        return true
286✔
209
}
210

211
// This should be called while holding the lock on the level.
212
func (s *levelHandler) addSize(t *table.Table) {
786✔
213
        s.totalSize += t.Size()
786✔
214
        s.totalStaleSize += int64(t.StaleDataSize())
786✔
215
}
786✔
216

217
// This should be called while holding the lock on the level.
218
func (s *levelHandler) subtractSize(t *table.Table) {
362✔
219
        s.totalSize -= t.Size()
362✔
220
        s.totalStaleSize -= int64(t.StaleDataSize())
362✔
221
}
362✔
222
func (s *levelHandler) numTables() int {
119,582✔
223
        s.RLock()
119,582✔
224
        defer s.RUnlock()
119,582✔
225
        return len(s.tables)
119,582✔
226
}
119,582✔
227

228
func (s *levelHandler) close() error {
2,163✔
229
        s.RLock()
2,163✔
230
        defer s.RUnlock()
2,163✔
231
        var err error
2,163✔
232
        for _, t := range s.tables {
2,643✔
233
                if closeErr := t.Close(-1); closeErr != nil && err == nil {
480✔
234
                        err = closeErr
×
235
                }
×
236
        }
237
        return y.Wrap(err, "levelHandler.close")
2,163✔
238
}
239

240
// getTableForKey acquires a read-lock to access s.tables. It returns a list of tableHandlers.
241
func (s *levelHandler) getTableForKey(key []byte) ([]*table.Table, func() error) {
4,927,484✔
242
        s.RLock()
4,927,484✔
243
        defer s.RUnlock()
4,927,484✔
244

4,927,484✔
245
        if s.level == 0 {
5,631,429✔
246
                // For level 0, we need to check every table. Remember to make a copy as s.tables may change
703,945✔
247
                // once we exit this function, and we don't want to lock s.tables while seeking in tables.
703,945✔
248
                // CAUTION: Reverse the tables.
703,945✔
249
                out := make([]*table.Table, 0, len(s.tables))
703,945✔
250
                for i := len(s.tables) - 1; i >= 0; i-- {
765,677✔
251
                        out = append(out, s.tables[i])
61,732✔
252
                        s.tables[i].IncrRef()
61,732✔
253
                }
61,732✔
254
                return out, func() error {
1,407,890✔
255
                        for _, t := range out {
765,677✔
256
                                if err := t.DecrRef(); err != nil {
61,732✔
257
                                        return err
×
258
                                }
×
259
                        }
260
                        return nil
703,945✔
261
                }
262
        }
263
        // For level >= 1, we can do a binary search as key range does not overlap.
264
        idx := sort.Search(len(s.tables), func(i int) bool {
4,228,569✔
265
                return y.CompareKeys(s.tables[i].Biggest(), key) >= 0
5,030✔
266
        })
5,030✔
267
        if idx >= len(s.tables) {
8,445,248✔
268
                // Given key is strictly > than every element we have.
4,221,709✔
269
                return nil, func() error { return nil }
8,443,418✔
270
        }
271
        tbl := s.tables[idx]
1,830✔
272
        tbl.IncrRef()
1,830✔
273
        return []*table.Table{tbl}, tbl.DecrRef
1,830✔
274
}
275

276
// get returns value for a given key or the key after that. If not found, return nil.
277
func (s *levelHandler) get(key []byte) (y.ValueStruct, error) {
4,927,484✔
278
        tables, decr := s.getTableForKey(key)
4,927,484✔
279
        keyNoTs := y.ParseKey(key)
4,927,484✔
280

4,927,484✔
281
        hash := y.Hash(keyNoTs)
4,927,484✔
282
        var maxVs y.ValueStruct
4,927,484✔
283
        for _, th := range tables {
4,991,046✔
284
                if th.DoesNotHave(hash) {
68,848✔
285
                        y.NumLSMBloomHitsAdd(s.db.opt.MetricsEnabled, s.strLevel, 1)
5,286✔
286
                        continue
5,286✔
287
                }
288

289
                it := th.NewIterator(0)
58,276✔
290
                defer it.Close()
58,276✔
291

58,276✔
292
                y.NumLSMGetsAdd(s.db.opt.MetricsEnabled, s.strLevel, 1)
58,276✔
293
                it.Seek(key)
58,276✔
294
                if !it.Valid() {
58,335✔
295
                        continue
59✔
296
                }
297
                if y.SameKey(key, it.Key()) {
116,296✔
298
                        if version := y.ParseTs(it.Key()); maxVs.Version < version {
116,151✔
299
                                maxVs = it.ValueCopy()
58,072✔
300
                                maxVs.Version = version
58,072✔
301
                        }
58,072✔
302
                }
303
        }
304
        return maxVs, decr()
4,927,484✔
305
}
306

307
// iterators returns an array of iterators, for merging.
308
// Note: This obtains references for the table handlers. Remember to close these iterators.
309
func (s *levelHandler) iterators(opt *IteratorOptions) []y.Iterator {
286,706✔
310
        s.RLock()
286,706✔
311
        defer s.RUnlock()
286,706✔
312

286,706✔
313
        var topt int
286,706✔
314
        if opt.Reverse {
286,951✔
315
                topt = table.REVERSED
245✔
316
        }
245✔
317
        if s.level == 0 {
327,664✔
318
                // Remember to add in reverse order!
40,958✔
319
                // The newer table at the end of s.tables should be added first as it takes precedence.
40,958✔
320
                // Level 0 tables are not in key sorted order, so we need to consider them one by one.
40,958✔
321
                var out []*table.Table
40,958✔
322
                for _, t := range s.tables {
40,996✔
323
                        if opt.pickTable(t) {
76✔
324
                                out = append(out, t)
38✔
325
                        }
38✔
326
                }
327
                return iteratorsReversed(out, topt)
40,958✔
328
        }
329

330
        tables := opt.pickTables(s.tables)
245,748✔
331
        if len(tables) == 0 {
491,405✔
332
                return nil
245,657✔
333
        }
245,657✔
334
        return []y.Iterator{table.NewConcatIterator(tables, topt)}
91✔
335
}
336

337
func (s *levelHandler) getTables(opt *IteratorOptions) []*table.Table {
161✔
338
        if opt.Reverse {
161✔
339
                panic("Invalid option for getTables")
×
340
        }
341

342
        s.RLock()
161✔
343
        defer s.RUnlock()
161✔
344

161✔
345
        if s.level == 0 {
184✔
346
                var out []*table.Table
23✔
347
                for _, t := range s.tables {
25✔
348
                        if opt.pickTable(t) {
4✔
349
                                t.IncrRef()
2✔
350
                                out = append(out, t)
2✔
351
                        }
2✔
352
                }
353
                return out
23✔
354
        }
355

356
        tables := opt.pickTables(s.tables)
138✔
357
        for _, t := range tables {
163✔
358
                t.IncrRef()
25✔
359
        }
25✔
360
        return tables
138✔
361
}
362

363
type levelHandlerRLocked struct{}
364

365
// overlappingTables returns the tables that intersect with key range. Returns a half-interval.
366
// This function should already have acquired a read lock, and this is so important the caller must
367
// pass an empty parameter declaring such.
368
func (s *levelHandler) overlappingTables(_ levelHandlerRLocked, kr keyRange) (int, int) {
3,420✔
369
        if len(kr.left) == 0 || len(kr.right) == 0 {
3,420✔
370
                return 0, 0
×
371
        }
×
372
        left := sort.Search(len(s.tables), func(i int) bool {
12,691✔
373
                return y.CompareKeys(kr.left, s.tables[i].Biggest()) <= 0
9,271✔
374
        })
9,271✔
375
        right := sort.Search(len(s.tables), func(i int) bool {
12,024✔
376
                return y.CompareKeys(kr.right, s.tables[i].Smallest()) < 0
8,604✔
377
        })
8,604✔
378
        return left, right
3,420✔
379
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc