• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 14064282151

25 Mar 2025 03:56PM UTC coverage: 86.209%. First build
14064282151

Pull #311

github

web-flow
Merge fea57f1e4 into c298be096
Pull Request #311: Adds coro::queue<T>

65 of 76 new or added lines in 2 files covered. (85.53%)

1469 of 1704 relevant lines covered (86.21%)

4105072.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.36
/include/coro/queue.hpp
1
#pragma once
2

3
#include "coro/expected.hpp"
4

5
#include <mutex>
6
#include <queue>
7

8
namespace coro
9
{
10

11
enum class queue_consume_result
12
{
13
    queue_stopped
14
};
15

16
template<typename element_type>
17
class queue
18
{
19
public:
20
    struct awaiter
21
    {
22
        awaiter(queue<element_type>& q) noexcept : m_queue(q) {}
326✔
23

24
        auto await_ready() noexcept -> bool
326✔
25
        {
26
            // This awaiter is ready when it has actually acquired an element or it is shutting down.
27
            if (m_queue.m_stopped.load(std::memory_order::acquire))
326✔
28
            {
29
                return false;
1✔
30
            }
31

32
            std::scoped_lock lock{m_queue.m_mutex};
325✔
33
            if (!m_queue.empty())
325✔
34
            {
35
                if constexpr (std::is_move_constructible_v<element_type>)
36
                {
37
                    m_element = std::move(m_queue.m_elements.front());
12✔
38
                }
39
                else
40
                {
41
                    m_element = m_queue.m_elements.front();
42
                }
43

44
                m_queue.m_elements.pop();
12✔
45
                return true;
12✔
46
            }
47

48
            return false;
313✔
49
        }
325✔
50

51
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
314✔
52
        {
53
            // Don't suspend if the stop signal has been set.
54
            if (m_queue.m_stopped.load(std::memory_order::acquire))
314✔
55
            {
56
                return false;
1✔
57
            }
58

59
            std::scoped_lock lock{m_queue.m_mutex};
313✔
60
            if (!m_queue.empty())
313✔
61
            {
62
                if constexpr (std::is_move_constructible_v<element_type>)
63
                {
NEW
64
                    m_element = std::move(m_queue.m_elements.front());
×
65
                }
66
                else
67
                {
68
                    m_element = m_queue.m_elements.front();
69
                }
70

NEW
71
                m_queue.m_elements.pop();
×
NEW
72
                return false;
×
73
            }
74

75
            // No element is ready, put ourselves on the waiter list and suspend.
76
            this->m_next         = m_queue.m_waiters;
313✔
77
            m_queue.m_waiters    = this;
313✔
78
            m_awaiting_coroutine = awaiting_coroutine;
313✔
79

80
            return true;
313✔
81
        }
313✔
82

83
        [[nodiscard]] auto await_resume() noexcept -> expected<element_type, queue_consume_result>
326✔
84
        {
85
            if (m_queue.m_stopped.load(std::memory_order::acquire))
326✔
86
            {
87
                return unexpected<queue_consume_result>(queue_consume_result::queue_stopped);
5✔
88
            }
89

90
            if constexpr (std::is_move_constructible_v<element_type>)
91
            {
92
                return std::move(m_element.value());
321✔
93
            }
94
            else
95
            {
96
                return m_element.value();
97
            }
98
        }
99

100
        std::optional<element_type> m_element{std::nullopt};
101
        queue&                      m_queue;
102
        std::coroutine_handle<>     m_awaiting_coroutine{nullptr};
103
        /// The next awaiter in line for this queue, nullptr if this is the end.
104
        awaiter* m_next{nullptr};
105
        bool     m_stopped{false};
106
    };
107

108
    queue() {}
5✔
109

110
    ~queue() {}
5✔
111

112
    queue(const queue&) = delete;
113
    queue(queue&& other)
114
    {
115
        m_waiters  = std::exchange(other.m_waiters, nullptr);
116
        m_mutex    = std::move(other.m_mutex);
117
        m_elements = std::move(other.m_elements);
118
    }
119

120
    auto operator=(const queue&) -> queue& = delete;
121
    auto operator=(queue&& other) -> queue&
122
    {
123
        if (std::addressof(other) != this)
124
        {
125
            m_waiters  = std::exchange(other.m_waiters, nullptr);
126
            m_mutex    = std::move(other.m_mutex);
127
            m_elements = std::move(other.m_elements);
128
        }
129

130
        return *this;
131
    }
132

133
    auto empty() const -> bool { return size() == 0; }
641✔
134

135
    auto size() const -> std::size_t
642✔
136
    {
137
        std::atomic_thread_fence(std::memory_order::acquire);
138
        return m_elements.size();
642✔
139
    }
140

141
    auto push(const element_type& element) -> void
320✔
142
    {
143
        // The general idea is to see if anyone is waiting, and if so directly transfer the element
144
        // to that waiter. If there is nobody waiting then move the element into the queue.
145

146
        std::unique_lock lock{m_mutex};
320✔
147

148
        if (m_waiters != nullptr)
320✔
149
        {
150
            awaiter* waiter = m_waiters;
309✔
151
            m_waiters       = m_waiters->m_next;
309✔
152
            lock.unlock();
309✔
153

154
            // Transfer the element directly to the awaiter.
155
            waiter->m_element = element;
308✔
156
            waiter->m_awaiting_coroutine.resume();
309✔
157
        }
158
        else
159
        {
160
            m_elements.push(element);
11✔
161
        }
162
    }
320✔
163

164
    auto push(element_type&& element) -> void
2✔
165
    {
166
        std::unique_lock lock{m_mutex};
2✔
167

168
        if (m_waiters != nullptr)
2✔
169
        {
NEW
170
            awaiter* waiter = m_waiters;
×
NEW
171
            m_waiters       = m_waiters->m_next;
×
NEW
172
            lock.unlock();
×
173

174
            // Transfer the element directly to the awaiter.
NEW
175
            waiter->m_element = std::move(element);
×
NEW
176
            waiter->m_awaiting_coroutine.resume();
×
177
        }
178
        else
179
        {
180
            m_elements.push(std::move(element));
2✔
181
        }
182
    }
2✔
183

184
    template<class... args_type>
185
    auto emplace(args_type&&... args) -> void
186
    {
187
        std::unique_lock lock{m_mutex};
188

189
        if (m_waiters != nullptr)
190
        {
191
            awaiter* waiter = m_waiters;
192
            m_waiters       = m_waiters->m_next;
193
            lock.unlock();
194

195
            waiter->m_element.emplace(std::forward<args_type>(args)...);
196
            waiter->m_awaiting_coroutine.resume();
197
        }
198
        else
199
        {
200
            m_elements.emplace(std::forward<args_type>(args)...);
201
        }
202
    }
203

204
    [[nodiscard]] auto pop() -> awaiter { return awaiter{*this}; }
326✔
205

206
    auto notify_waiters() -> void
3✔
207
    {
208
        auto expected = false;
3✔
209
        if (!m_stopped.compare_exchange_strong(expected, true, std::memory_order::acq_rel, std::memory_order::relaxed))
3✔
210
        {
NEW
211
            return;
×
212
        }
213

214
        std::unique_lock lock{m_mutex};
3✔
215
        while (m_waiters != nullptr)
7✔
216
        {
217
            auto* to_resume = m_waiters;
4✔
218
            m_waiters       = m_waiters->m_next;
4✔
219

220
            lock.unlock();
4✔
221
            to_resume->m_awaiting_coroutine.resume();
4✔
222
            lock.lock();
4✔
223
        }
224
    }
3✔
225

226
private:
227
    friend awaiter;
228
    awaiter*                 m_waiters{nullptr};
229
    std::mutex               m_mutex{};
230
    std::queue<element_type> m_elements{};
231
    std::atomic<bool>        m_stopped{false};
232
};
233

234
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc