• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 14814048204

03 May 2025 07:45PM UTC coverage: 86.694%. First build
14814048204

Pull #311

github

web-flow
Merge 062d4f197 into 8fc3e2712
Pull Request #311: Adds coro::queue<T>

52 of 54 new or added lines in 2 files covered. (96.3%)

1466 of 1691 relevant lines covered (86.69%)

4127117.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.75
/include/coro/ring_buffer.hpp
1
#pragma once
2

3
#include "coro/expected.hpp"
4

5
#include <array>
6
#include <atomic>
7
#include <coroutine>
8
#include <mutex>
9
#include <optional>
10

11
namespace coro
12
{
13
namespace rb
14
{
15
enum class produce_result
16
{
17
    produced,
18
    ring_buffer_stopped
19
};
20

21
enum class consume_result
22
{
23
    ring_buffer_stopped
24
};
25
} // namespace rb
26

27
/**
28
 * @tparam element The type of element the ring buffer will store.  Note that this type should be
29
 *         cheap to move if possible as it is moved into and out of the buffer upon produce and
30
 *         consume operations.
31
 * @tparam num_elements The maximum number of elements the ring buffer can store, must be >= 1.
32
 */
33
template<typename element, size_t num_elements>
34
class ring_buffer
35
{
36
public:
37
    /**
38
     * static_assert If `num_elements` == 0.
39
     */
40
    ring_buffer() { static_assert(num_elements != 0, "num_elements cannot be zero"); }
6✔
41

42
    ~ring_buffer()
6✔
43
    {
44
        // Wake up anyone still using the ring buffer.
45
        notify_waiters();
6✔
46
    }
6✔
47

48
    ring_buffer(const ring_buffer<element, num_elements>&) = delete;
49
    ring_buffer(ring_buffer<element, num_elements>&&)      = delete;
50

51
    auto operator=(const ring_buffer<element, num_elements>&) noexcept -> ring_buffer<element, num_elements>& = delete;
52
    auto operator=(ring_buffer<element, num_elements>&&) noexcept -> ring_buffer<element, num_elements>&      = delete;
53

54
    struct produce_operation
55
    {
56
        produce_operation(ring_buffer<element, num_elements>& rb, element e) : m_rb(rb), m_e(std::move(e)) {}
10,995,785✔
57

58
        auto await_ready() noexcept -> bool
10,994,623✔
59
        {
60
            std::unique_lock lk{m_rb.m_mutex};
10,994,623✔
61
            return m_rb.try_produce_locked(lk, m_e);
11,000,013✔
62
        }
11,000,013✔
63

64
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
3,496,353✔
65
        {
66
            std::unique_lock lk{m_rb.m_mutex};
3,496,353✔
67
            // Its possible a consumer on another thread consumed an item between await_ready() and await_suspend()
68
            // so we must check to see if there is space again.
69
            if (m_rb.try_produce_locked(lk, m_e))
3,500,183✔
70
            {
71
                return false;
1,032✔
72
            }
73

74
            // Don't suspend if the stop signal has been set.
75
            if (m_rb.m_stopped.load(std::memory_order::acquire))
3,499,151✔
76
            {
77
                return false;
×
78
            }
79

80
            m_awaiting_coroutine   = awaiting_coroutine;
3,499,151✔
81
            m_next                 = m_rb.m_produce_waiters;
3,499,151✔
82
            m_rb.m_produce_waiters = this;
3,499,151✔
83
            return true;
3,499,151✔
84
        }
3,500,183✔
85

86
        /**
87
         * @return produce_result
88
         */
89
        auto await_resume() -> rb::produce_result
10,997,686✔
90
        {
91
            return !m_rb.m_stopped.load(std::memory_order::acquire) ? rb::produce_result::produced
10,997,686✔
92
                                                                    : rb::produce_result::ring_buffer_stopped;
10,997,760✔
93
        }
94

95
    private:
96
        template<typename element_subtype, size_t num_elements_subtype>
97
        friend class ring_buffer;
98

99
        /// The ring buffer the element is being produced into.
100
        ring_buffer<element, num_elements>& m_rb;
101
        /// If the operation needs to suspend, the coroutine to resume when the element can be produced.
102
        std::coroutine_handle<> m_awaiting_coroutine;
103
        /// Linked list of produce operations that are awaiting to produce their element.
104
        produce_operation* m_next{nullptr};
105
        /// The element this produce operation is producing into the ring buffer.
106
        element m_e;
107
    };
108

109
    struct consume_operation
110
    {
111
        explicit consume_operation(ring_buffer<element, num_elements>& rb) : m_rb(rb) {}
10,996,312✔
112

113
        auto await_ready() noexcept -> bool
10,994,747✔
114
        {
115
            std::unique_lock lk{m_rb.m_mutex};
10,994,747✔
116
            return m_rb.try_consume_locked(lk, this);
11,000,114✔
117
        }
10,997,188✔
118

119
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
2,497,574✔
120
        {
121
            std::unique_lock lk{m_rb.m_mutex};
2,497,574✔
122
            // We have to check again as there is a race condition between await_ready() and now on the mutex acquire.
123
            // It is possible that a producer added items between await_ready() and await_suspend().
124
            if (m_rb.try_consume_locked(lk, this))
2,497,578✔
125
            {
126
                return false;
135✔
127
            }
128

129
            // Don't suspend if the stop signal has been set.
130
            if (m_rb.m_stopped.load(std::memory_order::acquire))
2,497,443✔
131
            {
132
                return false;
66✔
133
            }
134
            m_awaiting_coroutine   = awaiting_coroutine;
2,497,377✔
135
            m_next                 = m_rb.m_consume_waiters;
2,497,377✔
136
            m_rb.m_consume_waiters = this;
2,497,377✔
137
            return true;
2,497,377✔
138
        }
2,497,578✔
139

140
        /**
141
         * @return The consumed element or std::nullopt if the consume has failed.
142
         */
143
        auto await_resume() -> expected<element, rb::consume_result>
10,996,414✔
144
        {
145
            if (m_rb.m_stopped.load(std::memory_order::acquire))
10,996,414✔
146
            {
147
                return unexpected<rb::consume_result>(rb::consume_result::ring_buffer_stopped);
101✔
148
            }
149

150
            return std::move(m_e);
10,996,421✔
151
        }
152

153
    private:
154
        template<typename element_subtype, size_t num_elements_subtype>
155
        friend class ring_buffer;
156

157
        /// The ring buffer to consume an element from.
158
        ring_buffer<element, num_elements>& m_rb;
159
        /// If the operation needs to suspend, the coroutine to resume when the element can be consumed.
160
        std::coroutine_handle<> m_awaiting_coroutine;
161
        /// Linked list of consume operations that are awaiting to consume an element.
162
        consume_operation* m_next{nullptr};
163
        /// The element this consume operation will consume.
164
        element m_e;
165
    };
166

167
    /**
168
     * Produces the given element into the ring buffer.  This operation will suspend until a slot
169
     * in the ring buffer becomes available.
170
     * @param e The element to produce.
171
     */
172
    [[nodiscard]] auto produce(element e) -> produce_operation { return produce_operation{*this, std::move(e)}; }
10,996,713✔
173

174
    /**
175
     * Consumes an element from the ring buffer.  This operation will suspend until an element in
176
     * the ring buffer becomes available.
177
     */
178
    [[nodiscard]] auto consume() -> consume_operation { return consume_operation{*this}; }
10,996,623✔
179

180
    /**
181
     * @return The current number of elements contained in the ring buffer.
182
     */
183
    auto size() const -> size_t
7✔
184
    {
185
        std::atomic_thread_fence(std::memory_order::acquire);
186
        return m_used;
7✔
187
    }
188

189
    /**
190
     * @return True if the ring buffer contains zero elements.
191
     */
192
    auto empty() const -> bool { return size() == 0; }
7✔
193

194
    /**
195
     * Wakes up all currently awaiting producers and consumers.  Their await_resume() function
196
     * will return an expected consume result that the ring buffer has stopped.
197
     */
198
    auto notify_waiters() -> void
8✔
199
    {
200
        auto expected = false;
8✔
201
        if (!m_stopped.compare_exchange_strong(expected, true, std::memory_order::acq_rel, std::memory_order::relaxed))
8✔
202
        {
203
            // Only wake up waiters once.
204
            return;
2✔
205
        }
206

207
        std::unique_lock lk{m_mutex};
6✔
208

209
        while (m_produce_waiters != nullptr)
6✔
210
        {
NEW
211
            auto* to_resume   = m_produce_waiters;
×
NEW
212
            m_produce_waiters = m_produce_waiters->m_next;
×
213

214
            lk.unlock();
×
215
            to_resume->m_awaiting_coroutine.resume();
×
216
            lk.lock();
×
217
        }
218

219
        while (m_consume_waiters != nullptr)
41✔
220
        {
221
            auto* to_resume   = m_consume_waiters;
35✔
222
            m_consume_waiters = m_consume_waiters->m_next;
35✔
223

224
            lk.unlock();
35✔
225
            to_resume->m_awaiting_coroutine.resume();
35✔
226
            lk.lock();
35✔
227
        }
228
    }
6✔
229

230
private:
231
    friend produce_operation;
232
    friend consume_operation;
233

234
    std::mutex m_mutex{};
235

236
    std::array<element, num_elements> m_elements{};
237
    /// The current front pointer to an open slot if not full.
238
    size_t m_front{0};
239
    /// The current back pointer to the oldest item in the buffer if not empty.
240
    size_t m_back{0};
241
    /// The number of items in the ring buffer.
242
    size_t m_used{0};
243

244
    /// The LIFO list of produce waiters.
245
    produce_operation* m_produce_waiters{nullptr};
246
    /// The LIFO list of consume watier.
247
    consume_operation* m_consume_waiters{nullptr};
248

249
    std::atomic<bool> m_stopped{false};
250

251
    auto try_produce_locked(std::unique_lock<std::mutex>& lk, element& e) -> bool
14,500,196✔
252
    {
253
        if (m_used == num_elements)
14,500,196✔
254
        {
255
            return false;
6,999,334✔
256
        }
257

258
        m_elements[m_front] = std::move(e);
7,500,862✔
259
        m_front             = (m_front + 1) % num_elements;
7,500,862✔
260
        ++m_used;
7,500,862✔
261

262
        if (m_consume_waiters != nullptr)
7,500,862✔
263
        {
264
            consume_operation* to_resume = m_consume_waiters;
2,497,342✔
265
            m_consume_waiters            = m_consume_waiters->m_next;
2,497,342✔
266

267
            // Since the consume operation suspended it needs to be provided an element to consume.
268
            to_resume->m_e = std::move(m_elements[m_back]);
2,497,342✔
269
            m_back         = (m_back + 1) % num_elements;
2,497,342✔
270
            --m_used; // And we just consumed up another item.
2,497,342✔
271

272
            lk.unlock();
2,497,342✔
273
            to_resume->m_awaiting_coroutine.resume();
2,497,342✔
274
        }
275

276
        return true;
7,500,862✔
277
    }
278

279
    auto try_consume_locked(std::unique_lock<std::mutex>& lk, consume_operation* op) -> bool
13,497,692✔
280
    {
281
        if (m_used == 0)
13,497,692✔
282
        {
283
            return false;
4,995,021✔
284
        }
285

286
        op->m_e = std::move(m_elements[m_back]);
8,502,671✔
287
        m_back  = (m_back + 1) % num_elements;
8,502,671✔
288
        --m_used;
8,502,671✔
289

290
        if (m_produce_waiters != nullptr)
8,502,671✔
291
        {
292
            produce_operation* to_resume = m_produce_waiters;
3,499,151✔
293
            m_produce_waiters            = m_produce_waiters->m_next;
3,499,151✔
294

295
            // Since the produce operation suspended it needs to be provided a slot to place its element.
296
            m_elements[m_front] = std::move(to_resume->m_e);
3,499,151✔
297
            m_front             = (m_front + 1) % num_elements;
3,499,151✔
298
            ++m_used; // And we just produced another item.
3,499,151✔
299

300
            lk.unlock();
3,499,151✔
301
            to_resume->m_awaiting_coroutine.resume();
3,498,424✔
302
        }
303

304
        return true;
8,500,086✔
305
    }
306
};
307

308
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc