• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 18877428141

28 Oct 2025 02:03PM UTC coverage: 82.271%. First build
18877428141

Pull #407

github

web-flow
Merge b9eb98ad5 into f3ca22039
Pull Request #407: coro::thread_pool uses lockless mpmc queue

612 of 868 new or added lines in 5 files covered. (70.51%)

2232 of 2713 relevant lines covered (82.27%)

10440887.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.24
/include/coro/thread_pool.hpp
1
#pragma once
2

3
#include "coro/detail/vendor/cameron314/concurrentqueue/blockingconcurrentqueue.h"
4
#include "coro/concepts/range_of.hpp"
5
#include "coro/task.hpp"
6

7
#include <atomic>
8
#include <condition_variable>
9
#include <coroutine>
10
#include <deque>
11
#include <functional>
12
#include <mutex>
13
#include <optional>
14
#include <ranges>
15
#include <thread>
16
#include <variant>
17
#include <vector>
18

19
namespace coro
20
{
21
/**
22
 * Creates a thread pool that executes arbitrary coroutine tasks in a FIFO scheduler policy.
23
 * The thread pool by default will create an execution thread per available core on the system.
24
 *
25
 * When shutting down, either by the thread pool destructing or by manually calling shutdown()
26
 * the thread pool will stop accepting new tasks but will complete all tasks that were scheduled
27
 * prior to the shutdown request.
28
 */
29
class thread_pool
30
{
31
    struct private_constructor
32
    {
33
        explicit private_constructor() = default;
34
    };
35
public:
36
    /**
37
     * A schedule operation is an awaitable type with a coroutine to resume the task scheduled on one of
38
     * the executor threads.
39
     */
40
    class schedule_operation
41
    {
42
        friend class thread_pool;
43
        /**
44
         * Only thread_pools can create schedule operations when a task is being scheduled.
45
         * @param tp The thread pool that created this schedule operation.
46
         */
47
        explicit schedule_operation(thread_pool& tp) noexcept;
48

49
    public:
50
        /**
51
         * Schedule operations always pause so the executing thread can be switched.
52
         */
53
        auto await_ready() noexcept -> bool { return false; }
46,269,088✔
54

55
        /**
56
         * Suspending always returns to the caller (using void return of await_suspend()) and
57
         * stores the coroutine internally for the executing thread to resume from.
58
         */
59
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void;
60

61
        /**
62
         * no-op as this is the function called first by the thread pool's executing thread.
63
         */
64
        auto await_resume() noexcept -> void {}
46,256,766✔
65

66
    private:
67
        /// @brief The thread pool that this schedule operation will execute on.
68
        thread_pool& m_thread_pool;
69
    };
70

71
    struct options
72
    {
73
        /// The number of executor threads for this thread pool.  Uses the hardware concurrency
74
        /// value by default.
75
        uint32_t thread_count = std::thread::hardware_concurrency();
76
        /// Functor to call on each executor thread upon starting execution.  The parameter is the
77
        /// thread's ID assigned to it by the thread pool.
78
        std::function<void(std::size_t)> on_thread_start_functor = nullptr;
79
        /// Functor to call on each executor thread upon stopping execution.  The parameter is the
80
        /// thread's ID assigned to it by the thread pool.
81
        std::function<void(std::size_t)> on_thread_stop_functor = nullptr;
82
    };
83

84
    /**
85
     * @see thread_pool::make_unique
86
     */
87
    explicit thread_pool(options&& opts, private_constructor);
88

89
    /**
90
     * @brief Creates a thread pool executor.
91
     *
92
     * @param opts The thread pool's options.
93
     * @return std::unique_ptr<thread_pool>
94
     */
95
    static auto make_unique(
96
        options opts = options{
97
            .thread_count            = std::thread::hardware_concurrency(),
98
            .on_thread_start_functor = nullptr,
99
            .on_thread_stop_functor  = nullptr}) -> std::unique_ptr<thread_pool>;
100

101
    thread_pool(const thread_pool&)                    = delete;
102
    thread_pool(thread_pool&&)                         = delete;
103
    auto operator=(const thread_pool&) -> thread_pool& = delete;
104
    auto operator=(thread_pool&&) -> thread_pool&      = delete;
105

106
    virtual ~thread_pool();
107

108
    /**
109
     * @return The number of executor threads for processing tasks.
110
     */
111
    auto thread_count() const noexcept -> size_t { return m_threads.size(); }
112

113
    /**
114
     * Schedules the currently executing coroutine to be run on this thread pool.  This must be
115
     * called from within the coroutines function body to schedule the coroutine on the thread pool.
116
     * @throw std::runtime_error If the thread pool is `shutdown()` scheduling new tasks is not permitted.
117
     * @return The schedule operation to switch from the calling scheduling thread to the executor thread
118
     *         pool thread.
119
     */
120
    [[nodiscard]] auto schedule() -> schedule_operation;
121

122
    /**
123
     * Spawns the given task to be run on this thread pool, the task is detached from the user.
124
     * @param task The task to spawn onto the thread pool.
125
     * @return True if the task has been spawned onto this thread pool.
126
     */
127
    auto spawn(coro::task<void>&& task) noexcept -> bool;
128

129
    /**
130
     * Schedules a task on the thread pool and returns another task that must be awaited on for completion.
131
     * This can be done via co_await in a coroutine context or coro::sync_wait() outside of coroutine context.
132
     * @tparam return_type The return value of the task.
133
     * @param task The task to schedule on the thread pool.
134
     * @return The task to await for the input task to complete.
135
     */
136
    template<typename return_type>
137
    [[nodiscard]] auto schedule(coro::task<return_type> task) -> coro::task<return_type>
5✔
138
    {
139
        co_await schedule();
140
        co_return co_await task;
141
    }
10✔
142

143
    /**
144
     * Schedules any coroutine handle that is ready to be resumed.
145
     * @param handle The coroutine handle to schedule.
146
     * @return True if the coroutine is resumed, false if its a nullptr or the coroutine is already done.
147
     */
148
    auto resume(std::coroutine_handle<> handle) noexcept -> bool;
149

150
    /**
151
     * Schedules the set of coroutine handles that are ready to be resumed.
152
     * @param handles The coroutine handles to schedule.
153
     * @param uint64_t The number of tasks resumed, if any are null they are discarded.
154
     */
155
    template<coro::concepts::range_of<std::coroutine_handle<>> range_type>
156
    auto resume(const range_type& handles) noexcept -> uint64_t
886,131✔
157
    {
158
        m_size.fetch_add(std::size(handles), std::memory_order::release);
886,131✔
159

160
        size_t null_handles{0};
886,092✔
161

162
        for (const auto& handle : handles)
6,486,615✔
163
        {
164
            if (handle != nullptr) [[likely]]
5,600,526✔
165
            {
166
                m_queue.enqueue(handle);
5,600,625✔
167
            }
168
            else
169
            {
NEW
170
                ++null_handles;
×
171
            }
172
        }
173

174
        if (null_handles > 0)
886,094✔
175
        {
176
            m_size.fetch_sub(null_handles, std::memory_order::release);
×
177
        }
178

179
        return std::size(handles) - null_handles;
886,094✔
180
    }
181

182
    /**
183
     * Immediately yields the current task and places it at the end of the queue of tasks waiting
184
     * to be processed.  This will immediately be picked up again once it naturally goes through the
185
     * FIFO task queue.  This function is useful to yielding long processing tasks to let other tasks
186
     * get processing time.
187
     */
188
    [[nodiscard]] auto yield() -> schedule_operation { return schedule(); }
11,077,784✔
189

190
    /**
191
     * Shuts down the thread pool.  This will finish any tasks scheduled prior to calling this
192
     * function but will prevent the thread pool from scheduling any new tasks.  This call is
193
     * blocking and will wait until all inflight tasks are completed before returning.
194
     */
195
    auto shutdown() noexcept -> void;
196

197
    [[nodiscard]] auto is_shutdown() const -> bool { return m_shutdown_requested.load(std::memory_order::acquire); }
198

199
    /**
200
     * @return The number of tasks waiting in the task queue + the executing tasks.
201
     */
202
    auto size() const noexcept -> std::size_t { return m_size.load(std::memory_order::acquire); }
49,914✔
203

204
    /**
205
     * @return True if the task queue is empty and zero tasks are currently executing.
206
     */
207
    auto empty() const noexcept -> bool { return size() == 0; }
6✔
208

209
    /**
210
     * @return The approximate number of tasks waiting in the task queue to be executed.
211
     */
212
    auto queue_size() const noexcept -> std::size_t
213
    {
214
        std::atomic_thread_fence(std::memory_order::acquire);
215
        return m_queue.size_approx();
216
    }
217

218
    /**
219
     * @return True if the task queue is currently empty.
220
     */
221
    auto queue_empty() const noexcept -> bool { return queue_size() == 0; }
222

223
private:
224
    /// The configuration options.
225
    options m_opts;
226
    /// The background executor threads.
227
    std::vector<std::thread> m_threads;
228
    /// The tasks waiting to be executed.
229
    moodycamel::BlockingConcurrentQueue<std::coroutine_handle<>> m_queue;
230

231
    /**
232
     * Each background thread runs from this function.
233
     * @param idx The executor's idx for internal data structure accesses.
234
     */
235
    auto executor(std::size_t idx) -> void;
236
    /**
237
     * @param handle Schedules the given coroutine to be executed upon the first available thread.
238
     */
239
    auto schedule_impl(std::coroutine_handle<> handle) noexcept -> void;
240

241
    /// The number of tasks in the queue + currently executing.
242
    std::atomic<std::size_t> m_size{0};
243
    /// Has the thread pool been requested to shut down?
244
    std::atomic<bool> m_shutdown_requested{false};
245
};
246

247
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc