• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 14068483290

25 Mar 2025 07:30PM UTC coverage: 86.241%. First build
14068483290

Pull #311

github

web-flow
Merge 4e92de6d0 into c298be096
Pull Request #311: Adds coro::queue<T>

70 of 82 new or added lines in 2 files covered. (85.37%)

1473 of 1708 relevant lines covered (86.24%)

4086678.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.11
/include/coro/queue.hpp
1
#pragma once
2

3
#include "coro/expected.hpp"
4

5
#include <mutex>
6
#include <queue>
7

8
namespace coro
9
{
10

11
/// @brief Enum used for push() expected result.
12
enum class queue_produce_result
13
{
14
    produced,
15
    queue_stopped
16
};
17

18
/// @brief Enum used for pop() expected result when the queue is shutting down.
19
enum class queue_consume_result
20
{
21
    queue_stopped
22
};
23

24
/// @brief
25
/// @tparam element_type
26
template<typename element_type>
27
class queue
28
{
29
public:
30
    struct awaiter
31
    {
32
        explicit awaiter(queue<element_type>& q) noexcept : m_queue(q) {}
324✔
33

34
        auto await_ready() noexcept -> bool
323✔
35
        {
36
            // This awaiter is ready when it has actually acquired an element or it is shutting down.
37
            if (m_queue.m_stopped.load(std::memory_order::acquire))
323✔
38
            {
39
                return false;
2✔
40
            }
41

42
            std::scoped_lock lock{m_queue.m_mutex};
323✔
43
            if (!m_queue.empty())
325✔
44
            {
45
                if constexpr (std::is_move_constructible_v<element_type>)
46
                {
47
                    m_element = std::move(m_queue.m_elements.front());
12✔
48
                }
49
                else
50
                {
51
                    m_element = m_queue.m_elements.front();
52
                }
53

54
                m_queue.m_elements.pop();
12✔
55
                return true;
12✔
56
            }
57

58
            return false;
313✔
59
        }
325✔
60

61
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
314✔
62
        {
63
            // Don't suspend if the stop signal has been set.
64
            if (m_queue.m_stopped.load(std::memory_order::acquire))
314✔
65
            {
66
                return false;
2✔
67
            }
68

69
            std::scoped_lock lock{m_queue.m_mutex};
312✔
70
            if (!m_queue.empty())
313✔
71
            {
72
                if constexpr (std::is_move_constructible_v<element_type>)
73
                {
NEW
74
                    m_element = std::move(m_queue.m_elements.front());
×
75
                }
76
                else
77
                {
78
                    m_element = m_queue.m_elements.front();
79
                }
80

NEW
81
                m_queue.m_elements.pop();
×
NEW
82
                return false;
×
83
            }
84

85
            // No element is ready, put ourselves on the waiter list and suspend.
86
            this->m_next         = m_queue.m_waiters;
313✔
87
            m_queue.m_waiters    = this;
313✔
88
            m_awaiting_coroutine = awaiting_coroutine;
313✔
89

90
            return true;
313✔
91
        }
313✔
92

93
        [[nodiscard]] auto await_resume() noexcept -> expected<element_type, queue_consume_result>
326✔
94
        {
95
            if (m_queue.m_stopped.load(std::memory_order::acquire))
326✔
96
            {
97
                return unexpected<queue_consume_result>(queue_consume_result::queue_stopped);
6✔
98
            }
99

100
            if constexpr (std::is_move_constructible_v<element_type>)
101
            {
102
                return std::move(m_element.value());
320✔
103
            }
104
            else
105
            {
106
                return m_element.value();
107
            }
108
        }
109

110
        std::optional<element_type> m_element{std::nullopt};
111
        queue&                      m_queue;
112
        std::coroutine_handle<>     m_awaiting_coroutine{nullptr};
113
        /// The next awaiter in line for this queue, nullptr if this is the end.
114
        awaiter* m_next{nullptr};
115
        bool     m_stopped{false};
116
    };
117

118
    queue() {}
6✔
119
    ~queue() {}
6✔
120

121
    queue(const queue&) = delete;
122
    queue(queue&& other)
123
    {
124
        m_waiters  = std::exchange(other.m_waiters, nullptr);
125
        m_mutex    = std::move(other.m_mutex);
126
        m_elements = std::move(other.m_elements);
127
    }
128

129
    auto operator=(const queue&) -> queue& = delete;
130
    auto operator=(queue&& other) -> queue&
131
    {
132
        if (std::addressof(other) != this)
133
        {
134
            m_waiters  = std::exchange(other.m_waiters, nullptr);
135
            m_mutex    = std::move(other.m_mutex);
136
            m_elements = std::move(other.m_elements);
137
        }
138

139
        return *this;
140
    }
141

142
    /// @brief Determines if the queue is empty.
143
    /// @return True if the queue has no elements.
144
    auto empty() const -> bool { return size() == 0; }
642✔
145

146
    /// @brief Gets the current number of elements in the queue.
147
    /// @return The number of elements in the queue.
148
    auto size() const -> std::size_t
643✔
149
    {
150
        std::atomic_thread_fence(std::memory_order::acquire);
151
        return m_elements.size();
643✔
152
    }
153

154
    /// @brief Pushes the element into the queue.
155
    /// @param element The element to push.
156
    /// @return void.
157
    auto push(const element_type& element) -> queue_produce_result
320✔
158
    {
159
        if (m_stopped.load(std::memory_order::acquire))
320✔
160
        {
NEW
161
            return queue_produce_result::queue_stopped;
×
162
        }
163

164
        // The general idea is to see if anyone is waiting, and if so directly transfer the element
165
        // to that waiter. If there is nobody waiting then move the element into the queue.
166
        std::unique_lock lock{m_mutex};
320✔
167

168
        if (m_waiters != nullptr)
320✔
169
        {
170
            awaiter* waiter = m_waiters;
309✔
171
            m_waiters       = m_waiters->m_next;
309✔
172
            lock.unlock();
309✔
173

174
            // Transfer the element directly to the awaiter.
175
            waiter->m_element = element;
309✔
176
            waiter->m_awaiting_coroutine.resume();
309✔
177
        }
178
        else
179
        {
180
            m_elements.push(element);
11✔
181
        }
182

183
        return queue_produce_result::produced;
320✔
184
    }
320✔
185

186
    /// @brief Pushes the element into the queue.
187
    /// @param element The element to push.
188
    /// @return void.
189
    auto push(element_type&& element) -> queue_produce_result
3✔
190
    {
191
        if (m_stopped.load(std::memory_order::acquire))
3✔
192
        {
193
            return queue_produce_result::queue_stopped;
1✔
194
        }
195

196
        std::unique_lock lock{m_mutex};
2✔
197

198
        if (m_waiters != nullptr)
2✔
199
        {
NEW
200
            awaiter* waiter = m_waiters;
×
NEW
201
            m_waiters       = m_waiters->m_next;
×
NEW
202
            lock.unlock();
×
203

204
            // Transfer the element directly to the awaiter.
NEW
205
            waiter->m_element = std::move(element);
×
NEW
206
            waiter->m_awaiting_coroutine.resume();
×
207
        }
208
        else
209
        {
210
            m_elements.push(std::move(element));
2✔
211
        }
212

213
        return queue_produce_result::produced;
2✔
214
    }
2✔
215

216
    /// @brief Emplaces the element into the queue.
217
    /// @tparam ...args_type The element's constructor argument types.
218
    /// @param ...args The element's constructor arguments.
219
    /// @return void.
220
    template<class... args_type>
221
    auto emplace(args_type&&... args) -> queue_produce_result
222
    {
223
        if (m_stopped.load(std::memory_order::acquire))
224
        {
225
            return queue_produce_result::queue_stopped;
226
        }
227

228
        std::unique_lock lock{m_mutex};
229

230
        if (m_waiters != nullptr)
231
        {
232
            awaiter* waiter = m_waiters;
233
            m_waiters       = m_waiters->m_next;
234
            lock.unlock();
235

236
            waiter->m_element.emplace(std::forward<args_type>(args)...);
237
            waiter->m_awaiting_coroutine.resume();
238
        }
239
        else
240
        {
241
            m_elements.emplace(std::forward<args_type>(args)...);
242
        }
243

244
        return queue_produce_result::produced;
245
    }
246

247
    /// @brief Pops the head element of the queue if available, or waits for one to
248
    ///        be available.
249
    /// @return The head element, or coro::queue_consume_result::queue_stopped if the
250
    ///         queue has been shutdown.
251
    [[nodiscard]] auto pop() -> awaiter { return awaiter{*this}; }
325✔
252

253
    /// @brief Shutsdown the queue and notifies all waiters of pop() to stop waiting.
254
    /// @return void.
255
    auto shutdown_notify_waiters() -> void
4✔
256
    {
257
        auto expected = false;
4✔
258
        if (!m_stopped.compare_exchange_strong(expected, true, std::memory_order::acq_rel, std::memory_order::relaxed))
4✔
259
        {
NEW
260
            return;
×
261
        }
262

263
        std::unique_lock lock{m_mutex};
4✔
264
        while (m_waiters != nullptr)
8✔
265
        {
266
            auto* to_resume = m_waiters;
4✔
267
            m_waiters       = m_waiters->m_next;
4✔
268

269
            lock.unlock();
4✔
270
            to_resume->m_awaiting_coroutine.resume();
4✔
271
            lock.lock();
4✔
272
        }
273
    }
4✔
274

275
private:
276
    friend awaiter;
277
    /// @brief The list of pop() awaiters.
278
    awaiter* m_waiters{nullptr};
279
    /// @brief Mutex for properly maintaining the queue.
280
    std::mutex m_mutex{};
281
    /// @brief The underlying queue datastructure.
282
    std::queue<element_type> m_elements{};
283
    /// @brief Has this queue been shutdown?
284
    std::atomic<bool> m_stopped{false};
285
};
286

287
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc