• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 15404406158

02 Jun 2025 10:48PM UTC coverage: 87.689%. First build
15404406158

Pull #334

github

web-flow
Merge 9103293e2 into ae3662d54
Pull Request #334: Semaphore test failure

89 of 95 new or added lines in 7 files covered. (93.68%)

1624 of 1852 relevant lines covered (87.69%)

3815837.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.35
/include/coro/ring_buffer.hpp
1
#pragma once
2

3
#include "coro/expected.hpp"
4

5
#include <array>
6
#include <atomic>
7
#include <coroutine>
8
#include <mutex>
9
#include <optional>
10

11
namespace coro
12
{
13
namespace ring_buffer_result
14
{
15
enum class produce
16
{
17
    produced,
18
    stopped
19
};
20

21
enum class consume
22
{
23
    stopped
24
};
25
} // namespace ring_buffer_result
26

27
/**
28
 * @tparam element The type of element the ring buffer will store.  Note that this type should be
29
 *         cheap to move if possible as it is moved into and out of the buffer upon produce and
30
 *         consume operations.
31
 * @tparam num_elements The maximum number of elements the ring buffer can store, must be >= 1.
32
 */
33
template<typename element, size_t num_elements>
34
class ring_buffer
35
{
36
private:
37
    enum running_state_t
38
    {
39
        /// @brief The ring buffer is still running.
40
        running,
41
        /// @brief The ring buffer is draining all elements, produce is no longer allowed.
42
        draining,
43
        /// @brief The ring buffer is fully shutdown, all produce and consume tasks will be woken up with result::stopped.
44
        stopped,
45
    };
46

47
public:
48
    /**
49
     * static_assert If `num_elements` == 0.
50
     */
51
    ring_buffer() { static_assert(num_elements != 0, "num_elements cannot be zero"); }
6✔
52

53
    ~ring_buffer()
6✔
54
    {
55
        // Wake up anyone still using the ring buffer.
56
        shutdown();
6✔
57
    }
6✔
58

59
    ring_buffer(const ring_buffer<element, num_elements>&) = delete;
60
    ring_buffer(ring_buffer<element, num_elements>&&)      = delete;
61

62
    auto operator=(const ring_buffer<element, num_elements>&) noexcept -> ring_buffer<element, num_elements>& = delete;
63
    auto operator=(ring_buffer<element, num_elements>&&) noexcept -> ring_buffer<element, num_elements>&      = delete;
64

65
    struct produce_operation
66
    {
67
        produce_operation(ring_buffer<element, num_elements>& rb, element e) : m_rb(rb), m_e(std::move(e)) {}
10,995,427✔
68

69
        auto await_ready() noexcept -> bool
10,993,509✔
70
        {
71
            if (m_rb.m_running_state.load(std::memory_order::acquire) != running_state_t::running)
10,993,509✔
72
            {
73
                return true;
×
74
            }
75

76
            std::unique_lock lk{m_rb.m_mutex};
10,994,818✔
77
            return m_rb.try_produce_locked(lk, m_e);
11,000,013✔
78
        }
11,000,013✔
79

80
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
3,496,109✔
81
        {
82
            std::unique_lock lk{m_rb.m_mutex};
3,496,109✔
83
            // Its possible a consumer on another thread consumed an item between await_ready() and await_suspend()
84
            // so we must check to see if there is space again.
85
            if (m_rb.try_produce_locked(lk, m_e))
3,500,270✔
86
            {
87
                return false;
648✔
88
            }
89

90
            // Don't suspend if stopping has been requested.
91
            if (m_rb.m_running_state.load(std::memory_order::acquire) != running_state_t::running)
3,499,622✔
92
            {
93
                return false;
×
94
            }
95

96
            m_awaiting_coroutine   = awaiting_coroutine;
3,499,622✔
97
            m_next                 = m_rb.m_produce_waiters;
3,499,622✔
98
            m_rb.m_produce_waiters = this;
3,499,622✔
99
            return true;
3,499,622✔
100
        }
3,500,270✔
101

102
        /**
103
         * @return produce_result
104
         */
105
        auto await_resume() -> ring_buffer_result::produce
10,997,617✔
106
        {
107
            return m_rb.m_running_state.load(std::memory_order::acquire) == running_state_t::running
10,997,617✔
108
                    ? ring_buffer_result::produce::produced
10,997,520✔
109
                    : ring_buffer_result::produce::stopped;
10,997,520✔
110
        }
111

112
    private:
113
        template<typename element_subtype, size_t num_elements_subtype>
114
        friend class ring_buffer;
115

116
        /// The ring buffer the element is being produced into.
117
        ring_buffer<element, num_elements>& m_rb;
118
        /// If the operation needs to suspend, the coroutine to resume when the element can be produced.
119
        std::coroutine_handle<> m_awaiting_coroutine;
120
        /// Linked list of produce operations that are awaiting to produce their element.
121
        produce_operation* m_next{nullptr};
122
        /// The element this produce operation is producing into the ring buffer.
123
        element m_e;
124
    };
125

126
    struct consume_operation
127
    {
128
        explicit consume_operation(ring_buffer<element, num_elements>& rb) : m_rb(rb) {}
10,996,554✔
129

130
        auto await_ready() noexcept -> bool
10,995,630✔
131
        {
132
            std::unique_lock lk{m_rb.m_mutex};
10,995,630✔
133
            return m_rb.try_consume_locked(lk, this);
11,000,161✔
134
        }
10,997,517✔
135

136
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
2,497,991✔
137
        {
138
            std::unique_lock lk{m_rb.m_mutex};
2,497,991✔
139
            // We have to check again as there is a race condition between await_ready() and now on the mutex acquire.
140
            // It is possible that a producer added items between await_ready() and await_suspend().
141
            if (m_rb.try_consume_locked(lk, this))
2,497,993✔
142
            {
143
                return false;
150✔
144
            }
145

146
            // Don't suspend if the stop signal has been set.
147
            if (m_rb.m_running_state.load(std::memory_order::acquire) != running_state_t::running)
2,497,843✔
148
            {
149
                return false;
148✔
150
            }
151
            m_awaiting_coroutine   = awaiting_coroutine;
2,497,695✔
152
            m_next                 = m_rb.m_consume_waiters;
2,497,695✔
153
            m_rb.m_consume_waiters = this;
2,497,695✔
154
            return true;
2,497,695✔
155
        }
2,497,993✔
156

157
        /**
158
         * @return The consumed element or ring_buffer_stopped if the ring buffer has been shutdown.
159
         */
160
        auto await_resume() -> expected<element, ring_buffer_result::consume>
10,996,621✔
161
        {
162
            if (m_rb.m_running_state.load(std::memory_order::acquire) == running_state_t::stopped)
10,996,621✔
163
            {
164
                return unexpected<ring_buffer_result::consume>(ring_buffer_result::consume::stopped);
101✔
165
            }
166

167
            return std::move(m_e);
10,996,792✔
168
        }
169

170
    private:
171
        template<typename element_subtype, size_t num_elements_subtype>
172
        friend class ring_buffer;
173

174
        /// The ring buffer to consume an element from.
175
        ring_buffer<element, num_elements>& m_rb;
176
        /// If the operation needs to suspend, the coroutine to resume when the element can be consumed.
177
        std::coroutine_handle<> m_awaiting_coroutine;
178
        /// Linked list of consume operations that are awaiting to consume an element.
179
        consume_operation* m_next{nullptr};
180
        /// The element this consume operation will consume.
181
        element m_e;
182
    };
183

184
    /**
185
     * Produces the given element into the ring buffer.  This operation will suspend until a slot
186
     * in the ring buffer becomes available.
187
     * @param e The element to produce.
188
     */
189
    [[nodiscard]] auto produce(element e) -> produce_operation { return produce_operation{*this, std::move(e)}; }
10,996,157✔
190

191
    /**
192
     * Consumes an element from the ring buffer.  This operation will suspend until an element in
193
     * the ring buffer becomes available.
194
     */
195
    [[nodiscard]] auto consume() -> consume_operation { return consume_operation{*this}; }
10,996,797✔
196

197
    /**
198
     * @return The current number of elements contained in the ring buffer.
199
     */
200
    auto size() const -> size_t
6✔
201
    {
202
        return m_used.load();
12✔
203
    }
204

205
    /**
206
     * @return True if the ring buffer contains zero elements.
207
     */
208
    auto empty() const -> bool { return size() == 0; }
6✔
209

210
    /**
211
     * @brief Wakes up all currently awaiting producers and consumers.  Their await_resume() function
212
     *        will return an expected consume result that the ring buffer has stopped.
213
     */
214
    auto shutdown() -> void
8✔
215
    {
216
        // Only wake up waiters once.
217
        auto expected = m_running_state.load(std::memory_order::acquire);
8✔
218
        if (expected == running_state_t::stopped)
8✔
219
        {
220
            return;
2✔
221
        }
222

223
        // Only let one caller do the wake-ups, this can go from running or draining to stopped
224
        if (!m_running_state.compare_exchange_strong(expected, running_state_t::stopped, std::memory_order::acq_rel, std::memory_order::relaxed))
6✔
225
        {
226
            return;
×
227
        }
228

229
        m_mutex.lock();
6✔
230
        auto* produce_waiters = m_produce_waiters.load(std::memory_order::acquire);
6✔
231
        auto* consume_waiters = m_consume_waiters.load(std::memory_order::acquire);
6✔
232
        m_produce_waiters = nullptr;
6✔
233
        m_consume_waiters = nullptr;
6✔
234
        m_mutex.unlock();
6✔
235

236
        while (produce_waiters != nullptr)
6✔
237
        {
NEW
238
            auto* to_resume = produce_waiters;
×
NEW
239
            produce_waiters = produce_waiters->m_next;
×
240
            to_resume->m_awaiting_coroutine.resume();
×
241
        }
242

243
        while (consume_waiters != nullptr)
6✔
244
        {
NEW
245
            auto* to_resume = consume_waiters;
×
NEW
246
            consume_waiters = consume_waiters->m_next;
×
247
            to_resume->m_awaiting_coroutine.resume();
×
248
        }
249

250
        return;
6✔
251
    }
252

253
    template<coro::concepts::executor executor_t>
254
    [[nodiscard]] auto shutdown_drain(executor_t& e) -> coro::task<void>
2✔
255
    {
256
        // Do not allow any more produces, the state must be in running to drain.
257
        auto expected = running_state_t::running;
258
        if (!m_running_state.compare_exchange_strong(expected, running_state_t::draining, std::memory_order::acq_rel, std::memory_order::relaxed))
259
        {
260
            co_return;
261
        }
262

263
        while (!empty())
264
        {
265
            co_await e.yield();
266
        }
267

268
        shutdown();
269
        co_return;
270
    }
4✔
271

272
private:
273
    friend produce_operation;
274
    friend consume_operation;
275

276
    std::mutex m_mutex{};
277

278
    std::array<element, num_elements> m_elements{};
279
    /// The current front pointer to an open slot if not full.
280
    std::atomic<size_t> m_front{0};
281
    /// The current back pointer to the oldest item in the buffer if not empty.
282
    std::atomic<size_t> m_back{0};
283
    /// The number of items in the ring buffer.
284
    std::atomic<size_t> m_used{0};
285

286
    /// The LIFO list of produce waiters.
287
    std::atomic<produce_operation*> m_produce_waiters{nullptr};
288
    /// The LIFO list of consume watier.
289
    std::atomic<consume_operation*> m_consume_waiters{nullptr};
290

291
    std::atomic<running_state_t> m_running_state{running_state_t::running};
292

293
    auto try_produce_locked(std::unique_lock<std::mutex>& lk, element& e) -> bool
14,500,283✔
294
    {
295
        if (m_used == num_elements)
14,500,283✔
296
        {
297
            return false;
6,999,892✔
298
        }
299

300
        m_elements[m_front] = std::move(e);
7,500,391✔
301
        m_front             = (m_front + 1) % num_elements;
7,500,391✔
302
        ++m_used;
7,500,391✔
303

304
        consume_operation* to_resume = m_consume_waiters.load();
7,500,391✔
305
        if (to_resume != nullptr)
7,500,391✔
306
        {
307
            m_consume_waiters            = to_resume->m_next;
2,497,695✔
308

309
            // Since the consume operation suspended it needs to be provided an element to consume.
310
            to_resume->m_e = std::move(m_elements[m_back]);
2,497,695✔
311
            m_back         = (m_back + 1) % num_elements;
2,497,695✔
312
            --m_used; // And we just consumed up another item.
2,497,695✔
313

314
            lk.unlock();
2,497,695✔
315
            to_resume->m_awaiting_coroutine.resume();
2,497,695✔
316
        }
317

318
        return true;
7,500,391✔
319
    }
320

321
    auto try_consume_locked(std::unique_lock<std::mutex>& lk, consume_operation* op) -> bool
13,498,154✔
322
    {
323
        if (m_used == 0)
13,498,154✔
324
        {
325
            return false;
4,995,836✔
326
        }
327

328
        op->m_e = std::move(m_elements[m_back]);
8,502,318✔
329
        m_back  = (m_back + 1) % num_elements;
8,502,318✔
330
        --m_used;
8,502,318✔
331

332
        produce_operation* to_resume = m_produce_waiters.load();
8,502,318✔
333
        if (to_resume != nullptr)
8,502,318✔
334
        {
335
            m_produce_waiters            = to_resume->m_next;
3,499,622✔
336

337
            // Since the produce operation suspended it needs to be provided a slot to place its element.
338
            m_elements[m_front] = std::move(to_resume->m_e);
3,499,622✔
339
            m_front             = (m_front + 1) % num_elements;
3,499,622✔
340
            ++m_used; // And we just produced another item.
3,499,622✔
341

342
            lk.unlock();
3,499,622✔
343
            to_resume->m_awaiting_coroutine.resume();
3,498,497✔
344
        }
345

346
        return true;
8,500,040✔
347
    }
348
};
349

350
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc