• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 15285972720

27 May 2025 09:20PM UTC coverage: 86.321%. First build
15285972720

Pull #327

github

web-flow
Merge 5cb833a4a into 544f33ba3
Pull Request #327: test ring_buffer hang

24 of 34 new or added lines in 1 file covered. (70.59%)

1464 of 1696 relevant lines covered (86.32%)

4135362.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.25
/include/coro/ring_buffer.hpp
1
#pragma once
2

3
#include "coro/expected.hpp"
4

5
#include <array>
6
#include <atomic>
7
#include <coroutine>
8
#include <mutex>
9
#include <optional>
10

11
namespace coro
12
{
13
namespace ring_buffer_result
14
{
15
enum class produce
16
{
17
    produced,
18
    stopped
19
};
20

21
enum class consume
22
{
23
    stopped
24
};
25
} // namespace ring_buffer_result
26

27
/**
28
 * @tparam element The type of element the ring buffer will store.  Note that this type should be
29
 *         cheap to move if possible as it is moved into and out of the buffer upon produce and
30
 *         consume operations.
31
 * @tparam num_elements The maximum number of elements the ring buffer can store, must be >= 1.
32
 */
33
template<typename element, size_t num_elements>
34
class ring_buffer
35
{
36
private:
37
    enum running_state_t
38
    {
39
        /// @brief The ring buffer is still running.
40
        running,
41
        /// @brief The ring buffer is draining all elements, produce is no longer allowed.
42
        draining,
43
        /// @brief The ring buffer is fully shutdown, all produce and consume tasks will be woken up with result::stopped.
44
        stopped,
45
    };
46

47
public:
48
    /**
49
     * static_assert If `num_elements` == 0.
50
     */
51
    ring_buffer() { static_assert(num_elements != 0, "num_elements cannot be zero"); }
6✔
52

53
    ~ring_buffer()
6✔
54
    {
55
        // Wake up anyone still using the ring buffer.
56
        shutdown();
6✔
57
    }
6✔
58

59
    ring_buffer(const ring_buffer<element, num_elements>&) = delete;
60
    ring_buffer(ring_buffer<element, num_elements>&&)      = delete;
61

62
    auto operator=(const ring_buffer<element, num_elements>&) noexcept -> ring_buffer<element, num_elements>& = delete;
63
    auto operator=(ring_buffer<element, num_elements>&&) noexcept -> ring_buffer<element, num_elements>&      = delete;
64

65
    struct produce_operation
66
    {
67
        produce_operation(ring_buffer<element, num_elements>& rb, element e) : m_rb(rb), m_e(std::move(e)) {}
10,994,194✔
68

69
        auto await_ready() noexcept -> bool
10,992,545✔
70
        {
71
            if (m_rb.m_running_state.load(std::memory_order::acquire) != running_state_t::running)
10,992,545✔
72
            {
NEW
73
                return true;
×
74
            }
75

76
            std::unique_lock lk{m_rb.m_mutex};
10,993,852✔
77
            return m_rb.try_produce_locked(lk, m_e);
11,000,013✔
78
        }
11,000,013✔
79

80
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
3,496,796✔
81
        {
82
            std::unique_lock lk{m_rb.m_mutex};
3,496,796✔
83
            // Its possible a consumer on another thread consumed an item between await_ready() and await_suspend()
84
            // so we must check to see if there is space again.
85
            if (m_rb.try_produce_locked(lk, m_e))
3,500,754✔
86
            {
87
                return false;
604✔
88
            }
89

90
            // Don't suspend if stopping has been requested.
91
            if (m_rb.m_running_state.load(std::memory_order::acquire) != running_state_t::running)
3,500,150✔
92
            {
93
                return false;
×
94
            }
95

96
            m_awaiting_coroutine   = awaiting_coroutine;
3,500,150✔
97
            m_next                 = m_rb.m_produce_waiters;
3,500,150✔
98
            m_rb.m_produce_waiters = this;
3,500,150✔
99
            return true;
3,500,150✔
100
        }
3,500,754✔
101

102
        /**
103
         * @return produce_result
104
         */
105
        auto await_resume() -> ring_buffer_result::produce
10,996,847✔
106
        {
107
            return m_rb.m_running_state.load(std::memory_order::acquire) == running_state_t::running
10,996,847✔
108
                    ? ring_buffer_result::produce::produced
10,996,379✔
109
                    : ring_buffer_result::produce::stopped;
10,996,379✔
110
        }
111

112
    private:
113
        template<typename element_subtype, size_t num_elements_subtype>
114
        friend class ring_buffer;
115

116
        /// The ring buffer the element is being produced into.
117
        ring_buffer<element, num_elements>& m_rb;
118
        /// If the operation needs to suspend, the coroutine to resume when the element can be produced.
119
        std::coroutine_handle<> m_awaiting_coroutine;
120
        /// Linked list of produce operations that are awaiting to produce their element.
121
        produce_operation* m_next{nullptr};
122
        /// The element this produce operation is producing into the ring buffer.
123
        element m_e;
124
    };
125

126
    struct consume_operation
127
    {
128
        explicit consume_operation(ring_buffer<element, num_elements>& rb) : m_rb(rb) {}
10,996,811✔
129

130
        auto await_ready() noexcept -> bool
10,996,774✔
131
        {
132
            std::unique_lock lk{m_rb.m_mutex};
10,996,774✔
133
            return m_rb.try_consume_locked(lk, this);
11,000,182✔
134
        }
10,997,244✔
135

136
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
2,497,850✔
137
        {
138
            std::unique_lock lk{m_rb.m_mutex};
2,497,850✔
139
            // We have to check again as there is a race condition between await_ready() and now on the mutex acquire.
140
            // It is possible that a producer added items between await_ready() and await_suspend().
141
            if (m_rb.try_consume_locked(lk, this))
2,497,850✔
142
            {
143
                return false;
159✔
144
            }
145

146
            // Don't suspend if the stop signal has been set.
147
            if (m_rb.m_running_state.load(std::memory_order::acquire) != running_state_t::running)
2,497,691✔
148
            {
149
                return false;
169✔
150
            }
151
            m_awaiting_coroutine   = awaiting_coroutine;
2,497,522✔
152
            m_next                 = m_rb.m_consume_waiters;
2,497,522✔
153
            m_rb.m_consume_waiters = this;
2,497,522✔
154
            return true;
2,497,522✔
155
        }
2,497,850✔
156

157
        /**
158
         * @return The consumed element or ring_buffer_stopped if the ring buffer has been shutdown.
159
         */
160
        auto await_resume() -> expected<element, ring_buffer_result::consume>
10,996,749✔
161
        {
162
            if (m_rb.m_running_state.load(std::memory_order::acquire) == running_state_t::stopped)
10,996,749✔
163
            {
164
                return unexpected<ring_buffer_result::consume>(ring_buffer_result::consume::stopped);
101✔
165
            }
166

167
            return std::move(m_e);
10,996,723✔
168
        }
169

170
    private:
171
        template<typename element_subtype, size_t num_elements_subtype>
172
        friend class ring_buffer;
173

174
        /// The ring buffer to consume an element from.
175
        ring_buffer<element, num_elements>& m_rb;
176
        /// If the operation needs to suspend, the coroutine to resume when the element can be consumed.
177
        std::coroutine_handle<> m_awaiting_coroutine;
178
        /// Linked list of consume operations that are awaiting to consume an element.
179
        consume_operation* m_next{nullptr};
180
        /// The element this consume operation will consume.
181
        element m_e;
182
    };
183

184
    /**
185
     * Produces the given element into the ring buffer.  This operation will suspend until a slot
186
     * in the ring buffer becomes available.
187
     * @param e The element to produce.
188
     */
189
    [[nodiscard]] auto produce(element e) -> produce_operation { return produce_operation{*this, std::move(e)}; }
10,995,395✔
190

191
    /**
192
     * Consumes an element from the ring buffer.  This operation will suspend until an element in
193
     * the ring buffer becomes available.
194
     */
195
    [[nodiscard]] auto consume() -> consume_operation { return consume_operation{*this}; }
10,997,314✔
196

197
    /**
198
     * @return The current number of elements contained in the ring buffer.
199
     */
200
    auto size() const -> size_t
7✔
201
    {
202
        return m_used.load();
14✔
203
    }
204

205
    /**
206
     * @return True if the ring buffer contains zero elements.
207
     */
208
    auto empty() const -> bool { return size() == 0; }
7✔
209

210
    /**
211
     * @brief Wakes up all currently awaiting producers and consumers.  Their await_resume() function
212
     *        will return an expected consume result that the ring buffer has stopped.
213
     */
214
    auto shutdown() -> void
8✔
215
    {
216
        // Only wake up waiters once.
217
        auto expected = m_running_state.load(std::memory_order::acquire);
8✔
218
        if (expected == running_state_t::stopped)
8✔
219
        {
220
            return;
2✔
221
        }
222

223
        // Only let one caller do the wake-ups, this can go from running or draining to stopped
224
        if (!m_running_state.compare_exchange_strong(expected, running_state_t::stopped, std::memory_order::acq_rel, std::memory_order::relaxed))
6✔
225
        {
NEW
226
            return;
×
227
        }
228

229
        while (m_produce_waiters != nullptr)
6✔
230
        {
NEW
231
            m_mutex.lock();
×
NEW
232
            auto* to_resume   = m_produce_waiters.load();
×
NEW
233
            m_produce_waiters = to_resume->m_next;
×
NEW
234
            m_mutex.unlock();
×
235

236
            to_resume->m_awaiting_coroutine.resume();
×
237
        }
238

239
        while (m_consume_waiters != nullptr)
6✔
240
        {
NEW
241
            m_mutex.lock();
×
NEW
242
            auto* to_resume   = m_consume_waiters.load();
×
NEW
243
            m_consume_waiters = to_resume->m_next;
×
NEW
244
            m_mutex.unlock();
×
245

246
            to_resume->m_awaiting_coroutine.resume();
×
247
        }
248

249
        return;
6✔
250
    }
251

252
    template<coro::concepts::executor executor_t>
253
    [[nodiscard]] auto shutdown_drain(executor_t& e) -> coro::task<void>
2✔
254
    {
255
        // Do not allow any more produces, the state must be in running to drain.
256
        auto expected = running_state_t::running;
257
        if (!m_running_state.compare_exchange_strong(expected, running_state_t::draining, std::memory_order::acq_rel, std::memory_order::relaxed))
258
        {
259
            co_return;
260
        }
261

262
        while (!empty())
263
        {
264
            co_await e.yield();
265
        }
266

267
        shutdown();
268
        co_return;
269
    }
4✔
270

271
private:
272
    friend produce_operation;
273
    friend consume_operation;
274

275
    std::mutex m_mutex{};
276

277
    std::array<element, num_elements> m_elements{};
278
    /// The current front pointer to an open slot if not full.
279
    std::atomic<size_t> m_front{0};
280
    /// The current back pointer to the oldest item in the buffer if not empty.
281
    std::atomic<size_t> m_back{0};
282
    /// The number of items in the ring buffer.
283
    std::atomic<size_t> m_used{0};
284

285
    /// The LIFO list of produce waiters.
286
    std::atomic<produce_operation*> m_produce_waiters{nullptr};
287
    /// The LIFO list of consume watier.
288
    std::atomic<consume_operation*> m_consume_waiters{nullptr};
289

290
    std::atomic<running_state_t> m_running_state{running_state_t::running};
291

292
    auto try_produce_locked(std::unique_lock<std::mutex>& lk, element& e) -> bool
14,500,767✔
293
    {
294
        if (m_used == num_elements)
14,500,767✔
295
        {
296
            return false;
7,000,904✔
297
        }
298

299
        m_elements[m_front] = std::move(e);
7,499,863✔
300
        m_front             = (m_front + 1) % num_elements;
7,499,863✔
301
        ++m_used;
7,499,863✔
302

303
        consume_operation* to_resume = m_consume_waiters.load();
7,499,863✔
304
        if (to_resume != nullptr)
7,499,863✔
305
        {
306
            m_consume_waiters            = to_resume->m_next;
2,497,522✔
307

308
            // Since the consume operation suspended it needs to be provided an element to consume.
309
            to_resume->m_e = std::move(m_elements[m_back]);
2,497,522✔
310
            m_back         = (m_back + 1) % num_elements;
2,497,522✔
311
            --m_used; // And we just consumed up another item.
2,497,522✔
312

313
            lk.unlock();
2,497,522✔
314
            to_resume->m_awaiting_coroutine.resume();
2,497,522✔
315
        }
316

317
        return true;
7,499,863✔
318
    }
319

320
    auto try_consume_locked(std::unique_lock<std::mutex>& lk, consume_operation* op) -> bool
13,498,032✔
321
    {
322
        if (m_used == 0)
13,498,032✔
323
        {
324
            return false;
4,995,541✔
325
        }
326

327
        op->m_e = std::move(m_elements[m_back]);
8,502,491✔
328
        m_back  = (m_back + 1) % num_elements;
8,502,491✔
329
        --m_used;
8,502,491✔
330

331
        produce_operation* to_resume = m_produce_waiters.load();
8,502,491✔
332
        if (to_resume != nullptr)
8,502,491✔
333
        {
334
            m_produce_waiters            = to_resume->m_next;
3,500,150✔
335

336
            // Since the produce operation suspended it needs to be provided a slot to place its element.
337
            m_elements[m_front] = std::move(to_resume->m_e);
3,500,150✔
338
            m_front             = (m_front + 1) % num_elements;
3,500,150✔
339
            ++m_used; // And we just produced another item.
3,500,150✔
340

341
            lk.unlock();
3,500,150✔
342
            to_resume->m_awaiting_coroutine.resume();
3,498,946✔
343
        }
344

345
        return true;
8,499,704✔
346
    }
347
};
348

349
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc