• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 16729047925

04 Aug 2025 04:44PM UTC coverage: 78.411% (-0.09%) from 78.504%
16729047925

push

github

bmerry
Wire set_pop_if_full up to Python

4 of 6 new or added lines in 1 file covered. (66.67%)

75 existing lines in 6 files now uncovered.

5586 of 7124 relevant lines covered (78.41%)

90538.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.0
/include/spead2/common_ringbuffer.h
1
/* Copyright 2015, 2021, 2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 *
20
 * Ring buffer.
21
 */
22

23
#ifndef SPEAD2_COMMON_RINGBUFFER_H
24
#define SPEAD2_COMMON_RINGBUFFER_H
25

26
#include <mutex>
27
#include <condition_variable>
28
#include <type_traits>
29
#include <memory>
30
#include <stdexcept>
31
#include <utility>
32
#include <optional>
33
#include <cassert>
34
#include <climits>
35
#include <iostream>
36
#include <new>
37
#include <optional>
38
#include <spead2/common_logging.h>
39
#include <spead2/common_semaphore.h>
40
#include <spead2/common_storage.h>
41

42
namespace spead2
43
{
44

45
/// Thrown when attempting to do a non-blocking push to a full ringbuffer
46
class ringbuffer_full : public std::runtime_error
47
{
48
public:
49
    ringbuffer_full() : std::runtime_error("ring buffer is full") {}
9✔
50
};
51

52
/// Thrown when attempting to do a non-blocking pop from an empty ringbuffer
53
class ringbuffer_empty : public std::runtime_error
54
{
55
public:
56
    ringbuffer_empty() : std::runtime_error("ring buffer is empty") {}
279✔
57
};
58

59
/**
60
 * Thrown when attempting to do a pop from an empty ringbuffer that has been
61
 * stopped, or a push to a ringbuffer that has been stopped.
62
 */
63
class ringbuffer_stopped : public std::runtime_error
64
{
65
public:
66
    ringbuffer_stopped() : std::runtime_error("ring buffer has been stopped") {}
635✔
67
};
68

69
namespace detail
70
{
71

72
/// Sentinel counterpart to @ref ringbuffer_iterator
73
class ringbuffer_sentinel {};
74

75
/**
76
 * Basic iterator for @ref ringbuffer as well as ringbuffer-like classes: they
77
 * must provide @c pop which either returns when data is available or throws
78
 * @ref ringbuffer_stopped.
79
 *
80
 * This does not fully implement the iterator concept; it is suitable only for
81
 * range-based for loops.
82
 */
83
template<typename Ringbuffer>
84
class ringbuffer_iterator
85
{
86
private:
87
    using value_type = decltype(std::declval<Ringbuffer>().pop());
88

89
    Ringbuffer &owner;
90
    std::optional<value_type> current; // nullopt once ring has stopped
91

92
public:
93
    explicit ringbuffer_iterator(Ringbuffer &owner);
94
    bool operator!=(const ringbuffer_sentinel &);
95
    ringbuffer_iterator &operator++();
96
    value_type &&operator*();
97
};
98

99
template<typename Ringbuffer>
100
ringbuffer_iterator<Ringbuffer>::ringbuffer_iterator(Ringbuffer &owner)
1✔
101
    : owner(owner)
1✔
102
{
103
    ++*this;  // Load the first value
1✔
104
}
1✔
105

106
template<typename Ringbuffer>
107
bool ringbuffer_iterator<Ringbuffer>::operator!=(const ringbuffer_sentinel &)
4✔
108
{
109
    return bool(current);
4✔
110
}
111

112
template<typename Ringbuffer>
113
auto ringbuffer_iterator<Ringbuffer>::operator++() -> ringbuffer_iterator &
4✔
114
{
115
    /* Clear it first, so that we can reclaim the memory before making
116
     * space available in the ringbuffer, which might cause another
117
     * thread to allocate more memory.
118
     */
119
    current = std::nullopt;
4✔
120
    try
121
    {
122
        current = owner.pop();
4✔
123
    }
124
    catch (ringbuffer_stopped &)
1✔
125
    {
126
    }
127
    return *this;
4✔
128
}
129

130
template<typename Ringbuffer>
131
auto ringbuffer_iterator<Ringbuffer>::operator*() -> value_type &&
3✔
132
{
133
    return std::move(*current);
3✔
134
}
135

136
} // namespace detail
137

138
/**
139
 * Internal base class for @ref ringbuffer that is independent of the semaphore
140
 * type.
141
 */
142
template<typename T>
143
class ringbuffer_base
144
{
145
private:
146
    typedef detail::storage<T> storage_type;
147
    std::unique_ptr<storage_type[]> storage;
148
    const std::size_t cap;  ///< Number of slots
149

150
    /// Mutex held when reading from the head (needed for safe multi-consumer)
151
    mutable std::mutex head_mutex;
152
    std::size_t head = 0;   ///< first slot with data
153
    /**
154
     * Position in the queue which the receiver should treat as "please stop",
155
     * or an invalid position if not yet stopped. Unlike @ref stopped, this is
156
     * updated with @ref head_mutex rather than @ref tail_mutex held. Once
157
     * set, it is guaranteed to always equal @ref tail.
158
     */
159
    std::size_t stop_position = SIZE_MAX;
160

161
    /// Mutex held when writing to the tail (needed for safe multi-producer)
162
    mutable std::mutex tail_mutex;
163
    std::size_t tail = 0;   ///< first slot without data
164
    bool stopped = false;   ///< Whether stop has been called
165
    std::size_t producers = 0;  ///< Number of producers registered with @ref add_producer
166

167
    /// Increments @a idx, wrapping around.
168
    std::size_t next(std::size_t idx)
42,946✔
169
    {
170
        idx++;
42,946✔
171
        if (idx == cap)
42,946✔
172
            idx = 0;
282✔
173
        return idx;
42,946✔
174
    }
175

176
protected:
177
    explicit ringbuffer_base(std::size_t cap);
178
    ~ringbuffer_base();
179

180
    /**
181
     * Check whether we're stopped and throw an appropriate error.
182
     *
183
     * @throw ringbuffer_stopped if the ringbuffer is empty and stopped
184
     * @throw ringbuffer_empty otherwise
185
     */
186
    void throw_empty_or_stopped();
187

188
    /**
189
     * Check whether we're stopped and throw an appropriate error.
190
     *
191
     * @throw ringbuffer_stopped if the ringbuffer is stopped
192
     * @throw ringbuffer_full otherwise
193
     */
194
    void throw_full_or_stopped();
195

196
    /// Implementation of pushing functions, which doesn't touch semaphores
197
    template<typename... Args>
198
    void emplace_internal(Args&&... args);
199

200
    /// Implementation of popping functions, which doesn't touch semaphores
201
    T pop_internal();
202

203
    /**
204
     * Emplace a new item, and return the oldest item.
205
     *
206
     * If the queue is empty, does nothing and returns std::nullopt.
207
     */
208
    template<typename... Args>
209
    std::optional<T> emplace_pop_internal(Args&&... args);
210

211
    /**
212
     * Implementation of stopping, without the semaphores.
213
     *
214
     * @param remove_producer If true, remove a producer and only stop if none are left
215
     * @returns whether the ringbuffer was stopped (i.e., this is the first call)
216
     */
217
    bool stop_internal(bool remove_producer = false);
218

219
public:
220
    /// Maximum number of items that can be held at once
221
    std::size_t capacity() const;
222

223
    /**
224
     * Return the number of items currently in the ringbuffer.
225
     *
226
     * This should only be used for metrics, not for control flow, as
227
     * the result could be out of date by the time it is returned.
228
     */
229
    std::size_t size() const;
230

231
    /**
232
     * Register a new producer. Producers only need to call this if they
233
     * want to call @ref ringbuffer::remove_producer.
234
     */
235
    void add_producer();
236
};
237

238
template<typename T>
239
void ringbuffer_base<T>::throw_empty_or_stopped()
279✔
240
{
241
    std::lock_guard<std::mutex> lock(head_mutex);
279✔
242
    if (head == stop_position)
279✔
UNCOV
243
        throw ringbuffer_stopped();
×
244
    else
245
        throw ringbuffer_empty();
279✔
246
}
279✔
247

248
template<typename T>
249
std::size_t ringbuffer_base<T>::capacity() const
142✔
250
{
251
    return cap - 1;
142✔
252
}
253

254
template<typename T>
255
std::size_t ringbuffer_base<T>::size() const
145✔
256
{
257
    std::lock_guard<std::mutex> head_lock(head_mutex);
145✔
258
    std::lock_guard<std::mutex> tail_lock(tail_mutex);
145✔
259
    if (head <= tail)
145✔
260
        return tail - head;
145✔
261
    else
UNCOV
262
        return tail + cap - head;
×
263
}
145✔
264

265
template<typename T>
266
void ringbuffer_base<T>::throw_full_or_stopped()
9✔
267
{
268
    std::lock_guard<std::mutex> lock(tail_mutex);
9✔
269
    if (stopped)
9✔
UNCOV
270
        throw ringbuffer_stopped();
×
271
    else
272
        throw ringbuffer_full();
9✔
273
}
9✔
274

275
template<typename T>
276
ringbuffer_base<T>::ringbuffer_base(size_t cap)
618✔
277
    : storage(new storage_type[cap + 1]), cap(cap + 1), stop_position(cap + 1)
618✔
278
{
279
    /* We allocate one extra slot so that the destructor can disambiguate empty
280
     * from full. We could also use the semaphore values, but after stopping
281
     * they get used for other things so it is simplest not to rely on them.
282
     */
283
    assert(cap > 0);
618✔
284
}
618✔
285

286
template<typename T>
287
ringbuffer_base<T>::~ringbuffer_base()
618✔
288
{
289
    // Drain any remaining elements
290
    while (head != tail)
707✔
291
    {
292
        storage[head].destroy();
89✔
293
        head = next(head);
89✔
294
    }
295
}
618✔
296

297
template<typename T>
298
template<typename... Args>
299
void ringbuffer_base<T>::emplace_internal(Args&&... args)
21,498✔
300
{
301
    std::lock_guard<std::mutex> lock(tail_mutex);
21,498✔
302
    if (stopped)
21,498✔
303
    {
304
        throw ringbuffer_stopped();
25✔
305
    }
306
    // Construct in-place
307
    storage[tail].construct(std::forward<Args>(args)...);
21,473✔
308
    tail = next(tail);
21,473✔
309
}
21,498✔
310

311
template<typename T>
312
T ringbuffer_base<T>::pop_internal()
21,945✔
313
{
314
    std::lock_guard<std::mutex> lock(head_mutex);
21,945✔
315
    if (stop_position == head)
21,945✔
316
    {
317
        throw ringbuffer_stopped();
561✔
318
    }
319
    auto &item = storage[head];
21,384✔
320
    T result = std::move(*item);
21,384✔
321
    item.destroy();
21,384✔
322
    head = next(head);
21,384✔
323
    return result;
42,768✔
324
}
21,945✔
325

326
template<typename T>
327
template<typename... Args>
328
std::optional<T> ringbuffer_base<T>::emplace_pop_internal(Args&&... args)
×
329
{
330
    std::scoped_lock lock(head_mutex, tail_mutex);
×
UNCOV
331
    if (stopped)
×
332
    {
UNCOV
333
        throw ringbuffer_stopped();
×
334
    }
UNCOV
335
    if (head == tail)
×
UNCOV
336
        return std::nullopt;
×
UNCOV
337
    auto &item = storage[head];
×
UNCOV
338
    std::optional<T> result{std::move(*item)};
×
UNCOV
339
    item.destroy();
×
UNCOV
340
    head = next(head);
×
341

342
    // Construct new item in-place
UNCOV
343
    storage[tail].construct(std::forward<Args>(args)...);
×
UNCOV
344
    tail = next(tail);
×
UNCOV
345
    return result;
×
UNCOV
346
}
×
347

348
template<typename T>
349
bool ringbuffer_base<T>::stop_internal(bool remove_producer)
1,819✔
350
{
351
    std::size_t saved_tail;
352

353
    {
354
        std::lock_guard<std::mutex> tail_lock(this->tail_mutex);
1,819✔
355
        if (remove_producer)
1,819✔
356
        {
357
            assert(producers != 0);
108✔
358
            producers--;
108✔
359
            if (producers != 0)
108✔
360
                return false;
71✔
361
        }
362
        stopped = true;
1,748✔
363
        saved_tail = tail;
1,748✔
364
    }
1,819✔
365

366
    {
367
        std::lock_guard<std::mutex> head_lock(this->head_mutex);
1,748✔
368
        stop_position = saved_tail;
1,748✔
369
    }
1,748✔
370
    return true;
1,748✔
371
}
372

373
template<typename T>
374
void ringbuffer_base<T>::add_producer()
108✔
375
{
376
    std::lock_guard<std::mutex> tail_lock(this->tail_mutex);
108✔
377
    producers++;
108✔
378
}
108✔
379

380
///////////////////////////////////////////////////////////////////////
381

382
/**
383
 * Ring buffer with blocking and non-blocking push and pop. It supports
384
 * non-copyable objects using move semantics. The producer may signal that it
385
 * has finished producing data by calling @ref stop, which will gracefully shut
386
 * down consumers as well as other producers. This class is fully thread-safe
387
 * for multiple producers and consumers.
388
 *
389
 * With multiple producers it is sometimes desirable to only stop the
390
 * ringbuffer once all the producers are finished. To support this, a
391
 * producer may register itself with @ref add_producer, and indicate
392
 * completion with @ref remove_producer. If this causes the number of
393
 * producers to fall to zero, the stream is stopped.
394
 *
395
 * Normally, trying to push data when the ringbuffer is full will either block
396
 * (for @ref push and @ref emplace) or fail (for @ref try_push and
397
 * @ref try_emplace). However, one can use @ref push_pop_if_full or
398
 * @ref emplace_pop_if_full instead to pop and return the oldest item if there
399
 * is not enough space. This can be useful for lossy network applications where
400
 * it is more important to have fresh data.
401
 *
402
 * \internal
403
 *
404
 * The design is mostly standard: head and tail pointers, and semaphores
405
 * indicating the number of free and filled slots. One slot is always left
406
 * empty so that the destructor can distinguish between empty and full without
407
 * consulting the semaphores.
408
 *
409
 * The interesting part is @ref stop. On the producer side, this sets @ref
410
 * stopped, which is protected by the tail mutex to immediately prevent any
411
 * other pushes. On the consumer side, it sets @ref stop_position (protected
412
 * by the head mutex), so that consumers only get @ref ringbuffer_stopped
413
 * after consuming already-present elements. To wake everything up and prevent
414
 * any further waits, @ref stop ups both semaphores, and any functions that
415
 * observe a stop condition after downing a semaphore will re-up it. This
416
 * causes the semaphore to be transiently unavailable, which leads to the need
417
 * for @ref throw_empty_or_stopped and @ref throw_full_or_stopped.
418
 *
419
 * Another unusul
420
 */
421
template<typename T, typename DataSemaphore = semaphore, typename SpaceSemaphore = semaphore>
422
class ringbuffer : public ringbuffer_base<T>
423
{
424
private:
425
    DataSemaphore data_sem;     ///< Number of filled slots
426
    SpaceSemaphore space_sem;   ///< Number of available slots
427

428
public:
429
    explicit ringbuffer(std::size_t cap);
430

431
    /**
432
     * Append an item to the queue, if there is space. It uses move
433
     * semantics, so on success, the original value is undefined.
434
     *
435
     * @param value    Value to move
436
     * @throw ringbuffer_full if there is no space
437
     * @throw ringbuffer_stopped if @ref stop has already been called
438
     */
439
    void try_push(T &&value);
440

441
    /**
442
     * Construct a new item in the queue, if there is space.
443
     *
444
     * @param args     Arguments to the constructor
445
     * @throw ringbuffer_full if there is no space
446
     * @throw ringbuffer_stopped if @ref stop has already been called
447
     */
448
    template<typename... Args>
449
    void try_emplace(Args&&... args);
450

451
    /**
452
     * Append an item to the queue, blocking if necessary. It uses move
453
     * semantics, so on success, the original value is undefined.
454
     *
455
     * @param value     Value to move
456
     * @param sem_args  Arbitrary arguments to pass to the space semaphore
457
     * @throw ringbuffer_stopped if @ref stop is called first
458
     */
459
    template<typename... SemArgs>
460
    void push(T &&value, SemArgs&&... sem_args);
461

462
    /**
463
     * Construct a new item in the queue, blocking if necessary.
464
     *
465
     * @param args     Arguments to the constructor
466
     * @throw ringbuffer_stopped if @ref stop is called first
467
     */
468
    template<typename... Args>
469
    void emplace(Args&&... args);
470

471
    /**
472
     * Append an item to the queue, popping and returning the oldest item
473
     * if the ringbuffer is full.
474
     *
475
     * @param value     Value to move
476
     */
477
    std::optional<T> push_pop_if_full(T &&value);
478

479
    /**
480
     * Construct a new item in the queue, popping and returning the oldest
481
     * item if the ringbuffer is full.
482
     *
483
     * @param args     Arguments to the constructor
484
     * @throw ringbuffer_stopped if @ref stop is called first
485
     */
486
    template<typename... Args>
487
    std::optional<T> emplace_pop_if_full(Args&&... args);
488

489
    /**
490
     * Retrieve an item from the queue, if there is one.
491
     *
492
     * @throw ringbuffer_stopped if the queue is empty and @ref stop was called
493
     * @throw ringbuffer_empty if the queue is empty but still active
494
     */
495
    T try_pop();
496

497
    /**
498
     * Retrieve an item from the queue, blocking until there is one or until
499
     * the queue is stopped.
500
     *
501
     * @param sem_args  Arbitrary arguments to pass to the data semaphore
502
     * @throw ringbuffer_stopped if the queue is empty and @ref stop was called
503
     */
504
    template<typename... SemArgs>
505
    T pop(SemArgs&&... sem_args);
506

507
    /**
508
     * Indicate that no more items will be produced. This does not immediately
509
     * stop consumers if there are still items in the queue; instead,
510
     * consumers will continue to retrieve remaining items, and will only be
511
     * signalled once the queue has drained.
512
     *
513
     * @returns whether the ringbuffer was stopped
514
     */
515
    bool stop();
516

517
    /**
518
     * Indicate that a producer registered with @ref add_producer is
519
     * finished with the ringbuffer. If this was the last producer, the
520
     * ringbuffer is stopped.
521
     *
522
     * @returns whether the ringbuffer was stopped
523
     */
524
    bool remove_producer();
525

526
    /// Get access to the data semaphore
527
    const DataSemaphore &get_data_sem() const { return data_sem; }
157✔
528
    /// Get access to the free-space semaphore
529
    const SpaceSemaphore &get_space_sem() const { return space_sem; }
7✔
530

531
    /**
532
     * Begin iteration over the items in the ringbuffer. This does not
533
     * return a full-blown iterator; it is only intended to be used for
534
     * a range-based for loop. For example:
535
     * <code>for (auto &&item : ringbuffer) { ... }</code>
536
     */
537
    detail::ringbuffer_iterator<ringbuffer> begin();
538
    /**
539
     * End iterator (see @ref begin).
540
     */
541
    detail::ringbuffer_sentinel end();
542
};
543

544
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
545
ringbuffer<T, DataSemaphore, SpaceSemaphore>::ringbuffer(size_t cap)
618✔
546
    : ringbuffer_base<T>(cap), data_sem(0), space_sem(cap)
618✔
547
{
548
}
618✔
549

550
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
551
template<typename... Args>
552
void ringbuffer<T, DataSemaphore, SpaceSemaphore>::emplace(Args&&... args)
553
{
554
    semaphore_get(space_sem);
555
    try
556
    {
557
        this->emplace_internal(std::forward<Args>(args)...);
558
        data_sem.put();
559
    }
560
    catch (ringbuffer_stopped &e)
561
    {
562
        // We didn't actually use the slot we reserved with space_sem
563
        space_sem.put();
564
        throw;
565
    }
566
}
567

568
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
569
template<typename... Args>
570
void ringbuffer<T, DataSemaphore, SpaceSemaphore>::try_emplace(Args&&... args)
20,246✔
571
{
572
    /* TODO: try_get needs to be modified to distinguish between interrupted
573
     * system calls and zero semaphore (EAGAIN vs EINTR). But in most (all?)
574
     * cases is impossible because try_get is not blocking.
575
     */
576
    if (space_sem.try_get() == -1)
20,246✔
UNCOV
577
        this->throw_full_or_stopped();
×
578
    try
579
    {
580
        this->emplace_internal(std::forward<Args>(args)...);
20,246✔
581
        data_sem.put();
20,246✔
582
    }
UNCOV
583
    catch (ringbuffer_stopped &e)
×
584
    {
585
        // We didn't actually use the slot we reserved with space_sem
UNCOV
586
        space_sem.put();
×
UNCOV
587
        throw;
×
588
    }
589
}
20,246✔
590

591
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
592
template<typename... SemArgs>
593
void ringbuffer<T, DataSemaphore, SpaceSemaphore>::push(T &&value, SemArgs&&... sem_args)
427✔
594
{
595
    semaphore_get(space_sem, std::forward<SemArgs>(sem_args)...);
427✔
596
    try
597
    {
598
        this->emplace_internal(std::move(value));
427✔
599
        data_sem.put();
411✔
600
    }
601
    catch (ringbuffer_stopped &e)
32✔
602
    {
603
        // We didn't actually use the slot we reserved with space_sem
604
        space_sem.put();
16✔
605
        throw;
16✔
606
    }
607
}
411✔
608

609
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
610
void ringbuffer<T, DataSemaphore, SpaceSemaphore>::try_push(T &&value)
834✔
611
{
612
    if (space_sem.try_get() == -1)
834✔
613
        this->throw_full_or_stopped();
9✔
614
    try
615
    {
616
        this->emplace_internal(std::move(value));
825✔
617
        data_sem.put();
816✔
618
    }
619
    catch (ringbuffer_stopped &e)
18✔
620
    {
621
        // We didn't actually use the slot we reserved with space_sem
622
        space_sem.put();
9✔
623
        throw;
9✔
624
    }
625
}
816✔
626

627
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
UNCOV
628
std::optional<T> ringbuffer<T, DataSemaphore, SpaceSemaphore>::push_pop_if_full(T &&value)
×
629
{
UNCOV
630
    return emplace_pop_if_full(std::move(value));
×
631
}
632

633
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
634
template<typename... Args>
UNCOV
635
std::optional<T> ringbuffer<T, DataSemaphore, SpaceSemaphore>::emplace_pop_if_full(Args&&... args)
×
636
{
UNCOV
637
    while (space_sem.try_get() == -1)
×
638
    {
UNCOV
639
        std::optional<T> ret = this->emplace_pop_internal(std::forward<Args>(args)...);
×
UNCOV
640
        if (ret.has_value())
×
641
        {
UNCOV
642
            return ret;  // No semaphore manipulations needed
×
643
        }
644
        /* The ringbuffer is empty: either because consumers emptied it because we
645
         * got a chance to pop, or because other producers are holding all the
646
         * space_sem tokens and haven't had a chance to push yet. Just spin until
647
         * we can make forward progress. Spinning isn't great but there is no
648
         * primitive that lets us acquire one of two semaphores.
649
         */
650
    }
651
    try
652
    {
UNCOV
653
        this->emplace_internal(std::forward<Args>(args)...);
×
UNCOV
654
        data_sem.put();
×
655
    }
UNCOV
656
    catch (ringbuffer_stopped &e)
×
657
    {
658
        // We didn't actually use the slot we reserved with space_sem
UNCOV
659
        space_sem.put();
×
UNCOV
660
        throw;
×
661
    }
UNCOV
662
    return std::nullopt;
×
663
}
664

665
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
666
template<typename... SemArgs>
667
T ringbuffer<T, DataSemaphore, SpaceSemaphore>::pop(SemArgs&&... sem_args)
1,952✔
668
{
669
    semaphore_get(data_sem, std::forward<SemArgs>(sem_args)...);
1,952✔
670
    try
671
    {
672
        T result = this->pop_internal();
1,952✔
673
        space_sem.put();
1,398✔
674
        return result;
2,796✔
675
    }
1,398✔
676
    catch (ringbuffer_stopped &e)
1,108✔
677
    {
678
        // We didn't consume any data, wake up the next waiter
679
        data_sem.put();
554✔
680
        throw;
554✔
681
    }
682
}
683

684
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
685
T ringbuffer<T, DataSemaphore, SpaceSemaphore>::try_pop()
20,272✔
686
{
687
    if (data_sem.try_get() == -1)
20,272✔
688
        this->throw_empty_or_stopped();
279✔
689
    try
690
    {
691
        T result = this->pop_internal();
19,993✔
692
        space_sem.put();
19,986✔
693
        return result;
39,972✔
694
    }
19,986✔
695
    catch (ringbuffer_stopped &e)
14✔
696
    {
697
        // We didn't consume any data, wake up the next waiter
698
        data_sem.put();
7✔
699
        throw;
7✔
700
    }
701
}
702

703
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
704
bool ringbuffer<T, DataSemaphore, SpaceSemaphore>::stop()
1,711✔
705
{
706
    if (this->stop_internal())
1,711✔
707
    {
708
        space_sem.put();
1,711✔
709
        data_sem.put();
1,711✔
710
        return true;
1,711✔
711
    }
712
    else
UNCOV
713
        return false;
×
714
}
715

716
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
717
bool ringbuffer<T, DataSemaphore, SpaceSemaphore>::remove_producer()
108✔
718
{
719
    if (this->stop_internal(true))
108✔
720
    {
721
        space_sem.put();
37✔
722
        data_sem.put();
37✔
723
        return true;
37✔
724
    }
725
    else
726
        return false;
71✔
727
}
728

729
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
730
auto ringbuffer<T, DataSemaphore, SpaceSemaphore>::begin() -> detail::ringbuffer_iterator<ringbuffer>
731
{
732
    return detail::ringbuffer_iterator(*this);
733
}
734

735
template<typename T, typename DataSemaphore, typename SpaceSemaphore>
736
detail::ringbuffer_sentinel ringbuffer<T, DataSemaphore, SpaceSemaphore>::end()
737
{
738
    return detail::ringbuffer_sentinel();
739
}
740

741
} // namespace spead2
742

743
#endif // SPEAD2_COMMON_RINGBUFFER_H
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc