• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / badger / 4229068887

21 Feb 2023 04:16AM UTC coverage: 42.43% (-18.8%) from 61.239%
4229068887

Pull #1866

Aman Mangal
combine memtables before flushing to L0
Pull Request #1866: combine memtables before flushing to L0

49 of 49 new or added lines in 1 file covered. (100.0%)

5919 of 13950 relevant lines covered (42.43%)

437472.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.16
/batch.go
1
/*
2
 * Copyright 2018 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package badger
18

19
import (
20
        "sync"
21
        "sync/atomic"
22

23
        "github.com/pkg/errors"
24

25
        "github.com/dgraph-io/badger/v3/pb"
26
        "github.com/dgraph-io/badger/v3/y"
27
        "github.com/dgraph-io/ristretto/z"
28
)
29

30
// WriteBatch holds the necessary info to perform batched writes.
31
type WriteBatch struct {
32
        sync.Mutex
33
        txn      *Txn
34
        db       *DB
35
        throttle *y.Throttle
36
        err      atomic.Value
37

38
        isManaged bool
39
        commitTs  uint64
40
        finished  bool
41
}
42

43
// NewWriteBatch creates a new WriteBatch. This provides a way to conveniently do a lot of writes,
44
// batching them up as tightly as possible in a single transaction and using callbacks to avoid
45
// waiting for them to commit, thus achieving good performance. This API hides away the logic of
46
// creating and committing transactions. Due to the nature of SSI guaratees provided by Badger,
47
// blind writes can never encounter transaction conflicts (ErrConflict).
48
func (db *DB) NewWriteBatch() *WriteBatch {
4✔
49
        if db.opt.managedTxns {
4✔
50
                panic("cannot use NewWriteBatch in managed mode. Use NewWriteBatchAt instead")
×
51
        }
52
        return db.newWriteBatch(false)
4✔
53
}
54

55
func (db *DB) newWriteBatch(isManaged bool) *WriteBatch {
5✔
56
        return &WriteBatch{
5✔
57
                db:        db,
5✔
58
                isManaged: isManaged,
5✔
59
                txn:       db.newTransaction(true, isManaged),
5✔
60
                throttle:  y.NewThrottle(16),
5✔
61
        }
5✔
62
}
5✔
63

64
// SetMaxPendingTxns sets a limit on maximum number of pending transactions while writing batches.
65
// This function should be called before using WriteBatch. Default value of MaxPendingTxns is
66
// 16 to minimise memory usage.
67
func (wb *WriteBatch) SetMaxPendingTxns(max int) {
×
68
        wb.throttle = y.NewThrottle(max)
×
69
}
×
70

71
// Cancel function must be called if there's a chance that Flush might not get
72
// called. If neither Flush or Cancel is called, the transaction oracle would
73
// never get a chance to clear out the row commit timestamp map, thus causing an
74
// unbounded memory consumption. Typically, you can call Cancel as a defer
75
// statement right after NewWriteBatch is called.
76
//
77
// Note that any committed writes would still go through despite calling Cancel.
78
func (wb *WriteBatch) Cancel() {
×
79
        wb.Lock()
×
80
        defer wb.Unlock()
×
81
        wb.finished = true
×
82
        if err := wb.throttle.Finish(); err != nil {
×
83
                wb.db.opt.Errorf("WatchBatch.Cancel error while finishing: %v", err)
×
84
        }
×
85
        wb.txn.Discard()
×
86
}
87

88
func (wb *WriteBatch) callback(err error) {
33✔
89
        // sync.WaitGroup is thread-safe, so it doesn't need to be run inside wb.Lock.
33✔
90
        defer wb.throttle.Done(err)
33✔
91
        if err == nil {
66✔
92
                return
33✔
93
        }
33✔
94
        if err := wb.Error(); err != nil {
×
95
                return
×
96
        }
×
97
        wb.err.Store(err)
×
98
}
99

100
func (wb *WriteBatch) writeKV(kv *pb.KV) error {
×
101
        e := Entry{Key: kv.Key, Value: kv.Value}
×
102
        if len(kv.UserMeta) > 0 {
×
103
                e.UserMeta = kv.UserMeta[0]
×
104
        }
×
105
        y.AssertTrue(kv.Version != 0)
×
106
        e.version = kv.Version
×
107
        return wb.handleEntry(&e)
×
108
}
109

110
func (wb *WriteBatch) Write(buf *z.Buffer) error {
×
111
        wb.Lock()
×
112
        defer wb.Unlock()
×
113

×
114
        err := buf.SliceIterate(func(s []byte) error {
×
115
                kv := &pb.KV{}
×
116
                if err := kv.Unmarshal(s); err != nil {
×
117
                        return err
×
118
                }
×
119
                return wb.writeKV(kv)
×
120
        })
121
        return err
×
122
}
123

124
func (wb *WriteBatch) WriteList(kvList *pb.KVList) error {
×
125
        wb.Lock()
×
126
        defer wb.Unlock()
×
127
        for _, kv := range kvList.Kv {
×
128
                if err := wb.writeKV(kv); err != nil {
×
129
                        return err
×
130
                }
×
131
        }
132
        return nil
×
133
}
134

135
// SetEntryAt is the equivalent of Txn.SetEntry but it also allows setting version for the entry.
136
// SetEntryAt can be used only in managed mode.
137
func (wb *WriteBatch) SetEntryAt(e *Entry, ts uint64) error {
×
138
        if !wb.db.opt.managedTxns {
×
139
                return errors.New("SetEntryAt can only be used in managed mode. Use SetEntry instead")
×
140
        }
×
141
        e.version = ts
×
142
        return wb.SetEntry(e)
×
143
}
144

145
// Should be called with lock acquired.
146
func (wb *WriteBatch) handleEntry(e *Entry) error {
3,040,000✔
147
        if err := wb.txn.SetEntry(e); err != ErrTxnTooBig {
6,079,972✔
148
                return err
3,039,972✔
149
        }
3,039,972✔
150
        // Txn has reached it's zenith. Commit now.
151
        if cerr := wb.commit(); cerr != nil {
28✔
152
                return cerr
×
153
        }
×
154
        // This time the error must not be ErrTxnTooBig, otherwise, we make the
155
        // error permanent.
156
        if err := wb.txn.SetEntry(e); err != nil {
28✔
157
                wb.err.Store(err)
×
158
                return err
×
159
        }
×
160
        return nil
28✔
161
}
162

163
// SetEntry is the equivalent of Txn.SetEntry.
164
func (wb *WriteBatch) SetEntry(e *Entry) error {
3,040,000✔
165
        wb.Lock()
3,040,000✔
166
        defer wb.Unlock()
3,040,000✔
167
        return wb.handleEntry(e)
3,040,000✔
168
}
3,040,000✔
169

170
// Set is equivalent of Txn.Set().
171
func (wb *WriteBatch) Set(k, v []byte) error {
40,000✔
172
        e := &Entry{Key: k, Value: v}
40,000✔
173
        return wb.SetEntry(e)
40,000✔
174
}
40,000✔
175

176
// DeleteAt is equivalent of Txn.Delete but accepts a delete timestamp.
177
func (wb *WriteBatch) DeleteAt(k []byte, ts uint64) error {
×
178
        e := Entry{Key: k, meta: bitDelete, version: ts}
×
179
        return wb.SetEntry(&e)
×
180
}
×
181

182
// Delete is equivalent of Txn.Delete.
183
func (wb *WriteBatch) Delete(k []byte) error {
×
184
        wb.Lock()
×
185
        defer wb.Unlock()
×
186

×
187
        if err := wb.txn.Delete(k); err != ErrTxnTooBig {
×
188
                return err
×
189
        }
×
190
        if err := wb.commit(); err != nil {
×
191
                return err
×
192
        }
×
193
        if err := wb.txn.Delete(k); err != nil {
×
194
                wb.err.Store(err)
×
195
                return err
×
196
        }
×
197
        return nil
×
198
}
199

200
// Caller to commit must hold a write lock.
201
func (wb *WriteBatch) commit() error {
33✔
202
        if err := wb.Error(); err != nil {
33✔
203
                return err
×
204
        }
×
205
        if wb.finished {
33✔
206
                return y.ErrCommitAfterFinish
×
207
        }
×
208
        if err := wb.throttle.Do(); err != nil {
33✔
209
                wb.err.Store(err)
×
210
                return err
×
211
        }
×
212
        wb.txn.CommitWith(wb.callback)
33✔
213
        wb.txn = wb.db.newTransaction(true, wb.isManaged)
33✔
214
        wb.txn.commitTs = wb.commitTs
33✔
215
        return wb.Error()
33✔
216
}
217

218
// Flush must be called at the end to ensure that any pending writes get committed to Badger. Flush
219
// returns any error stored by WriteBatch.
220
func (wb *WriteBatch) Flush() error {
5✔
221
        wb.Lock()
5✔
222
        err := wb.commit()
5✔
223
        if err != nil {
5✔
224
                wb.Unlock()
×
225
                return err
×
226
        }
×
227
        wb.finished = true
5✔
228
        wb.txn.Discard()
5✔
229
        wb.Unlock()
5✔
230

5✔
231
        if err := wb.throttle.Finish(); err != nil {
5✔
232
                if wb.Error() != nil {
×
233
                        return errors.Errorf("wb.err: %s err: %s", wb.Error(), err)
×
234
                }
×
235
                return err
×
236
        }
237

238
        return wb.Error()
5✔
239
}
240

241
// Error returns any errors encountered so far. No commits would be run once an error is detected.
242
func (wb *WriteBatch) Error() error {
71✔
243
        // If the interface conversion fails, the err will be nil.
71✔
244
        err, _ := wb.err.Load().(error)
71✔
245
        return err
71✔
246
}
71✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc