• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 15792692670

21 Jun 2025 05:47AM UTC coverage: 88.08%. First build
15792692670

Pull #353

github

web-flow
Merge 8baf1cce4 into c52f4aac8
Pull Request #353: ring_buffer: add max_size, full, notify_producers, and notify_consumers

3 of 4 new or added lines in 1 file covered. (75.0%)

1633 of 1854 relevant lines covered (88.08%)

13715961.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.77
/include/coro/ring_buffer.hpp
1
#pragma once
2

3
#include "coro/expected.hpp"
4
#include "coro/mutex.hpp"
5
#include "coro/task.hpp"
6

7
#include <array>
8
#include <atomic>
9
#include <coroutine>
10
#include <mutex>
11
#include <optional>
12

13
namespace coro
14
{
15
namespace ring_buffer_result
16
{
17
enum class produce
18
{
19
    produced,
20
    notified,
21
    stopped
22
};
23

24
enum class consume
25
{
26
    notified,
27
    stopped
28
};
29
} // namespace ring_buffer_result
30

31
/**
32
 * @tparam element The type of element the ring buffer will store.  Note that this type should be
33
 *         cheap to move if possible as it is moved into and out of the buffer upon produce and
34
 *         consume operations.
35
 * @tparam num_elements The maximum number of elements the ring buffer can store, must be >= 1.
36
 */
37
template<typename element, size_t num_elements>
38
class ring_buffer
39
{
40
private:
41
    enum running_state_t
42
    {
43
        /// @brief The ring buffer is still running.
44
        running,
45
        /// @brief The ring buffer is draining all elements, produce is no longer allowed.
46
        draining,
47
        /// @brief The ring buffer is fully shutdown, all produce and consume tasks will be woken up with result::stopped.
48
        stopped,
49
    };
50

51
public:
52
    /**
53
     * static_assert If `num_elements` == 0.
54
     */
55
    ring_buffer()
6✔
56
    {
6✔
57
        static_assert(num_elements != 0, "num_elements cannot be zero");
58
    }
6✔
59

60
    ~ring_buffer()
6✔
61
    {
62
        // Wake up anyone still using the ring buffer.
63
        coro::sync_wait(shutdown());
6✔
64
    }
6✔
65

66
    ring_buffer(const ring_buffer<element, num_elements>&) = delete;
67
    ring_buffer(ring_buffer<element, num_elements>&&)      = delete;
68

69
    auto operator=(const ring_buffer<element, num_elements>&) noexcept -> ring_buffer<element, num_elements>& = delete;
70
    auto operator=(ring_buffer<element, num_elements>&&) noexcept -> ring_buffer<element, num_elements>&      = delete;
71

72
    struct produce_operation
73
    {
74
        produce_operation(ring_buffer<element, num_elements>& rb, element e)
11,000,013✔
75
            : m_rb(rb),
11,000,013✔
76
              m_e(std::move(e))
11,000,013✔
77
        {}
11,000,013✔
78

79
        auto await_ready() noexcept -> bool
11,000,013✔
80
        {
81
            auto& mutex = m_rb.m_mutex;
11,000,013✔
82

83
            // Produce operations can only proceed if running.
84
            if (m_rb.m_running_state.load(std::memory_order::acquire) != running_state_t::running)
11,000,013✔
85
            {
NEW
86
                m_result = ring_buffer_result::produce::stopped;
×
87
                mutex.unlock();
×
88
                return true; // Will be awoken with produce::stopped
×
89
            }
90

91
            if (m_rb.m_used.load(std::memory_order::acquire) < num_elements)
22,000,026✔
92
            {
93
                // There is guaranteed space to store
94
                auto slot = m_rb.m_front.fetch_add(1, std::memory_order::acq_rel) % num_elements;
7,041,318✔
95
                m_rb.m_elements[slot] = std::move(m_e);
7,041,318✔
96
                m_rb.m_used.fetch_add(1, std::memory_order::release);
7,041,318✔
97
                mutex.unlock();
7,041,318✔
98
                return true; // Will be awoken with produce::produced
7,041,314✔
99
            }
100

101
            return false; // ring buffer full, suspend
3,958,695✔
102
        }
103

104
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
3,958,695✔
105
        {
106
            m_awaiting_coroutine = awaiting_coroutine;
3,958,695✔
107
            m_next = m_rb.m_produce_waiters.exchange(this, std::memory_order::acq_rel);
3,958,695✔
108
            m_rb.m_mutex.unlock();
3,958,695✔
109
            return true;
3,958,590✔
110
        }
111

112
        /**
113
         * @return produce_result
114
         */
115
        auto await_resume() -> ring_buffer_result::produce
10,999,704✔
116
        {
117
            return m_result;
10,999,704✔
118
        }
119

120
        /// If the operation needs to suspend, the coroutine to resume when the element can be produced.
121
        std::coroutine_handle<> m_awaiting_coroutine;
122
        /// The result that should be returned when this coroutine resumes.
123
        ring_buffer_result::produce m_result{ring_buffer_result::produce::produced};
124
        /// Linked list of produce operations that are awaiting to produce their element.
125
        produce_operation* m_next{nullptr};
126

127
    private:
128
        template<typename element_subtype, size_t num_elements_subtype>
129
        friend class ring_buffer;
130

131
        /// The ring buffer the element is being produced into.
132
        ring_buffer<element, num_elements>& m_rb;
133
        /// The element this produce operation is producing into the ring buffer.
134
        std::optional<element> m_e{std::nullopt};
135
    };
136

137
    struct consume_operation
138
    {
139
        explicit consume_operation(ring_buffer<element, num_elements>& rb)
11,000,014✔
140
            : m_rb(rb)
11,000,014✔
141
        {}
11,000,014✔
142

143
        auto await_ready() noexcept -> bool
11,000,014✔
144
        {
145
            auto& mutex = m_rb.m_mutex;
11,000,014✔
146

147
            // Consume operations proceed until stopped.
148
            if (m_rb.m_running_state.load(std::memory_order::acquire) == running_state_t::stopped)
11,000,014✔
149
            {
150
                m_result = ring_buffer_result::consume::stopped;
1✔
151
                mutex.unlock();
1✔
152
                return true;
1✔
153
            }
154

155
            if (m_rb.m_used.load(std::memory_order::acquire) > 0)
22,000,026✔
156
            {
157
                auto slot = m_rb.m_back.fetch_add(1, std::memory_order::acq_rel) % num_elements;
9,687,812✔
158
                m_e = std::move(m_rb.m_elements[slot]);
9,687,812✔
159
                m_rb.m_elements[slot] = std::nullopt;
9,687,812✔
160
                m_rb.m_used.fetch_sub(1, std::memory_order::release);
9,687,812✔
161
                mutex.unlock();
9,687,812✔
162
                return true;
9,687,712✔
163
            }
164

165
            return false; // ring buffer is empty, suspend.
1,312,201✔
166
        }
167

168
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
1,312,201✔
169
        {
170
            m_awaiting_coroutine = awaiting_coroutine;
1,312,201✔
171
            m_next = m_rb.m_consume_waiters.exchange(this, std::memory_order::acq_rel);
1,312,201✔
172
            m_rb.m_mutex.unlock();
1,312,201✔
173
            return true;
1,312,201✔
174
        }
175

176
        /**
177
         * @return The consumed element or ring_buffer_stopped if the ring buffer has been shutdown.
178
         */
179
        auto await_resume() -> expected<element, ring_buffer_result::consume>
10,999,823✔
180
        {
181
            if (m_e.has_value())
10,999,823✔
182
            {
183
                return expected<element, ring_buffer_result::consume>(std::move(m_e).value());
10,999,759✔
184
            }
185
            else // state is stopped
186
            {
187
                return unexpected<ring_buffer_result::consume>(m_result);
1✔
188
            }
189
        }
190

191
        /// If the operation needs to suspend, the coroutine to resume when the element can be consumed.
192
        std::coroutine_handle<> m_awaiting_coroutine;
193
        /// The unexpected result this should return on resume
194
        ring_buffer_result::consume m_result{ring_buffer_result::consume::stopped};
195
        /// Linked list of consume operations that are awaiting to consume an element.
196
        consume_operation* m_next{nullptr};
197

198
    private:
199
        template<typename element_subtype, size_t num_elements_subtype>
200
        friend class ring_buffer;
201

202
        /// The ring buffer to consume an element from.
203
        ring_buffer<element, num_elements>& m_rb;
204
        /// The element this consume operation will consume.
205
        std::optional<element> m_e{std::nullopt};
206
    };
207

208
    /**
209
     * Produces the given element into the ring buffer.  This operation will suspend until a slot
210
     * in the ring buffer becomes available.
211
     * @param e The element to produce.
212
     */
213
    [[nodiscard]] auto produce(element e) -> coro::task<ring_buffer_result::produce>
10,993,465✔
214
    {
215
        co_await m_mutex.lock();
216
        auto result = co_await produce_operation{*this, std::move(e)};
217
        co_await try_resume_consumers();
218
        co_return result;
219
    }
21,983,941✔
220

221
    /**
222
     * Consumes an element from the ring buffer.  This operation will suspend until an element in
223
     * the ring buffer becomes available.
224
     */
225
    [[nodiscard]] auto consume() -> coro::task<expected<element, ring_buffer_result::consume>>
10,999,078✔
226
    {
227
        co_await m_mutex.lock();
228
        auto result = co_await consume_operation{*this};
229
        co_await try_resume_producers();
230
        co_return result;
231
    }
21,998,215✔
232

233
    /**
234
     * @return The maximum number of elements the ring buffer can hold.
235
     */
236
    constexpr auto max_size() const noexcept -> size_t
237
    {
238
        return num_elements;
239
    }
240

241
    /**
242
     * @return The current number of elements contained in the ring buffer.
243
     */
244
    auto size() const -> size_t
59✔
245
    {
246
        return m_used.load(std::memory_order::acquire);
118✔
247
    }
248

249
    /**
250
     * @return True if the ring buffer contains zero elements.
251
     */
252
    auto empty() const -> bool { return size() == 0; }
59✔
253

254
    /**
255
     * @return True if the ring buffer has no more space.
256
     */
257
    auto full() const -> bool { return size() == max_size(); }
258

259
    /**
260
     * @brief Wakes up all currently awaiting producers.  Their await_resume() function
261
     *        will return an expected produce result that producers have been notified.
262
     */
263
    auto notify_producers() -> coro::task<void>
264
    {
265
        auto expected = m_running_state.load(std::memory_order::acquire);
266
        if (expected == running_state_t::stopped)
267
        {
268
            co_return;
269
        }
270

271
        co_await m_mutex.lock();
272
        auto* produce_waiters = m_produce_waiters.exchange(nullptr, std::memory_order::acq_rel);
273
        m_mutex.unlock();
274

275
        while (produce_waiters != nullptr)
276
        {
277
            auto* next = produce_waiters->m_next;
278
            produce_waiters->m_result = ring_buffer_result::produce::notified;
279
            produce_waiters->m_awaiting_coroutine.resume();
280
            produce_waiters = next;
281
        }
282

283
        co_return;
284
    }
285

286
    /**
287
     * @brief Wakes up all currently awaiting consumers.  Their await_resume() function
288
     *        will return an expected consume result that consumers have been notified.
289
     */
290
    auto notify_consumers() -> coro::task<void>
291
    {
292
        auto expected = m_running_state.load(std::memory_order::acquire);
293
        if (expected == running_state_t::stopped)
294
        {
295
            co_return;
296
        }
297

298
        co_await m_mutex.lock();
299
        auto* consume_waiters = m_consume_waiters.exchange(nullptr, std::memory_order::acq_rel);
300
        m_mutex.unlock();
301

302
        while (consume_waiters != nullptr)
303
        {
304
            auto* next = consume_waiters->m_next;
305
            consume_waiters->m_result = ring_buffer_result::consume::notified;
306
            consume_waiters->m_awaiting_coroutine.resume();
307
            consume_waiters = next;
308
        }
309

310
        co_return;
311
    }
312

313
    /**
314
     * @brief Wakes up all currently awaiting producers and consumers.  Their await_resume() function
315
     *        will return an expected consume result that the ring buffer has stopped.
316
     */
317
    auto shutdown() -> coro::task<void>
8✔
318
    {
319
        // Only wake up waiters once.
320
        auto expected = m_running_state.load(std::memory_order::acquire);
321
        if (expected == running_state_t::stopped)
322
        {
323
            co_return;
324
        }
325

326
        auto lk = co_await m_mutex.scoped_lock();
327
        // Only let one caller do the wake-ups, this can go from running or draining to stopped
328
        if (!m_running_state.compare_exchange_strong(expected, running_state_t::stopped, std::memory_order::acq_rel, std::memory_order::relaxed))
329
        {
330
            co_return;
331
        }
332
        lk.unlock();
333

334
        co_await m_mutex.lock();
335
        auto* produce_waiters = m_produce_waiters.exchange(nullptr, std::memory_order::acq_rel);
336
        auto* consume_waiters = m_consume_waiters.exchange(nullptr, std::memory_order::acq_rel);
337
        m_mutex.unlock();
338

339
        while (produce_waiters != nullptr)
340
        {
341
            auto* next = produce_waiters->m_next;
342
            produce_waiters->m_result = ring_buffer_result::produce::stopped;
343
            produce_waiters->m_awaiting_coroutine.resume();
344
            produce_waiters = next;
345
        }
346

347
        while (consume_waiters != nullptr)
348
        {
349
            auto* next = consume_waiters->m_next;
350
            consume_waiters->m_result = ring_buffer_result::consume::stopped;
351
            consume_waiters->m_awaiting_coroutine.resume();
352
            consume_waiters = next;
353
        }
354

355
        co_return;
356
    }
16✔
357

358
    template<coro::concepts::executor executor_type>
359
    [[nodiscard]] auto shutdown_drain(std::shared_ptr<executor_type> e) -> coro::task<void>
2✔
360
    {
361
        auto lk = co_await m_mutex.scoped_lock();
362
        // Do not allow any more produces, the state must be in running to drain.
363
        auto expected = running_state_t::running;
364
        if (!m_running_state.compare_exchange_strong(expected, running_state_t::draining, std::memory_order::acq_rel, std::memory_order::relaxed))
365
        {
366
            co_return;
367
        }
368

369
        auto* produce_waiters = m_produce_waiters.exchange(nullptr, std::memory_order::acq_rel);
370
        lk.unlock();
371

372
        while (produce_waiters != nullptr)
373
        {
374
            auto* next = produce_waiters->m_next;
375
            produce_waiters->m_awaiting_coroutine.resume();
376
            produce_waiters = next;
377
        }
378

379
        while (!empty())
380
        {
381
            co_await e->yield();
382
        }
383

384
        co_await shutdown();
385
        co_return;
386
    }
4✔
387

388
private:
389
    friend produce_operation;
390
    friend consume_operation;
391

392
    coro::mutex m_mutex{};
393

394
    std::array<std::optional<element>, num_elements> m_elements{};
395
    /// The current front pointer to an open slot if not full.
396
    std::atomic<size_t> m_front{0};
397
    /// The current back pointer to the oldest item in the buffer if not empty.
398
    std::atomic<size_t> m_back{0};
399
    /// The number of items in the ring buffer.
400
    std::atomic<size_t> m_used{0};
401

402
    /// The LIFO list of produce waiters.
403
    std::atomic<produce_operation*> m_produce_waiters{nullptr};
404
    /// The LIFO list of consume watier.
405
    std::atomic<consume_operation*> m_consume_waiters{nullptr};
406

407
    std::atomic<running_state_t> m_running_state{running_state_t::running};
408

409
    auto try_resume_producers() -> coro::task<void>
10,999,323✔
410
    {
411
        while (true)
412
        {
413
            auto lk = co_await m_mutex.scoped_lock();
414
            if (m_used.load(std::memory_order::acquire) < num_elements)
415
            {
416
                auto* op = detail::awaiter_list_pop(m_produce_waiters);
417
                if (op != nullptr)
418
                {
419
                    auto slot = m_front.fetch_add(1, std::memory_order::acq_rel) % num_elements;
420
                    m_elements[slot] = std::move(op->m_e);
421
                    m_used.fetch_add(1, std::memory_order::release);
422

423
                    lk.unlock();
424
                    op->m_awaiting_coroutine.resume();
425
                    continue;
426
                }
427
            }
428
            co_return;
429
        }
430
    }
21,997,745✔
431

432
    auto try_resume_consumers() -> coro::task<void>
10,999,258✔
433
    {
434
        while (true)
435
        {
436
            auto lk = co_await m_mutex.scoped_lock();
437
            if (m_used.load(std::memory_order::acquire) > 0)
438
            {
439
                auto* op = detail::awaiter_list_pop(m_consume_waiters);
440
                if (op != nullptr)
441
                {
442
                    auto slot = m_back.fetch_add(1, std::memory_order::acq_rel) % num_elements;
443
                    op->m_e = std::move(m_elements[slot]);
444
                    m_elements[slot] = std::nullopt;
445
                    m_used.fetch_sub(1, std::memory_order::release);
446
                    lk.unlock();
447

448
                    op->m_awaiting_coroutine.resume();
449
                    continue;
450
                }
451
            }
452
            co_return;
453
        }
454
    }
21,998,458✔
455
};
456

457
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc