• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 15455559001

05 Jun 2025 12:27AM UTC coverage: 87.595%. First build
15455559001

Pull #336

github

web-flow
Merge 02c038965 into f0cccaaf4
Pull Request #336: coro::ring_buffer use coro::mutex instead of std::mutex

106 of 120 new or added lines in 5 files covered. (88.33%)

1610 of 1838 relevant lines covered (87.6%)

5644127.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.48
/include/coro/ring_buffer.hpp
1
#pragma once
2

3
#include "coro/expected.hpp"
4
#include "coro/mutex.hpp"
5
#include "coro/task.hpp"
6

7
#include <array>
8
#include <atomic>
9
#include <coroutine>
10
#include <mutex>
11
#include <optional>
12

13
#include <iostream>
14

15
namespace coro
16
{
17
namespace ring_buffer_result
18
{
19
enum class produce
20
{
21
    produced,
22
    stopped
23
};
24

25
enum class consume
26
{
27
    stopped
28
};
29
} // namespace ring_buffer_result
30

31
/**
32
 * @tparam element The type of element the ring buffer will store.  Note that this type should be
33
 *         cheap to move if possible as it is moved into and out of the buffer upon produce and
34
 *         consume operations.
35
 * @tparam num_elements The maximum number of elements the ring buffer can store, must be >= 1.
36
 */
37
template<typename element, size_t num_elements>
38
class ring_buffer
39
{
40
private:
41
    enum running_state_t
42
    {
43
        /// @brief The ring buffer is still running.
44
        running,
45
        /// @brief The ring buffer is draining all elements, produce is no longer allowed.
46
        draining,
47
        /// @brief The ring buffer is fully shutdown, all produce and consume tasks will be woken up with result::stopped.
48
        stopped,
49
    };
50

51
public:
52
    /**
53
     * static_assert If `num_elements` == 0.
54
     */
55
    ring_buffer()
6✔
56
    {
6✔
57
        static_assert(num_elements != 0, "num_elements cannot be zero");
58

59
        for (auto& e : m_elements)
76✔
60
        {
61
            if (e.has_value())
70✔
62
            {
NEW
63
                std::cerr << "not nullopt\n";
×
64
            }
65
        }
66
    }
6✔
67

68
    ~ring_buffer()
6✔
69
    {
70
        // Wake up anyone still using the ring buffer.
71
        coro::sync_wait(shutdown());
6✔
72
        std::cerr << "~ring_buffer() exit\n";
6✔
73
    }
6✔
74

75
    ring_buffer(const ring_buffer<element, num_elements>&) = delete;
76
    ring_buffer(ring_buffer<element, num_elements>&&)      = delete;
77

78
    auto operator=(const ring_buffer<element, num_elements>&) noexcept -> ring_buffer<element, num_elements>& = delete;
79
    auto operator=(ring_buffer<element, num_elements>&&) noexcept -> ring_buffer<element, num_elements>&      = delete;
80

81
    struct consume_operation;
82

83
    struct produce_operation
84
    {
85
        produce_operation(ring_buffer<element, num_elements>& rb, element e)
11,000,013✔
86
            : m_rb(rb),
11,000,013✔
87
              m_e(std::move(e))
11,000,013✔
88
        {}
11,000,013✔
89

90
        auto await_ready() noexcept -> bool
11,000,013✔
91
        {
92
            auto& mutex = m_rb.m_mutex;
11,000,013✔
93

94
            // Produce operations can only proceed if running.
95
            if (m_rb.m_running_state.load(std::memory_order::acquire) != running_state_t::running)
11,000,013✔
96
            {
NEW
97
                mutex.unlock();
×
98
                std::atomic_thread_fence(std::memory_order::acq_rel);
NEW
99
                return true; // Will be awoken with produce::stopped
×
100
            }
101

102
            if (m_rb.m_used.load(std::memory_order::acquire) < num_elements)
22,000,026✔
103
            {
104
                // There is guaranteed space to store
105
                auto slot = m_rb.m_front.fetch_add(1, std::memory_order::acq_rel) % num_elements;
7,959,012✔
106
                if (m_rb.m_elements[slot].has_value())
7,959,012✔
107
                {
NEW
108
                    std::cerr << "[ERROR] produce_operation slot=[" << slot << "] has a value already\n";
×
109
                }
110

111
                m_rb.m_elements[slot] = std::move(m_e);
7,959,012✔
112
                m_rb.m_used.fetch_add(1, std::memory_order::release);
7,959,012✔
113
                mutex.unlock();
7,959,012✔
114
                std::atomic_thread_fence(std::memory_order::acq_rel);
115
                return true; // Will be awoken with produce::produced
7,958,854✔
116
            }
117

118
            return false; // ring buffer full, suspend
3,041,001✔
119

120
            // auto* op = m_rb.m_consume_waiters.load(std::memory_order::acquire);
121
            // if (op != nullptr)
122
            // {
123
            //     // Advance the consume operation waiters.
124
            //     m_rb.m_consume_waiters.store(op->m_next, std::memory_order::release);
125

126
            //     // Store the back element into the consumer that will be resumed.
127
            //     auto slot = m_rb.m_back.fetch_add(1, std::memory_order::acq_rel) % num_elements;
128
            //     op->m_e = std::move(m_rb.m_elements[slot]);
129
            //     m_rb.m_used.fetch_sub(1, std::memory_order::release);
130

131
            //     mutex.unlock();
132
            //     op->m_awaiting_coroutine.resume();
133
            // }
134
            // else
135
            // {
136
            //     mutex.unlock();
137
            // }
138

139
            // return true;
140
        }
141

142
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
3,041,001✔
143
        {
144
            m_awaiting_coroutine   = awaiting_coroutine;
3,041,001✔
145
            m_next = m_rb.m_produce_waiters.exchange(this, std::memory_order::acq_rel);
3,041,001✔
146
            m_rb.m_mutex.unlock();
3,041,001✔
147
            std::atomic_thread_fence(std::memory_order::acq_rel);
148
            return true;
3,040,578✔
149
        }
150

151
        /**
152
         * @return produce_result
153
         */
154
        auto await_resume() -> ring_buffer_result::produce
10,996,475✔
155
        {
156
            return m_rb.m_running_state.load(std::memory_order::acquire) == running_state_t::running
10,996,475✔
157
                    ? ring_buffer_result::produce::produced
10,996,397✔
158
                    : ring_buffer_result::produce::stopped;
10,996,397✔
159
        }
160

161
        /// If the operation needs to suspend, the coroutine to resume when the element can be produced.
162
        std::coroutine_handle<> m_awaiting_coroutine;
163
        /// Linked list of produce operations that are awaiting to produce their element.
164
        produce_operation* m_next{nullptr};
165

166
    private:
167
        template<typename element_subtype, size_t num_elements_subtype>
168
        friend class ring_buffer;
169

170
        /// The ring buffer the element is being produced into.
171
        ring_buffer<element, num_elements>& m_rb;
172
        /// The element this produce operation is producing into the ring buffer.
173
        std::optional<element> m_e{std::nullopt};
174
    };
175

176
    struct consume_operation
177
    {
178
        explicit consume_operation(ring_buffer<element, num_elements>& rb)
11,000,014✔
179
            : m_rb(rb)
11,000,014✔
180
        {}
11,000,014✔
181

182
        auto await_ready() noexcept -> bool
11,000,014✔
183
        {
184
            m_rb.internal_consume_await_ready++;
11,000,014✔
185
            auto& mutex = m_rb.m_mutex;
11,000,014✔
186

187
            // Consume operations proceed until stopped.
188
            if (m_rb.m_running_state.load(std::memory_order::acquire) == running_state_t::stopped)
11,000,014✔
189
            {
NEW
190
                mutex.unlock();
×
191
                std::atomic_thread_fence(std::memory_order::acq_rel);
NEW
192
                return true;
×
193
            }
194

195
            if (m_rb.m_used.load(std::memory_order::acquire) > 0)
22,000,028✔
196
            {
197
                auto slot = m_rb.m_back.fetch_add(1, std::memory_order::acq_rel) % num_elements;
8,744,648✔
198
                if (!m_rb.m_elements[slot].has_value())
8,744,648✔
199
                {
NEW
200
                    std::cerr << "[ERROR] consume_operation slot=[" << slot << "] doesn't have a value\n";
×
201
                }
202

203
                m_e = std::move(m_rb.m_elements[slot]);
8,744,648✔
204
                m_rb.m_elements[slot] = std::nullopt;
8,744,648✔
205
                m_rb.m_used.fetch_sub(1, std::memory_order::release);
8,744,648✔
206
                mutex.unlock();
8,744,648✔
207
                std::atomic_thread_fence(std::memory_order::acq_rel);
208
                return true;
8,743,045✔
209
            }
210

211
            return false; // ring buffer is empty, suspend.
2,255,366✔
212

213
            // auto* op = m_rb.m_produce_waiters.load(std::memory_order::acquire);
214
            // if (op != nullptr)
215
            // {
216
            //     m_rb.m_produce_waiters.store(op->m_next, std::memory_order::release);
217

218
            //     auto slot = m_rb.m_front.fetch_add(1, std::memory_order::acq_rel) % num_elements;
219
            //     m_rb.m_elements[slot] = std::move(op->m_e);
220
            //     m_rb.m_used.fetch_add(1, std::memory_order::release);
221

222
            //     mutex.unlock();
223
            //     op->m_awaiting_coroutine.resume();
224
            // }
225
            // else
226
            // {
227
            //     mutex.unlock();
228
            // }
229

230
            // return true;
231
        }
232

233
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
2,255,366✔
234
        {
235
            m_rb.internal_consume_await_suspend++;
2,255,366✔
236
            m_awaiting_coroutine   = awaiting_coroutine;
2,255,366✔
237
            m_next = m_rb.m_consume_waiters.exchange(this, std::memory_order::acq_rel);
2,255,366✔
238
            m_rb.m_mutex.unlock();
2,255,366✔
239
            std::atomic_thread_fence(std::memory_order::acq_rel);
240
            return true;
2,255,339✔
241
        }
242

243
        /**
244
         * @return The consumed element or ring_buffer_stopped if the ring buffer has been shutdown.
245
         */
246
        auto await_resume() -> expected<element, ring_buffer_result::consume>
10,997,738✔
247
        {
248
            m_rb.internal_consume_await_resume++;
10,997,738✔
249
            if (m_rb.m_running_state.load(std::memory_order::acquire) == running_state_t::stopped)
10,998,660✔
250
            {
251
                if (m_rb.m_used != 0)
1✔
252
                {
NEW
253
                    std::cerr << "consume_operation return unexpected STOPPED\n";
×
254
                }
255
                return unexpected<ring_buffer_result::consume>(ring_buffer_result::consume::stopped);
1✔
256
            }
257

258
            return expected<element, ring_buffer_result::consume>(std::move(m_e).value());
10,996,931✔
259
        }
260

261
        /// If the operation needs to suspend, the coroutine to resume when the element can be consumed.
262
        std::coroutine_handle<> m_awaiting_coroutine;
263
        /// Linked list of consume operations that are awaiting to consume an element.
264
        consume_operation* m_next{nullptr};
265

266
    private:
267
        template<typename element_subtype, size_t num_elements_subtype>
268
        friend class ring_buffer;
269

270
        /// The ring buffer to consume an element from.
271
        ring_buffer<element, num_elements>& m_rb;
272
        /// The element this consume operation will consume.
273
        std::optional<element> m_e{std::nullopt};
274
    };
275

276
    std::atomic<uint64_t> internal_consume{0};
277
    std::atomic<uint64_t> internal_consume_await_ready{0};
278
    std::atomic<uint64_t> internal_consume_await_suspend{0};
279
    std::atomic<uint64_t> internal_consume_await_resume{0};
280

281
    auto try_resume_producer() -> coro::task<void>
10,994,547✔
282
    {
283
again:
284
        auto lk = co_await m_mutex.scoped_lock();
285
        std::atomic_thread_fence(std::memory_order::acq_rel);
286
        if (m_used.load(std::memory_order::acquire) < num_elements)
287
        {
288
            auto* op = detail::awaiter_list_pop(m_produce_waiters);
289
            if (op != nullptr)
290
            {
291
                auto slot = m_front.fetch_add(1, std::memory_order::acq_rel) % num_elements;
292
                if (m_elements[slot].has_value())
293
                {
294
                    std::cerr << "[ERROR] try_resume_producer slot=[" << slot << "] has a value already\n";
295
                }
296

297
                m_elements[slot] = std::move(op->m_e);
298
                m_used.fetch_add(1, std::memory_order::release);
299

300
                lk.unlock();
301
                std::atomic_thread_fence(std::memory_order::acq_rel);
302
                op->m_awaiting_coroutine.resume();
303
                goto again;
304
            }
305
        }
306
    }
21,986,098✔
307

308
    auto try_resume_consumer() -> coro::task<void>
10,995,250✔
309
    {
310
again:
311
        auto lk = co_await m_mutex.scoped_lock();
312
        std::atomic_thread_fence(std::memory_order::acq_rel);
313
        if (m_used.load(std::memory_order::acquire) > 0)
314
        {
315
            auto* op = detail::awaiter_list_pop(m_consume_waiters);
316
            if (op != nullptr)
317
            {
318
                auto slot = m_back.fetch_add(1, std::memory_order::acq_rel) % num_elements;
319
                if (!m_elements[slot].has_value())
320
                {
321
                    std::cerr << "[ERROR] try_resume_consumer slot=[" << slot << "] doesn't have a value\n";
322
                }
323

324
                op->m_e = std::move(m_elements[slot]);
325
                m_elements[slot] = std::nullopt;
326
                m_used.fetch_sub(1, std::memory_order::release);
327

328
                lk.unlock();
329
                std::atomic_thread_fence(std::memory_order::acq_rel);
330
                op->m_awaiting_coroutine.resume();
331
                goto again;
332
            }
333
        }
334

335
    }
21,988,439✔
336

337
    /**
338
     * Produces the given element into the ring buffer.  This operation will suspend until a slot
339
     * in the ring buffer becomes available.
340
     * @param e The element to produce.
341
     */
342
    [[nodiscard]] auto produce(element e) -> coro::task<ring_buffer_result::produce>
10,984,234✔
343
    {
344
        // co_await try_resume_consumer();
345
        co_await m_mutex.lock();
346
        std::atomic_thread_fence(std::memory_order::acq_rel);
347
        auto result = co_await produce_operation{*this, std::move(e)};
348
        co_await try_resume_consumer();
349
        co_return result;
350
    }
21,960,712✔
351

352
    /**
353
     * Consumes an element from the ring buffer.  This operation will suspend until an element in
354
     * the ring buffer becomes available.
355
     */
356
    [[nodiscard]] auto consume() -> coro::task<expected<element, ring_buffer_result::consume>>
10,997,357✔
357
    {
358
        internal_consume++;
359
        // co_await try_resume_producer();
360
        co_await m_mutex.lock();
361
        std::atomic_thread_fence(std::memory_order::acq_rel);
362
        auto result = co_await consume_operation{*this};
363
        co_await try_resume_producer();
364
        co_return result;
365
    }
21,994,600✔
366

367
    /**
368
     * @return The current number of elements contained in the ring buffer.
369
     */
370
    auto size() const -> size_t
997✔
371
    {
372
        return m_used.load(std::memory_order::acquire);
1,994✔
373
    }
374

375
    /**
376
     * @return True if the ring buffer contains zero elements.
377
     */
378
    auto empty() const -> bool { return size() == 0; }
501✔
379

380
    /**
381
     * @brief Wakes up all currently awaiting producers and consumers.  Their await_resume() function
382
     *        will return an expected consume result that the ring buffer has stopped.
383
     */
384
    auto shutdown() -> coro::task<void>
8✔
385
    {
386
        std::cerr << "ring_buffer::shutdown()\n";
387
        // Only wake up waiters once.
388
        auto expected = m_running_state.load(std::memory_order::acquire);
389
        if (expected == running_state_t::stopped)
390
        {
391
            std::cerr << "ring_buffer::shutdown() already stopped returning\n";
392
            co_return;
393
        }
394

395
        std::cerr << "ring_buffer::shutdown() acquire lock\n";
396
        auto lk = co_await m_mutex.scoped_lock();
397
        // Only let one caller do the wake-ups, this can go from running or draining to stopped
398
        if (!m_running_state.compare_exchange_strong(expected, running_state_t::stopped, std::memory_order::acq_rel, std::memory_order::relaxed))
399
        {
400
            co_return;
401
        }
402
        lk.unlock();
403

404
        co_await m_mutex.lock();
405
        std::cerr << "ring_buffer::shutdown() acquire m_produce waiters and m_consume_waiters\n";
406
        auto* produce_waiters = m_produce_waiters.exchange(nullptr, std::memory_order::acq_rel);
407
        auto* consume_waiters = m_consume_waiters.exchange(nullptr, std::memory_order::acq_rel);
408
        m_mutex.unlock();
409

410
        while (produce_waiters != nullptr)
411
        {
412
            auto* next = produce_waiters->m_next;
413
            std::cerr << "ring_buffer::shutdown() produce_waiters->resume()\n";
414
            produce_waiters->m_awaiting_coroutine.resume();
415
            produce_waiters = next;
416
        }
417

418
        while (consume_waiters != nullptr)
419
        {
420
            auto* next = consume_waiters->m_next;
421
            std::cerr << "ring_buffer::shutdown() consume_waiters->resume()\n";
422
            consume_waiters->m_awaiting_coroutine.resume();
423
            consume_waiters = next;
424
        }
425

426
        std::cerr << "ring_buffer::shutdown() co_return\n";
427
        co_return;
428
    }
16✔
429

430
    template<coro::concepts::executor executor_t>
431
    [[nodiscard]] auto shutdown_drain(executor_t& e) -> coro::task<void>
2✔
432
    {
433
        std::cerr << "ring_buffer::shutdown_drain()\n";
434
        auto lk = co_await m_mutex.scoped_lock();
435
        // Do not allow any more produces, the state must be in running to drain.
436
        auto expected = running_state_t::running;
437
        if (!m_running_state.compare_exchange_strong(expected, running_state_t::draining, std::memory_order::acq_rel, std::memory_order::relaxed))
438
        {
439
            co_return;
440
        }
441

442
        std::cerr << "ring_buffer::shutdown_drain() exchange m_produce_waiters\n";
443
        auto* produce_waiters = m_produce_waiters.exchange(nullptr, std::memory_order::acq_rel);
444

445
        for (auto& e : m_elements)
446
        {
447
            if (e.has_value())
448
            {
449
                std::cerr << "ring_buffer::shutdown_drain() e.has_value()\n";
450
            }
451
        }
452

453
        lk.unlock();
454

455
        while (produce_waiters != nullptr)
456
        {
457
            auto* next = produce_waiters->m_next;
458
            std::cerr << "ring_buffer::shutdown_drain() produce_waiters->resume()\n";
459
            produce_waiters->m_awaiting_coroutine.resume();
460
            produce_waiters = next;
461
        }
462

463
        std::cerr << "ring_buffer::shutdown_drain() while(!empty())\n";
464
        while (!empty())
465
        {
466
            co_await e.yield();
467
        }
468

469
        co_await shutdown();
470
        std::cerr << "ring_buffer::shutdown_drain() co_return\n";
471
        co_return;
472
    }
4✔
473

474
// private:
475
    friend produce_operation;
476
    friend consume_operation;
477

478
    coro::mutex m_mutex{};
479

480
    std::array<std::optional<element>, num_elements> m_elements{};
481
    /// The current front pointer to an open slot if not full.
482
    std::atomic<size_t> m_front{0};
483
    /// The current back pointer to the oldest item in the buffer if not empty.
484
    std::atomic<size_t> m_back{0};
485
    /// The number of items in the ring buffer.
486
    std::atomic<size_t> m_used{0};
487

488
    /// The LIFO list of produce waiters.
489
    std::atomic<produce_operation*> m_produce_waiters{nullptr};
490
    /// The LIFO list of consume watier.
491
    std::atomic<consume_operation*> m_consume_waiters{nullptr};
492

493
    std::atomic<running_state_t> m_running_state{running_state_t::running};
494
};
495

496
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc