• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 14090895846

26 Mar 2025 06:33PM UTC coverage: 86.507%. First build
14090895846

Pull #311

github

web-flow
Merge 2a5ee7fd5 into 8fc3e2712
Pull Request #311: Adds coro::queue<T>

73 of 82 new or added lines in 2 files covered. (89.02%)

1481 of 1712 relevant lines covered (86.51%)

4077289.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.28
/include/coro/queue.hpp
1
#pragma once
2

3
#include "coro/expected.hpp"
4

5
#include <mutex>
6
#include <queue>
7

8
namespace coro
9
{
10

11
/// @brief Enum used for push() expected result.
12
enum class queue_produce_result
13
{
14
    produced,
15
    queue_stopped
16
};
17

18
/// @brief Enum used for pop() expected result when the queue is shutting down.
19
enum class queue_consume_result
20
{
21
    queue_stopped
22
};
23

24
/// @brief
25
/// @tparam element_type
26
template<typename element_type>
27
class queue
28
{
29
public:
30
    struct awaiter
31
    {
32
        explicit awaiter(queue<element_type>& q) noexcept : m_queue(q) {}
326✔
33

34
        auto await_ready() noexcept -> bool
326✔
35
        {
36
            // This awaiter is ready when it has actually acquired an element or it is shutting down.
37
            if (m_queue.m_stopped.load(std::memory_order::acquire))
326✔
38
            {
39
                return false;
2✔
40
            }
41

42
            std::scoped_lock lock{m_queue.m_mutex};
323✔
43
            if (!m_queue.empty())
325✔
44
            {
45
                if constexpr (std::is_move_constructible_v<element_type>)
46
                {
47
                    m_element = std::move(m_queue.m_elements.front());
12✔
48
                }
49
                else
50
                {
51
                    m_element = m_queue.m_elements.front();
52
                }
53

54
                m_queue.m_elements.pop();
12✔
55
                return true;
12✔
56
            }
57

58
            return false;
313✔
59
        }
325✔
60

61
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
314✔
62
        {
63
            // Don't suspend if the stop signal has been set.
64
            if (m_queue.m_stopped.load(std::memory_order::acquire))
314✔
65
            {
66
                return false;
2✔
67
            }
68

69
            std::scoped_lock lock{m_queue.m_mutex};
313✔
70
            if (!m_queue.empty())
313✔
71
            {
72
                if constexpr (std::is_move_constructible_v<element_type>)
73
                {
74
                    m_element = std::move(m_queue.m_elements.front());
1✔
75
                }
76
                else
77
                {
78
                    m_element = m_queue.m_elements.front();
79
                }
80

81
                m_queue.m_elements.pop();
1✔
82
                return false;
1✔
83
            }
84

85
            // No element is ready, put ourselves on the waiter list and suspend.
86
            this->m_next         = m_queue.m_waiters;
312✔
87
            m_queue.m_waiters    = this;
312✔
88
            m_awaiting_coroutine = awaiting_coroutine;
312✔
89

90
            return true;
312✔
91
        }
313✔
92

93
        [[nodiscard]] auto await_resume() noexcept -> expected<element_type, queue_consume_result>
326✔
94
        {
95
            if (m_queue.m_stopped.load(std::memory_order::acquire))
326✔
96
            {
97
                return unexpected<queue_consume_result>(queue_consume_result::queue_stopped);
6✔
98
            }
99

100
            if constexpr (std::is_move_constructible_v<element_type>)
101
            {
102
                return std::move(m_element.value());
319✔
103
            }
104
            else
105
            {
106
                return m_element.value();
107
            }
108
        }
109

110
        std::optional<element_type> m_element{std::nullopt};
111
        queue&                      m_queue;
112
        std::coroutine_handle<>     m_awaiting_coroutine{nullptr};
113
        /// The next awaiter in line for this queue, nullptr if this is the end.
114
        awaiter* m_next{nullptr};
115
    };
116

117
    queue() {}
6✔
118
    ~queue() {}
6✔
119

120
    queue(const queue&) = delete;
121
    queue(queue&& other)
122
    {
123
        m_waiters  = std::exchange(other.m_waiters, nullptr);
124
        m_mutex    = std::move(other.m_mutex);
125
        m_elements = std::move(other.m_elements);
126
    }
127

128
    auto operator=(const queue&) -> queue& = delete;
129
    auto operator=(queue&& other) -> queue&
130
    {
131
        if (std::addressof(other) != this)
132
        {
133
            m_waiters  = std::exchange(other.m_waiters, nullptr);
134
            m_mutex    = std::move(other.m_mutex);
135
            m_elements = std::move(other.m_elements);
136
        }
137

138
        return *this;
139
    }
140

141
    /// @brief Determines if the queue is empty.
142
    /// @return True if the queue has no elements.
143
    auto empty() const -> bool { return size() == 0; }
642✔
144

145
    /// @brief Gets the current number of elements in the queue.
146
    /// @return The number of elements in the queue.
147
    auto size() const -> std::size_t
643✔
148
    {
149
        std::atomic_thread_fence(std::memory_order::acquire);
150
        return m_elements.size();
643✔
151
    }
152

153
    /// @brief Pushes the element into the queue.
154
    /// @param element The element to push.
155
    /// @return void.
156
    auto push(const element_type& element) -> queue_produce_result
320✔
157
    {
158
        if (m_stopped.load(std::memory_order::acquire))
320✔
159
        {
NEW
160
            return queue_produce_result::queue_stopped;
×
161
        }
162

163
        // The general idea is to see if anyone is waiting, and if so directly transfer the element
164
        // to that waiter. If there is nobody waiting then move the element into the queue.
165
        std::unique_lock lock{m_mutex};
320✔
166

167
        if (m_waiters != nullptr)
320✔
168
        {
169
            awaiter* waiter = m_waiters;
308✔
170
            m_waiters       = m_waiters->m_next;
308✔
171
            lock.unlock();
308✔
172

173
            // Transfer the element directly to the awaiter.
174
            waiter->m_element = element;
308✔
175
            waiter->m_awaiting_coroutine.resume();
307✔
176
        }
177
        else
178
        {
179
            m_elements.push(element);
12✔
180
        }
181

182
        return queue_produce_result::produced;
320✔
183
    }
320✔
184

185
    /// @brief Pushes the element into the queue.
186
    /// @param element The element to push.
187
    /// @return void.
188
    auto push(element_type&& element) -> queue_produce_result
3✔
189
    {
190
        if (m_stopped.load(std::memory_order::acquire))
3✔
191
        {
192
            return queue_produce_result::queue_stopped;
1✔
193
        }
194

195
        std::unique_lock lock{m_mutex};
2✔
196

197
        if (m_waiters != nullptr)
2✔
198
        {
NEW
199
            awaiter* waiter = m_waiters;
×
NEW
200
            m_waiters       = m_waiters->m_next;
×
NEW
201
            lock.unlock();
×
202

203
            // Transfer the element directly to the awaiter.
NEW
204
            waiter->m_element = std::move(element);
×
NEW
205
            waiter->m_awaiting_coroutine.resume();
×
206
        }
207
        else
208
        {
209
            m_elements.push(std::move(element));
2✔
210
        }
211

212
        return queue_produce_result::produced;
2✔
213
    }
2✔
214

215
    /// @brief Emplaces the element into the queue.
216
    /// @tparam ...args_type The element's constructor argument types.
217
    /// @param ...args The element's constructor arguments.
218
    /// @return void.
219
    template<class... args_type>
220
    auto emplace(args_type&&... args) -> queue_produce_result
221
    {
222
        if (m_stopped.load(std::memory_order::acquire))
223
        {
224
            return queue_produce_result::queue_stopped;
225
        }
226

227
        std::unique_lock lock{m_mutex};
228

229
        if (m_waiters != nullptr)
230
        {
231
            awaiter* waiter = m_waiters;
232
            m_waiters       = m_waiters->m_next;
233
            lock.unlock();
234

235
            waiter->m_element.emplace(std::forward<args_type>(args)...);
236
            waiter->m_awaiting_coroutine.resume();
237
        }
238
        else
239
        {
240
            m_elements.emplace(std::forward<args_type>(args)...);
241
        }
242

243
        return queue_produce_result::produced;
244
    }
245

246
    /// @brief Pops the head element of the queue if available, or waits for one to
247
    ///        be available.
248
    /// @return The head element, or coro::queue_consume_result::queue_stopped if the
249
    ///         queue has been shutdown.
250
    [[nodiscard]] auto pop() -> awaiter { return awaiter{*this}; }
326✔
251

252
    /// @brief Shutsdown the queue and notifies all waiters of pop() to stop waiting.
253
    /// @return void.
254
    auto shutdown_notify_waiters() -> void
4✔
255
    {
256
        auto expected = false;
4✔
257
        if (!m_stopped.compare_exchange_strong(expected, true, std::memory_order::acq_rel, std::memory_order::relaxed))
4✔
258
        {
NEW
259
            return;
×
260
        }
261

262
        std::unique_lock lock{m_mutex};
4✔
263
        while (m_waiters != nullptr)
8✔
264
        {
265
            auto* to_resume = m_waiters;
4✔
266
            m_waiters       = m_waiters->m_next;
4✔
267

268
            lock.unlock();
4✔
269
            to_resume->m_awaiting_coroutine.resume();
4✔
270
            lock.lock();
4✔
271
        }
272
    }
4✔
273

274
private:
275
    friend awaiter;
276
    /// @brief The list of pop() awaiters.
277
    awaiter* m_waiters{nullptr};
278
    /// @brief Mutex for properly maintaining the queue.
279
    std::mutex m_mutex{};
280
    /// @brief The underlying queue datastructure.
281
    std::queue<element_type> m_elements{};
282
    /// @brief Has this queue been shutdown?
283
    std::atomic<bool> m_stopped{false};
284
};
285

286
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc