• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 18892717725

29 Oct 2025 12:03AM UTC coverage: 80.879%. First build
18892717725

Pull #407

github

web-flow
Merge 63a3d33f2 into f3ca22039
Pull Request #407: coro::thread_pool uses lockless mpmc queue

593 of 883 new or added lines in 7 files covered. (67.16%)

2191 of 2709 relevant lines covered (80.88%)

10826691.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.24
/include/coro/thread_pool.hpp
1
#pragma once
2

3
#include "coro/detail/vendor/cameron314/concurrentqueue/blockingconcurrentqueue.h"
4
#include "coro/concepts/range_of.hpp"
5
#include "coro/task.hpp"
6

7
#include <atomic>
8
#include <condition_variable>
9
#include <coroutine>
10
#include <deque>
11
#include <functional>
12
#include <mutex>
13
#include <optional>
14
#include <ranges>
15
#include <thread>
16
#include <variant>
17
#include <vector>
18

19
namespace coro
20
{
21
/**
22
 * Creates a thread pool that executes arbitrary coroutine tasks in a FIFO scheduler policy.
23
 * The thread pool by default will create an execution thread per available core on the system.
24
 *
25
 * When shutting down, either by the thread pool destructing or by manually calling shutdown()
26
 * the thread pool will stop accepting new tasks but will complete all tasks that were scheduled
27
 * prior to the shutdown request.
28
 */
29
class thread_pool
30
{
31
    struct private_constructor
32
    {
33
        explicit private_constructor() = default;
34
    };
35
public:
36
    /**
37
     * A schedule operation is an awaitable type with a coroutine to resume the task scheduled on one of
38
     * the executor threads.
39
     */
40
    class schedule_operation
41
    {
42
        friend class thread_pool;
43
        /**
44
         * Only thread_pools can create schedule operations when a task is being scheduled.
45
         * @param tp The thread pool that created this schedule operation.
46
         */
47
        explicit schedule_operation(thread_pool& tp) noexcept;
48

49
    public:
50
        explicit schedule_operation(thread_pool& tp, bool force_global_queue) noexcept;
51

52
        /**
53
         * Schedule operations always pause so the executing thread can be switched.
54
         */
55
        auto await_ready() noexcept -> bool { return false; }
41,277,716✔
56

57
        /**
58
         * Suspending always returns to the caller (using void return of await_suspend()) and
59
         * stores the coroutine internally for the executing thread to resume from.
60
         */
61
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void;
62

63
        /**
64
         * no-op as this is the function called first by the thread pool's executing thread.
65
         */
66
        auto await_resume() noexcept -> void {}
41,286,346✔
67

68
    private:
69
        /// @brief The thread pool that this schedule operation will execute on.
70
        thread_pool& m_thread_pool;
71
        bool m_force_global_queue{false};
72
    };
73

74
    struct options
75
    {
76
        /// The number of executor threads for this thread pool.  Uses the hardware concurrency
77
        /// value by default.
78
        uint32_t thread_count = std::thread::hardware_concurrency();
79
        /// Functor to call on each executor thread upon starting execution.  The parameter is the
80
        /// thread's ID assigned to it by the thread pool.
81
        std::function<void(std::size_t)> on_thread_start_functor = nullptr;
82
        /// Functor to call on each executor thread upon stopping execution.  The parameter is the
83
        /// thread's ID assigned to it by the thread pool.
84
        std::function<void(std::size_t)> on_thread_stop_functor = nullptr;
85
    };
86

87
    /**
88
     * @see thread_pool::make_unique
89
     */
90
    explicit thread_pool(options&& opts, private_constructor);
91

92
    /**
93
     * @brief Creates a thread pool executor.
94
     *
95
     * @param opts The thread pool's options.
96
     * @return std::unique_ptr<thread_pool>
97
     */
98
    static auto make_unique(
99
        options opts = options{
100
            .thread_count            = std::thread::hardware_concurrency(),
101
            .on_thread_start_functor = nullptr,
102
            .on_thread_stop_functor  = nullptr}) -> std::unique_ptr<thread_pool>;
103

104
    thread_pool(const thread_pool&)                    = delete;
105
    thread_pool(thread_pool&&)                         = delete;
106
    auto operator=(const thread_pool&) -> thread_pool& = delete;
107
    auto operator=(thread_pool&&) -> thread_pool&      = delete;
108

109
    virtual ~thread_pool();
110

111
    /**
112
     * @return The number of executor threads for processing tasks.
113
     */
114
    auto thread_count() const noexcept -> size_t { return m_threads.size(); }
115

116
    /**
117
     * Schedules the currently executing coroutine to be run on this thread pool.  This must be
118
     * called from within the coroutines function body to schedule the coroutine on the thread pool.
119
     * @throw std::runtime_error If the thread pool is `shutdown()` scheduling new tasks is not permitted.
120
     * @return The schedule operation to switch from the calling scheduling thread to the executor thread
121
     *         pool thread.
122
     */
123
    [[nodiscard]] auto schedule() -> schedule_operation;
124

125
    /**
126
     * Spawns the given task to be run on this thread pool, the task is detached from the user.
127
     * @param task The task to spawn onto the thread pool.
128
     * @return True if the task has been spawned onto this thread pool.
129
     */
130
    auto spawn(coro::task<void>&& task) noexcept -> bool;
131

132
    /**
133
     * Schedules a task on the thread pool and returns another task that must be awaited on for completion.
134
     * This can be done via co_await in a coroutine context or coro::sync_wait() outside of coroutine context.
135
     * @tparam return_type The return value of the task.
136
     * @param task The task to schedule on the thread pool.
137
     * @return The task to await for the input task to complete.
138
     */
139
    template<typename return_type>
140
    [[nodiscard]] auto schedule(coro::task<return_type> task) -> coro::task<return_type>
5✔
141
    {
142
        co_await schedule();
143
        co_return co_await task;
144
    }
10✔
145

146
    /**
147
     * Schedules any coroutine handle that is ready to be resumed.
148
     * @param handle The coroutine handle to schedule.
149
     * @return True if the coroutine is resumed, false if its a nullptr or the coroutine is already done.
150
     */
151
    auto resume(std::coroutine_handle<> handle) noexcept -> bool;
152

153
    /**
154
     * Schedules the set of coroutine handles that are ready to be resumed.
155
     * @param handles The coroutine handles to schedule.
156
     * @param uint64_t The number of tasks resumed, if any are null they are discarded.
157
     */
158
    template<coro::concepts::range_of<std::coroutine_handle<>> range_type>
159
    auto resume(const range_type& handles) noexcept -> uint64_t
997,545✔
160
    {
161
        m_size.fetch_add(std::size(handles), std::memory_order::release);
997,545✔
162

163
        size_t null_handles{0};
997,533✔
164

165
        for (const auto& handle : handles)
6,598,155✔
166
        {
167
            if (handle != nullptr) [[likely]]
5,600,569✔
168
            {
169
                m_global_queue.enqueue(handle);
5,600,670✔
170
            }
171
            else
172
            {
NEW
173
                ++null_handles;
×
174
            }
175
        }
176

177
        if (null_handles > 0)
997,381✔
178
        {
179
            m_size.fetch_sub(null_handles, std::memory_order::release);
×
180
        }
181

182
        return std::size(handles) - null_handles;
997,381✔
183
    }
184

185
    /**
186
     * Immediately yields the current task and places it at the end of the queue of tasks waiting
187
     * to be processed.  This will immediately be picked up again once it naturally goes through the
188
     * FIFO task queue.  This function is useful to yielding long processing tasks to let other tasks
189
     * get processing time.
190
     */
191
    [[nodiscard]] auto yield() -> schedule_operation { return schedule_operation{*this, true}; }
11,074,965✔
192

193
    /**
194
     * Shuts down the thread pool.  This will finish any tasks scheduled prior to calling this
195
     * function but will prevent the thread pool from scheduling any new tasks.  This call is
196
     * blocking and will wait until all inflight tasks are completed before returning.
197
     */
198
    auto shutdown() noexcept -> void;
199

200
    [[nodiscard]] auto is_shutdown() const -> bool { return m_shutdown_requested.load(std::memory_order::acquire); }
201

202
    /**
203
     * @return The number of tasks waiting in the task queue + the executing tasks.
204
     */
205
    [[nodiscard]] auto size() const noexcept -> std::size_t { return m_size.load(std::memory_order::acquire); }
51,220✔
206

207
    /**
208
     * @return True if the task queue is empty and zero tasks are currently executing.
209
     */
210
    [[nodiscard]] auto empty() const noexcept -> bool { return size() == 0; }
5✔
211

212
    /**
213
     * @return The approximate number of tasks waiting in the task queue to be executed.
214
     */
215
    [[nodiscard]] auto queue_size() const noexcept -> std::size_t
216
    {
217
        std::size_t approx_size{0};
218
        for (auto& queue : m_local_queues)
219
        {
220
            approx_size += queue.size_approx();
221
        }
222
        return approx_size + m_global_queue.size_approx();
223
    }
224

225
    /**
226
     * @return True if the task queue is currently empty.
227
     */
228
    [[nodiscard]] auto queue_empty() const noexcept -> bool { return queue_size() == 0; }
229

230
private:
231
    struct executor_state
232
    {
233
        std::thread::id m_thread_id;
234
        std::mutex m_mutex;
235
    };
236

237
    /// The configuration options.
238
    options m_opts;
239
    /// The background executor threads.
240
    std::vector<std::thread> m_threads;
241
    /// Local executor worker thread queues.
242
    std::vector<moodycamel::ConcurrentQueue<std::coroutine_handle<>>> m_local_queues;
243
    /// Global queue.
244
    moodycamel::BlockingConcurrentQueue<std::coroutine_handle<>> m_global_queue;
245

246
    std::vector<std::unique_ptr<executor_state>> m_executor_state;
247

248
    auto try_steal_work(std::size_t my_idx, std::array<std::coroutine_handle<>, 2>& handles) -> bool;
249

250
    /**
251
     * Each background thread runs from this function.
252
     * @param idx The executor's idx for internal data structure accesses.
253
     */
254
    auto executor(std::size_t idx) -> void;
255
    /**
256
     * @param handle Schedules the given coroutine to be executed upon the first available thread.
257
     */
258
    auto schedule_impl(std::coroutine_handle<> handle, bool force_global_queue) noexcept -> void;
259

260
    /// The number of tasks in the queue + currently executing.
261
    std::atomic<std::size_t> m_size{0};
262
    /// Has the thread pool been requested to shut down?
263
    std::atomic<bool> m_shutdown_requested{false};
264
};
265

266
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc