• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 15456509665

05 Jun 2025 01:45AM UTC coverage: 87.987%. First build
15456509665

Pull #336

github

web-flow
Merge d3c189c61 into f0cccaaf4
Pull Request #336: coro::ring_buffer use coro::mutex instead of std::mutex

78 of 81 new or added lines in 3 files covered. (96.3%)

1582 of 1798 relevant lines covered (87.99%)

5569429.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.18
/include/coro/ring_buffer.hpp
1
#pragma once
2

3
#include "coro/expected.hpp"
4
#include "coro/mutex.hpp"
5
#include "coro/task.hpp"
6

7
#include <array>
8
#include <atomic>
9
#include <coroutine>
10
#include <mutex>
11
#include <optional>
12

13
namespace coro
14
{
15
namespace ring_buffer_result
16
{
17
enum class produce
18
{
19
    produced,
20
    stopped
21
};
22

23
enum class consume
24
{
25
    stopped
26
};
27
} // namespace ring_buffer_result
28

29
/**
30
 * @tparam element The type of element the ring buffer will store.  Note that this type should be
31
 *         cheap to move if possible as it is moved into and out of the buffer upon produce and
32
 *         consume operations.
33
 * @tparam num_elements The maximum number of elements the ring buffer can store, must be >= 1.
34
 */
35
template<typename element, size_t num_elements>
36
class ring_buffer
37
{
38
private:
39
    enum running_state_t
40
    {
41
        /// @brief The ring buffer is still running.
42
        running,
43
        /// @brief The ring buffer is draining all elements, produce is no longer allowed.
44
        draining,
45
        /// @brief The ring buffer is fully shutdown, all produce and consume tasks will be woken up with result::stopped.
46
        stopped,
47
    };
48

49
public:
50
    /**
51
     * static_assert If `num_elements` == 0.
52
     */
53
    ring_buffer()
6✔
54
    {
6✔
55
        static_assert(num_elements != 0, "num_elements cannot be zero");
56
    }
6✔
57

58
    ~ring_buffer()
6✔
59
    {
60
        // Wake up anyone still using the ring buffer.
61
        coro::sync_wait(shutdown());
6✔
62
    }
6✔
63

64
    ring_buffer(const ring_buffer<element, num_elements>&) = delete;
65
    ring_buffer(ring_buffer<element, num_elements>&&)      = delete;
66

67
    auto operator=(const ring_buffer<element, num_elements>&) noexcept -> ring_buffer<element, num_elements>& = delete;
68
    auto operator=(ring_buffer<element, num_elements>&&) noexcept -> ring_buffer<element, num_elements>&      = delete;
69

70
    struct produce_operation
71
    {
72
        produce_operation(ring_buffer<element, num_elements>& rb, element e)
11,000,013✔
73
            : m_rb(rb),
11,000,013✔
74
              m_e(std::move(e))
11,000,013✔
75
        {}
11,000,013✔
76

77
        auto await_ready() noexcept -> bool
11,000,013✔
78
        {
79
            auto& mutex = m_rb.m_mutex;
11,000,013✔
80

81
            // Produce operations can only proceed if running.
82
            if (m_rb.m_running_state.load(std::memory_order::acquire) != running_state_t::running)
11,000,013✔
83
            {
NEW
84
                mutex.unlock();
×
NEW
85
                return true; // Will be awoken with produce::stopped
×
86
            }
87

88
            if (m_rb.m_used.load(std::memory_order::acquire) < num_elements)
22,000,026✔
89
            {
90
                // There is guaranteed space to store
91
                auto slot = m_rb.m_front.fetch_add(1, std::memory_order::acq_rel) % num_elements;
7,906,751✔
92
                m_rb.m_elements[slot] = std::move(m_e);
7,906,751✔
93
                m_rb.m_used.fetch_add(1, std::memory_order::release);
7,906,751✔
94
                mutex.unlock();
7,906,751✔
95
                return true; // Will be awoken with produce::produced
7,906,621✔
96
            }
97

98
            return false; // ring buffer full, suspend
3,093,262✔
99
        }
100

101
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
3,093,262✔
102
        {
103
            m_awaiting_coroutine = awaiting_coroutine;
3,093,262✔
104
            m_next = m_rb.m_produce_waiters.exchange(this, std::memory_order::acq_rel);
3,093,262✔
105
            m_rb.m_mutex.unlock();
3,093,262✔
106
            return true;
3,092,377✔
107
        }
108

109
        /**
110
         * @return produce_result
111
         */
112
        auto await_resume() -> ring_buffer_result::produce
10,996,567✔
113
        {
114
            return m_rb.m_running_state.load(std::memory_order::acquire) == running_state_t::running
10,996,567✔
115
                    ? ring_buffer_result::produce::produced
10,997,314✔
116
                    : ring_buffer_result::produce::stopped;
10,997,314✔
117
        }
118

119
        /// If the operation needs to suspend, the coroutine to resume when the element can be produced.
120
        std::coroutine_handle<> m_awaiting_coroutine;
121
        /// Linked list of produce operations that are awaiting to produce their element.
122
        produce_operation* m_next{nullptr};
123

124
    private:
125
        template<typename element_subtype, size_t num_elements_subtype>
126
        friend class ring_buffer;
127

128
        /// The ring buffer the element is being produced into.
129
        ring_buffer<element, num_elements>& m_rb;
130
        /// The element this produce operation is producing into the ring buffer.
131
        std::optional<element> m_e{std::nullopt};
132
    };
133

134
    struct consume_operation
135
    {
136
        explicit consume_operation(ring_buffer<element, num_elements>& rb)
11,000,014✔
137
            : m_rb(rb)
11,000,014✔
138
        {}
11,000,014✔
139

140
        auto await_ready() noexcept -> bool
11,000,014✔
141
        {
142
            auto& mutex = m_rb.m_mutex;
11,000,014✔
143

144
            // Consume operations proceed until stopped.
145
            if (m_rb.m_running_state.load(std::memory_order::acquire) == running_state_t::stopped)
11,000,014✔
146
            {
147
                mutex.unlock();
1✔
148
                return true;
1✔
149
            }
150

151
            if (m_rb.m_used.load(std::memory_order::acquire) > 0)
22,000,026✔
152
            {
153
                auto slot = m_rb.m_back.fetch_add(1, std::memory_order::acq_rel) % num_elements;
8,637,064✔
154
                m_e = std::move(m_rb.m_elements[slot]);
8,637,064✔
155
                m_rb.m_elements[slot] = std::nullopt;
8,637,064✔
156
                m_rb.m_used.fetch_sub(1, std::memory_order::release);
8,637,064✔
157
                mutex.unlock();
8,637,064✔
158
                return true;
8,635,229✔
159
            }
160

161
            return false; // ring buffer is empty, suspend.
2,362,949✔
162
        }
163

164
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
2,362,949✔
165
        {
166
            m_awaiting_coroutine = awaiting_coroutine;
2,362,949✔
167
            m_next = m_rb.m_consume_waiters.exchange(this, std::memory_order::acq_rel);
2,362,949✔
168
            m_rb.m_mutex.unlock();
2,362,949✔
169
            return true;
2,362,921✔
170
        }
171

172
        /**
173
         * @return The consumed element or ring_buffer_stopped if the ring buffer has been shutdown.
174
         */
175
        auto await_resume() -> expected<element, ring_buffer_result::consume>
10,997,612✔
176
        {
177
            if (m_e.has_value())
10,997,612✔
178
            {
179
                return expected<element, ring_buffer_result::consume>(std::move(m_e).value());
10,997,593✔
180
            }
181
            else // state is stopped
182
            {
183
                return unexpected<ring_buffer_result::consume>(ring_buffer_result::consume::stopped);
1✔
184
            }
185
        }
186

187
        /// If the operation needs to suspend, the coroutine to resume when the element can be consumed.
188
        std::coroutine_handle<> m_awaiting_coroutine;
189
        /// Linked list of consume operations that are awaiting to consume an element.
190
        consume_operation* m_next{nullptr};
191

192
    private:
193
        template<typename element_subtype, size_t num_elements_subtype>
194
        friend class ring_buffer;
195

196
        /// The ring buffer to consume an element from.
197
        ring_buffer<element, num_elements>& m_rb;
198
        /// The element this consume operation will consume.
199
        std::optional<element> m_e{std::nullopt};
200
    };
201

202
    /**
203
     * Produces the given element into the ring buffer.  This operation will suspend until a slot
204
     * in the ring buffer becomes available.
205
     * @param e The element to produce.
206
     */
207
    [[nodiscard]] auto produce(element e) -> coro::task<ring_buffer_result::produce>
10,984,777✔
208
    {
209
        co_await m_mutex.lock();
210
        auto result = co_await produce_operation{*this, std::move(e)};
211
        co_await try_resume_consumers();
212
        co_return result;
213
    }
21,962,740✔
214

215
    /**
216
     * Consumes an element from the ring buffer.  This operation will suspend until an element in
217
     * the ring buffer becomes available.
218
     */
219
    [[nodiscard]] auto consume() -> coro::task<expected<element, ring_buffer_result::consume>>
10,997,889✔
220
    {
221
        co_await m_mutex.lock();
222
        auto result = co_await consume_operation{*this};
223
        co_await try_resume_producers();
224
        co_return result;
225
    }
21,995,407✔
226

227
    /**
228
     * @return The current number of elements contained in the ring buffer.
229
     */
230
    auto size() const -> size_t
250✔
231
    {
232
        return m_used.load(std::memory_order::acquire);
500✔
233
    }
234

235
    /**
236
     * @return True if the ring buffer contains zero elements.
237
     */
238
    auto empty() const -> bool { return size() == 0; }
250✔
239

240
    /**
241
     * @brief Wakes up all currently awaiting producers and consumers.  Their await_resume() function
242
     *        will return an expected consume result that the ring buffer has stopped.
243
     */
244
    auto shutdown() -> coro::task<void>
8✔
245
    {
246
        // Only wake up waiters once.
247
        auto expected = m_running_state.load(std::memory_order::acquire);
248
        if (expected == running_state_t::stopped)
249
        {
250
            co_return;
251
        }
252

253
        auto lk = co_await m_mutex.scoped_lock();
254
        // Only let one caller do the wake-ups, this can go from running or draining to stopped
255
        if (!m_running_state.compare_exchange_strong(expected, running_state_t::stopped, std::memory_order::acq_rel, std::memory_order::relaxed))
256
        {
257
            co_return;
258
        }
259
        lk.unlock();
260

261
        co_await m_mutex.lock();
262
        auto* produce_waiters = m_produce_waiters.exchange(nullptr, std::memory_order::acq_rel);
263
        auto* consume_waiters = m_consume_waiters.exchange(nullptr, std::memory_order::acq_rel);
264
        m_mutex.unlock();
265

266
        while (produce_waiters != nullptr)
267
        {
268
            auto* next = produce_waiters->m_next;
269
            produce_waiters->m_awaiting_coroutine.resume();
270
            produce_waiters = next;
271
        }
272

273
        while (consume_waiters != nullptr)
274
        {
275
            auto* next = consume_waiters->m_next;
276
            consume_waiters->m_awaiting_coroutine.resume();
277
            consume_waiters = next;
278
        }
279

280
        co_return;
281
    }
16✔
282

283
    template<coro::concepts::executor executor_t>
284
    [[nodiscard]] auto shutdown_drain(executor_t& e) -> coro::task<void>
2✔
285
    {
286
        auto lk = co_await m_mutex.scoped_lock();
287
        // Do not allow any more produces, the state must be in running to drain.
288
        auto expected = running_state_t::running;
289
        if (!m_running_state.compare_exchange_strong(expected, running_state_t::draining, std::memory_order::acq_rel, std::memory_order::relaxed))
290
        {
291
            co_return;
292
        }
293

294
        auto* produce_waiters = m_produce_waiters.exchange(nullptr, std::memory_order::acq_rel);
295
        lk.unlock();
296

297
        while (produce_waiters != nullptr)
298
        {
299
            auto* next = produce_waiters->m_next;
300
            produce_waiters->m_awaiting_coroutine.resume();
301
            produce_waiters = next;
302
        }
303

304
        while (!empty())
305
        {
306
            co_await e.yield();
307
        }
308

309
        co_await shutdown();
310
        co_return;
311
    }
4✔
312

313
private:
314
    friend produce_operation;
315
    friend consume_operation;
316

317
    coro::mutex m_mutex{};
318

319
    std::array<std::optional<element>, num_elements> m_elements{};
320
    /// The current front pointer to an open slot if not full.
321
    std::atomic<size_t> m_front{0};
322
    /// The current back pointer to the oldest item in the buffer if not empty.
323
    std::atomic<size_t> m_back{0};
324
    /// The number of items in the ring buffer.
325
    std::atomic<size_t> m_used{0};
326

327
    /// The LIFO list of produce waiters.
328
    std::atomic<produce_operation*> m_produce_waiters{nullptr};
329
    /// The LIFO list of consume watier.
330
    std::atomic<consume_operation*> m_consume_waiters{nullptr};
331

332
    std::atomic<running_state_t> m_running_state{running_state_t::running};
333

334
    auto try_resume_producers() -> coro::task<void>
10,995,636✔
335
    {
336
        while (true)
337
        {
338
            auto lk = co_await m_mutex.scoped_lock();
339
            if (m_used.load(std::memory_order::acquire) < num_elements)
340
            {
341
                auto* op = detail::awaiter_list_pop(m_produce_waiters);
342
                if (op != nullptr)
343
                {
344
                    auto slot = m_front.fetch_add(1, std::memory_order::acq_rel) % num_elements;
345
                    m_elements[slot] = std::move(op->m_e);
346
                    m_used.fetch_add(1, std::memory_order::release);
347

348
                    lk.unlock();
349
                    op->m_awaiting_coroutine.resume();
350
                    continue;
351
                }
352
            }
353
            co_return;
354
        }
355
    }
21,988,633✔
356

357
    auto try_resume_consumers() -> coro::task<void>
10,995,780✔
358
    {
359
        while (true)
360
        {
361
            auto lk = co_await m_mutex.scoped_lock();
362
            if (m_used.load(std::memory_order::acquire) > 0)
363
            {
364
                auto* op = detail::awaiter_list_pop(m_consume_waiters);
365
                if (op != nullptr)
366
                {
367
                    auto slot = m_back.fetch_add(1, std::memory_order::acq_rel) % num_elements;
368
                    op->m_e = std::move(m_elements[slot]);
369
                    m_elements[slot] = std::nullopt;
370
                    m_used.fetch_sub(1, std::memory_order::release);
371
                    lk.unlock();
372

373
                    op->m_awaiting_coroutine.resume();
374
                    continue;
375
                }
376
            }
377
            co_return;
378
        }
379
    }
21,989,505✔
380
};
381

382
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc