• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

smallnest / ringbuffer / 21159004936

20 Jan 2026 04:01AM UTC coverage: 89.286% (-0.9%) from 90.179%
21159004936

push

github

chaoyuepan
#21 fix panic

14 of 19 new or added lines in 2 files covered. (73.68%)

3 existing lines in 1 file now uncovered.

600 of 672 relevant lines covered (89.29%)

47044.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.71
/ring_buffer.go
1
// Copyright 2019 smallnest. All rights reserved.
2
// Use of this source code is governed by a MIT-style
3
// license that can be found in the LICENSE file.
4

5
package ringbuffer
6

7
import (
8
        "context"
9
        "errors"
10
        "io"
11
        "sync"
12
        "time"
13
        "unsafe"
14
)
15

16
var (
17
        // ErrTooMuchDataToWrite is returned when the data to write is more than the buffer size.
18
        ErrTooMuchDataToWrite = errors.New("too much data to write")
19

20
        // ErrIsFull is returned when the buffer is full and not blocking.
21
        ErrIsFull = errors.New("ringbuffer is full")
22

23
        // ErrIsEmpty is returned when the buffer is empty and not blocking.
24
        ErrIsEmpty = errors.New("ringbuffer is empty")
25

26
        // ErrIsNotEmpty is returned when the buffer is not empty and not blocking.
27
        ErrIsNotEmpty = errors.New("ringbuffer is not empty")
28

29
        // ErrAcquireLock is returned when the lock is not acquired on Try operations.
30
        ErrAcquireLock = errors.New("unable to acquire lock")
31

32
        // ErrWriteOnClosed is returned when write on a closed ringbuffer.
33
        ErrWriteOnClosed = errors.New("write on closed ringbuffer")
34

35
        // ErrReaderClosed is returned when a ReadClosed closed the ringbuffer.
36
        ErrReaderClosed = errors.New("reader closed")
37

38
        // ErrReset is returned when Reset() is called, causing pending operations to abort.
39
        ErrReset = errors.New("reset called")
40
)
41

42
// RingBuffer is a circular buffer that implements io.ReaderWriter interface.
43
// It operates like a buffered pipe, where data is written to a RingBuffer
44
// and can be read back from another goroutine.
45
// It is safe to concurrently read and write RingBuffer.
46
type RingBuffer struct {
47
        buf       []byte
48
        size      int
49
        r         int // next position to read
50
        w         int // next position to write
51
        isFull    bool
52
        err       error
53
        block     bool
54
        overwrite bool          // when true, overwrite old data when buffer is full
55
        rTimeout  time.Duration // Applies to writes (waits for the read condition)
56
        wTimeout  time.Duration // Applies to read (wait for the write condition)
57
        mu        sync.Mutex
58
        wg        sync.WaitGroup
59
        readCond  *sync.Cond // Signaled when data has been read.
60
        writeCond *sync.Cond // Signaled when data has been written.
61
}
62

63
// New returns a new RingBuffer whose buffer has the given size.
64
func New(size int) *RingBuffer {
175✔
65
        return &RingBuffer{
175✔
66
                buf:  make([]byte, size),
175✔
67
                size: size,
175✔
68
        }
175✔
69
}
175✔
70

71
// NewBuffer returns a new RingBuffer whose buffer is provided.
72
func NewBuffer(b []byte) *RingBuffer {
×
73
        return &RingBuffer{
×
74
                buf:  b,
×
75
                size: len(b),
×
76
        }
×
77
}
×
78

79
// SetBlocking sets the blocking mode of the ring buffer.
80
// If block is true, Read and Write will block when there is no data to read or no space to write.
81
// If block is false, Read and Write will return ErrIsEmpty or ErrIsFull immediately.
82
// By default, the ring buffer is not blocking.
83
// This setting should be called before any Read or Write operation or after a Reset.
84
func (r *RingBuffer) SetBlocking(block bool) *RingBuffer {
252✔
85
        r.block = block
252✔
86
        if block {
504✔
87
                r.readCond = sync.NewCond(&r.mu)
252✔
88
                r.writeCond = sync.NewCond(&r.mu)
252✔
89
        }
252✔
90
        return r
252✔
91
}
92

93
// SetOverwrite sets the overwrite mode of the ring buffer.
94
// If overwrite is true, Write operations will overwrite the oldest data when the buffer is full,
95
// similar to a traditional circular buffer. The read pointer will advance to skip overwritten data.
96
// If overwrite is false (default), Write will return ErrIsFull or block (if blocking mode is enabled).
97
func (r *RingBuffer) SetOverwrite(overwrite bool) *RingBuffer {
9✔
98
        r.overwrite = overwrite
9✔
99
        return r
9✔
100
}
9✔
101

102
// WithCancel sets a context to cancel the ring buffer.
103
// When the context is canceled, the ring buffer will be closed with the context error.
104
// A goroutine will be started and run until the provided context is canceled.
105
func (r *RingBuffer) WithCancel(ctx context.Context) *RingBuffer {
×
106
        go func() {
×
NEW
107
                <-ctx.Done()
×
NEW
108
                r.CloseWithError(ctx.Err())
×
109
        }()
×
110
        return r
×
111
}
112

113
// WithTimeout will set a blocking read/write timeout.
114
// If no reads or writes occur within the timeout,
115
// the ringbuffer will be closed and context.DeadlineExceeded will be returned.
116
// A timeout of 0 or less will disable timeouts (default).
117
func (r *RingBuffer) WithTimeout(d time.Duration) *RingBuffer {
28✔
118
        r.mu.Lock()
28✔
119
        r.rTimeout = d
28✔
120
        r.wTimeout = d
28✔
121
        r.mu.Unlock()
28✔
122
        return r
28✔
123
}
28✔
124

125
// WithReadTimeout will set a blocking read timeout.
126
// Reads refers to any call that reads data from the buffer.
127
// If no writes occur within the timeout,
128
// the ringbuffer will be closed and context.DeadlineExceeded will be returned.
129
// A timeout of 0 or less will disable timeouts (default).
130
func (r *RingBuffer) WithReadTimeout(d time.Duration) *RingBuffer {
7✔
131
        r.mu.Lock()
7✔
132
        // Read operations wait for writes to complete,
7✔
133
        // therefore we set the wTimeout.
7✔
134
        r.wTimeout = d
7✔
135
        r.mu.Unlock()
7✔
136
        return r
7✔
137
}
7✔
138

139
// WithWriteTimeout will set a blocking write timeout.
140
// Write refers to any call that writes data into the buffer.
141
// If no reads occur within the timeout,
142
// the ringbuffer will be closed and context.DeadlineExceeded will be returned.
143
// A timeout of 0 or less will disable timeouts (default).
144
func (r *RingBuffer) WithWriteTimeout(d time.Duration) *RingBuffer {
7✔
145
        r.mu.Lock()
7✔
146
        // Write operations wait for reads to complete,
7✔
147
        // therefore we set the rTimeout.
7✔
148
        r.rTimeout = d
7✔
149
        r.mu.Unlock()
7✔
150
        return r
7✔
151
}
7✔
152

153
func (r *RingBuffer) setErr(err error, locked bool) error {
22,863✔
154
        if !locked {
23,050✔
155
                r.mu.Lock()
187✔
156
                defer r.mu.Unlock()
187✔
157
        }
187✔
158
        if r.err != nil && r.err != io.EOF {
23,009✔
159
                return r.err
146✔
160
        }
146✔
161

162
        switch err {
22,717✔
163
        // Internal errors are transient
164
        case nil, ErrIsEmpty, ErrIsFull, ErrAcquireLock, ErrTooMuchDataToWrite, ErrIsNotEmpty:
22,470✔
165
                return err
22,470✔
166
        default:
247✔
167
                r.err = err
247✔
168
                if r.block {
486✔
169
                        r.readCond.Broadcast()
239✔
170
                        r.writeCond.Broadcast()
239✔
171
                }
239✔
172
        }
173
        return err
247✔
174
}
175

176
func (r *RingBuffer) readErr(locked bool) error {
1,333,434✔
177
        if !locked {
1,333,442✔
178
                r.mu.Lock()
8✔
179
                defer r.mu.Unlock()
8✔
180
        }
8✔
181
        if r.err != nil {
1,333,699✔
182
                if r.err == io.EOF {
489✔
183
                        if r.w == r.r && !r.isFull {
338✔
184
                                return io.EOF
114✔
185
                        }
114✔
186
                        return nil
110✔
187
                }
188
                return r.err
41✔
189
        }
190
        return nil
1,333,169✔
191
}
192

193
// Read reads up to len(p) bytes into p. It returns the number of bytes read (0 <= n <= len(p)) and any error encountered.
194
// Even if Read returns n < len(p), it may use all of p as scratch space during the call.
195
// If some data is available but not len(p) bytes, Read conventionally returns what is available instead of waiting for more.
196
// When Read encounters an error or end-of-file condition after successfully reading n > 0 bytes, it returns the number of bytes read.
197
// It may return the (non-nil) error from the same call or return the error (and n == 0) from a subsequent call.
198
// Callers should always process the n > 0 bytes returned before considering the error err.
199
// Doing so correctly handles I/O errors that happen after reading some bytes and also both of the allowed EOF behaviors.
200
func (r *RingBuffer) Read(p []byte) (n int, err error) {
15,469✔
201
        if len(p) == 0 {
15,477✔
202
                return 0, r.readErr(false)
8✔
203
        }
8✔
204

205
        r.mu.Lock()
15,461✔
206
        defer r.mu.Unlock()
15,461✔
207
        if err := r.readErr(true); err != nil {
15,468✔
208
                return 0, err
7✔
209
        }
7✔
210

211
        r.wg.Add(1)
15,454✔
212
        defer r.wg.Done()
15,454✔
213
        n, err = r.read(p)
15,454✔
214
        for err == ErrIsEmpty && r.block {
24,893✔
215
                if !r.waitWrite() {
9,447✔
216
                        return 0, context.DeadlineExceeded
8✔
217
                }
8✔
218
                if err = r.readErr(true); err != nil {
9,442✔
219
                        break
11✔
220
                }
221
                n, err = r.read(p)
9,420✔
222
        }
223
        if r.block && n > 0 {
30,863✔
224
                r.readCond.Broadcast()
15,417✔
225
        }
15,417✔
226
        return n, err
15,446✔
227
}
228

229
// TryRead read up to len(p) bytes into p like Read, but it is never blocking.
230
// If it does not succeed to acquire the lock, it returns ErrAcquireLock.
231
func (r *RingBuffer) TryRead(p []byte) (n int, err error) {
65,278✔
232
        ok := r.mu.TryLock()
65,278✔
233
        if !ok {
112,469✔
234
                return 0, ErrAcquireLock
47,191✔
235
        }
47,191✔
236
        defer r.mu.Unlock()
18,087✔
237
        if err := r.readErr(true); err != nil {
18,091✔
238
                return 0, err
4✔
239
        }
4✔
240
        if len(p) == 0 {
18,086✔
241
                return 0, r.readErr(true)
3✔
242
        }
3✔
243

244
        n, err = r.read(p)
18,080✔
245
        if r.block && n > 0 {
32,893✔
246
                r.readCond.Broadcast()
14,813✔
247
        }
14,813✔
248
        return n, err
18,080✔
249
}
250

251
// copyFromBuffer copies data from the ring buffer to dst without modifying the read pointer.
252
// Returns the number of bytes copied. Does not check for errors.
253
func (r *RingBuffer) copyFromBuffer(dst []byte) int {
32,680✔
254
        if r.w == r.r && !r.isFull {
32,680✔
255
                return 0
×
256
        }
×
257

258
        var n int
32,680✔
259
        if r.w > r.r {
39,995✔
260
                n = r.w - r.r
7,315✔
261
                if n > len(dst) {
12,380✔
262
                        n = len(dst)
5,065✔
263
                }
5,065✔
264
                copy(dst, r.buf[r.r:r.r+n])
7,315✔
265
                return n
7,315✔
266
        }
267

268
        n = r.size - r.r + r.w
25,365✔
269
        if n > len(dst) {
30,493✔
270
                n = len(dst)
5,128✔
271
        }
5,128✔
272

273
        if r.r+n <= r.size {
29,146✔
274
                copy(dst, r.buf[r.r:r.r+n])
3,781✔
275
        } else {
25,365✔
276
                c1 := r.size - r.r
21,584✔
277
                copy(dst, r.buf[r.r:r.size])
21,584✔
278
                c2 := n - c1
21,584✔
279
                copy(dst[c1:], r.buf[0:c2])
21,584✔
280
        }
21,584✔
281
        return n
25,365✔
282
}
283

284
func (r *RingBuffer) read(p []byte) (n int, err error) {
42,954✔
285
        if r.w == r.r && !r.isFull {
53,232✔
286
                return 0, ErrIsEmpty
10,278✔
287
        }
10,278✔
288

289
        n = r.copyFromBuffer(p)
32,676✔
290
        if n == 0 {
32,676✔
291
                return 0, ErrIsEmpty
×
292
        }
×
293

294
        r.r = (r.r + n) % r.size
32,676✔
295
        r.isFull = false
32,676✔
296

32,676✔
297
        return n, r.readErr(true)
32,676✔
298
}
299

300
// Returns true if a read may have happened.
301
// Returns false if waited longer than rTimeout.
302
// Must be called when locked and returns locked.
303
func (r *RingBuffer) waitRead() (ok bool) {
112,106✔
304
        if r.rTimeout <= 0 {
224,180✔
305
                r.readCond.Wait()
112,074✔
306
                return true
112,074✔
307
        }
112,074✔
308
        start := time.Now()
32✔
309
        defer time.AfterFunc(r.rTimeout, r.readCond.Broadcast).Stop()
32✔
310

32✔
311
        r.readCond.Wait()
32✔
312
        if time.Since(start) >= r.rTimeout {
64✔
313
                r.setErr(context.DeadlineExceeded, true) //nolint errcheck
32✔
314
                return false
32✔
315
        }
32✔
316
        return true
×
317
}
318

319
// ReadByte reads and returns the next byte from the input or ErrIsEmpty.
320
func (r *RingBuffer) ReadByte() (b byte, err error) {
15,290✔
321
        r.mu.Lock()
15,290✔
322
        defer r.mu.Unlock()
15,290✔
323
        if err = r.readErr(true); err != nil {
15,293✔
324
                return 0, err
3✔
325
        }
3✔
326
        for r.w == r.r && !r.isFull {
25,444✔
327
                if r.block {
20,313✔
328
                        if !r.waitWrite() {
10,164✔
329
                                return 0, context.DeadlineExceeded
8✔
330
                        }
8✔
331
                        err = r.readErr(true)
10,148✔
332
                        if err != nil {
10,151✔
333
                                return 0, err
3✔
334
                        }
3✔
335
                        continue
10,145✔
336
                }
337
                return 0, ErrIsEmpty
1✔
338
        }
339
        b = r.buf[r.r]
15,275✔
340
        r.r++
15,275✔
341
        if r.r == r.size {
15,276✔
342
                r.r = 0
1✔
343
        }
1✔
344

345
        r.isFull = false
15,275✔
346
        return b, r.readErr(true)
15,275✔
347
}
348

349
// checkWriteErr checks if the buffer has an error that should prevent writes.
350
// Returns the appropriate error to return (nil if write can proceed).
351
// Must be called with mutex held.
352
func (r *RingBuffer) checkWriteErr() error {
19,964✔
353
        if r.err == nil {
39,909✔
354
                return nil
19,945✔
355
        }
19,945✔
356
        if r.err == io.EOF {
24✔
357
                return ErrWriteOnClosed
5✔
358
        }
5✔
359
        return r.err
14✔
360
}
361

362
// Write writes len(p) bytes from p to the underlying buf.
363
// It returns the number of bytes written from p (0 <= n <= len(p))
364
// and any error encountered that caused the write to stop early.
365
// If blocking n < len(p) will be returned only if an error occurred.
366
// Write returns a non-nil error if it returns n < len(p).
367
// Write will not modify the slice data, even temporarily.
368
func (r *RingBuffer) Write(p []byte) (n int, err error) {
6,151✔
369
        if len(p) == 0 {
6,159✔
370
                return 0, r.setErr(nil, false)
8✔
371
        }
8✔
372
        r.mu.Lock()
6,143✔
373
        defer r.mu.Unlock()
6,143✔
374
        if err := r.checkWriteErr(); err != nil {
6,155✔
375
                return 0, err
12✔
376
        }
12✔
377
        wrote := 0
6,131✔
378
        for len(p) > 0 {
20,385✔
379
                n, err = r.write(p)
14,254✔
380
                wrote += n
14,254✔
381
                if !r.block || err == nil {
20,363✔
382
                        break
6,109✔
383
                }
384
                err = r.setErr(err, true)
8,145✔
385
                if r.block && (err == ErrIsFull || err == ErrTooMuchDataToWrite) {
16,268✔
386
                        r.writeCond.Broadcast()
8,123✔
387
                        r.waitRead()
8,123✔
388
                        p = p[n:]
8,123✔
389
                        err = nil
8,123✔
390
                        continue
8,123✔
391
                }
392
                break
22✔
393
        }
394
        if r.block && wrote > 0 {
12,206✔
395
                r.writeCond.Broadcast()
6,075✔
396
        }
6,075✔
397

398
        return wrote, r.setErr(err, true)
6,131✔
399
}
400

401
// waitWrite will wait for a write event.
402
// Returns true if a write may have happened.
403
// Returns false if waited longer than wTimeout.
404
// Must be called when locked and returns locked.
405
func (r *RingBuffer) waitWrite() (ok bool) {
111,127✔
406
        if r.wTimeout <= 0 {
222,230✔
407
                r.writeCond.Wait()
111,103✔
408
                return true
111,103✔
409
        }
111,103✔
410

411
        start := time.Now()
24✔
412
        defer time.AfterFunc(r.wTimeout, r.writeCond.Broadcast).Stop()
24✔
413

24✔
414
        r.writeCond.Wait()
24✔
415
        if time.Since(start) >= r.wTimeout {
48✔
416
                r.setErr(context.DeadlineExceeded, true) //nolint errcheck
24✔
417
                return false
24✔
418
        }
24✔
419
        return true
×
420
}
421

422
// ReadFrom will fulfill the write side of the ringbuffer.
423
// This will do writes directly into the buffer,
424
// therefore avoiding a mem-copy when using the Write.
425
//
426
// ReadFrom will not automatically close the buffer even after returning.
427
// For that call CloseWriter().
428
//
429
// ReadFrom reads data from r until EOF or error.
430
// The return value n is the number of bytes read.
431
// Any error except EOF encountered during the read is also returned,
432
// and the error will cause the Read side to fail as well.
433
// ReadFrom only available in blocking mode.
434
func (r *RingBuffer) ReadFrom(rd io.Reader) (n int64, err error) {
125✔
435
        if !r.block {
125✔
436
                return 0, errors.New("RingBuffer: ReadFrom only available in blocking mode")
×
437
        }
×
438
        zeroReads := 0
125✔
439
        r.mu.Lock()
125✔
440
        defer r.mu.Unlock()
125✔
441
        for {
1,034,000✔
442
                if err = r.readErr(true); err != nil {
1,033,881✔
443
                        return n, err
6✔
444
                }
6✔
445
                if r.isFull {
1,137,837✔
446
                        // Wait for a read
103,968✔
447
                        if !r.waitRead() {
103,976✔
448
                                return 0, context.DeadlineExceeded
8✔
449
                        }
8✔
450
                        continue
103,960✔
451
                }
452

453
                // Calculate available space to read into
454
                var toRead []byte
929,901✔
455
                if r.w >= r.r {
1,847,283✔
456
                        // After reader, read until end of buffer
917,382✔
457
                        toRead = r.buf[r.w:]
917,382✔
458
                } else {
929,901✔
459
                        // Before reader, read until reader.
12,519✔
460
                        if r.w >= r.size || r.r > r.size {
12,519✔
NEW
461
                                // Pointers are corrupted, return error to prevent panic
×
NEW
462
                                return n, errors.New("RingBuffer: internal state corrupted")
×
NEW
463
                        }
×
464
                        toRead = r.buf[r.w:r.r]
12,519✔
465
                }
466

467
                nr, rerr := rd.Read(toRead)
929,901✔
468
                if rerr != nil && rerr != io.EOF {
929,911✔
469
                        err = r.setErr(rerr, true)
10✔
470
                        break
10✔
471
                }
472
                if nr == 0 && rerr == nil {
1,021,425✔
473
                        zeroReads++
91,534✔
474
                        if zeroReads >= 100 {
91,534✔
475
                                err = r.setErr(io.ErrNoProgress, true)
×
476
                        }
×
477
                        continue
91,534✔
478
                }
479
                zeroReads = 0
838,357✔
480

838,357✔
481
                // Update write pointer with proper wrap-around using modulo
838,357✔
482
                r.w = (r.w + nr) % r.size
838,357✔
483
                r.isFull = r.r == r.w && nr > 0
838,357✔
484
                n += int64(nr)
838,357✔
485
                r.writeCond.Broadcast()
838,357✔
486
                if rerr == io.EOF {
838,458✔
487
                        // We do not close.
101✔
488
                        break
101✔
489
                }
490
        }
491
        return n, err
111✔
492
}
493

494
// WriteTo writes data to w until there's no more data to write or
495
// when an error occurs. The return value n is the number of bytes
496
// written. Any error encountered during the write is also returned.
497
//
498
// If a non-nil error is returned the write side will also see the error.
499
func (r *RingBuffer) WriteTo(w io.Writer) (n int64, err error) {
129✔
500
        if !r.block {
129✔
501
                return 0, errors.New("RingBuffer: WriteTo only available in blocking mode")
×
502
        }
×
503
        r.mu.Lock()
129✔
504
        defer r.mu.Unlock()
129✔
505

129✔
506
        // Don't write more than half, to unblock reads earlier.
129✔
507
        maxWrite := len(r.buf) / 2
129✔
508
        // But write at least 8K if possible
129✔
509
        if maxWrite < 8<<10 {
258✔
510
                maxWrite = len(r.buf)
129✔
511
        }
129✔
512
        for {
183,290✔
513
                if err = r.readErr(true); err != nil {
183,272✔
514
                        break
111✔
515
                }
516
                if r.r == r.w && !r.isFull {
274,582✔
517
                        // Wait for a write to make space
91,532✔
518
                        if !r.waitWrite() {
91,540✔
519
                                return 0, context.DeadlineExceeded
8✔
520
                        }
8✔
521
                        continue
91,524✔
522
                }
523

524
                var toWrite []byte
91,518✔
525
                if r.r >= r.w {
183,036✔
526
                        // After writer, we can write until end of buffer
91,518✔
527
                        toWrite = r.buf[r.r:]
91,518✔
528
                } else {
91,518✔
UNCOV
529
                        // Before reader, we can read until writer.
×
UNCOV
530
                        toWrite = r.buf[r.r:r.w]
×
UNCOV
531
                }
×
532
                if len(toWrite) > maxWrite {
91,518✔
533
                        toWrite = toWrite[:maxWrite]
×
534
                }
×
535
                // Unlock while reading
536
                r.mu.Unlock()
91,518✔
537
                nr, werr := w.Write(toWrite)
91,518✔
538
                r.mu.Lock()
91,518✔
539
                if werr != nil {
91,528✔
540
                        n += int64(nr)
10✔
541
                        err = r.setErr(werr, true)
10✔
542
                        break
10✔
543
                }
544
                if nr != len(toWrite) {
91,508✔
545
                        err = r.setErr(io.ErrShortWrite, true)
×
546
                        break
×
547
                }
548
                r.r += nr
91,508✔
549
                if r.r == r.size {
183,016✔
550
                        r.r = 0
91,508✔
551
                }
91,508✔
552
                r.isFull = false
91,508✔
553
                n += int64(nr)
91,508✔
554
                r.readCond.Broadcast()
91,508✔
555
        }
556
        if err == io.EOF {
221✔
557
                err = nil
100✔
558
        }
100✔
559
        return n, err
121✔
560
}
561

562
// Copy will pipe all data from the reader to the writer through the ringbuffer.
563
// The ringbuffer will switch to blocking mode.
564
// Reads and writes will be done async.
565
// No internal mem-copies are used for the transfer.
566
//
567
// Calling CloseWithError will cancel the transfer and make the function return when
568
// any ongoing reads or writes have finished.
569
//
570
// Calling Read or Write functions concurrently with running this will lead to unpredictable results.
571
func (r *RingBuffer) Copy(dst io.Writer, src io.Reader) (written int64, err error) {
110✔
572
        r.SetBlocking(true)
110✔
573
        var wg sync.WaitGroup
110✔
574
        wg.Add(1)
110✔
575
        go func() {
220✔
576
                defer wg.Done()
110✔
577
                _, _ = r.ReadFrom(src) //nolint errcheck
110✔
578
                r.CloseWriter()
110✔
579
        }()
110✔
580
        defer wg.Wait()
110✔
581
        return r.WriteTo(dst)
110✔
582
}
583

584
// TryWrite writes len(p) bytes from p to the underlying buf like Write, but it is not blocking.
585
// If it does not succeed to acquire the lock, it returns ErrAcquireLock.
586
func (r *RingBuffer) TryWrite(p []byte) (n int, err error) {
53,004✔
587
        if len(p) == 0 {
53,006✔
588
                return 0, r.setErr(nil, false)
2✔
589
        }
2✔
590
        ok := r.mu.TryLock()
53,002✔
591
        if !ok {
97,769✔
592
                return 0, ErrAcquireLock
44,767✔
593
        }
44,767✔
594
        defer r.mu.Unlock()
8,235✔
595
        if err := r.checkWriteErr(); err != nil {
8,237✔
596
                return 0, err
2✔
597
        }
2✔
598

599
        n, err = r.write(p)
8,233✔
600
        if r.block && n > 0 {
10,955✔
601
                r.writeCond.Broadcast()
2,722✔
602
        }
2,722✔
603
        return n, r.setErr(err, true)
8,233✔
604
}
605

606
func (r *RingBuffer) write(p []byte) (n int, err error) {
22,487✔
607
        // In overwrite mode, we always allow writing by discarding old data
22,487✔
608
        if r.overwrite && r.isFull && len(p) > 0 {
22,494✔
609
                // Write will overwrite old data
7✔
610
                // First, advance read pointer to make room
7✔
611
                needed := len(p)
7✔
612
                // Discard 'needed' bytes by advancing read pointer
7✔
613
                r.r = (r.r + needed) % r.size
7✔
614
                r.isFull = false
7✔
615
        }
7✔
616

617
        if r.isFull {
26,205✔
618
                return 0, ErrIsFull
3,718✔
619
        }
3,718✔
620

621
        var avail int
18,769✔
622
        if r.w >= r.r {
34,203✔
623
                avail = r.size - r.w + r.r
15,434✔
624
        } else {
18,769✔
625
                avail = r.r - r.w
3,335✔
626
        }
3,335✔
627

628
        if len(p) > avail {
27,410✔
629
                err = ErrTooMuchDataToWrite
8,641✔
630
                p = p[:avail]
8,641✔
631
        }
8,641✔
632
        n = len(p)
18,769✔
633

18,769✔
634
        if r.w >= r.r {
34,203✔
635
                c1 := r.size - r.w
15,434✔
636
                if c1 >= n {
21,754✔
637
                        copy(r.buf[r.w:], p)
6,320✔
638
                        r.w += n
6,320✔
639
                } else {
15,434✔
640
                        copy(r.buf[r.w:], p[:c1])
9,114✔
641
                        c2 := n - c1
9,114✔
642
                        copy(r.buf[0:], p[c1:])
9,114✔
643
                        r.w = c2
9,114✔
644
                }
9,114✔
645
        } else {
3,335✔
646
                copy(r.buf[r.w:], p)
3,335✔
647
                r.w += n
3,335✔
648
        }
3,335✔
649

650
        if r.w == r.size {
18,868✔
651
                r.w = 0
99✔
652
        }
99✔
653
        if r.w == r.r {
27,473✔
654
                r.isFull = true
8,704✔
655
        }
8,704✔
656

657
        return n, err
18,769✔
658
}
659

660
// WriteByte writes one byte into buffer, and returns ErrIsFull if the buffer is full.
661
func (r *RingBuffer) WriteByte(c byte) error {
3,019✔
662
        r.mu.Lock()
3,019✔
663
        defer r.mu.Unlock()
3,019✔
664
        if err := r.checkWriteErr(); err != nil {
3,022✔
665
                return err
3✔
666
        }
3✔
667
        err := r.writeByte(c)
3,016✔
668
        for err == ErrIsFull && r.block {
3,026✔
669
                if !r.waitRead() {
18✔
670
                        return context.DeadlineExceeded
8✔
671
                }
8✔
672
                err = r.setErr(r.writeByte(c), true)
2✔
673
        }
674
        if r.block && err == nil {
6,008✔
675
                r.writeCond.Broadcast()
3,000✔
676
        }
3,000✔
677
        return err
3,008✔
678
}
679

680
// TryWriteByte writes one byte into buffer without blocking.
681
// If it does not succeed to acquire the lock, it returns ErrAcquireLock.
682
func (r *RingBuffer) TryWriteByte(c byte) error {
3,002✔
683
        ok := r.mu.TryLock()
3,002✔
684
        if !ok {
3,437✔
685
                return ErrAcquireLock
435✔
686
        }
435✔
687
        defer r.mu.Unlock()
2,567✔
688
        if err := r.checkWriteErr(); err != nil {
2,569✔
689
                return err
2✔
690
        }
2✔
691

692
        err := r.writeByte(c)
2,565✔
693
        if err == nil && r.block {
4,548✔
694
                r.writeCond.Broadcast()
1,983✔
695
        }
1,983✔
696
        return err
2,565✔
697
}
698

699
func (r *RingBuffer) writeByte(c byte) error {
5,583✔
700
        if r.err != nil {
5,585✔
701
                return r.err
2✔
702
        }
2✔
703
        if r.w == r.r && r.isFull {
6,175✔
704
                // In overwrite mode, discard the oldest byte and write the new one
594✔
705
                if r.overwrite {
595✔
706
                        r.r++
1✔
707
                        if r.r == r.size {
1✔
708
                                r.r = 0
×
709
                        }
×
710
                } else {
593✔
711
                        return ErrIsFull
593✔
712
                }
593✔
713
        }
714
        r.buf[r.w] = c
4,988✔
715
        r.w++
4,988✔
716

4,988✔
717
        if r.w == r.size {
4,990✔
718
                r.w = 0
2✔
719
        }
2✔
720
        if r.w == r.r {
4,991✔
721
                r.isFull = true
3✔
722
        }
3✔
723

724
        return nil
4,988✔
725
}
726

727
// Length returns the number of bytes that can be read without blocking.
728
func (r *RingBuffer) Length() int {
27✔
729
        r.mu.Lock()
27✔
730
        defer r.mu.Unlock()
27✔
731

27✔
732
        if r.w == r.r {
44✔
733
                if r.isFull {
25✔
734
                        return r.size
8✔
735
                }
8✔
736
                return 0
9✔
737
        }
738

739
        if r.w > r.r {
19✔
740
                return r.w - r.r
9✔
741
        }
9✔
742

743
        return r.size - r.r + r.w
1✔
744
}
745

746
// Capacity returns the size of the underlying buffer.
747
func (r *RingBuffer) Capacity() int {
×
748
        return r.size
×
749
}
×
750

751
// Free returns the number of bytes that can be written without blocking.
752
func (r *RingBuffer) Free() int {
25✔
753
        r.mu.Lock()
25✔
754
        defer r.mu.Unlock()
25✔
755

25✔
756
        if r.w == r.r {
43✔
757
                if r.isFull {
28✔
758
                        return 0
10✔
759
                }
10✔
760
                return r.size
8✔
761
        }
762

763
        if r.w < r.r {
8✔
764
                return r.r - r.w
1✔
765
        }
1✔
766

767
        return r.size - r.w + r.r
6✔
768
}
769

770
// WriteString writes the contents of the string s to buffer, which accepts a slice of bytes.
771
func (r *RingBuffer) WriteString(s string) (n int, err error) {
3,009✔
772
        x := (*[2]uintptr)(unsafe.Pointer(&s))
3,009✔
773
        h := [3]uintptr{x[0], x[1], x[1]}
3,009✔
774
        buf := *(*[]byte)(unsafe.Pointer(&h))
3,009✔
775
        return r.Write(buf)
3,009✔
776
}
3,009✔
777

778
// Bytes returns all available read bytes.
779
// It does not move the read pointer and only copy the available data.
780
// If the dst is big enough, it will be used as destination,
781
// otherwise a new buffer will be allocated.
782
func (r *RingBuffer) Bytes(dst []byte) []byte {
19✔
783
        r.mu.Lock()
19✔
784
        defer r.mu.Unlock()
19✔
785
        getDst := func(n int) []byte {
38✔
786
                if cap(dst) < n {
36✔
787
                        return make([]byte, n)
17✔
788
                }
17✔
789
                return dst[:n]
2✔
790
        }
791

792
        if r.w == r.r {
27✔
793
                if r.isFull {
16✔
794
                        buf := getDst(r.size)
8✔
795
                        copy(buf, r.buf[r.r:])
8✔
796
                        copy(buf[r.size-r.r:], r.buf[:r.w])
8✔
797
                        return buf
8✔
798
                }
8✔
799
                return nil
×
800
        }
801

802
        if r.w > r.r {
18✔
803
                buf := getDst(r.w - r.r)
7✔
804
                copy(buf, r.buf[r.r:r.w])
7✔
805
                return buf
7✔
806
        }
7✔
807

808
        n := r.size - r.r + r.w
4✔
809
        buf := getDst(n)
4✔
810

4✔
811
        if r.r+n < r.size {
4✔
812
                copy(buf, r.buf[r.r:r.r+n])
×
813
        } else {
4✔
814
                c1 := r.size - r.r
4✔
815
                copy(buf, r.buf[r.r:r.size])
4✔
816
                c2 := n - c1
4✔
817
                copy(buf[c1:], r.buf[0:c2])
4✔
818
        }
4✔
819

820
        return buf
4✔
821
}
822

823
// IsFull returns true when the ringbuffer is full.
824
func (r *RingBuffer) IsFull() bool {
15✔
825
        r.mu.Lock()
15✔
826
        defer r.mu.Unlock()
15✔
827

15✔
828
        return r.isFull
15✔
829
}
15✔
830

831
// IsEmpty returns true when the ringbuffer is empty.
832
func (r *RingBuffer) IsEmpty() bool {
15✔
833
        r.mu.Lock()
15✔
834
        defer r.mu.Unlock()
15✔
835

15✔
836
        return !r.isFull && r.w == r.r
15✔
837
}
15✔
838

839
// CloseWithError closes the writer; reads will return
840
// no bytes and the error err, or EOF if err is nil.
841
//
842
// CloseWithError never overwrites the previous error if it exists
843
// and always returns nil.
844
func (r *RingBuffer) CloseWithError(err error) {
24✔
845
        if err == nil {
27✔
846
                err = io.EOF
3✔
847
        }
3✔
848
        r.setErr(err, false) //nolint errcheck
24✔
849
}
850

851
// CloseWriter closes the writer.
852
// Reads will return any remaining bytes and io.EOF.
853
func (r *RingBuffer) CloseWriter() {
114✔
854
        r.setErr(io.EOF, false) //nolint errcheck
114✔
855
}
114✔
856

857
// Flush waits for the buffer to be empty and fully read.
858
// If not blocking ErrIsNotEmpty will be returned if the buffer still contains data.
859
func (r *RingBuffer) Flush() error {
6✔
860
        r.mu.Lock()
6✔
861
        defer r.mu.Unlock()
6✔
862
        for r.w != r.r || r.isFull {
12✔
863
                err := r.readErr(true)
6✔
864
                if err != nil {
7✔
865
                        if err == io.EOF {
1✔
866
                                err = nil
×
867
                        }
×
868
                        return err
1✔
869
                }
870
                if !r.block {
5✔
871
                        return ErrIsNotEmpty
×
872
                }
×
873
                if !r.waitRead() {
5✔
874
                        return context.DeadlineExceeded
×
875
                }
×
876
        }
877

878
        err := r.readErr(true)
5✔
879
        if err == io.EOF {
5✔
880
                return nil
×
881
        }
×
882
        return err
5✔
883
}
884

885
// Reset the read pointer and writer pointer to zero.
886
func (r *RingBuffer) Reset() {
89✔
887
        r.mu.Lock()
89✔
888
        defer r.mu.Unlock()
89✔
889

89✔
890
        // Set error so any readers/writers will return immediately.
89✔
891
        r.setErr(ErrReset, true) //nolint errcheck
89✔
892
        if r.block {
171✔
893
                r.readCond.Broadcast()
82✔
894
                r.writeCond.Broadcast()
82✔
895
        }
82✔
896

897
        // Unlock the mutex so readers/writers can finish.
898
        r.mu.Unlock()
89✔
899
        r.wg.Wait()
89✔
900
        r.mu.Lock()
89✔
901
        r.r = 0
89✔
902
        r.w = 0
89✔
903
        r.err = nil
89✔
904
        r.isFull = false
89✔
905
}
906

907
// WriteCloser returns a WriteCloser that writes to the ring buffer.
908
// When the returned WriteCloser is closed, it will wait for all data to be read before returning.
909
func (r *RingBuffer) WriteCloser() io.WriteCloser {
×
910
        return &writeCloser{RingBuffer: r}
×
911
}
×
912

913
type writeCloser struct {
914
        *RingBuffer
915
}
916

917
// Close provides a close method for the WriteCloser.
918
func (wc *writeCloser) Close() error {
×
919
        wc.CloseWriter()
×
920
        return wc.Flush()
×
921
}
×
922

923
// ReadCloser returns a io.ReadCloser that reads to the ring buffer.
924
// When the returned ReadCloser is closed, ErrReaderClosed will be returned on any writes done afterwards.
925
func (r *RingBuffer) ReadCloser() io.ReadCloser {
×
926
        return &readCloser{RingBuffer: r}
×
927
}
×
928

929
type readCloser struct {
930
        *RingBuffer
931
}
932

933
// Close provides a close method for the ReadCloser.
934
func (rc *readCloser) Close() error {
×
935
        rc.CloseWithError(ErrReaderClosed)
×
936
        err := rc.readErr(false)
×
937
        if err == ErrReaderClosed {
×
938
                err = nil
×
939
        }
×
940
        return err
×
941
}
942

943
// Peek reads up to len(p) bytes into p without moving the read pointer.
944
func (r *RingBuffer) Peek(p []byte) (n int, err error) {
4✔
945
        if len(p) == 0 {
4✔
946
                return 0, r.readErr(false)
×
947
        }
×
948

949
        r.mu.Lock()
4✔
950
        defer r.mu.Unlock()
4✔
951
        if err := r.readErr(true); err != nil {
4✔
952
                return 0, err
×
953
        }
×
954

955
        return r.peek(p)
4✔
956
}
957

958
func (r *RingBuffer) peek(p []byte) (n int, err error) {
4✔
959
        n = r.copyFromBuffer(p)
4✔
960
        if n == 0 {
4✔
961
                return 0, ErrIsEmpty
×
962
        }
×
963
        return n, r.readErr(true)
4✔
964
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc