• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / ristretto / 6029454936

30 Aug 2023 04:50AM UTC coverage: 73.696% (+0.07%) from 73.63%
6029454936

push

web-flow
remove glog dependency (#350)

glog adds global flags so it is not a good library (adding to an
existing program causes a panic if a flag name conflicts)

12 of 12 new or added lines in 3 files covered. (100.0%)

2219 of 3011 relevant lines covered (73.7%)

2509947.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.17
/z/buffer.go
1
/*
2
 * Copyright 2020 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package z
18

19
import (
20
        "encoding/binary"
21
        "fmt"
22
        "io/ioutil"
23
        "log"
24
        "os"
25
        "sort"
26
        "sync/atomic"
27

28
        "github.com/pkg/errors"
29
)
30

31
const (
32
        defaultCapacity = 64
33
        defaultTag      = "buffer"
34
)
35

36
// Buffer is equivalent of bytes.Buffer without the ability to read. It is NOT thread-safe.
37
//
38
// In UseCalloc mode, z.Calloc is used to allocate memory, which depending upon how the code is
39
// compiled could use jemalloc for allocations.
40
//
41
// In UseMmap mode, Buffer  uses file mmap to allocate memory. This allows us to store big data
42
// structures without using physical memory.
43
//
44
// MaxSize can be set to limit the memory usage.
45
type Buffer struct {
46
        padding       uint64     // number of starting bytes used for padding
47
        offset        uint64     // used length of the buffer
48
        buf           []byte     // backing slice for the buffer
49
        bufType       BufferType // type of the underlying buffer
50
        curSz         int        // capacity of the buffer
51
        maxSz         int        // causes a panic if the buffer grows beyond this size
52
        mmapFile      *MmapFile  // optional mmap backing for the buffer
53
        autoMmapAfter int        // Calloc falls back to an mmaped tmpfile after crossing this size
54
        autoMmapDir   string     // directory for autoMmap to create a tempfile in
55
        persistent    bool       // when enabled, Release will not delete the underlying mmap file
56
        tag           string     // used for jemalloc stats
57
}
58

59
func NewBuffer(capacity int, tag string) *Buffer {
2,023✔
60
        if capacity < defaultCapacity {
2,028✔
61
                capacity = defaultCapacity
5✔
62
        }
5✔
63
        if tag == "" {
3,026✔
64
                tag = defaultTag
1,003✔
65
        }
1,003✔
66
        return &Buffer{
2,023✔
67
                buf:     Calloc(capacity, tag),
2,023✔
68
                bufType: UseCalloc,
2,023✔
69
                curSz:   capacity,
2,023✔
70
                offset:  8,
2,023✔
71
                padding: 8,
2,023✔
72
                tag:     tag,
2,023✔
73
        }
2,023✔
74
}
75

76
// It is the caller's responsibility to set offset after this, because Buffer
77
// doesn't remember what it was.
78
func NewBufferPersistent(path string, capacity int) (*Buffer, error) {
3✔
79
        file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
3✔
80
        if err != nil {
3✔
81
                return nil, err
×
82
        }
×
83
        buffer, err := newBufferFile(file, capacity)
3✔
84
        if err != nil {
3✔
85
                return nil, err
×
86
        }
×
87
        buffer.persistent = true
3✔
88
        return buffer, nil
3✔
89
}
90

91
func NewBufferTmp(dir string, capacity int) (*Buffer, error) {
6✔
92
        if dir == "" {
12✔
93
                dir = tmpDir
6✔
94
        }
6✔
95
        file, err := ioutil.TempFile(dir, "buffer")
6✔
96
        if err != nil {
6✔
97
                return nil, err
×
98
        }
×
99
        return newBufferFile(file, capacity)
6✔
100
}
101

102
func newBufferFile(file *os.File, capacity int) (*Buffer, error) {
9✔
103
        if capacity < defaultCapacity {
12✔
104
                capacity = defaultCapacity
3✔
105
        }
3✔
106
        mmapFile, err := OpenMmapFileUsing(file, capacity, true)
9✔
107
        if err != nil && err != NewFile {
9✔
108
                return nil, err
×
109
        }
×
110
        buf := &Buffer{
9✔
111
                buf:      mmapFile.Data,
9✔
112
                bufType:  UseMmap,
9✔
113
                curSz:    len(mmapFile.Data),
9✔
114
                mmapFile: mmapFile,
9✔
115
                offset:   8,
9✔
116
                padding:  8,
9✔
117
        }
9✔
118
        return buf, nil
9✔
119
}
120

121
func NewBufferSlice(slice []byte) *Buffer {
×
122
        return &Buffer{
×
123
                offset:  uint64(len(slice)),
×
124
                buf:     slice,
×
125
                bufType: UseInvalid,
×
126
        }
×
127
}
×
128

129
func (b *Buffer) WithAutoMmap(threshold int, path string) *Buffer {
1✔
130
        if b.bufType != UseCalloc {
1✔
131
                panic("can only autoMmap with UseCalloc")
×
132
        }
133
        b.autoMmapAfter = threshold
1✔
134
        if path == "" {
2✔
135
                b.autoMmapDir = tmpDir
1✔
136
        } else {
1✔
137
                b.autoMmapDir = path
×
138
        }
×
139
        return b
1✔
140
}
141

142
func (b *Buffer) WithMaxSize(size int) *Buffer {
×
143
        b.maxSz = size
×
144
        return b
×
145
}
×
146

147
func (b *Buffer) IsEmpty() bool {
7✔
148
        return int(b.offset) == b.StartOffset()
7✔
149
}
7✔
150

151
// LenWithPadding would return the number of bytes written to the buffer so far
152
// plus the padding at the start of the buffer.
153
func (b *Buffer) LenWithPadding() int {
1✔
154
        return int(atomic.LoadUint64(&b.offset))
1✔
155
}
1✔
156

157
// LenNoPadding would return the number of bytes written to the buffer so far
158
// (without the padding).
159
func (b *Buffer) LenNoPadding() int {
×
160
        return int(atomic.LoadUint64(&b.offset) - b.padding)
×
161
}
×
162

163
// Bytes would return all the written bytes as a slice.
164
func (b *Buffer) Bytes() []byte {
73,511✔
165
        off := atomic.LoadUint64(&b.offset)
73,511✔
166
        return b.buf[b.padding:off]
73,511✔
167
}
73,511✔
168

169
// Grow would grow the buffer to have at least n more bytes. In case the buffer is at capacity, it
170
// would reallocate twice the size of current capacity + n, to ensure n bytes can be written to the
171
// buffer without further allocation. In UseMmap mode, this might result in underlying file
172
// expansion.
173
func (b *Buffer) Grow(n int) {
980,383✔
174
        if b.buf == nil {
980,383✔
175
                panic("z.Buffer needs to be initialized before using")
×
176
        }
177
        if b.maxSz > 0 && int(b.offset)+n > b.maxSz {
980,383✔
178
                err := fmt.Errorf(
×
179
                        "z.Buffer max size exceeded: %d offset: %d grow: %d", b.maxSz, b.offset, n)
×
180
                panic(err)
×
181
        }
182
        if int(b.offset)+n < b.curSz {
1,958,684✔
183
                return
978,301✔
184
        }
978,301✔
185

186
        // Calculate new capacity.
187
        growBy := b.curSz + n
2,082✔
188
        // Don't allocate more than 1GB at a time.
2,082✔
189
        if growBy > 1<<30 {
2,082✔
190
                growBy = 1 << 30
×
191
        }
×
192
        // Allocate at least n, even if it exceeds the 1GB limit above.
193
        if n > growBy {
2,082✔
194
                growBy = n
×
195
        }
×
196
        b.curSz += growBy
2,082✔
197

2,082✔
198
        switch b.bufType {
2,082✔
199
        case UseCalloc:
2,053✔
200
                // If autoMmap gets triggered, copy the slice over to an mmaped file.
2,053✔
201
                if b.autoMmapAfter > 0 && b.curSz > b.autoMmapAfter {
2,054✔
202
                        b.bufType = UseMmap
1✔
203
                        file, err := ioutil.TempFile(b.autoMmapDir, "")
1✔
204
                        if err != nil {
1✔
205
                                panic(err)
×
206
                        }
207
                        mmapFile, err := OpenMmapFileUsing(file, b.curSz, true)
1✔
208
                        if err != nil && err != NewFile {
1✔
209
                                panic(err)
×
210
                        }
211
                        assert(int(b.offset) == copy(mmapFile.Data, b.buf[:b.offset]))
1✔
212
                        Free(b.buf)
1✔
213
                        b.mmapFile = mmapFile
1✔
214
                        b.buf = mmapFile.Data
1✔
215
                        break
1✔
216
                }
217

218
                // Else, reallocate the slice.
219
                newBuf := Calloc(b.curSz, b.tag)
2,052✔
220
                assert(int(b.offset) == copy(newBuf, b.buf[:b.offset]))
2,052✔
221
                Free(b.buf)
2,052✔
222
                b.buf = newBuf
2,052✔
223

224
        case UseMmap:
29✔
225
                // Truncate and remap the underlying file.
29✔
226
                if err := b.mmapFile.Truncate(int64(b.curSz)); err != nil {
29✔
227
                        err = errors.Wrapf(err,
×
228
                                "while trying to truncate file: %s to size: %d", b.mmapFile.Fd.Name(), b.curSz)
×
229
                        panic(err)
×
230
                }
231
                b.buf = b.mmapFile.Data
29✔
232

233
        default:
×
234
                panic("can only use Grow on UseCalloc and UseMmap buffers")
×
235
        }
236
}
237

238
// Allocate is a way to get a slice of size n back from the buffer. This slice can be directly
239
// written to. Warning: Allocate is not thread-safe. The byte slice returned MUST be used before
240
// further calls to Buffer.
241
func (b *Buffer) Allocate(n int) []byte {
444,546✔
242
        b.Grow(n)
444,546✔
243
        off := b.offset
444,546✔
244
        b.offset += uint64(n)
444,546✔
245
        return b.buf[off:int(b.offset)]
444,546✔
246
}
444,546✔
247

248
// AllocateOffset works the same way as allocate, but instead of returning a byte slice, it returns
249
// the offset of the allocation.
250
func (b *Buffer) AllocateOffset(n int) int {
71,070✔
251
        b.Grow(n)
71,070✔
252
        b.offset += uint64(n)
71,070✔
253
        return int(b.offset) - n
71,070✔
254
}
71,070✔
255

256
func (b *Buffer) writeLen(sz int) {
222,273✔
257
        buf := b.Allocate(4)
222,273✔
258
        binary.BigEndian.PutUint32(buf, uint32(sz))
222,273✔
259
}
222,273✔
260

261
// SliceAllocate would encode the size provided into the buffer, followed by a call to Allocate,
262
// hence returning the slice of size sz. This can be used to allocate a lot of small buffers into
263
// this big buffer.
264
// Note that SliceAllocate should NOT be mixed with normal calls to Write.
265
func (b *Buffer) SliceAllocate(sz int) []byte {
222,273✔
266
        b.Grow(4 + sz)
222,273✔
267
        b.writeLen(sz)
222,273✔
268
        return b.Allocate(sz)
222,273✔
269
}
222,273✔
270

271
func (b *Buffer) StartOffset() int {
6,466✔
272
        return int(b.padding)
6,466✔
273
}
6,466✔
274

275
func (b *Buffer) WriteSlice(slice []byte) {
1✔
276
        dst := b.SliceAllocate(len(slice))
1✔
277
        assert(len(slice) == copy(dst, slice))
1✔
278
}
1✔
279

280
func (b *Buffer) SliceIterate(f func(slice []byte) error) error {
7✔
281
        if b.IsEmpty() {
7✔
282
                return nil
×
283
        }
×
284
        slice, next := []byte{}, b.StartOffset()
7✔
285
        for next >= 0 {
222,279✔
286
                slice, next = b.Slice(next)
222,272✔
287
                if len(slice) == 0 {
222,272✔
288
                        continue
×
289
                }
290
                if err := f(slice); err != nil {
222,272✔
291
                        return err
×
292
                }
×
293
        }
294
        return nil
7✔
295
}
296

297
const (
298
        UseCalloc BufferType = iota
299
        UseMmap
300
        UseInvalid
301
)
302

303
type BufferType int
304

305
func (t BufferType) String() string {
12✔
306
        switch t {
12✔
307
        case UseCalloc:
6✔
308
                return "UseCalloc"
6✔
309
        case UseMmap:
6✔
310
                return "UseMmap"
6✔
311
        default:
×
312
                return "UseInvalid"
×
313
        }
314
}
315

316
type LessFunc func(a, b []byte) bool
317
type sortHelper struct {
318
        offsets []int
319
        b       *Buffer
320
        tmp     *Buffer
321
        less    LessFunc
322
        small   []int
323
}
324

325
func (s *sortHelper) sortSmall(start, end int) {
2,219✔
326
        s.tmp.Reset()
2,219✔
327
        s.small = s.small[:0]
2,219✔
328
        next := start
2,219✔
329
        for next >= 0 && next < end {
244,492✔
330
                s.small = append(s.small, next)
242,273✔
331
                _, next = s.b.Slice(next)
242,273✔
332
        }
242,273✔
333

334
        // We are sorting the slices pointed to by s.small offsets, but only moving the offsets around.
335
        sort.Slice(s.small, func(i, j int) bool {
2,288,861✔
336
                left, _ := s.b.Slice(s.small[i])
2,286,642✔
337
                right, _ := s.b.Slice(s.small[j])
2,286,642✔
338
                return s.less(left, right)
2,286,642✔
339
        })
2,286,642✔
340
        // Now we iterate over the s.small offsets and copy over the slices. The result is now in order.
341
        for _, off := range s.small {
244,492✔
342
                s.tmp.Write(rawSlice(s.b.buf[off:]))
242,273✔
343
        }
242,273✔
344
        assert(end-start == copy(s.b.buf[start:end], s.tmp.Bytes()))
2,219✔
345
}
346

347
func assert(b bool) {
29,060,368✔
348
        if !b {
29,060,368✔
349
                log.Fatalf("%+v", errors.Errorf("Assertion failure"))
×
350
        }
×
351
}
352
func check(err error) {
211✔
353
        if err != nil {
211✔
354
                log.Fatalf("%+v", err)
×
355
        }
×
356
}
357
func check2(_ interface{}, err error) {
211✔
358
        check(err)
211✔
359
}
211✔
360

361
func (s *sortHelper) merge(left, right []byte, start, end int) {
211✔
362
        if len(left) == 0 || len(right) == 0 {
211✔
363
                return
×
364
        }
×
365
        s.tmp.Reset()
211✔
366
        check2(s.tmp.Write(left))
211✔
367
        left = s.tmp.Bytes()
211✔
368

211✔
369
        var ls, rs []byte
211✔
370

211✔
371
        copyLeft := func() {
630,795✔
372
                assert(len(ls) == copy(s.b.buf[start:], ls))
630,584✔
373
                left = left[len(ls):]
630,584✔
374
                start += len(ls)
630,584✔
375
        }
630,584✔
376
        copyRight := func() {
663,805✔
377
                assert(len(rs) == copy(s.b.buf[start:], rs))
663,594✔
378
                right = right[len(rs):]
663,594✔
379
                start += len(rs)
663,594✔
380
        }
663,594✔
381

382
        for start < end {
1,294,600✔
383
                if len(left) == 0 {
1,294,495✔
384
                        assert(len(right) == copy(s.b.buf[start:end], right))
106✔
385
                        return
106✔
386
                }
106✔
387
                if len(right) == 0 {
1,294,388✔
388
                        assert(len(left) == copy(s.b.buf[start:end], left))
105✔
389
                        return
105✔
390
                }
105✔
391
                ls = rawSlice(left)
1,294,178✔
392
                rs = rawSlice(right)
1,294,178✔
393

1,294,178✔
394
                // We skip the first 4 bytes in the rawSlice, because that stores the length.
1,294,178✔
395
                if s.less(ls[4:], rs[4:]) {
1,924,762✔
396
                        copyLeft()
630,584✔
397
                } else {
1,294,178✔
398
                        copyRight()
663,594✔
399
                }
663,594✔
400
        }
401
}
402

403
func (s *sortHelper) sort(lo, hi int) []byte {
2,430✔
404
        assert(lo <= hi)
2,430✔
405

2,430✔
406
        mid := lo + (hi-lo)/2
2,430✔
407
        loff, hoff := s.offsets[lo], s.offsets[hi]
2,430✔
408
        if lo == mid {
4,649✔
409
                // No need to sort, just return the buffer.
2,219✔
410
                return s.b.buf[loff:hoff]
2,219✔
411
        }
2,219✔
412

413
        // lo, mid would sort from [offset[lo], offset[mid]) .
414
        left := s.sort(lo, mid)
211✔
415
        // Typically we'd use mid+1, but here mid represents an offset in the buffer. Each offset
211✔
416
        // contains a thousand entries. So, if we do mid+1, we'd skip over those entries.
211✔
417
        right := s.sort(mid, hi)
211✔
418

211✔
419
        s.merge(left, right, loff, hoff)
211✔
420
        return s.b.buf[loff:hoff]
211✔
421
}
422

423
// SortSlice is like SortSliceBetween but sorting over the entire buffer.
424
func (b *Buffer) SortSlice(less func(left, right []byte) bool) {
6✔
425
        b.SortSliceBetween(b.StartOffset(), int(b.offset), less)
6✔
426
}
6✔
427
func (b *Buffer) SortSliceBetween(start, end int, less LessFunc) {
2,008✔
428
        if start >= end {
2,008✔
429
                return
×
430
        }
×
431
        if start == 0 {
2,008✔
432
                panic("start can never be zero")
×
433
        }
434

435
        var offsets []int
2,008✔
436
        next, count := start, 0
2,008✔
437
        for next >= 0 && next < end {
244,281✔
438
                if count%1024 == 0 {
244,492✔
439
                        offsets = append(offsets, next)
2,219✔
440
                }
2,219✔
441
                _, next = b.Slice(next)
242,273✔
442
                count++
242,273✔
443
        }
444
        assert(len(offsets) > 0)
2,008✔
445
        if offsets[len(offsets)-1] != end {
4,016✔
446
                offsets = append(offsets, end)
2,008✔
447
        }
2,008✔
448

449
        szTmp := int(float64((end-start)/2) * 1.1)
2,008✔
450
        s := &sortHelper{
2,008✔
451
                offsets: offsets,
2,008✔
452
                b:       b,
2,008✔
453
                less:    less,
2,008✔
454
                small:   make([]int, 0, 1024),
2,008✔
455
                tmp:     NewBuffer(szTmp, b.tag),
2,008✔
456
        }
2,008✔
457
        defer s.tmp.Release()
2,008✔
458

2,008✔
459
        left := offsets[0]
2,008✔
460
        for _, off := range offsets[1:] {
4,227✔
461
                s.sortSmall(left, off)
2,219✔
462
                left = off
2,219✔
463
        }
2,219✔
464
        s.sort(0, len(offsets)-1)
2,008✔
465
}
466

467
func rawSlice(buf []byte) []byte {
2,830,629✔
468
        sz := binary.BigEndian.Uint32(buf)
2,830,629✔
469
        return buf[:4+int(sz)]
2,830,629✔
470
}
2,830,629✔
471

472
// Slice would return the slice written at offset.
473
func (b *Buffer) Slice(offset int) ([]byte, int) {
5,320,102✔
474
        if offset >= int(b.offset) {
5,320,102✔
475
                return nil, -1
×
476
        }
×
477

478
        sz := binary.BigEndian.Uint32(b.buf[offset:])
5,320,102✔
479
        start := offset + 4
5,320,102✔
480
        next := start + int(sz)
5,320,102✔
481
        res := b.buf[start:next]
5,320,102✔
482
        if next >= int(b.offset) {
5,320,242✔
483
                next = -1
140✔
484
        }
140✔
485
        return res, next
5,320,102✔
486
}
487

488
// SliceOffsets is an expensive function. Use sparingly.
489
func (b *Buffer) SliceOffsets() []int {
×
490
        next := b.StartOffset()
×
491
        var offsets []int
×
492
        for next >= 0 {
×
493
                offsets = append(offsets, next)
×
494
                _, next = b.Slice(next)
×
495
        }
×
496
        return offsets
×
497
}
498

499
func (b *Buffer) Data(offset int) []byte {
2✔
500
        if offset > b.curSz {
2✔
501
                panic("offset beyond current size")
×
502
        }
503
        return b.buf[offset:b.curSz]
2✔
504
}
505

506
// Write would write p bytes to the buffer.
507
func (b *Buffer) Write(p []byte) (n int, err error) {
242,494✔
508
        n = len(p)
242,494✔
509
        b.Grow(n)
242,494✔
510
        assert(n == copy(b.buf[b.offset:], p))
242,494✔
511
        b.offset += uint64(n)
242,494✔
512
        return n, nil
242,494✔
513
}
242,494✔
514

515
// Reset would reset the buffer to be reused.
516
func (b *Buffer) Reset() {
2,438✔
517
        b.offset = uint64(b.StartOffset())
2,438✔
518
}
2,438✔
519

520
// Release would free up the memory allocated by the buffer. Once the usage of buffer is done, it is
521
// important to call Release, otherwise a memory leak can happen.
522
func (b *Buffer) Release() error {
2,032✔
523
        if b == nil {
2,032✔
524
                return nil
×
525
        }
×
526
        switch b.bufType {
2,032✔
527
        case UseCalloc:
2,022✔
528
                Free(b.buf)
2,022✔
529
        case UseMmap:
10✔
530
                if b.mmapFile == nil {
10✔
531
                        return nil
×
532
                }
×
533
                path := b.mmapFile.Fd.Name()
10✔
534
                if err := b.mmapFile.Close(-1); err != nil {
10✔
535
                        return errors.Wrapf(err, "while closing file: %s", path)
×
536
                }
×
537
                if !b.persistent {
17✔
538
                        if err := os.Remove(path); err != nil {
7✔
539
                                return errors.Wrapf(err, "while deleting file %s", path)
×
540
                        }
×
541
                }
542
        }
543
        return nil
2,032✔
544
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc