• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 15542916462

09 Jun 2025 07:30PM UTC coverage: 87.982%. First build
15542916462

Pull #342

github

web-flow
Merge 23b2abe6b into 770368f83
Pull Request #342: coro::queue::try_pop()

12 of 13 new or added lines in 1 file covered. (92.31%)

1596 of 1814 relevant lines covered (87.98%)

5494552.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.04
/include/coro/queue.hpp
1
#pragma once
2

3
#include "coro/concepts/executor.hpp"
4
#include "coro/expected.hpp"
5
#include "coro/sync_wait.hpp"
6
#include "coro/task.hpp"
7

8
#include <queue>
9

10
namespace coro
11
{
12

13
enum class queue_produce_result
14
{
15
    /**
16
     * @brief The item was successfully produced.
17
     */
18
    produced,
19
    /**
20
     * @brief The queue is shutting down or stopped, no more items are allowed to be produced.
21
     */
22
    stopped
23
};
24

25
enum class queue_consume_result
26
{
27
    /**
28
     * @brief The queue has shut down/stopped and the user should stop calling pop().
29
     */
30
    stopped,
31

32
    /**
33
     * @brief try_pop() failed to acquire the lock.
34
     */
35
    try_lock_failure,
36

37
    /**
38
     * @brief try_pop() acquired the lock but there are no items in the queue.
39
     */
40
    empty,
41
};
42

43
/**
44
 * @brief An unbounded queue. If the queue is empty and there are waiters to consume then
45
 *        there are no allocations and the coroutine context will simply be passed to the
46
 *        waiter. If there are no waiters the item being produced will be placed into the
47
 *        queue.
48
 *
49
 * @tparam element_type The type of items being produced and consumed.
50
 */
51
template<typename element_type>
52
class queue
53
{
54
private:
55
    enum running_state_t
56
    {
57
        running,
58
        draining,
59
        stopped,
60
    };
61

62
public:
63
    struct awaiter
64
    {
65
        explicit awaiter(queue<element_type>& q) noexcept : m_queue(q) {}
327✔
66

67
        auto await_ready() noexcept -> bool
327✔
68
        {
69
            // This awaiter is ready when it has actually acquired an element or it is shutting down.
70
            if (m_queue.m_running_state.load(std::memory_order::acquire) == running_state_t::stopped)
327✔
71
            {
72
                m_queue.m_mutex.unlock();
2✔
73
                return true; // await_resume with stopped
2✔
74
            }
75

76
            // If we have items return it.
77
            if (!m_queue.empty())
325✔
78
            {
79
                if constexpr (std::is_move_constructible_v<element_type>)
80
                {
81
                    m_element = std::move(m_queue.m_elements.front());
16✔
82
                }
83
                else
84
                {
85
                    m_element = m_queue.m_elements.front();
86
                }
87

88
                m_queue.m_elements.pop();
16✔
89
                m_queue.m_mutex.unlock();
16✔
90
                return true;
16✔
91
            }
92

93
            // Nothing available suspend, mutex will be unlocked in await_suspend.
94
            return false;
309✔
95
        }
96

97
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
309✔
98
        {
99
            // No element is ready, put ourselves on the waiter list and suspend.
100
            this->m_next         = m_queue.m_waiters;
309✔
101
            m_queue.m_waiters    = this;
309✔
102
            m_awaiting_coroutine = awaiting_coroutine;
309✔
103
            m_queue.m_mutex.unlock();
309✔
104
            return true;
309✔
105
        }
106

107
        [[nodiscard]] auto await_resume() noexcept -> expected<element_type, queue_consume_result>
327✔
108
        {
109
            if (m_element.has_value())
327✔
110
            {
111
                if constexpr (std::is_move_constructible_v<element_type>)
112
                {
113
                    return std::move(m_element.value());
321✔
114
                }
115
                else
116
                {
117
                    return m_element.value();
118
                }
119
            }
120
            else
121
            {
122
                // If we don't have an item the queue has stopped, the prior functions will have checked the state.
123
                return unexpected<queue_consume_result>(queue_consume_result::stopped);
6✔
124
            }
125
        }
126

127
        std::optional<element_type> m_element{std::nullopt};
128
        queue&                      m_queue;
129
        std::coroutine_handle<>     m_awaiting_coroutine{nullptr};
130
        awaiter*                    m_next{nullptr};
131
    };
132

133
    queue() {}
7✔
134
    ~queue()
7✔
135
    {
136
        coro::sync_wait(shutdown());
7✔
137
    }
7✔
138

139
    queue(const queue&)  = delete;
140
    queue(queue&& other) = delete;
141

142
    auto operator=(const queue&) -> queue&  = delete;
143
    auto operator=(queue&& other) -> queue& = delete;
144

145
    /**
146
     * @brief Determines if the queue is empty.
147
     *
148
     * @return true If the queue is empty.
149
     * @return false If the queue is not empty.
150
     */
151
    auto empty() const -> bool { return size() == 0; }
332✔
152

153
    /**
154
     * @brief Gets the number of elements in the queue.
155
     *
156
     * @return std::size_t The number of elements in the queue.
157
     */
158
    auto size() const -> std::size_t
333✔
159
    {
160
        std::atomic_thread_fence(std::memory_order::acquire);
161
        return m_elements.size();
333✔
162
    }
163

164
    /**
165
     * @brief Pushes the element into the queue. If the queue is empty and there are waiters
166
     *        then the element will be processed immediately by transfering the coroutine task
167
     *        context to the waiter.
168
     *
169
     * @param element The element being produced.
170
     * @return coro::task<queue_produce_result>
171
     */
172
    auto push(const element_type& element) -> coro::task<queue_produce_result>
319✔
173
    {
174
        // The general idea is to see if anyone is waiting, and if so directly transfer the element
175
        // to that waiter. If there is nobody waiting then move the element into the queue.
176
        auto lock = co_await m_mutex.scoped_lock();
177

178
        if (m_running_state.load(std::memory_order::acquire) != running_state_t::running)
179
        {
180
            co_return queue_produce_result::stopped;
181
        }
182

183
        // assert(m_element.empty())
184
        if (m_waiters != nullptr)
185
        {
186
            auto* waiter = std::exchange(m_waiters, m_waiters->m_next);
187
            lock.unlock();
188

189
            // Transfer the element directly to the awaiter.
190
            waiter->m_element = element;
191
            waiter->m_awaiting_coroutine.resume();
192
        }
193
        else
194
        {
195
            m_elements.push(element);
196
        }
197

198
        co_return queue_produce_result::produced;
199
    }
639✔
200

201
    /**
202
     * @brief Pushes the element into the queue. If the queue is empty and there are waiters
203
     *        then the element will be processed immediately by transfering the coroutine task
204
     *        context to the waiter.
205
     *
206
     * @param element The element being produced.
207
     * @return coro::task<queue_produce_result>
208
     */
209
    auto push(element_type&& element) -> coro::task<queue_produce_result>
4✔
210
    {
211
        auto lock = co_await m_mutex.scoped_lock();
212

213
        if (m_running_state.load(std::memory_order::acquire) != running_state_t::running)
214
        {
215
            co_return queue_produce_result::stopped;
216
        }
217

218
        if (m_waiters != nullptr)
219
        {
220
            auto* waiter = std::exchange(m_waiters, m_waiters->m_next);
221
            lock.unlock();
222

223
            // Transfer the element directly to the awaiter.
224
            waiter->m_element = std::move(element);
225
            waiter->m_awaiting_coroutine.resume();
226
        }
227
        else
228
        {
229
            m_elements.push(std::move(element));
230
        }
231

232
        co_return queue_produce_result::produced;
233
    }
8✔
234

235
    /**
236
     * @brief Emplaces an element into the queue. Has the same behavior as push if the queue
237
     *        is empty and has waiters.
238
     *
239
     * @param args The element's constructor argument types and values.
240
     * @return coro::task<queue_produce_result>
241
     */
242
    template<typename... args_type>
243
    auto emplace(args_type&&... args) -> coro::task<queue_produce_result>
244
    {
245
        auto lock = co_await m_mutex.scoped_lock();
246

247
        if (m_running_state.load(std::memory_order::acquire) != running_state_t::running)
248
        {
249
            co_return queue_produce_result::stopped;
250
        }
251

252
        if (m_waiters != nullptr)
253
        {
254
            auto* waiter = std::exchange(m_waiters, m_waiters->m_next);
255
            lock.unlock();
256

257
            waiter->m_element.emplace(std::forward<args_type>(args)...);
258
            waiter->m_awaiting_coroutine.resume();
259
        }
260
        else
261
        {
262
            m_elements.emplace(std::forward<args_type>(args)...);
263
        }
264

265
        co_return queue_produce_result::produced;
266
    }
267

268
    /**
269
     * @brief Pops the head element of the queue if available, or waits for one to be available.
270
     *
271
     * @return awaiter A waiter task that upon co_await complete returns an element or the queue
272
     *                 status that it is shut down.
273
     */
274
    [[nodiscard]] auto pop() -> coro::task<expected<element_type, queue_consume_result>>
324✔
275
    {
276
        co_await m_mutex.lock();
277
        co_return co_await awaiter{*this};
278
    }
646✔
279

280
    /**
281
     * @brief Tries to pop the head element of the queue if available. This can fail if it cannot
282
     *        acquire the lock via `coro::mutex::try_lock()` or if there are no elements available.
283
     *        Does not block.
284
     *
285
     * @return expected<element_type, queue_consume_result> The head element if the lock was acquired
286
     *         and an element is available.
287
     *         queue_consume_result::stopped if the queue has been shutdown.
288
     *         queue_consume_result::empty if lock was acquired but the queue is empty.
289
     *         queue_consume_result::try_lock_failure if the queue is in use and the lock could not be acquired.
290
     */
291
    [[nodiscard]] auto try_pop() -> expected<element_type, queue_consume_result>
3✔
292
    {
293
        if (m_mutex.try_lock())
3✔
294
        {
295
            // Capture mutex into a scoped lock to manage unlocking correctly.
296
            coro::scoped_lock lk{m_mutex};
3✔
297

298
            // Return if stopped.
299
            if (m_running_state.load(std::memory_order::acquire) == running_state_t::stopped)
3✔
300
            {
301
                return unexpected<queue_consume_result>(queue_consume_result::stopped);
1✔
302
            }
303

304
            // Return if empty.
305
            if (empty())
2✔
306
            {
307
                return unexpected<queue_consume_result>(queue_consume_result::empty);
1✔
308
            }
309

310
            expected<element_type, queue_consume_result> value;
1✔
311
            if constexpr (std::is_move_constructible_v<element_type>)
312
            {
313
                value = std::move(m_elements.front());
1✔
314
            }
315
            else
316
            {
317
                value = m_elements.front();
318
            }
319

320
            m_elements.pop();
1✔
321
            return value;
1✔
322
        }
3✔
323

NEW
324
        return unexpected<queue_consume_result>(queue_consume_result::try_lock_failure);
×
325
    }
326

327
    /**
328
     * @brief Shuts down the queue immediately discarding any elements that haven't been processed.
329
     *
330
     * @return coro::task<void>
331
     */
332
    auto shutdown() -> coro::task<void>
12✔
333
    {
334
        auto expected = m_running_state.load(std::memory_order::acquire);
335
        if (expected == running_state_t::stopped)
336
        {
337
            co_return;
338
        }
339

340
        // We use the lock to guarantee the m_running_state has propagated.
341
        auto lk = co_await m_mutex.scoped_lock();
342
        if (!m_running_state.compare_exchange_strong(
343
                expected, running_state_t::stopped, std::memory_order::acq_rel, std::memory_order::relaxed))
344
        {
345
            co_return;
346
        }
347

348
        auto* waiters = m_waiters;
349
        m_waiters = nullptr;
350
        lk.unlock();
351
        while (waiters != nullptr)
352
        {
353
            auto* next = waiters->m_next;
354
            waiters->m_awaiting_coroutine.resume();
355
            waiters = next;
356
        }
357
    }
24✔
358

359
    /**
360
     * @brief Shuts down the queue but waits for it to be drained so all elements are processed.
361
     *        Will yield on the given executor between checking if the queue is empty so the tasks
362
     *        can be processed.
363
     *
364
     * @tparam executor_t The executor type.
365
     * @param e The executor to yield this task to wait for elements to be processed.
366
     * @return coro::task<void>
367
     */
368
    template<coro::concepts::executor executor_type>
369
    auto shutdown_drain(std::shared_ptr<executor_type> e) -> coro::task<void>
2✔
370
    {
371
        auto lk = co_await m_mutex.scoped_lock();
372
        auto expected = running_state_t::running;
373
        if (!m_running_state.compare_exchange_strong(
374
                expected, running_state_t::draining, std::memory_order::acq_rel, std::memory_order::relaxed))
375
        {
376
            co_return;
377
        }
378
        lk.unlock();
379

380
        while (!empty())
381
        {
382
            co_await e->yield();
383
        }
384

385
        co_return co_await shutdown();
386
    }
4✔
387

388
private:
389
    friend awaiter;
390
    /// @brief The list of pop() awaiters.
391
    awaiter* m_waiters{nullptr};
392
    /// @brief Mutex for properly maintaining the queue.
393
    coro::mutex m_mutex{};
394
    /// @brief The underlying queue data structure.
395
    std::queue<element_type> m_elements{};
396
    /// @brief The current running state of the queue.
397
    std::atomic<running_state_t> m_running_state{running_state_t::running};
398
};
399

400
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc