• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / badger / 4229068887

21 Feb 2023 04:16AM UTC coverage: 42.43% (-18.8%) from 61.239%
4229068887

Pull #1866

Aman Mangal
combine memtables before flushing to L0
Pull Request #1866: combine memtables before flushing to L0

49 of 49 new or added lines in 1 file covered. (100.0%)

5919 of 13950 relevant lines covered (42.43%)

437472.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.17
/level_handler.go
1
/*
2
 * Copyright 2017 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package badger
18

19
import (
20
        "fmt"
21
        "sort"
22
        "sync"
23

24
        "github.com/dgraph-io/badger/v3/table"
25
        "github.com/dgraph-io/badger/v3/y"
26
)
27

28
type levelHandler struct {
29
        // Guards tables, totalSize.
30
        sync.RWMutex
31

32
        // For level >= 1, tables are sorted by key ranges, which do not overlap.
33
        // For level 0, tables are sorted by time.
34
        // For level 0, newest table are at the back. Compact the oldest one first, which is at the front.
35
        tables         []*table.Table
36
        totalSize      int64
37
        totalStaleSize int64
38

39
        // The following are initialized once and const.
40
        level    int
41
        strLevel string
42
        db       *DB
43
}
44

45
func (s *levelHandler) isLastLevel() bool {
×
46
        return s.level == s.db.opt.MaxLevels-1
×
47
}
×
48

49
func (s *levelHandler) getTotalStaleSize() int64 {
×
50
        s.RLock()
×
51
        defer s.RUnlock()
×
52
        return s.totalStaleSize
×
53
}
×
54

55
func (s *levelHandler) getTotalSize() int64 {
46,431✔
56
        s.RLock()
46,431✔
57
        defer s.RUnlock()
46,431✔
58
        return s.totalSize
46,431✔
59
}
46,431✔
60

61
// initTables replaces s.tables with given tables. This is done during loading.
62
func (s *levelHandler) initTables(tables []*table.Table) {
266✔
63
        s.Lock()
266✔
64
        defer s.Unlock()
266✔
65

266✔
66
        s.tables = tables
266✔
67
        s.totalSize = 0
266✔
68
        s.totalStaleSize = 0
266✔
69
        for _, t := range tables {
282✔
70
                s.addSize(t)
16✔
71
        }
16✔
72

73
        if s.level == 0 {
304✔
74
                // Key range will overlap. Just sort by fileID in ascending order
38✔
75
                // because newer tables are at the end of level 0.
38✔
76
                sort.Slice(s.tables, func(i, j int) bool {
38✔
77
                        return s.tables[i].ID() < s.tables[j].ID()
×
78
                })
×
79
        } else {
228✔
80
                // Sort tables by keys.
228✔
81
                sort.Slice(s.tables, func(i, j int) bool {
296✔
82
                        return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
68✔
83
                })
68✔
84
        }
85
}
86

87
// deleteTables remove tables idx0, ..., idx1-1.
88
func (s *levelHandler) deleteTables(toDel []*table.Table) error {
×
89
        s.Lock() // s.Unlock() below
×
90

×
91
        toDelMap := make(map[uint64]struct{})
×
92
        for _, t := range toDel {
×
93
                toDelMap[t.ID()] = struct{}{}
×
94
        }
×
95

96
        // Make a copy as iterators might be keeping a slice of tables.
97
        var newTables []*table.Table
×
98
        for _, t := range s.tables {
×
99
                _, found := toDelMap[t.ID()]
×
100
                if !found {
×
101
                        newTables = append(newTables, t)
×
102
                        continue
×
103
                }
104
                s.subtractSize(t)
×
105
        }
106
        s.tables = newTables
×
107

×
108
        s.Unlock() // Unlock s _before_ we DecrRef our tables, which can be slow.
×
109

×
110
        return decrRefs(toDel)
×
111
}
112

113
// replaceTables will replace tables[left:right] with newTables. Note this EXCLUDES tables[right].
114
// You must call decr() to delete the old tables _after_ writing the update to the manifest.
115
func (s *levelHandler) replaceTables(toDel, toAdd []*table.Table) error {
×
116
        // Need to re-search the range of tables in this level to be replaced as other goroutines might
×
117
        // be changing it as well.  (They can't touch our tables, but if they add/remove other tables,
×
118
        // the indices get shifted around.)
×
119
        s.Lock() // We s.Unlock() below.
×
120

×
121
        toDelMap := make(map[uint64]struct{})
×
122
        for _, t := range toDel {
×
123
                toDelMap[t.ID()] = struct{}{}
×
124
        }
×
125
        var newTables []*table.Table
×
126
        for _, t := range s.tables {
×
127
                _, found := toDelMap[t.ID()]
×
128
                if !found {
×
129
                        newTables = append(newTables, t)
×
130
                        continue
×
131
                }
132
                s.subtractSize(t)
×
133
        }
134

135
        // Increase totalSize first.
136
        for _, t := range toAdd {
×
137
                s.addSize(t)
×
138
                t.IncrRef()
×
139
                newTables = append(newTables, t)
×
140
        }
×
141

142
        // Assign tables.
143
        s.tables = newTables
×
144
        sort.Slice(s.tables, func(i, j int) bool {
×
145
                return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
×
146
        })
×
147
        s.Unlock() // s.Unlock before we DecrRef tables -- that can be slow.
×
148
        return decrRefs(toDel)
×
149
}
150

151
// addTable adds toAdd table to levelHandler. Normally when we add tables to levelHandler, we sort
152
// tables based on table.Smallest. This is required for correctness of the system. But in case of
153
// stream writer this can be avoided. We can just add tables to levelHandler's table list
154
// and after all addTable calls, we can sort table list(check sortTable method).
155
// NOTE: levelHandler.sortTables() should be called after call addTable calls are done.
156
func (s *levelHandler) addTable(t *table.Table) {
16✔
157
        s.Lock()
16✔
158
        defer s.Unlock()
16✔
159

16✔
160
        s.addSize(t) // Increase totalSize first.
16✔
161
        t.IncrRef()
16✔
162
        s.tables = append(s.tables, t)
16✔
163
}
16✔
164

165
// sortTables sorts tables of levelHandler based on table.Smallest.
166
// Normally it should be called after all addTable calls.
167
func (s *levelHandler) sortTables() {
7✔
168
        s.RLock()
7✔
169
        defer s.RUnlock()
7✔
170

7✔
171
        sort.Slice(s.tables, func(i, j int) bool {
64✔
172
                return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
57✔
173
        })
57✔
174
}
175

176
func decrRefs(tables []*table.Table) error {
×
177
        for _, table := range tables {
×
178
                if err := table.DecrRef(); err != nil {
×
179
                        return err
×
180
                }
×
181
        }
182
        return nil
×
183
}
184

185
func newLevelHandler(db *DB, level int) *levelHandler {
266✔
186
        return &levelHandler{
266✔
187
                level:    level,
266✔
188
                strLevel: fmt.Sprintf("l%d", level),
266✔
189
                db:       db,
266✔
190
        }
266✔
191
}
266✔
192

193
// tryAddLevel0Table returns true if ok and no stalling.
194
func (s *levelHandler) tryAddLevel0Table(t *table.Table) bool {
35✔
195
        y.AssertTrue(s.level == 0)
35✔
196
        // Need lock as we may be deleting the first table during a level 0 compaction.
35✔
197
        s.Lock()
35✔
198
        defer s.Unlock()
35✔
199
        // Stall (by returning false) if we are above the specified stall setting for L0.
35✔
200
        if len(s.tables) >= s.db.opt.NumLevelZeroTablesStall {
35✔
201
                return false
×
202
        }
×
203

204
        s.tables = append(s.tables, t)
35✔
205
        t.IncrRef()
35✔
206
        s.addSize(t)
35✔
207

35✔
208
        return true
35✔
209
}
210

211
// This should be called while holding the lock on the level.
212
func (s *levelHandler) addSize(t *table.Table) {
67✔
213
        s.totalSize += t.Size()
67✔
214
        s.totalStaleSize += int64(t.StaleDataSize())
67✔
215
}
67✔
216

217
// This should be called while holding the lock on the level.
218
func (s *levelHandler) subtractSize(t *table.Table) {
×
219
        s.totalSize -= t.Size()
×
220
        s.totalStaleSize -= int64(t.StaleDataSize())
×
221
}
×
222
func (s *levelHandler) numTables() int {
5,867✔
223
        s.RLock()
5,867✔
224
        defer s.RUnlock()
5,867✔
225
        return len(s.tables)
5,867✔
226
}
5,867✔
227

228
func (s *levelHandler) close() error {
238✔
229
        s.RLock()
238✔
230
        defer s.RUnlock()
238✔
231
        var err error
238✔
232
        for _, t := range s.tables {
289✔
233
                if closeErr := t.Close(-1); closeErr != nil && err == nil {
51✔
234
                        err = closeErr
×
235
                }
×
236
        }
237
        return y.Wrap(err, "levelHandler.close")
238✔
238
}
239

240
// getTableForKey acquires a read-lock to access s.tables. It returns a list of tableHandlers.
241
func (s *levelHandler) getTableForKey(key []byte) ([]*table.Table, func() error) {
4,200,455✔
242
        s.RLock()
4,200,455✔
243
        defer s.RUnlock()
4,200,455✔
244

4,200,455✔
245
        if s.level == 0 {
4,800,520✔
246
                // For level 0, we need to check every table. Remember to make a copy as s.tables may change
600,065✔
247
                // once we exit this function, and we don't want to lock s.tables while seeking in tables.
600,065✔
248
                // CAUTION: Reverse the tables.
600,065✔
249
                out := make([]*table.Table, 0, len(s.tables))
600,065✔
250
                for i := len(s.tables) - 1; i >= 0; i-- {
600,065✔
251
                        out = append(out, s.tables[i])
×
252
                        s.tables[i].IncrRef()
×
253
                }
×
254
                return out, func() error {
1,200,130✔
255
                        for _, t := range out {
600,065✔
256
                                if err := t.DecrRef(); err != nil {
×
257
                                        return err
×
258
                                }
×
259
                        }
260
                        return nil
600,065✔
261
                }
262
        }
263
        // For level >= 1, we can do a binary search as key range does not overlap.
264
        idx := sort.Search(len(s.tables), func(i int) bool {
3,600,390✔
265
                return y.CompareKeys(s.tables[i].Biggest(), key) >= 0
×
266
        })
×
267
        if idx >= len(s.tables) {
7,200,780✔
268
                // Given key is strictly > than every element we have.
3,600,390✔
269
                return nil, func() error { return nil }
7,200,780✔
270
        }
271
        tbl := s.tables[idx]
×
272
        tbl.IncrRef()
×
273
        return []*table.Table{tbl}, tbl.DecrRef
×
274
}
275

276
// get returns value for a given key or the key after that. If not found, return nil.
277
func (s *levelHandler) get(key []byte) (y.ValueStruct, error) {
4,200,455✔
278
        tables, decr := s.getTableForKey(key)
4,200,455✔
279
        keyNoTs := y.ParseKey(key)
4,200,455✔
280

4,200,455✔
281
        hash := y.Hash(keyNoTs)
4,200,455✔
282
        var maxVs y.ValueStruct
4,200,455✔
283
        for _, th := range tables {
4,200,455✔
284
                if th.DoesNotHave(hash) {
×
285
                        y.NumLSMBloomHitsAdd(s.db.opt.MetricsEnabled, s.strLevel, 1)
×
286
                        continue
×
287
                }
288

289
                it := th.NewIterator(0)
×
290
                defer it.Close()
×
291

×
292
                y.NumLSMGetsAdd(s.db.opt.MetricsEnabled, s.strLevel, 1)
×
293
                it.Seek(key)
×
294
                if !it.Valid() {
×
295
                        continue
×
296
                }
297
                if y.SameKey(key, it.Key()) {
×
298
                        if version := y.ParseTs(it.Key()); maxVs.Version < version {
×
299
                                maxVs = it.ValueCopy()
×
300
                                maxVs.Version = version
×
301
                        }
×
302
                }
303
        }
304
        return maxVs, decr()
4,200,455✔
305
}
306

307
// appendIterators appends iterators to an array of iterators, for merging.
308
// Note: This obtains references for the table handlers. Remember to close these iterators.
309
func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
281,526✔
310
        s.RLock()
281,526✔
311
        defer s.RUnlock()
281,526✔
312

281,526✔
313
        var topt int
281,526✔
314
        if opt.Reverse {
281,526✔
315
                topt = table.REVERSED
×
316
        }
×
317
        if s.level == 0 {
321,744✔
318
                // Remember to add in reverse order!
40,218✔
319
                // The newer table at the end of s.tables should be added first as it takes precedence.
40,218✔
320
                // Level 0 tables are not in key sorted order, so we need to consider them one by one.
40,218✔
321
                var out []*table.Table
40,218✔
322
                for _, t := range s.tables {
40,262✔
323
                        if opt.pickTable(t) {
88✔
324
                                out = append(out, t)
44✔
325
                        }
44✔
326
                }
327
                return appendIteratorsReversed(iters, out, topt)
40,218✔
328
        }
329

330
        tables := opt.pickTables(s.tables)
241,308✔
331
        if len(tables) == 0 {
482,592✔
332
                return iters
241,284✔
333
        }
241,284✔
334
        return append(iters, table.NewConcatIterator(tables, topt))
24✔
335
}
336

337
type levelHandlerRLocked struct{}
338

339
// overlappingTables returns the tables that intersect with key range. Returns a half-interval.
340
// This function should already have acquired a read lock, and this is so important the caller must
341
// pass an empty parameter declaring such.
342
func (s *levelHandler) overlappingTables(_ levelHandlerRLocked, kr keyRange) (int, int) {
×
343
        if len(kr.left) == 0 || len(kr.right) == 0 {
×
344
                return 0, 0
×
345
        }
×
346
        left := sort.Search(len(s.tables), func(i int) bool {
×
347
                return y.CompareKeys(kr.left, s.tables[i].Biggest()) <= 0
×
348
        })
×
349
        right := sort.Search(len(s.tables), func(i int) bool {
×
350
                return y.CompareKeys(kr.right, s.tables[i].Smallest()) < 0
×
351
        })
×
352
        return left, right
×
353
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc