• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 15456023351

05 Jun 2025 01:06AM UTC coverage: 87.773%. First build
15456023351

Pull #336

github

web-flow
Merge cc71e2c9e into f0cccaaf4
Pull Request #336: coro::ring_buffer use coro::mutex instead of std::mutex

104 of 115 new or added lines in 5 files covered. (90.43%)

1608 of 1832 relevant lines covered (87.77%)

5664906.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.87
/include/coro/ring_buffer.hpp
1
#pragma once
2

3
#include "coro/expected.hpp"
4
#include "coro/mutex.hpp"
5
#include "coro/task.hpp"
6

7
#include <array>
8
#include <atomic>
9
#include <coroutine>
10
#include <mutex>
11
#include <optional>
12

13
#include <iostream>
14

15
namespace coro
16
{
17
namespace ring_buffer_result
18
{
19
enum class produce
20
{
21
    produced,
22
    stopped
23
};
24

25
enum class consume
26
{
27
    stopped
28
};
29
} // namespace ring_buffer_result
30

31
/**
32
 * @tparam element The type of element the ring buffer will store.  Note that this type should be
33
 *         cheap to move if possible as it is moved into and out of the buffer upon produce and
34
 *         consume operations.
35
 * @tparam num_elements The maximum number of elements the ring buffer can store, must be >= 1.
36
 */
37
template<typename element, size_t num_elements>
38
class ring_buffer
39
{
40
private:
41
    enum running_state_t
42
    {
43
        /// @brief The ring buffer is still running.
44
        running,
45
        /// @brief The ring buffer is draining all elements, produce is no longer allowed.
46
        draining,
47
        /// @brief The ring buffer is fully shutdown, all produce and consume tasks will be woken up with result::stopped.
48
        stopped,
49
    };
50

51
public:
52
    /**
53
     * static_assert If `num_elements` == 0.
54
     */
55
    ring_buffer()
6✔
56
    {
6✔
57
        static_assert(num_elements != 0, "num_elements cannot be zero");
58
    }
6✔
59

60
    ~ring_buffer()
6✔
61
    {
62
        // Wake up anyone still using the ring buffer.
63
        coro::sync_wait(shutdown());
6✔
64
    }
6✔
65

66
    ring_buffer(const ring_buffer<element, num_elements>&) = delete;
67
    ring_buffer(ring_buffer<element, num_elements>&&)      = delete;
68

69
    auto operator=(const ring_buffer<element, num_elements>&) noexcept -> ring_buffer<element, num_elements>& = delete;
70
    auto operator=(ring_buffer<element, num_elements>&&) noexcept -> ring_buffer<element, num_elements>&      = delete;
71

72
    struct consume_operation;
73

74
    struct produce_operation
75
    {
76
        produce_operation(ring_buffer<element, num_elements>& rb, element e)
11,000,013✔
77
            : m_rb(rb),
11,000,013✔
78
              m_e(std::move(e))
11,000,013✔
79
        {}
11,000,013✔
80

81
        auto await_ready() noexcept -> bool
11,000,013✔
82
        {
83
            auto& mutex = m_rb.m_mutex;
11,000,013✔
84

85
            // Produce operations can only proceed if running.
86
            if (m_rb.m_running_state.load(std::memory_order::acquire) != running_state_t::running)
11,000,013✔
87
            {
NEW
88
                mutex.unlock();
×
89
                std::atomic_thread_fence(std::memory_order::acq_rel);
NEW
90
                return true; // Will be awoken with produce::stopped
×
91
            }
92

93
            if (m_rb.m_used.load(std::memory_order::acquire) < num_elements)
22,000,026✔
94
            {
95
                // There is guaranteed space to store
96
                auto slot = m_rb.m_front.fetch_add(1, std::memory_order::acq_rel) % num_elements;
7,932,340✔
97
                if (m_rb.m_elements[slot].has_value())
7,932,340✔
98
                {
NEW
99
                    std::cerr << "[ERROR] produce_operation slot=[" << slot << "] has a value already\n";
×
100
                }
101

102
                m_rb.m_elements[slot] = std::move(m_e);
7,932,340✔
103
                m_rb.m_used.fetch_add(1, std::memory_order::release);
7,932,340✔
104
                mutex.unlock();
7,932,340✔
105
                std::atomic_thread_fence(std::memory_order::acq_rel);
106
                return true; // Will be awoken with produce::produced
7,932,208✔
107
            }
108

109
            return false; // ring buffer full, suspend
3,067,673✔
110
        }
111

112
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
3,067,673✔
113
        {
114
            m_awaiting_coroutine   = awaiting_coroutine;
3,067,673✔
115
            m_next = m_rb.m_produce_waiters.exchange(this, std::memory_order::acq_rel);
3,067,673✔
116
            m_rb.m_mutex.unlock();
3,067,673✔
117
            std::atomic_thread_fence(std::memory_order::acq_rel);
118
            return true;
3,067,111✔
119
        }
120

121
        /**
122
         * @return produce_result
123
         */
124
        auto await_resume() -> ring_buffer_result::produce
10,996,308✔
125
        {
126
            return m_rb.m_running_state.load(std::memory_order::acquire) == running_state_t::running
10,996,308✔
127
                    ? ring_buffer_result::produce::produced
10,996,453✔
128
                    : ring_buffer_result::produce::stopped;
10,996,453✔
129
        }
130

131
        /// If the operation needs to suspend, the coroutine to resume when the element can be produced.
132
        std::coroutine_handle<> m_awaiting_coroutine;
133
        /// Linked list of produce operations that are awaiting to produce their element.
134
        produce_operation* m_next{nullptr};
135

136
    private:
137
        template<typename element_subtype, size_t num_elements_subtype>
138
        friend class ring_buffer;
139

140
        /// The ring buffer the element is being produced into.
141
        ring_buffer<element, num_elements>& m_rb;
142
        /// The element this produce operation is producing into the ring buffer.
143
        std::optional<element> m_e{std::nullopt};
144
    };
145

146
    struct consume_operation
147
    {
148
        explicit consume_operation(ring_buffer<element, num_elements>& rb)
11,000,114✔
149
            : m_rb(rb)
11,000,114✔
150
        {}
11,000,114✔
151

152
        auto await_ready() noexcept -> bool
11,000,114✔
153
        {
154
            m_rb.internal_consume_await_ready++;
11,000,114✔
155
            auto& mutex = m_rb.m_mutex;
11,000,114✔
156

157
            // Consume operations proceed until stopped.
158
            if (m_rb.m_running_state.load(std::memory_order::acquire) == running_state_t::stopped)
11,000,114✔
159
            {
160
                mutex.unlock();
1✔
161
                std::atomic_thread_fence(std::memory_order::acq_rel);
162
                return true;
1✔
163
            }
164

165
            if (m_rb.m_used.load(std::memory_order::acquire) > 0)
22,000,226✔
166
            {
167
                auto slot = m_rb.m_back.fetch_add(1, std::memory_order::acq_rel) % num_elements;
8,687,373✔
168
                if (!m_rb.m_elements[slot].has_value())
8,687,373✔
169
                {
NEW
170
                    std::cerr << "[ERROR] consume_operation slot=[" << slot << "] doesn't have a value\n";
×
171
                }
172

173
                m_e = std::move(m_rb.m_elements[slot]);
8,687,373✔
174
                m_rb.m_elements[slot] = std::nullopt;
8,687,373✔
175
                m_rb.m_used.fetch_sub(1, std::memory_order::release);
8,687,373✔
176
                mutex.unlock();
8,687,373✔
177
                std::atomic_thread_fence(std::memory_order::acq_rel);
178
                return true;
8,685,453✔
179
            }
180

181
            return false; // ring buffer is empty, suspend.
2,312,740✔
182
        }
183

184
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
2,312,740✔
185
        {
186
            m_rb.internal_consume_await_suspend++;
2,312,740✔
187
            m_awaiting_coroutine   = awaiting_coroutine;
2,312,740✔
188
            m_next = m_rb.m_consume_waiters.exchange(this, std::memory_order::acq_rel);
2,312,740✔
189
            m_rb.m_mutex.unlock();
2,312,740✔
190
            std::atomic_thread_fence(std::memory_order::acq_rel);
191
            return true;
2,312,711✔
192
        }
193

194
        /**
195
         * @return The consumed element or ring_buffer_stopped if the ring buffer has been shutdown.
196
         */
197
        auto await_resume() -> expected<element, ring_buffer_result::consume>
10,997,542✔
198
        {
199
            m_rb.internal_consume_await_resume++;
10,997,542✔
200
            if (m_e.has_value())
10,999,017✔
201
            {
202
                return expected<element, ring_buffer_result::consume>(std::move(m_e).value());
10,996,860✔
203
            }
204
            else // state is stopped
205
            {
206
                return unexpected<ring_buffer_result::consume>(ring_buffer_result::consume::stopped);
101✔
207
            }
208
        }
209

210
        /// If the operation needs to suspend, the coroutine to resume when the element can be consumed.
211
        std::coroutine_handle<> m_awaiting_coroutine;
212
        /// Linked list of consume operations that are awaiting to consume an element.
213
        consume_operation* m_next{nullptr};
214

215
    private:
216
        template<typename element_subtype, size_t num_elements_subtype>
217
        friend class ring_buffer;
218

219
        /// The ring buffer to consume an element from.
220
        ring_buffer<element, num_elements>& m_rb;
221
        /// The element this consume operation will consume.
222
        std::optional<element> m_e{std::nullopt};
223
    };
224

225
    std::atomic<uint64_t> internal_consume{0};
226
    std::atomic<uint64_t> internal_consume_await_ready{0};
227
    std::atomic<uint64_t> internal_consume_await_suspend{0};
228
    std::atomic<uint64_t> internal_consume_await_resume{0};
229

230
    auto try_resume_producers() -> coro::task<void>
10,995,119✔
231
    {
232
        while (true)
233
        {
234
            auto lk = co_await m_mutex.scoped_lock();
235
            std::atomic_thread_fence(std::memory_order::acq_rel);
236
            if (m_used.load(std::memory_order::acquire) < num_elements)
237
            {
238
                auto* op = detail::awaiter_list_pop(m_produce_waiters);
239
                if (op != nullptr)
240
                {
241
                    auto slot = m_front.fetch_add(1, std::memory_order::acq_rel) % num_elements;
242
                    if (m_elements[slot].has_value())
243
                    {
244
                        std::cerr << "[ERROR] try_resume_producer slot=[" << slot << "] has a value already\n";
245
                    }
246

247
                    m_elements[slot] = std::move(op->m_e);
248
                    m_used.fetch_add(1, std::memory_order::release);
249

250
                    lk.unlock();
251
                    std::atomic_thread_fence(std::memory_order::acq_rel);
252
                    op->m_awaiting_coroutine.resume();
253
                    continue;
254
                }
255
            }
256
            co_return;
257
        }
258
    }
21,987,519✔
259

260
    auto try_resume_consumers() -> coro::task<void>
10,995,085✔
261
    {
262
        while (true)
263
        {
264
            auto lk = co_await m_mutex.scoped_lock();
265
            std::atomic_thread_fence(std::memory_order::acq_rel);
266
            if (m_used.load(std::memory_order::acquire) > 0)
267
            {
268
                auto* op = detail::awaiter_list_pop(m_consume_waiters);
269
                if (op != nullptr)
270
                {
271
                    auto slot = m_back.fetch_add(1, std::memory_order::acq_rel) % num_elements;
272
                    if (!m_elements[slot].has_value())
273
                    {
274
                        std::cerr << "[ERROR] try_resume_consumer slot=[" << slot << "] doesn't have a value\n";
275
                    }
276

277
                    op->m_e = std::move(m_elements[slot]);
278
                    m_elements[slot] = std::nullopt;
279
                    m_used.fetch_sub(1, std::memory_order::release);
280

281
                    lk.unlock();
282
                    std::atomic_thread_fence(std::memory_order::acq_rel);
283
                    op->m_awaiting_coroutine.resume();
284
                    continue;
285
                }
286
            }
287
            co_return;
288
        }
289

290
    }
21,987,666✔
291

292
    /**
293
     * Produces the given element into the ring buffer.  This operation will suspend until a slot
294
     * in the ring buffer becomes available.
295
     * @param e The element to produce.
296
     */
297
    [[nodiscard]] auto produce(element e) -> coro::task<ring_buffer_result::produce>
10,984,195✔
298
    {
299
        co_await m_mutex.lock();
300
        std::atomic_thread_fence(std::memory_order::acq_rel);
301
        auto result = co_await produce_operation{*this, std::move(e)};
302
        co_await try_resume_consumers();
303
        co_return result;
304
    }
21,962,497✔
305

306
    /**
307
     * Consumes an element from the ring buffer.  This operation will suspend until an element in
308
     * the ring buffer becomes available.
309
     */
310
    [[nodiscard]] auto consume() -> coro::task<expected<element, ring_buffer_result::consume>>
10,997,698✔
311
    {
312
        internal_consume++;
313
        co_await m_mutex.lock();
314
        std::atomic_thread_fence(std::memory_order::acq_rel);
315
        auto result = co_await consume_operation{*this};
316
        co_await try_resume_producers();
317
        co_return result;
318
    }
21,995,119✔
319

320
    /**
321
     * @return The current number of elements contained in the ring buffer.
322
     */
323
    auto size() const -> size_t
8✔
324
    {
325
        return m_used.load(std::memory_order::acquire);
16✔
326
    }
327

328
    /**
329
     * @return True if the ring buffer contains zero elements.
330
     */
331
    auto empty() const -> bool { return size() == 0; }
7✔
332

333
    /**
334
     * @brief Wakes up all currently awaiting producers and consumers.  Their await_resume() function
335
     *        will return an expected consume result that the ring buffer has stopped.
336
     */
337
    auto shutdown() -> coro::task<void>
8✔
338
    {
339
        std::cerr << "ring_buffer::shutdown()\n";
340
        // Only wake up waiters once.
341
        auto expected = m_running_state.load(std::memory_order::acquire);
342
        if (expected == running_state_t::stopped)
343
        {
344
            std::cerr << "ring_buffer::shutdown() already stopped returning\n";
345
            co_return;
346
        }
347

348
        std::cerr << "ring_buffer::shutdown() acquire lock\n";
349
        auto lk = co_await m_mutex.scoped_lock();
350
        // Only let one caller do the wake-ups, this can go from running or draining to stopped
351
        if (!m_running_state.compare_exchange_strong(expected, running_state_t::stopped, std::memory_order::acq_rel, std::memory_order::relaxed))
352
        {
353
            co_return;
354
        }
355
        lk.unlock();
356

357
        co_await m_mutex.lock();
358
        std::cerr << "ring_buffer::shutdown() acquire m_produce waiters and m_consume_waiters\n";
359
        auto* produce_waiters = m_produce_waiters.exchange(nullptr, std::memory_order::acq_rel);
360
        auto* consume_waiters = m_consume_waiters.exchange(nullptr, std::memory_order::acq_rel);
361
        m_mutex.unlock();
362

363
        while (produce_waiters != nullptr)
364
        {
365
            auto* next = produce_waiters->m_next;
366
            std::cerr << "ring_buffer::shutdown() produce_waiters->resume()\n";
367
            produce_waiters->m_awaiting_coroutine.resume();
368
            produce_waiters = next;
369
        }
370

371
        while (consume_waiters != nullptr)
372
        {
373
            auto* next = consume_waiters->m_next;
374
            std::cerr << "ring_buffer::shutdown() consume_waiters->resume()\n";
375
            consume_waiters->m_awaiting_coroutine.resume();
376
            consume_waiters = next;
377
        }
378

379
        std::cerr << "ring_buffer::shutdown() co_return\n";
380
        co_return;
381
    }
16✔
382

383
    template<coro::concepts::executor executor_t>
384
    [[nodiscard]] auto shutdown_drain(executor_t& e) -> coro::task<void>
2✔
385
    {
386
        std::cerr << "ring_buffer::shutdown_drain()\n";
387
        auto lk = co_await m_mutex.scoped_lock();
388
        // Do not allow any more produces, the state must be in running to drain.
389
        auto expected = running_state_t::running;
390
        if (!m_running_state.compare_exchange_strong(expected, running_state_t::draining, std::memory_order::acq_rel, std::memory_order::relaxed))
391
        {
392
            co_return;
393
        }
394

395
        std::cerr << "ring_buffer::shutdown_drain() exchange m_produce_waiters\n";
396
        auto* produce_waiters = m_produce_waiters.exchange(nullptr, std::memory_order::acq_rel);
397
        lk.unlock();
398

399
        while (produce_waiters != nullptr)
400
        {
401
            auto* next = produce_waiters->m_next;
402
            std::cerr << "ring_buffer::shutdown_drain() produce_waiters->resume()\n";
403
            produce_waiters->m_awaiting_coroutine.resume();
404
            produce_waiters = next;
405
        }
406

407
        std::cerr << "ring_buffer::shutdown_drain() while(!empty())\n";
408
        while (!empty())
409
        {
410
            co_await e.yield();
411
        }
412

413
        co_await shutdown();
414
        std::cerr << "ring_buffer::shutdown_drain() co_return\n";
415
        co_return;
416
    }
4✔
417

418
// private:
419
    friend produce_operation;
420
    friend consume_operation;
421

422
    coro::mutex m_mutex{};
423

424
    std::array<std::optional<element>, num_elements> m_elements{};
425
    /// The current front pointer to an open slot if not full.
426
    std::atomic<size_t> m_front{0};
427
    /// The current back pointer to the oldest item in the buffer if not empty.
428
    std::atomic<size_t> m_back{0};
429
    /// The number of items in the ring buffer.
430
    std::atomic<size_t> m_used{0};
431

432
    /// The LIFO list of produce waiters.
433
    std::atomic<produce_operation*> m_produce_waiters{nullptr};
434
    /// The LIFO list of consume watier.
435
    std::atomic<consume_operation*> m_consume_waiters{nullptr};
436

437
    std::atomic<running_state_t> m_running_state{running_state_t::running};
438
};
439

440
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc