• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 21761568859

06 Feb 2026 06:32PM UTC coverage: 86.475%. First build
21761568859

Pull #443

github

web-flow
Merge 03dd5f874 into 0cab473a7
Pull Request #443: Ring buffer features

10 of 11 new or added lines in 1 file covered. (90.91%)

1803 of 2085 relevant lines covered (86.47%)

4778926.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.2
/include/coro/ring_buffer.hpp
1
#pragma once
2

3
#include "coro/concepts/executor.hpp"
4
#include "coro/detail/awaiter_list.hpp"
5
#include "coro/expected.hpp"
6
#include "coro/mutex.hpp"
7
#include "coro/sync_wait.hpp"
8
#include "coro/task.hpp"
9

10
#include <array>
11
#include <atomic>
12
#include <coroutine>
13
#include <memory>
14
#include <optional>
15

16
namespace coro
17
{
18
namespace ring_buffer_result
19
{
20
enum class produce
21
{
22
    produced,
23
    notified,
24
    stopped
25
};
26

27
enum class consume
28
{
29
    notified,
30
    stopped
31
};
32
} // namespace ring_buffer_result
33

34
/**
35
 * @tparam element The type of element the ring buffer will store.  Note that this type should be
36
 *         cheap to move if possible as it is moved into and out of the buffer upon produce and
37
 *         consume operations.
38
 * @tparam num_elements The maximum number of elements the ring buffer can store, must be >= 1.
39
 */
40
template<typename element, size_t num_elements>
41
class ring_buffer
42
{
43
private:
44
    enum running_state_t
45
    {
46
        /// @brief The ring buffer is still running.
47
        running,
48
        /// @brief The ring buffer is draining all elements, produce is no longer allowed.
49
        draining,
50
        /// @brief The ring buffer is fully shutdown, all produce and consume tasks will be woken up with result::stopped.
51
        stopped,
52
    };
53

54
public:
55
    /**
56
     * static_assert If `num_elements` == 0.
57
     */
58
    ring_buffer()
14✔
59
    {
14✔
60
        static_assert(num_elements != 0, "num_elements cannot be zero");
61
    }
14✔
62

63
    ~ring_buffer()
14✔
64
    {
65
        // Wake up anyone still using the ring buffer.
66
        coro::sync_wait(shutdown());
14✔
67
    }
14✔
68

69
    ring_buffer(const ring_buffer<element, num_elements>&) = delete;
70
    ring_buffer(ring_buffer<element, num_elements>&&)      = delete;
71

72
    auto operator=(const ring_buffer<element, num_elements>&) noexcept -> ring_buffer<element, num_elements>& = delete;
73
    auto operator=(ring_buffer<element, num_elements>&&) noexcept -> ring_buffer<element, num_elements>&      = delete;
74

75
    struct produce_operation
76
    {
77
        produce_operation(ring_buffer<element, num_elements>& rb, element e)
11,000,022✔
78
            : m_rb(rb),
11,000,022✔
79
              m_e(std::move(e))
11,000,022✔
80
        {}
11,000,022✔
81

82
        auto await_ready() noexcept -> bool
11,000,022✔
83
        {
84
            auto& mutex = m_rb.m_mutex;
11,000,022✔
85

86
            // Produce operations can only proceed if running.
87
            if (m_rb.m_running_state.load(std::memory_order::acquire) != running_state_t::running)
11,000,022✔
88
            {
NEW
89
                m_result = ring_buffer_result::produce::stopped;
×
90
                mutex.unlock();
×
91
                return true; // Will be awoken with produce::stopped
×
92
            }
93

94
            if (m_rb.m_used.load(std::memory_order::acquire) < num_elements)
22,000,044✔
95
            {
96
                // There is guaranteed space to store
97
                auto slot = m_rb.m_front.fetch_add(1, std::memory_order::acq_rel) % num_elements;
7,916,497✔
98
                m_rb.m_elements[slot] = std::move(m_e);
7,916,497✔
99
                m_rb.m_used.fetch_add(1, std::memory_order::release);
7,916,497✔
100
                mutex.unlock();
7,916,497✔
101
                return true; // Will be awoken with produce::produced
7,916,346✔
102
            }
103

104
            return false; // ring buffer full, suspend
3,083,525✔
105
        }
106

107
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
3,083,525✔
108
        {
109
            m_awaiting_coroutine = awaiting_coroutine;
3,083,525✔
110
            m_next = m_rb.m_produce_waiters.exchange(this, std::memory_order::acq_rel);
3,083,525✔
111
            m_rb.m_mutex.unlock();
3,083,525✔
112
            return true;
3,082,860✔
113
        }
114

115
        /**
116
         * @return produce_result
117
         */
118
        auto await_resume() -> ring_buffer_result::produce
10,996,851✔
119
        {
120
            return m_result;
10,996,851✔
121
        }
122

123
        /// If the operation needs to suspend, the coroutine to resume when the element can be produced.
124
        std::coroutine_handle<> m_awaiting_coroutine;
125
        /// The result that should be returned when this coroutine resumes.
126
        ring_buffer_result::produce m_result{ring_buffer_result::produce::produced};
127
        /// Linked list of produce operations that are awaiting to produce their element.
128
        produce_operation* m_next{nullptr};
129

130
    private:
131
        template<typename element_subtype, size_t num_elements_subtype>
132
        friend class ring_buffer;
133

134
        /// The ring buffer the element is being produced into.
135
        ring_buffer<element, num_elements>& m_rb;
136
        /// The element this produce operation is producing into the ring buffer.
137
        std::optional<element> m_e{std::nullopt};
138
    };
139

140
    struct consume_operation
141
    {
142
        explicit consume_operation(ring_buffer<element, num_elements>& rb)
11,000,020✔
143
            : m_rb(rb)
11,000,020✔
144
        {}
11,000,020✔
145

146
        auto await_ready() noexcept -> bool
11,000,020✔
147
        {
148
            auto& mutex = m_rb.m_mutex;
11,000,020✔
149

150
            // Consume operations proceed until stopped.
151
            if (m_rb.m_running_state.load(std::memory_order::acquire) == running_state_t::stopped)
11,000,020✔
152
            {
153
                m_result = ring_buffer_result::consume::stopped;
1✔
154
                mutex.unlock();
1✔
155
                return true;
1✔
156
            }
157

158
            if (m_rb.m_used.load(std::memory_order::acquire) > 0)
22,000,038✔
159
            {
160
                auto slot = m_rb.m_back.fetch_add(1, std::memory_order::acq_rel) % num_elements;
8,606,060✔
161
                m_e = std::move(m_rb.m_elements[slot]);
8,606,060✔
162
                m_rb.m_elements[slot] = std::nullopt;
8,606,060✔
163
                m_rb.m_used.fetch_sub(1, std::memory_order::release);
8,606,060✔
164
                mutex.unlock();
8,606,060✔
165
                return true;
8,604,350✔
166
            }
167

168
            return false; // ring buffer is empty, suspend.
2,393,959✔
169
        }
170

171
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
2,393,959✔
172
        {
173
            m_awaiting_coroutine = awaiting_coroutine;
2,393,959✔
174
            m_next = m_rb.m_consume_waiters.exchange(this, std::memory_order::acq_rel);
2,393,959✔
175
            m_rb.m_mutex.unlock();
2,393,959✔
176
            return true;
2,393,924✔
177
        }
178

179
        /**
180
         * @return The consumed element or ring_buffer_stopped if the ring buffer has been shutdown.
181
         */
182
        auto await_resume() -> expected<element, ring_buffer_result::consume>
10,997,700✔
183
        {
184
            if (m_e.has_value())
10,997,700✔
185
            {
186
                return expected<element, ring_buffer_result::consume>(std::move(m_e).value());
10,997,842✔
187
            }
188
            else // state is stopped
189
            {
190
                return unexpected<ring_buffer_result::consume>(m_result);
6✔
191
            }
192
        }
193

194
        /// If the operation needs to suspend, the coroutine to resume when the element can be consumed.
195
        std::coroutine_handle<> m_awaiting_coroutine;
196
        /// The unexpected result this should return on resume
197
        ring_buffer_result::consume m_result{ring_buffer_result::consume::stopped};
198
        /// Linked list of consume operations that are awaiting to consume an element.
199
        consume_operation* m_next{nullptr};
200

201
    private:
202
        template<typename element_subtype, size_t num_elements_subtype>
203
        friend class ring_buffer;
204

205
        /// The ring buffer to consume an element from.
206
        ring_buffer<element, num_elements>& m_rb;
207
        /// The element this consume operation will consume.
208
        std::optional<element> m_e{std::nullopt};
209
    };
210

211
    /**
212
     * Produces the given element into the ring buffer.  This operation will suspend until a slot
213
     * in the ring buffer becomes available.
214
     * @param e The element to produce.
215
     */
216
    [[nodiscard]] auto produce(element e) -> coro::task<ring_buffer_result::produce>
10,978,020✔
217
    {
218
        co_await m_mutex.lock();
219
        auto result = co_await produce_operation{*this, std::move(e)};
220
        co_await try_resume_consumers();
221
        co_return result;
222
    }
21,952,842✔
223

224
    /**
225
     * Consumes an element from the ring buffer.  This operation will suspend until an element in
226
     * the ring buffer becomes available.
227
     */
228
    [[nodiscard]] auto consume() -> coro::task<expected<element, ring_buffer_result::consume>>
10,997,726✔
229
    {
230
        co_await m_mutex.lock();
231
        auto result = co_await consume_operation{*this};
232
        co_await try_resume_producers();
233
        co_return result;
234
    }
21,995,260✔
235

236
    /**
237
     * @return The maximum number of elements the ring buffer can hold.
238
     */
239
    constexpr auto max_size() const noexcept -> size_t
6✔
240
    {
241
        return num_elements;
6✔
242
    }
243

244
    /**
245
     * @return The current number of elements contained in the ring buffer.
246
     */
247
    auto size() const -> size_t
144✔
248
    {
249
        return m_used.load(std::memory_order::acquire);
288✔
250
    }
251

252
    /**
253
     * @return True if the ring buffer contains zero elements.
254
     */
255
    [[nodiscard]] auto empty() const -> bool { return size() == 0; }
142✔
256

257
    /**
258
     * @return True if the ring buffer has no more space.
259
     */
260
    auto full() const -> bool { return size() == max_size(); }
2✔
261

262
    /**
263
     * @brief Wakes up all currently awaiting producers.  Their await_resume() function
264
     *        will return an expected produce result that producers have been notified.
265
     */
266
    auto notify_producers() -> coro::task<void>
1✔
267
    {
268
        auto expected = m_running_state.load(std::memory_order::acquire);
269
        if (expected == running_state_t::stopped)
270
        {
271
            co_return;
272
        }
273

274
        co_await m_mutex.lock();
275
        auto* produce_waiters = m_produce_waiters.exchange(nullptr, std::memory_order::acq_rel);
276
        m_mutex.unlock();
277

278
        while (produce_waiters != nullptr)
279
        {
280
            auto* next = produce_waiters->m_next;
281
            produce_waiters->m_result = ring_buffer_result::produce::notified;
282
            produce_waiters->m_awaiting_coroutine.resume();
283
            produce_waiters = next;
284
        }
285

286
        co_return;
287
    }
2✔
288

289
    /**
290
     * @brief Wakes up all currently awaiting consumers.  Their await_resume() function
291
     *        will return an expected consume result that consumers have been notified.
292
     */
293
    auto notify_consumers() -> coro::task<void>
1✔
294
    {
295
        auto expected = m_running_state.load(std::memory_order::acquire);
296
        if (expected == running_state_t::stopped)
297
        {
298
            co_return;
299
        }
300

301
        co_await m_mutex.lock();
302
        auto* consume_waiters = m_consume_waiters.exchange(nullptr, std::memory_order::acq_rel);
303
        m_mutex.unlock();
304

305
        while (consume_waiters != nullptr)
306
        {
307
            auto* next = consume_waiters->m_next;
308
            consume_waiters->m_result = ring_buffer_result::consume::notified;
309
            consume_waiters->m_awaiting_coroutine.resume();
310
            consume_waiters = next;
311
        }
312

313
        co_return;
314
    }
2✔
315

316
    /**
317
     * @brief Wakes up all currently awaiting producers and consumers.  Their await_resume() function
318
     *        will return an expected consume result that the ring buffer has stopped.
319
     */
320
    auto shutdown() -> coro::task<void>
18✔
321
    {
322
        // Only wake up waiters once.
323
        auto expected = m_running_state.load(std::memory_order::acquire);
324
        if (expected == running_state_t::stopped)
325
        {
326
            co_return;
327
        }
328

329
        auto lk = co_await m_mutex.scoped_lock();
330
        // Only let one caller do the wake-ups, this can go from running or draining to stopped
331
        if (!m_running_state.compare_exchange_strong(expected, running_state_t::stopped, std::memory_order::acq_rel, std::memory_order::relaxed))
332
        {
333
            co_return;
334
        }
335
        lk.unlock();
336

337
        co_await m_mutex.lock();
338
        auto* produce_waiters = m_produce_waiters.exchange(nullptr, std::memory_order::acq_rel);
339
        auto* consume_waiters = m_consume_waiters.exchange(nullptr, std::memory_order::acq_rel);
340
        m_mutex.unlock();
341

342
        while (produce_waiters != nullptr)
343
        {
344
            auto* next = produce_waiters->m_next;
345
            produce_waiters->m_result = ring_buffer_result::produce::stopped;
346
            produce_waiters->m_awaiting_coroutine.resume();
347
            produce_waiters = next;
348
        }
349

350
        while (consume_waiters != nullptr)
351
        {
352
            auto* next = consume_waiters->m_next;
353
            consume_waiters->m_result = ring_buffer_result::consume::stopped;
354
            consume_waiters->m_awaiting_coroutine.resume();
355
            consume_waiters = next;
356
        }
357

358
        co_return;
359
    }
36✔
360

361
    template<coro::concepts::executor executor_type>
362
    [[nodiscard]] auto shutdown_drain(std::unique_ptr<executor_type>& e) -> coro::task<void>
3✔
363
    {
364
        auto lk = co_await m_mutex.scoped_lock();
365
        // Do not allow any more produces, the state must be in running to drain.
366
        auto expected = running_state_t::running;
367
        if (!m_running_state.compare_exchange_strong(expected, running_state_t::draining, std::memory_order::acq_rel, std::memory_order::relaxed))
368
        {
369
            co_return;
370
        }
371

372
        auto* produce_waiters = m_produce_waiters.exchange(nullptr, std::memory_order::acq_rel);
373
        lk.unlock();
374

375
        while (produce_waiters != nullptr)
376
        {
377
            auto* next = produce_waiters->m_next;
378
            produce_waiters->m_awaiting_coroutine.resume();
379
            produce_waiters = next;
380
        }
381

382
        while (!empty() && m_running_state.load(std::memory_order::acquire) == running_state_t::draining)
383
        {
384
            co_await e->yield();
385
        }
386

387
        co_await shutdown();
388
        co_return;
389
    }
6✔
390

391
    /**
392
     * Returns true if shutdown() or shutdown_drain() have been called on this coro::ring_buffer.
393
     * @return True if the coro::ring_buffer has been shutdown.
394
     */
395
    [[nodiscard]] auto is_shutdown() const -> bool { return m_running_state.load(std::memory_order::acquire) != running_state_t::running; }
2✔
396

397
private:
398
    friend produce_operation;
399
    friend consume_operation;
400

401
    coro::mutex m_mutex{};
402

403
    std::array<std::optional<element>, num_elements> m_elements{};
404
    /// The current front pointer to an open slot if not full.
405
    std::atomic<size_t> m_front{0};
406
    /// The current back pointer to the oldest item in the buffer if not empty.
407
    std::atomic<size_t> m_back{0};
408
    /// The number of items in the ring buffer.
409
    std::atomic<size_t> m_used{0};
410

411
    /// The LIFO list of produce waiters.
412
    std::atomic<produce_operation*> m_produce_waiters{nullptr};
413
    /// The LIFO list of consume watier.
414
    std::atomic<consume_operation*> m_consume_waiters{nullptr};
415

416
    std::atomic<running_state_t> m_running_state{running_state_t::running};
417

418
    auto try_resume_producers() -> coro::task<void>
10,994,712✔
419
    {
420
        while (true)
421
        {
422
            auto lk = co_await m_mutex.scoped_lock();
423
            if (m_used.load(std::memory_order::acquire) < num_elements)
424
            {
425
                auto* op = detail::awaiter_list_pop(m_produce_waiters);
426
                if (op != nullptr)
427
                {
428
                    auto slot = m_front.fetch_add(1, std::memory_order::acq_rel) % num_elements;
429
                    m_elements[slot] = std::move(op->m_e);
430
                    m_used.fetch_add(1, std::memory_order::release);
431

432
                    lk.unlock();
433
                    op->m_awaiting_coroutine.resume();
434
                    continue;
435
                }
436
            }
437
            co_return;
438
        }
439
    }
21,986,951✔
440

441
    auto try_resume_consumers() -> coro::task<void>
10,995,019✔
442
    {
443
        while (true)
444
        {
445
            auto lk = co_await m_mutex.scoped_lock();
446
            if (m_used.load(std::memory_order::acquire) > 0)
447
            {
448
                auto* op = detail::awaiter_list_pop(m_consume_waiters);
449
                if (op != nullptr)
450
                {
451
                    auto slot = m_back.fetch_add(1, std::memory_order::acq_rel) % num_elements;
452
                    op->m_e = std::move(m_elements[slot]);
453
                    m_elements[slot] = std::nullopt;
454
                    m_used.fetch_sub(1, std::memory_order::release);
455
                    lk.unlock();
456

457
                    op->m_awaiting_coroutine.resume();
458
                    continue;
459
                }
460
            }
461
            co_return;
462
        }
463
    }
21,988,905✔
464
};
465

466
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc