• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 10219129047

02 Aug 2024 04:47PM UTC coverage: 84.375% (-0.1%) from 84.498%
10219129047

Pull #272

github

web-flow
Merge c53f67690 into 5697678d7
Pull Request #272: Use lock for sync_wait completion

3 of 4 new or added lines in 1 file covered. (75.0%)

6 existing lines in 1 file now uncovered.

1377 of 1632 relevant lines covered (84.38%)

4240055.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.86
/include/coro/ring_buffer.hpp
1
#pragma once
2

3
#include <coro/expected.hpp>
4

5
#include <array>
6
#include <atomic>
7
#include <coroutine>
8
#include <mutex>
9
#include <optional>
10

11
namespace coro
12
{
13
namespace rb
14
{
15
enum class produce_result
16
{
17
    produced,
18
    ring_buffer_stopped
19
};
20

21
enum class consume_result
22
{
23
    ring_buffer_stopped
24
};
25
} // namespace rb
26

27
/**
28
 * @tparam element The type of element the ring buffer will store.  Note that this type should be
29
 *         cheap to move if possible as it is moved into and out of the buffer upon produce and
30
 *         consume operations.
31
 * @tparam num_elements The maximum number of elements the ring buffer can store, must be >= 1.
32
 */
33
template<typename element, size_t num_elements>
34
class ring_buffer
35
{
36
public:
37
    /**
38
     * static_assert If `num_elements` == 0.
39
     */
40
    ring_buffer() { static_assert(num_elements != 0, "num_elements cannot be zero"); }
6✔
41

42
    ~ring_buffer()
6✔
43
    {
44
        // Wake up anyone still using the ring buffer.
45
        notify_waiters();
6✔
46
    }
6✔
47

48
    ring_buffer(const ring_buffer<element, num_elements>&) = delete;
49
    ring_buffer(ring_buffer<element, num_elements>&&)      = delete;
50

51
    auto operator=(const ring_buffer<element, num_elements>&) noexcept -> ring_buffer<element, num_elements>& = delete;
52
    auto operator=(ring_buffer<element, num_elements>&&) noexcept -> ring_buffer<element, num_elements>&      = delete;
53

54
    struct produce_operation
55
    {
56
        produce_operation(ring_buffer<element, num_elements>& rb, element e) : m_rb(rb), m_e(std::move(e)) {}
10,997,993✔
57

58
        auto await_ready() noexcept -> bool
10,997,369✔
59
        {
60
            std::unique_lock lk{m_rb.m_mutex};
10,997,369✔
61
            return m_rb.try_produce_locked(lk, m_e);
11,000,013✔
62
        }
11,000,013✔
63

64
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
3,497,739✔
65
        {
66
            std::unique_lock lk{m_rb.m_mutex};
3,497,739✔
67
            // Its possible a consumer on another thread consumed an item between await_ready() and await_suspend()
68
            // so we must check to see if there is space again.
69
            if (m_rb.try_produce_locked(lk, m_e))
3,499,942✔
70
            {
71
                return false;
268✔
72
            }
73

74
            // Don't suspend if the stop signal has been set.
75
            if (m_rb.m_stopped.load(std::memory_order::acquire))
3,499,674✔
76
            {
77
                m_stopped = true;
×
78
                return false;
×
79
            }
80

81
            m_awaiting_coroutine   = awaiting_coroutine;
3,499,674✔
82
            m_next                 = m_rb.m_produce_waiters;
3,499,674✔
83
            m_rb.m_produce_waiters = this;
3,499,674✔
84
            return true;
3,499,674✔
85
        }
3,499,942✔
86

87
        /**
88
         * @return produce_result
89
         */
90
        auto await_resume() -> rb::produce_result
10,998,987✔
91
        {
92
            return !m_stopped ? rb::produce_result::produced : rb::produce_result::ring_buffer_stopped;
10,998,987✔
93
        }
94

95
    private:
96
        template<typename element_subtype, size_t num_elements_subtype>
97
        friend class ring_buffer;
98

99
        /// The ring buffer the element is being produced into.
100
        ring_buffer<element, num_elements>& m_rb;
101
        /// If the operation needs to suspend, the coroutine to resume when the element can be produced.
102
        std::coroutine_handle<> m_awaiting_coroutine;
103
        /// Linked list of produce operations that are awaiting to produce their element.
104
        produce_operation* m_next{nullptr};
105
        /// The element this produce operation is producing into the ring buffer.
106
        element m_e;
107
        /// Was the operation stopped?
108
        bool m_stopped{false};
109
    };
110

111
    struct consume_operation
112
    {
113
        explicit consume_operation(ring_buffer<element, num_elements>& rb) : m_rb(rb) {}
10,996,331✔
114

115
        auto await_ready() noexcept -> bool
10,995,160✔
116
        {
117
            std::unique_lock lk{m_rb.m_mutex};
10,995,160✔
118
            return m_rb.try_consume_locked(lk, this);
11,000,114✔
119
        }
10,998,353✔
120

121
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
2,499,594✔
122
        {
123
            std::unique_lock lk{m_rb.m_mutex};
2,499,594✔
124
            // We have to check again as there is a race condition between await_ready() and now on the mutex acquire.
125
            // It is possible that a producer added items between await_ready() and await_suspend().
126
            if (m_rb.try_consume_locked(lk, this))
2,499,594✔
127
            {
128
                return false;
48✔
129
            }
130

131
            // Don't suspend if the stop signal has been set.
132
            if (m_rb.m_stopped.load(std::memory_order::acquire))
2,499,546✔
133
            {
134
                m_stopped = true;
101✔
135
                return false;
101✔
136
            }
137
            m_awaiting_coroutine   = awaiting_coroutine;
2,499,445✔
138
            m_next                 = m_rb.m_consume_waiters;
2,499,445✔
139
            m_rb.m_consume_waiters = this;
2,499,445✔
140
            return true;
2,499,445✔
141
        }
2,499,594✔
142

143
        /**
144
         * @return The consumed element or std::nullopt if the consume has failed.
145
         */
146
        auto await_resume() -> expected<element, rb::consume_result>
10,998,230✔
147
        {
148
            if (m_stopped)
10,998,230✔
149
            {
150
                return unexpected<rb::consume_result>(rb::consume_result::ring_buffer_stopped);
101✔
151
            }
152

153
            return std::move(m_e);
10,998,129✔
154
        }
155

156
    private:
157
        template<typename element_subtype, size_t num_elements_subtype>
158
        friend class ring_buffer;
159

160
        /// The ring buffer to consume an element from.
161
        ring_buffer<element, num_elements>& m_rb;
162
        /// If the operation needs to suspend, the coroutine to resume when the element can be consumed.
163
        std::coroutine_handle<> m_awaiting_coroutine;
164
        /// Linked list of consume operations that are awaiting to consume an element.
165
        consume_operation* m_next{nullptr};
166
        /// The element this consume operation will consume.
167
        element m_e;
168
        /// Was the operation stopped?
169
        bool m_stopped{false};
170
    };
171

172
    /**
173
     * Produces the given element into the ring buffer.  This operation will suspend until a slot
174
     * in the ring buffer becomes available.
175
     * @param e The element to produce.
176
     */
177
    [[nodiscard]] auto produce(element e) -> produce_operation { return produce_operation{*this, std::move(e)}; }
10,997,956✔
178

179
    /**
180
     * Consumes an element from the ring buffer.  This operation will suspend until an element in
181
     * the ring buffer becomes available.
182
     */
183
    [[nodiscard]] auto consume() -> consume_operation { return consume_operation{*this}; }
10,996,546✔
184

185
    /**
186
     * @return The current number of elements contained in the ring buffer.
187
     */
188
    auto size() const -> size_t
482,246✔
189
    {
190
        std::atomic_thread_fence(std::memory_order::acquire);
191
        return m_used;
482,246✔
192
    }
193

194
    /**
195
     * @return True if the ring buffer contains zero elements.
196
     */
197
    auto empty() const -> bool { return size() == 0; }
482,347✔
198

199
    /**
200
     * Wakes up all currently awaiting producers and consumers.  Their await_resume() function
201
     * will return an expected consume result that the ring buffer has stopped.
202
     */
203
    auto notify_waiters() -> void
106✔
204
    {
205
        std::unique_lock lk{m_mutex};
106✔
206
        // Only wake up waiters once.
207
        if (m_stopped.load(std::memory_order::acquire))
107✔
208
        {
209
            return;
101✔
210
        }
211

212
        m_stopped.exchange(true, std::memory_order::release);
6✔
213

214
        while (m_produce_waiters != nullptr)
6✔
215
        {
216
            auto* to_resume      = m_produce_waiters;
×
217
            to_resume->m_stopped = true;
×
218
            m_produce_waiters    = m_produce_waiters->m_next;
×
219

220
            lk.unlock();
×
221
            to_resume->m_awaiting_coroutine.resume();
×
222
            lk.lock();
×
223
        }
224

225
        while (m_consume_waiters != nullptr)
6✔
226
        {
UNCOV
227
            auto* to_resume      = m_consume_waiters;
×
UNCOV
228
            to_resume->m_stopped = true;
×
UNCOV
229
            m_consume_waiters    = m_consume_waiters->m_next;
×
230

UNCOV
231
            lk.unlock();
×
UNCOV
232
            to_resume->m_awaiting_coroutine.resume();
×
UNCOV
233
            lk.lock();
×
234
        }
235
    }
107✔
236

237
private:
238
    friend produce_operation;
239
    friend consume_operation;
240

241
    std::mutex m_mutex{};
242

243
    std::array<element, num_elements> m_elements{};
244
    /// The current front pointer to an open slot if not full.
245
    size_t m_front{0};
246
    /// The current back pointer to the oldest item in the buffer if not empty.
247
    size_t m_back{0};
248
    /// The number of items in the ring buffer.
249
    size_t m_used{0};
250

251
    /// The LIFO list of produce waiters.
252
    produce_operation* m_produce_waiters{nullptr};
253
    /// The LIFO list of consume watier.
254
    consume_operation* m_consume_waiters{nullptr};
255

256
    std::atomic<bool> m_stopped{false};
257

258
    auto try_produce_locked(std::unique_lock<std::mutex>& lk, element& e) -> bool
14,499,955✔
259
    {
260
        if (m_used == num_elements)
14,499,955✔
261
        {
262
            return false;
6,999,616✔
263
        }
264

265
        m_elements[m_front] = std::move(e);
7,500,339✔
266
        m_front             = (m_front + 1) % num_elements;
7,500,339✔
267
        ++m_used;
7,500,339✔
268

269
        if (m_consume_waiters != nullptr)
7,500,339✔
270
        {
271
            consume_operation* to_resume = m_consume_waiters;
2,499,445✔
272
            m_consume_waiters            = m_consume_waiters->m_next;
2,499,445✔
273

274
            // Since the consume operation suspended it needs to be provided an element to consume.
275
            to_resume->m_e = std::move(m_elements[m_back]);
2,499,445✔
276
            m_back         = (m_back + 1) % num_elements;
2,499,445✔
277
            --m_used; // And we just consumed up another item.
2,499,445✔
278

279
            lk.unlock();
2,499,445✔
280
            to_resume->m_awaiting_coroutine.resume();
2,499,445✔
281
        }
282

283
        return true;
7,500,339✔
284
    }
285

286
    auto try_consume_locked(std::unique_lock<std::mutex>& lk, consume_operation* op) -> bool
13,499,708✔
287
    {
288
        if (m_used == 0)
13,499,708✔
289
        {
290
            return false;
4,999,140✔
291
        }
292

293
        op->m_e = std::move(m_elements[m_back]);
8,500,568✔
294
        m_back  = (m_back + 1) % num_elements;
8,500,568✔
295
        --m_used;
8,500,568✔
296

297
        if (m_produce_waiters != nullptr)
8,500,568✔
298
        {
299
            produce_operation* to_resume = m_produce_waiters;
3,499,674✔
300
            m_produce_waiters            = m_produce_waiters->m_next;
3,499,674✔
301

302
            // Since the produce operation suspended it needs to be provided a slot to place its element.
303
            m_elements[m_front] = std::move(to_resume->m_e);
3,499,674✔
304
            m_front             = (m_front + 1) % num_elements;
3,499,674✔
305
            ++m_used; // And we just produced another item.
3,499,674✔
306

307
            lk.unlock();
3,499,674✔
308
            to_resume->m_awaiting_coroutine.resume();
3,499,373✔
309
        }
310

311
        return true;
8,498,942✔
312
    }
313
};
314

315
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc