• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 15448013898

04 Jun 2025 04:49PM UTC coverage: 87.93%. First build
15448013898

Pull #336

github

web-flow
Merge e238fa0b8 into f0cccaaf4
Pull Request #336: coro::ring_buffer use coro::mutex instead of std::mutex

79 of 88 new or added lines in 4 files covered. (89.77%)

1610 of 1831 relevant lines covered (87.93%)

4633499.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.18
/include/coro/ring_buffer.hpp
1
#pragma once
2

3
#include "coro/expected.hpp"
4
#include "coro/mutex.hpp"
5
#include "coro/task.hpp"
6

7
#include <array>
8
#include <atomic>
9
#include <coroutine>
10
#include <mutex>
11
#include <optional>
12

13
#include <iostream>
14

15
namespace coro
16
{
17
namespace ring_buffer_result
18
{
19
enum class produce
20
{
21
    produced,
22
    stopped
23
};
24

25
enum class consume
26
{
27
    stopped
28
};
29
} // namespace ring_buffer_result
30

31
/**
32
 * @tparam element The type of element the ring buffer will store.  Note that this type should be
33
 *         cheap to move if possible as it is moved into and out of the buffer upon produce and
34
 *         consume operations.
35
 * @tparam num_elements The maximum number of elements the ring buffer can store, must be >= 1.
36
 */
37
template<typename element, size_t num_elements>
38
class ring_buffer
39
{
40
private:
41
    enum running_state_t
42
    {
43
        /// @brief The ring buffer is still running.
44
        running,
45
        /// @brief The ring buffer is draining all elements, produce is no longer allowed.
46
        draining,
47
        /// @brief The ring buffer is fully shutdown, all produce and consume tasks will be woken up with result::stopped.
48
        stopped,
49
    };
50

51
public:
52
    /**
53
     * static_assert If `num_elements` == 0.
54
     */
55
    ring_buffer() { static_assert(num_elements != 0, "num_elements cannot be zero"); }
6✔
56

57
    ~ring_buffer()
6✔
58
    {
59
        // Wake up anyone still using the ring buffer.
60
        coro::sync_wait(shutdown());
6✔
61
        std::cerr << "~ring_buffer() exit\n";
6✔
62
    }
6✔
63

64
    ring_buffer(const ring_buffer<element, num_elements>&) = delete;
65
    ring_buffer(ring_buffer<element, num_elements>&&)      = delete;
66

67
    auto operator=(const ring_buffer<element, num_elements>&) noexcept -> ring_buffer<element, num_elements>& = delete;
68
    auto operator=(ring_buffer<element, num_elements>&&) noexcept -> ring_buffer<element, num_elements>&      = delete;
69

70
    struct consume_operation;
71

72
    struct produce_operation
73
    {
74
        produce_operation(ring_buffer<element, num_elements>& rb, element e)
11,000,013✔
75
            : m_rb(rb),
11,000,013✔
76
              m_e(std::move(e))
11,000,013✔
77
        {}
11,000,013✔
78

79
        auto await_ready() noexcept -> bool
11,000,013✔
80
        {
81
            auto& mutex = m_rb.m_mutex;
11,000,013✔
82

83
            // Produce operations can only proceed if running.
84
            if (m_rb.m_running_state.load(std::memory_order::acquire) != running_state_t::running)
11,000,013✔
85
            {
NEW
86
                mutex.unlock();
×
NEW
87
                return true; // Will be awoken with produce::stopped
×
88
            }
89

90
            if (m_rb.m_used.load(std::memory_order::acquire) == num_elements)
22,000,026✔
91
            {
92
                return false; // ring buffer full, suspend
3,412,391✔
93
            }
94

95
            // There is guaranteed space to store
96
            auto slot = m_rb.m_front.fetch_add(1, std::memory_order::acq_rel) % num_elements;
7,587,622✔
97
            m_rb.m_elements[slot] = std::move(m_e);
7,587,622✔
98
            m_rb.m_used.fetch_add(1, std::memory_order::release);
7,587,622✔
99

100
            auto* op = m_rb.m_consume_waiters.load(std::memory_order::acquire);
7,587,622✔
101
            if (op != nullptr)
7,587,622✔
102
            {
103
                // Advance the consume operation waiters.
104
                m_rb.m_consume_waiters.store(op->m_next, std::memory_order::release);
2,292,600✔
105

106
                // Store the back element into the consumer that will be resumed.
107
                auto slot = m_rb.m_back.fetch_add(1, std::memory_order::acq_rel) % num_elements;
2,292,600✔
108
                op->m_e = std::move(m_rb.m_elements[slot]);
2,292,600✔
109
                m_rb.m_used.fetch_sub(1, std::memory_order::release);
2,292,600✔
110

111
                mutex.unlock();
2,292,600✔
112
                op->m_awaiting_coroutine.resume();
2,292,600✔
113
            }
114
            else
115
            {
116
                mutex.unlock();
5,295,022✔
117
            }
118

119
            return true;
7,587,620✔
120
        }
121

122
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
3,412,391✔
123
        {
124
            m_awaiting_coroutine   = awaiting_coroutine;
3,412,391✔
125
            m_next                 = m_rb.m_produce_waiters;
3,412,391✔
126
            m_rb.m_produce_waiters = this;
3,412,391✔
127
            m_rb.m_mutex.unlock();
3,412,391✔
128
            return true;
3,411,945✔
129
        }
130

131
        /**
132
         * @return produce_result
133
         */
134
        auto await_resume() -> ring_buffer_result::produce
10,998,591✔
135
        {
136
            return m_rb.m_running_state.load(std::memory_order::acquire) == running_state_t::running
10,998,591✔
137
                    ? ring_buffer_result::produce::produced
10,998,618✔
138
                    : ring_buffer_result::produce::stopped;
10,998,618✔
139
        }
140

141
    private:
142
        template<typename element_subtype, size_t num_elements_subtype>
143
        friend class ring_buffer;
144

145
        /// The ring buffer the element is being produced into.
146
        ring_buffer<element, num_elements>& m_rb;
147
        /// If the operation needs to suspend, the coroutine to resume when the element can be produced.
148
        std::coroutine_handle<> m_awaiting_coroutine;
149
        /// Linked list of produce operations that are awaiting to produce their element.
150
        produce_operation* m_next{nullptr};
151
        /// The element this produce operation is producing into the ring buffer.
152
        element m_e;
153
    };
154

155
    struct consume_operation
156
    {
157
        explicit consume_operation(ring_buffer<element, num_elements>& rb)
11,000,114✔
158
            : m_rb(rb)
11,000,114✔
159
        {}
11,000,114✔
160

161
        auto await_ready() noexcept -> bool
11,000,114✔
162
        {
163
            auto& mutex = m_rb.m_mutex;
11,000,114✔
164

165
            // Consume operations proceed until stopped.
166
            if (m_rb.m_running_state.load(std::memory_order::acquire) == running_state_t::stopped)
11,000,114✔
167
            {
NEW
168
                mutex.unlock();
×
NEW
169
                return true;
×
170
            }
171

172
            if (m_rb.m_used.load(std::memory_order::acquire) == 0)
22,000,228✔
173
            {
174
                return false; // ring buffer is empty, suspsned.
2,292,701✔
175
            }
176

177
            auto slot = m_rb.m_back.fetch_add(1, std::memory_order::acq_rel) % num_elements;
8,707,413✔
178
            m_e = std::move(m_rb.m_elements[slot]);
8,707,413✔
179
            m_rb.m_used.fetch_sub(1, std::memory_order::release);
8,707,413✔
180

181
            auto* op = m_rb.m_produce_waiters.load(std::memory_order::acquire);
8,707,413✔
182
            if (op != nullptr)
8,707,413✔
183
            {
184
                m_rb.m_produce_waiters.store(op->m_next, std::memory_order::release);
3,412,391✔
185

186
                auto slot = m_rb.m_front.fetch_add(1, std::memory_order::acq_rel) % num_elements;
3,412,391✔
187
                m_rb.m_elements[slot] = std::move(op->m_e);
3,412,391✔
188
                m_rb.m_used.fetch_add(1, std::memory_order::release);
3,412,391✔
189

190
                mutex.unlock();
3,412,391✔
191
                op->m_awaiting_coroutine.resume();
3,411,828✔
192
            }
193
            else
194
            {
195
                mutex.unlock();
5,295,022✔
196
            }
197

198
            return true;
8,705,935✔
199
        }
200

201
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
2,292,701✔
202
        {
203
            m_awaiting_coroutine   = awaiting_coroutine;
2,292,701✔
204
            m_next                 = m_rb.m_consume_waiters;
2,292,701✔
205
            m_rb.m_consume_waiters = this;
2,292,701✔
206
            m_rb.m_mutex.unlock();
2,292,701✔
207
            return true;
2,292,701✔
208
        }
209

210
        /**
211
         * @return The consumed element or ring_buffer_stopped if the ring buffer has been shutdown.
212
         */
213
        auto await_resume() -> expected<element, ring_buffer_result::consume>
10,998,273✔
214
        {
215
            if (m_rb.m_running_state.load(std::memory_order::acquire) == running_state_t::stopped)
10,998,273✔
216
            {
217
                return unexpected<ring_buffer_result::consume>(ring_buffer_result::consume::stopped);
101✔
218
            }
219

220
            return std::move(m_e);
10,998,094✔
221
        }
222

223
    private:
224
        template<typename element_subtype, size_t num_elements_subtype>
225
        friend class ring_buffer;
226

227
        /// The ring buffer to consume an element from.
228
        ring_buffer<element, num_elements>& m_rb;
229
        /// If the operation needs to suspend, the coroutine to resume when the element can be consumed.
230
        std::coroutine_handle<> m_awaiting_coroutine;
231
        /// Linked list of consume operations that are awaiting to consume an element.
232
        consume_operation* m_next{nullptr};
233
        /// The element this consume operation will consume.
234
        element m_e;
235
    };
236

237
    /**
238
     * Produces the given element into the ring buffer.  This operation will suspend until a slot
239
     * in the ring buffer becomes available.
240
     * @param e The element to produce.
241
     */
242
    [[nodiscard]] auto produce(element e) -> coro::task<ring_buffer_result::produce>
10,985,705✔
243
    {
244
        co_await m_mutex.lock();
245
        co_return co_await produce_operation{*this, std::move(e)};
246
    }
21,957,423✔
247

248
    /**
249
     * Consumes an element from the ring buffer.  This operation will suspend until an element in
250
     * the ring buffer becomes available.
251
     */
252
    [[nodiscard]] auto consume() -> coro::task<expected<element, ring_buffer_result::consume>>
10,992,586✔
253
    {
254
        co_await m_mutex.lock();
255
        co_return co_await consume_operation{*this};
256
    }
21,982,475✔
257

258
    /**
259
     * @return The current number of elements contained in the ring buffer.
260
     */
261
    auto size() const -> size_t
6✔
262
    {
263
        return m_used.load(std::memory_order::acquire);
12✔
264
    }
265

266
    /**
267
     * @return True if the ring buffer contains zero elements.
268
     */
269
    auto empty() const -> bool { return size() == 0; }
6✔
270

271
    /**
272
     * @brief Wakes up all currently awaiting producers and consumers.  Their await_resume() function
273
     *        will return an expected consume result that the ring buffer has stopped.
274
     */
275
    auto shutdown() -> coro::task<void>
8✔
276
    {
277
        std::cerr << "ring_buffer::shutdown()\n";
278
        // Only wake up waiters once.
279
        auto expected = m_running_state.load(std::memory_order::acquire);
280
        if (expected == running_state_t::stopped)
281
        {
282
            std::cerr << "ring_buffer::shutdown() already stopped returning\n";
283
            co_return;
284
        }
285

286
        std::cerr << "ring_buffer::shutdown() acquire lock\n";
287
        auto lk = co_await m_mutex.scoped_lock();
288
        // Only let one caller do the wake-ups, this can go from running or draining to stopped
289
        if (!m_running_state.compare_exchange_strong(expected, running_state_t::stopped, std::memory_order::acq_rel, std::memory_order::relaxed))
290
        {
291
            co_return;
292
        }
293

294
        std::cerr << "ring_buffer::shutdown() acquire m_produce waiters and m_consume_waiters\n";
295
        auto* produce_waiters = m_produce_waiters.exchange(nullptr, std::memory_order::acq_rel);
296
        auto* consume_waiters = m_consume_waiters.exchange(nullptr, std::memory_order::acq_rel);
297
        lk.unlock();
298

299
        while (produce_waiters != nullptr)
300
        {
301
            auto* next = produce_waiters->m_next;
302
            std::cerr << "ring_buffer::shutdown() produce_waiters->resume()\n";
303
            produce_waiters->m_awaiting_coroutine.resume();
304
            produce_waiters = next;
305
        }
306

307
        while (consume_waiters != nullptr)
308
        {
309
            auto* next = consume_waiters->m_next;
310
            std::cerr << "ring_buffer::shutdown() consume_waiters->resume()\n";
311
            consume_waiters->m_awaiting_coroutine.resume();
312
            consume_waiters = next;
313
        }
314

315
        std::cerr << "ring_buffer::shutdown() co_return\n";
316
        co_return;
317
    }
16✔
318

319
    template<coro::concepts::executor executor_t>
320
    [[nodiscard]] auto shutdown_drain(executor_t& e) -> coro::task<void>
2✔
321
    {
322
        std::cerr << "ring_buffer::shutdown_drain()\n";
323
        auto lk = co_await m_mutex.scoped_lock();
324
        // Do not allow any more produces, the state must be in running to drain.
325
        auto expected = running_state_t::running;
326
        if (!m_running_state.compare_exchange_strong(expected, running_state_t::draining, std::memory_order::acq_rel, std::memory_order::relaxed))
327
        {
328
            co_return;
329
        }
330

331
        std::cerr << "ring_buffer::shutdown_drain() exchange m_produce_waiters\n";
332
        auto* produce_waiters = m_produce_waiters.exchange(nullptr, std::memory_order::acq_rel);
333
        lk.unlock();
334

335
        while (produce_waiters != nullptr)
336
        {
337
            auto* next = produce_waiters->m_next;
338
            std::cerr << "ring_buffer::shutdown_drain() produce_waiters->resume()\n";
339
            produce_waiters->m_awaiting_coroutine.resume();
340
            produce_waiters = next;
341
        }
342

343
        std::cerr << "ring_buffer::shutdown_drain() while(!empty())\n";
344
        while (!empty())
345
        {
346
            co_await e.yield();
347
        }
348

349
        co_await shutdown();
350
        std::cerr << "ring_buffer::shutdown_drain() co_return\n";
351
        co_return;
352
    }
4✔
353

354
private:
355
    friend produce_operation;
356
    friend consume_operation;
357

358
    coro::mutex m_mutex{};
359

360
    std::array<element, num_elements> m_elements{};
361
    /// The current front pointer to an open slot if not full.
362
    std::atomic<size_t> m_front{0};
363
    /// The current back pointer to the oldest item in the buffer if not empty.
364
    std::atomic<size_t> m_back{0};
365
    /// The number of items in the ring buffer.
366
    std::atomic<size_t> m_used{0};
367

368
    /// The LIFO list of produce waiters.
369
    std::atomic<produce_operation*> m_produce_waiters{nullptr};
370
    /// The LIFO list of consume watier.
371
    std::atomic<consume_operation*> m_consume_waiters{nullptr};
372

373
    std::atomic<running_state_t> m_running_state{running_state_t::running};
374
};
375

376
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc