• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 16727297811

04 Aug 2025 03:23PM UTC coverage: 78.515% (+0.02%) from 78.495%
16727297811

push

github

bmerry
Wire up asyncio.ChunkRingbuffer to discard_oldest

5573 of 7098 relevant lines covered (78.52%)

90901.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.46
/include/spead2/common_ringbuffer.h
1
/* Copyright 2015, 2021, 2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 *
20
 * Ring buffer.
21
 */
22

23
#ifndef SPEAD2_COMMON_RINGBUFFER_H
24
#define SPEAD2_COMMON_RINGBUFFER_H
25

26
#include <mutex>
27
#include <condition_variable>
28
#include <type_traits>
29
#include <memory>
30
#include <stdexcept>
31
#include <utility>
32
#include <optional>
33
#include <cassert>
34
#include <climits>
35
#include <iostream>
36
#include <new>
37
#include <spead2/common_logging.h>
38
#include <spead2/common_semaphore.h>
39
#include <spead2/common_storage.h>
40

41
namespace spead2
42
{
43

44
/// Thrown when attempting to do a non-blocking push to a full ringbuffer
45
class ringbuffer_full : public std::runtime_error
46
{
47
public:
48
    ringbuffer_full() : std::runtime_error("ring buffer is full") {}
9✔
49
};
50

51
/// Thrown when attempting to do a non-blocking pop from an empty ringbuffer
52
class ringbuffer_empty : public std::runtime_error
53
{
54
public:
55
    ringbuffer_empty() : std::runtime_error("ring buffer is empty") {}
279✔
56
};
57

58
/**
59
 * Thrown when attempting to do a pop from an empty ringbuffer that has been
60
 * stopped, or a push to a ringbuffer that has been stopped.
61
 */
62
class ringbuffer_stopped : public std::runtime_error
63
{
64
public:
65
    ringbuffer_stopped() : std::runtime_error("ring buffer has been stopped") {}
660✔
66
};
67

68
namespace detail
69
{
70

71
/// Sentinel counterpart to @ref ringbuffer_iterator
72
class ringbuffer_sentinel {};
73

74
/**
75
 * Basic iterator for @ref ringbuffer as well as ringbuffer-like classes: they
76
 * must provide @c pop which either returns when data is available or throws
77
 * @ref ringbuffer_stopped.
78
 *
79
 * This does not fully implement the iterator concept; it is suitable only for
80
 * range-based for loops.
81
 */
82
template<typename Ringbuffer>
83
class ringbuffer_iterator
84
{
85
private:
86
    using value_type = decltype(std::declval<Ringbuffer>().pop());
87

88
    Ringbuffer &owner;
89
    std::optional<value_type> current; // nullopt once ring has stopped
90

91
public:
92
    explicit ringbuffer_iterator(Ringbuffer &owner);
93
    bool operator!=(const ringbuffer_sentinel &);
94
    ringbuffer_iterator &operator++();
95
    value_type &&operator*();
96
};
97

98
template<typename Ringbuffer>
99
ringbuffer_iterator<Ringbuffer>::ringbuffer_iterator(Ringbuffer &owner)
1✔
100
    : owner(owner)
1✔
101
{
102
    ++*this;  // Load the first value
1✔
103
}
1✔
104

105
template<typename Ringbuffer>
106
bool ringbuffer_iterator<Ringbuffer>::operator!=(const ringbuffer_sentinel &)
4✔
107
{
108
    return bool(current);
4✔
109
}
110

111
template<typename Ringbuffer>
112
auto ringbuffer_iterator<Ringbuffer>::operator++() -> ringbuffer_iterator &
4✔
113
{
114
    /* Clear it first, so that we can reclaim the memory before making
115
     * space available in the ringbuffer, which might cause another
116
     * thread to allocate more memory.
117
     */
118
    current = std::nullopt;
4✔
119
    try
120
    {
121
        current = owner.pop();
4✔
122
    }
123
    catch (ringbuffer_stopped &)
1✔
124
    {
125
    }
126
    return *this;
4✔
127
}
128

129
template<typename Ringbuffer>
130
auto ringbuffer_iterator<Ringbuffer>::operator*() -> value_type &&
3✔
131
{
132
    return std::move(*current);
3✔
133
}
134

135
} // namespace detail
136

137
/**
138
 * Internal base class for @ref ringbuffer that is independent of the semaphore
139
 * type.
140
 */
141
template<typename T>
142
class ringbuffer_base
143
{
144
private:
145
    typedef detail::storage<T> storage_type;
146
    std::unique_ptr<storage_type[]> storage;
147
    const std::size_t cap;  ///< Number of slots
148

149
    /// Mutex held when reading from the head (needed for safe multi-consumer)
150
    mutable std::mutex head_mutex;
151
    std::size_t head = 0;   ///< first slot with data
152
    /**
153
     * Position in the queue which the receiver should treat as "please stop",
154
     * or an invalid position if not yet stopped. Unlike @ref stopped, this is
155
     * updated with @ref head_mutex rather than @ref tail_mutex held. Once
156
     * set, it is guaranteed to always equal @ref tail.
157
     */
158
    std::size_t stop_position = SIZE_MAX;
159

160
    /// Mutex held when writing to the tail (needed for safe multi-producer)
161
    mutable std::mutex tail_mutex;
162
    std::size_t tail = 0;   ///< first slot without data
163
    bool stopped = false;   ///< Whether stop has been called
164
    std::size_t producers = 0;  ///< Number of producers registered with @ref add_producer
165

166
    /// Increments @a idx, wrapping around.
167
    std::size_t next(std::size_t idx)
42,946✔
168
    {
169
        idx++;
42,946✔
170
        if (idx == cap)
42,946✔
171
            idx = 0;
282✔
172
        return idx;
42,946✔
173
    }
174

175
protected:
176
    explicit ringbuffer_base(std::size_t cap);
177
    ~ringbuffer_base();
178

179
    /**
180
     * Check whether we're stopped and throw an appropriate error.
181
     *
182
     * @throw ringbuffer_stopped if the ringbuffer is empty and stopped
183
     * @throw ringbuffer_empty otherwise
184
     */
185
    void throw_empty_or_stopped();
186

187
    /**
188
     * Check whether we're stopped and throw an appropriate error.
189
     *
190
     * @throw ringbuffer_stopped if the ringbuffer is stopped
191
     * @throw ringbuffer_full otherwise
192
     */
193
    void throw_full_or_stopped();
194

195
    /// Implementation of pushing functions, which doesn't touch semaphores
196
    template<typename... Args>
197
    void emplace_internal(Args&&... args);
198

199
    /// Implementation of popping functions, which doesn't touch semaphores
200
    T pop_internal();
201

202
    /// Discard the oldest item
203
    void discard_oldest_internal();
204

205
    /**
206
     * Implementation of stopping, without the semaphores.
207
     *
208
     * @param remove_producer If true, remove a producer and only stop if none are left
209
     * @returns whether the ringbuffer was stopped (i.e., this is the first call)
210
     */
211
    bool stop_internal(bool remove_producer = false);
212

213
public:
214
    /// Maximum number of items that can be held at once
215
    std::size_t capacity() const;
216

217
    /**
218
     * Return the number of items currently in the ringbuffer.
219
     *
220
     * This should only be used for metrics, not for control flow, as
221
     * the result could be out of date by the time it is returned.
222
     */
223
    std::size_t size() const;
224

225
    /**
226
     * Register a new producer. Producers only need to call this if they
227
     * want to call @ref ringbuffer::remove_producer.
228
     */
229
    void add_producer();
230
};
231

232
template<typename T>
233
void ringbuffer_base<T>::throw_empty_or_stopped()
279✔
234
{
235
    std::lock_guard<std::mutex> lock(head_mutex);
279✔
236
    if (head == stop_position)
279✔
237
        throw ringbuffer_stopped();
×
238
    else
239
        throw ringbuffer_empty();
279✔
240
}
279✔
241

242
template<typename T>
243
std::size_t ringbuffer_base<T>::capacity() const
142✔
244
{
245
    return cap - 1;
142✔
246
}
247

248
template<typename T>
249
std::size_t ringbuffer_base<T>::size() const
145✔
250
{
251
    std::lock_guard<std::mutex> head_lock(head_mutex);
145✔
252
    std::lock_guard<std::mutex> tail_lock(tail_mutex);
145✔
253
    if (head <= tail)
145✔
254
        return tail - head;
145✔
255
    else
256
        return tail + cap - head;
×
257
}
145✔
258

259
template<typename T>
260
void ringbuffer_base<T>::throw_full_or_stopped()
9✔
261
{
262
    std::lock_guard<std::mutex> lock(tail_mutex);
9✔
263
    if (stopped)
9✔
264
        throw ringbuffer_stopped();
×
265
    else
266
        throw ringbuffer_full();
9✔
267
}
9✔
268

269
template<typename T>
270
ringbuffer_base<T>::ringbuffer_base(size_t cap)
618✔
271
    : storage(new storage_type[cap + 1]), cap(cap + 1), stop_position(cap + 1)
618✔
272
{
273
    /* We allocate one extra slot so that the destructor can disambiguate empty
274
     * from full. We could also use the semaphore values, but after stopping
275
     * they get used for other things so it is simplest not to rely on them.
276
     */
277
    assert(cap > 0);
618✔
278
}
618✔
279

280
template<typename T>
281
ringbuffer_base<T>::~ringbuffer_base()
618✔
282
{
283
    // Drain any remaining elements
284
    while (head != tail)
707✔
285
    {
286
        storage[head].destroy();
89✔
287
        head = next(head);
89✔
288
    }
289
}
618✔
290

291
template<typename T>
292
template<typename... Args>
293
void ringbuffer_base<T>::emplace_internal(Args&&... args)
21,498✔
294
{
295
    std::lock_guard<std::mutex> lock(tail_mutex);
21,498✔
296
    if (stopped)
21,498✔
297
    {
298
        throw ringbuffer_stopped();
25✔
299
    }
300
    // Construct in-place
301
    storage[tail].construct(std::forward<Args>(args)...);
21,473✔
302
    tail = next(tail);
21,473✔
303
}
21,498✔
304

305
template<typename T>
306
T ringbuffer_base<T>::pop_internal()
21,970✔
307
{
308
    std::lock_guard<std::mutex> lock(head_mutex);
21,970✔
309
    if (stop_position == head)
21,970✔
310
    {
311
        throw ringbuffer_stopped();
586✔
312
    }
313
    auto &item = storage[head];
21,384✔
314
    T result = std::move(*item);
21,384✔
315
    item.destroy();
21,384✔
316
    head = next(head);
21,384✔
317
    return result;
42,768✔
318
}
21,970✔
319

320
template<typename T>
321
void ringbuffer_base<T>::discard_oldest_internal()
×
322
{
323
    std::lock_guard<std::mutex> lock(head_mutex);
×
324
    if (stop_position < cap)
×
325
    {
326
        throw ringbuffer_stopped();
×
327
    }
328
    storage[head].destroy();
×
329
    head = next(head);
×
330
}
×
331

332
template<typename T>
333
bool ringbuffer_base<T>::stop_internal(bool remove_producer)
1,819✔
334
{
335
    std::size_t saved_tail;
336

337
    {
338
        std::lock_guard<std::mutex> tail_lock(this->tail_mutex);
1,819✔
339
        if (remove_producer)
1,819✔
340
        {
341
            assert(producers != 0);
108✔
342
            producers--;
108✔
343
            if (producers != 0)
108✔
344
                return false;
71✔
345
        }
346
        stopped = true;
1,748✔
347
        saved_tail = tail;
1,748✔
348
    }
1,819✔
349

350
    {
351
        std::lock_guard<std::mutex> head_lock(this->head_mutex);
1,748✔
352
        stop_position = saved_tail;
1,748✔
353
    }
1,748✔
354
    return true;
1,748✔
355
}
356

357
template<typename T>
358
void ringbuffer_base<T>::add_producer()
108✔
359
{
360
    std::lock_guard<std::mutex> tail_lock(this->tail_mutex);
108✔
361
    producers++;
108✔
362
}
108✔
363

364
///////////////////////////////////////////////////////////////////////
365

366
/**
367
 * Ring buffer with blocking and non-blocking push and pop. It supports
368
 * non-copyable objects using move semantics. The producer may signal that it
369
 * has finished producing data by calling @ref stop, which will gracefully shut
370
 * down consumers as well as other producers. This class is fully thread-safe
371
 * for multiple producers and consumers.
372
 *
373
 * With multiple producers it is sometimes desirable to only stop the
374
 * ringbuffer once all the producers are finished. To support this, a
375
 * producer may register itself with @ref add_producer, and indicate
376
 * completion with @ref remove_producer. If this causes the number of
377
 * producers to fall to zero, the stream is stopped.
378
 *
379
 * Normally, trying to push data when the ringbuffer is full will either block
380
 * (for @ref push and @ref emplace) or fail (for @ref try_push and
381
 * @ref try_emplace). However, if the ringbuffer is constructed with
382
 * @c discard_oldest set to true, then pushing to a full ringbuffer will
383
 * always succeed, dropping the oldest item if necessary. This can be useful
384
 * for lossy network applications where it is more important to have fresh
385
 * data.
386
 *
387
 * \internal
388
 *
389
 * The design is mostly standard: head and tail pointers, and semaphores
390
 * indicating the number of free and filled slots. One slot is always left
391
 * empty so that the destructor can distinguish between empty and full without
392
 * consulting the semaphores.
393
 *
394
 * The interesting part is @ref stop. On the producer side, this sets @ref
395
 * stopped, which is protected by the tail mutex to immediately prevent any
396
 * other pushes. On the consumer side, it sets @ref stop_position (protected
397
 * by the head mutex), so that consumers only get @ref ringbuffer_stopped
398
 * after consuming already-present elements. To wake everything up and prevent
399
 * any further waits, @ref stop ups both semaphores, and any functions that
400
 * observe a stop condition after downing a semaphore will re-up it. This
401
 * causes the semaphore to be transiently unavailable, which leads to the need
402
 * for @ref throw_empty_or_stopped and @ref throw_full_or_stopped.
403
 *
404
 * Another unusul
405
 */
406
template<typename T, typename DataSemaphore = semaphore, typename SpaceSemaphore = semaphore>
407
class ringbuffer : public ringbuffer_base<T>
408
{
409
private:
410
    const bool discard_oldest;
411
    DataSemaphore data_sem;     ///< Number of filled slots
412
    SpaceSemaphore space_sem;   ///< Number of available slots
413

414
    /// Implement @ref emplace and @ref try_emplace when discard_oldest is true
415
    template<typename... Args>
416
    void emplace_discard_oldest(Args&&... args);
417

418
public:
419
    explicit ringbuffer(std::size_t cap, bool discard_oldest = false);
420

421
    /**
422
     * Append an item to the queue, if there is space. It uses move
423
     * semantics, so on success, the original value is undefined.
424
     *
425
     * @param value    Value to move
426
     * @throw ringbuffer_full if there is no space
427
     * @throw ringbuffer_stopped if @ref stop has already been called
428
     */
429
    void try_push(T &&value);
430

431
    /**
432
     * Construct a new item in the queue, if there is space.
433
     *
434
     * @param args     Arguments to the constructor
435
     * @throw ringbuffer_full if there is no space
436
     * @throw ringbuffer_stopped if @ref stop has already been called
437
     */
438
    template<typename... Args>
439
    void try_emplace(Args&&... args);
440

441
    /**
442
     * Append an item to the queue, blocking if necessary. It uses move
443
     * semantics, so on success, the original value is undefined.
444
     *
445
     * @param value     Value to move
446
     * @param sem_args  Arbitrary arguments to pass to the space semaphore
447
     * @throw ringbuffer_stopped if @ref stop is called first
448
     */
449
    template<typename... SemArgs>
450
    void push(T &&value, SemArgs&&... sem_args);
451

452
    /**
453
     * Construct a new item in the queue, blocking if necessary.
454
     *
455
     * @param args     Arguments to the constructor
456
     * @throw ringbuffer_stopped if @ref stop is called first
457
     */
458
    template<typename... Args>
459
    void emplace(Args&&... args);
460

461
    /**
462
     * Retrieve an item from the queue, if there is one.
463
     *
464
     * @throw ringbuffer_stopped if the queue is empty and @ref stop was called
465
     * @throw ringbuffer_empty if the queue is empty but still active
466
     */
467
    T try_pop();
468

469
    /**
470
     * Retrieve an item from the queue, blocking until there is one or until
471
     * the queue is stopped.
472
     *
473
     * @param sem_args  Arbitrary arguments to pass to the data semaphore
474
     * @throw ringbuffer_stopped if the queue is empty and @ref stop was called
475
     */
476
    template<typename... SemArgs>
477
    T pop(SemArgs&&... sem_args);
478

479
    /**
480
     * Indicate that no more items will be produced. This does not immediately
481
     * stop consumers if there are still items in the queue; instead,
482
     * consumers will continue to retrieve remaining items, and will only be
483
     * signalled once the queue has drained.
484
     *
485
     * @returns whether the ringbuffer was stopped
486
     */
487
    bool stop();
488

489
    /**
490
     * Indicate that a producer registered with @ref add_producer is
491
     * finished with the ringbuffer. If this was the last producer, the
492
     * ringbuffer is stopped.
493
     *
494
     * @returns whether the ringbuffer was stopped
495
     */
496
    bool remove_producer();
497

498
    /// Get access to the data semaphore
499
    const DataSemaphore &get_data_sem() const { return data_sem; }
157✔
500
    /// Get access to the free-space semaphore
501
    const SpaceSemaphore &get_space_sem() const { return space_sem; }
7✔
502
    /// Get the discard_oldest flag
503
    bool get_discard_oldest() const { return discard_oldest; }
×
504

505
    /**
506
     * Begin iteration over the items in the ringbuffer. This does not
507
     * return a full-blown iterator; it is only intended to be used for
508
     * a range-based for loop. For example:
509
     * <code>for (auto &&item : ringbuffer) { ... }</code>
510
     */
511
    detail::ringbuffer_iterator<ringbuffer> begin();
512
    /**
513
     * End iterator (see @ref begin).
514
     */
515
    detail::ringbuffer_sentinel end();
516
};
517

518
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
519
ringbuffer<T, DataSemaphore, SpaceSemaphore>::ringbuffer(size_t cap, bool discard_oldest)
618✔
520
    : ringbuffer_base<T>(cap), discard_oldest(discard_oldest), data_sem(0), space_sem(cap)
618✔
521
{
522
}
618✔
523

524
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
525
template<typename... Args>
526
void ringbuffer<T, DataSemaphore, SpaceSemaphore>::emplace_discard_oldest(Args&&... args)
×
527
{
528
    while (space_sem.try_get() == -1)
×
529
    {
530
        if (data_sem.try_get() == -1)
×
531
        {
532
            /* This could happen because consumers have removed all the data
533
             * before we could (in which case space_sem will be available in
534
             * the near future) or because we're contending with other
535
             * producers that have taken space_sem but yet written the data (in
536
             * which case data_sem will be available in the near future). Spin
537
             * until we can get one of them.
538
             *
539
             * Spinning is not ideal, but I don't see any other way to wait
540
             * until one of the two semaphores has a token.
541
             */
542
            std::this_thread::yield();
×
543
            continue;
×
544
        }
545
        try
546
        {
547
            this->discard_oldest_internal();
×
548
        }
549
        catch (ringbuffer_stopped &)
×
550
        {
551
            data_sem.put();  // We didn't actually add any data
×
552
            throw;
×
553
        }
554
    }
555
    try
556
    {
557
        this->emplace_internal(std::forward<Args>(args)...);
×
558
        data_sem.put();
×
559
    }
560
    catch (ringbuffer_stopped &e)
×
561
    {
562
        // We didn't actually use the slot we reserved with space_sem
563
        space_sem.put();
×
564
        throw;
×
565
    }
566
}
×
567

568
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
569
template<typename... Args>
570
void ringbuffer<T, DataSemaphore, SpaceSemaphore>::emplace(Args&&... args)
571
{
572
    if (discard_oldest)
573
        return emplace_discard_oldest(std::forward<Args>(args)...);
574
    semaphore_get(space_sem);
575
    try
576
    {
577
        this->emplace_internal(std::forward<Args>(args)...);
578
        data_sem.put();
579
    }
580
    catch (ringbuffer_stopped &e)
581
    {
582
        // We didn't actually use the slot we reserved with space_sem
583
        space_sem.put();
584
        throw;
585
    }
586
}
587

588
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
589
template<typename... Args>
590
void ringbuffer<T, DataSemaphore, SpaceSemaphore>::try_emplace(Args&&... args)
20,246✔
591
{
592
    if (discard_oldest)
20,246✔
593
        return emplace_discard_oldest(std::forward<Args>(args)...);
×
594
    /* TODO: try_get needs to be modified to distinguish between interrupted
595
     * system calls and zero semaphore (EAGAIN vs EINTR). But in most (all?)
596
     * cases is impossible because try_get is not blocking.
597
     */
598
    if (space_sem.try_get() == -1)
20,246✔
599
        this->throw_full_or_stopped();
×
600
    try
601
    {
602
        this->emplace_internal(std::forward<Args>(args)...);
20,246✔
603
        data_sem.put();
20,246✔
604
    }
605
    catch (ringbuffer_stopped &e)
×
606
    {
607
        // We didn't actually use the slot we reserved with space_sem
608
        space_sem.put();
×
609
        throw;
×
610
    }
611
}
612

613
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
614
template<typename... SemArgs>
615
void ringbuffer<T, DataSemaphore, SpaceSemaphore>::push(T &&value, SemArgs&&... sem_args)
427✔
616
{
617
    if (discard_oldest)
427✔
618
        return emplace_discard_oldest(std::move(value));
×
619
    semaphore_get(space_sem, std::forward<SemArgs>(sem_args)...);
427✔
620
    try
621
    {
622
        this->emplace_internal(std::move(value));
427✔
623
        data_sem.put();
411✔
624
    }
625
    catch (ringbuffer_stopped &e)
32✔
626
    {
627
        // We didn't actually use the slot we reserved with space_sem
628
        space_sem.put();
16✔
629
        throw;
16✔
630
    }
631
}
632

633
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
634
void ringbuffer<T, DataSemaphore, SpaceSemaphore>::try_push(T &&value)
834✔
635
{
636
    if (discard_oldest)
834✔
637
        return emplace_discard_oldest(std::move(value));
×
638
    if (space_sem.try_get() == -1)
834✔
639
        this->throw_full_or_stopped();
9✔
640
    try
641
    {
642
        this->emplace_internal(std::move(value));
825✔
643
        data_sem.put();
816✔
644
    }
645
    catch (ringbuffer_stopped &e)
18✔
646
    {
647
        // We didn't actually use the slot we reserved with space_sem
648
        space_sem.put();
9✔
649
        throw;
9✔
650
    }
651
}
652

653
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
654
template<typename... SemArgs>
655
T ringbuffer<T, DataSemaphore, SpaceSemaphore>::pop(SemArgs&&... sem_args)
1,976✔
656
{
657
    semaphore_get(data_sem, std::forward<SemArgs>(sem_args)...);
1,976✔
658
    try
659
    {
660
        T result = this->pop_internal();
1,976✔
661
        space_sem.put();
1,397✔
662
        return result;
2,794✔
663
    }
1,397✔
664
    catch (ringbuffer_stopped &e)
1,158✔
665
    {
666
        // We didn't consume any data, wake up the next waiter
667
        data_sem.put();
579✔
668
        throw;
579✔
669
    }
670
}
671

672
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
673
T ringbuffer<T, DataSemaphore, SpaceSemaphore>::try_pop()
20,273✔
674
{
675
    if (data_sem.try_get() == -1)
20,273✔
676
        this->throw_empty_or_stopped();
279✔
677
    try
678
    {
679
        T result = this->pop_internal();
19,994✔
680
        space_sem.put();
19,987✔
681
        return result;
39,974✔
682
    }
19,987✔
683
    catch (ringbuffer_stopped &e)
14✔
684
    {
685
        // We didn't consume any data, wake up the next waiter
686
        data_sem.put();
7✔
687
        throw;
7✔
688
    }
689
}
690

691
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
692
bool ringbuffer<T, DataSemaphore, SpaceSemaphore>::stop()
1,711✔
693
{
694
    if (this->stop_internal())
1,711✔
695
    {
696
        space_sem.put();
1,711✔
697
        data_sem.put();
1,711✔
698
        return true;
1,711✔
699
    }
700
    else
701
        return false;
×
702
}
703

704
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
705
bool ringbuffer<T, DataSemaphore, SpaceSemaphore>::remove_producer()
108✔
706
{
707
    if (this->stop_internal(true))
108✔
708
    {
709
        space_sem.put();
37✔
710
        data_sem.put();
37✔
711
        return true;
37✔
712
    }
713
    else
714
        return false;
71✔
715
}
716

717
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
718
auto ringbuffer<T, DataSemaphore, SpaceSemaphore>::begin() -> detail::ringbuffer_iterator<ringbuffer>
719
{
720
    return detail::ringbuffer_iterator(*this);
721
}
722

723
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
724
detail::ringbuffer_sentinel ringbuffer<T, DataSemaphore, SpaceSemaphore>::end()
725
{
726
    return detail::ringbuffer_sentinel();
727
}
728

729
} // namespace spead2
730

731
#endif // SPEAD2_COMMON_RINGBUFFER_H
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc