• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 18890974085

28 Oct 2025 10:25PM UTC coverage: 80.731%. First build
18890974085

Pull #407

github

web-flow
Merge a651ef3d2 into f3ca22039
Pull Request #407: coro::thread_pool uses lockless mpmc queue

576 of 870 new or added lines in 5 files covered. (66.21%)

2187 of 2709 relevant lines covered (80.73%)

10476713.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.52
/src/thread_pool.cpp
1
#include "coro/thread_pool.hpp"
2
#include "coro/detail/task_self_deleting.hpp"
3

4
#include <queue>
5

6
namespace coro
7
{
8
static constexpr std::size_t MAX_HANDLES{4};
9

10
thread_pool::schedule_operation::schedule_operation(thread_pool& tp) noexcept : m_thread_pool(tp)
35,199,545✔
11
{
12

13
}
35,199,545✔
14

15
thread_pool::schedule_operation::schedule_operation(thread_pool& tp, bool force_global_queue) noexcept
11,070,677✔
16
    : m_thread_pool(tp),
11,070,677✔
17
      m_force_global_queue(force_global_queue)
11,070,677✔
18
{
19

20
}
11,070,677✔
21

22
auto thread_pool::schedule_operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
46,263,056✔
23
{
24
    m_thread_pool.schedule_impl(awaiting_coroutine, m_force_global_queue);
46,263,056✔
25
}
46,257,824✔
26

27
thread_pool::thread_pool(options&& opts, private_constructor)
90✔
28
    : m_opts(opts)
90✔
29
{
30
    m_threads.reserve(m_opts.thread_count);
90✔
31
    m_executor_state.reserve(m_opts.thread_count);
90✔
32
    m_local_queues.reserve(m_opts.thread_count);
90✔
33
}
90✔
34

35
auto thread_pool::make_unique(options opts) -> std::unique_ptr<thread_pool>
90✔
36
{
37
    auto tp = std::make_unique<thread_pool>(std::move(opts), private_constructor{});
90✔
38

39
    // Initialize the background worker threads once the thread pool is fully constructed
40
    // so the workers have a full ready object to work with.
41
    for (uint32_t i = 0; i < tp->m_opts.thread_count; ++i)
256✔
42
    {
43
        tp->m_local_queues.emplace_back();
166✔
44
        tp->m_executor_state.emplace_back(std::make_unique<executor_state>());
166✔
45
        tp->m_threads.emplace_back([tp = tp.get(), i]() { tp->executor(i); });
332✔
46
    }
47

48
    return tp;
90✔
49
}
×
50

51
thread_pool::~thread_pool()
180✔
52
{
53
    shutdown();
90✔
54
}
180✔
55

56
auto thread_pool::schedule() -> schedule_operation
35,199,810✔
57
{
58
    m_size.fetch_add(1, std::memory_order::release);
35,199,810✔
59
    if (!m_shutdown_requested.load(std::memory_order::acquire))
35,199,810✔
60
    {
61
        return schedule_operation{*this};
35,199,651✔
62
    }
63
    else
64
    {
65
        m_size.fetch_sub(1, std::memory_order::release);
1✔
66
        throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
1✔
67
    }
68
}
69

70
auto thread_pool::spawn(coro::task<void>&& task) noexcept -> bool
200,003✔
71
{
72
    m_size.fetch_add(1, std::memory_order::release);
200,003✔
73
    auto wrapper_task = detail::make_task_self_deleting(std::move(task));
200,003✔
74
    wrapper_task.promise().executor_size(m_size);
200,003✔
75
    return resume(wrapper_task.handle());
200,003✔
76
}
200,003✔
77

78
auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> bool
25,633,056✔
79
{
80
    if (handle == nullptr || handle.done())
25,633,056✔
81
    {
82
        return false;
×
83
    }
84

85
    m_size.fetch_add(1, std::memory_order::release);
25,470,759✔
86
    if (m_shutdown_requested.load(std::memory_order::acquire))
25,470,759✔
87
    {
88
        m_size.fetch_sub(1, std::memory_order::release);
×
89
        return false;
×
90
    }
91

92
    schedule_impl(handle, false);
25,536,082✔
93
    return true;
25,933,146✔
94
}
95

96
auto thread_pool::shutdown() noexcept -> void
146✔
97
{
98
    // Only allow shutdown to occur once.
99
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
146✔
100
    {
101
        for (auto& thread : m_threads)
256✔
102
        {
103
            if (thread.joinable())
166✔
104
            {
105
                thread.join();
166✔
106
            }
107
        }
108
    }
109
}
146✔
110

111
auto thread_pool::try_steal_work(std::size_t my_idx, std::array<std::coroutine_handle<>, MAX_HANDLES>& handles) -> bool
22,133,177✔
112
{
113
    for (std::size_t i = 0; i < m_local_queues.size(); ++i)
64,318,063✔
114
    {
115
        if (i == my_idx)
42,193,824✔
116
        {
117
            continue;
22,135,891✔
118
        }
119

120
        auto& queue = m_local_queues[i];
20,057,933✔
121
        if (queue.try_dequeue_bulk(handles.data(), MAX_HANDLES))
20,060,031✔
122
        {
123
            return true;
48✔
124
        }
125
    }
126

127
    return false;
22,115,284✔
128
}
129

130
auto thread_pool::executor(std::size_t idx) -> void
166✔
131
{
132
    auto& state = *m_executor_state[idx].get();
166✔
133
    state.m_thread_id = std::this_thread::get_id();
166✔
134
    auto& local_queue = m_local_queues[idx];
166✔
135

136
    constexpr std::chrono::milliseconds wait_timeout{100};
166✔
137
    if (m_opts.on_thread_start_functor != nullptr)
166✔
138
    {
139
        m_opts.on_thread_start_functor(idx);
4✔
140
    }
141

142
    // Process until shutdown is requested.
143
    while (!m_shutdown_requested.load(std::memory_order::acquire))
58,341,324✔
144
    {
145
        // Try and grab work from out local queue first.
146
        std::array<std::coroutine_handle<>, MAX_HANDLES> handles{nullptr};
57,826,333✔
147
        if (!local_queue.try_dequeue_bulk(handles.data(), MAX_HANDLES))
57,826,333✔
148
        {
149
            // Try and grab work from the global queue next.
150
            if (!m_global_queue.try_dequeue_bulk(handles.data(), MAX_HANDLES))
43,749,879✔
151
            {
152
                // Try and steal work from another local queue last.
153
                if (!try_steal_work(idx, handles))
22,137,320✔
154
                {
155
                    // If there is nothing on global and nothing to steal, try and lock to sleep
156
                    if (state.m_mutex.try_lock())
22,137,187✔
157
                    {
158
                        // Hold the lock for the duration of sleeping
159
                        std::scoped_lock lk{std::adopt_lock, state.m_mutex};
22,139,213✔
160
                        if (!m_global_queue.wait_dequeue_bulk_timed(handles.data(), MAX_HANDLES, wait_timeout))
22,137,259✔
161
                        {
162
                            // If we wait the full timeout and there is no work, probe around.
163
                            continue;
550✔
164
                        }
165
                    }
22,132,720✔
166
                    // else if we didn't get the lock we just enqueued on this thread (which should be impossible?)
167
                }
168
            }
169
        }
170

171
        for (std::size_t i = 0; i < MAX_HANDLES; ++i)
137,464,139✔
172
        {
173
            auto& handle = handles[i];
132,108,040✔
174
            if (handle == nullptr)
131,498,163✔
175
            {
176
                break;
52,982,533✔
177
            }
178

179
            handle.resume();
78,285,984✔
180
            m_size.fetch_sub(1, std::memory_order::release);
77,902,846✔
181
        }
182
    }
183

184
    // We'll lock our local thread so nothing new gets enqueued to it.
185
    std::scoped_lock lk{state.m_mutex};
153,629✔
186

187
    // Process until there are no ready tasks left, start by draining the local queue.
188
    while (true)
189
    {
190
        // Try and grab work from out local queue first.
191
        std::array<std::coroutine_handle<>, MAX_HANDLES> handles{nullptr};
161✔
192
        if (!local_queue.try_dequeue_bulk(handles.data(), MAX_HANDLES))
161✔
193
        {
194
            break;
162✔
195
        }
196

NEW
197
        for (std::size_t i = 0; i < MAX_HANDLES; ++i)
×
198
        {
NEW
199
            auto& handle = handles[i];
×
NEW
200
            if (handle == nullptr)
×
201
            {
NEW
202
                break;
×
203
            }
204

NEW
205
            handle.resume();
×
NEW
206
            m_size.fetch_sub(1, std::memory_order::release);
×
207
        }
208
    }
×
209

210
    // Now finish by draining the global queue.
211
    while (m_size.load(std::memory_order::acquire) > 0)
324✔
212
    {
213
        std::array<std::coroutine_handle<>, MAX_HANDLES> handles{nullptr};
24✔
214
        if (!m_global_queue.try_dequeue_bulk(handles.data(), MAX_HANDLES))
24✔
215
        {
216
            break;
23✔
217
        }
218

219
        for (std::size_t i = 0; i < MAX_HANDLES; ++i)
2✔
220
        {
221
            auto& handle = handles[i];
2✔
222
            if (handle == nullptr)
2✔
223
            {
224
                break;
1✔
225
            }
226

227
            handle.resume();
1✔
228
            m_size.fetch_sub(1, std::memory_order::release);
1✔
229
        }
230
    }
231

232
    if (m_opts.on_thread_stop_functor != nullptr)
160✔
233
    {
234
        m_opts.on_thread_stop_functor(idx);
×
235
    }
236
}
163✔
237

238
auto thread_pool::schedule_impl(std::coroutine_handle<> handle, bool force_global_queue) noexcept -> void
71,851,007✔
239
{
240
    if (handle == nullptr || handle.done())
71,851,007✔
241
    {
242
        return;
×
243
    }
244

245
    if (!force_global_queue)
72,046,889✔
246
    {
247
        // Attempt to see if we are on one of the thread_pool threads and enqueue to our local queue.
248
        for (std::size_t i = 0; i < m_executor_state.size(); i++)
126,841,346✔
249
        {
250
            // If we're on an executor thread and it is not sleeping enqueue locally, otherwise enqueue on the global queue to wake up a sleeping worker.
251
            auto& state = *m_executor_state[i].get();
81,230,585✔
252
            if (state.m_thread_id == std::this_thread::get_id())
81,318,949✔
253
            {
254
                // If we can lock and we're not sleeping enqueue locally.
255
                if (state.m_mutex.try_lock())
16,220,453✔
256
                {
257
                    std::scoped_lock lk{std::adopt_lock, state.m_mutex};
16,219,988✔
258
                    m_local_queues[i].enqueue(handle);
16,220,097✔
259
                    return;
16,216,694✔
260
                }
16,216,694✔
261

262
                // Either the lock couldn't be acquired without contention or the thread is sleeping
263
                // so enqueue globally.
NEW
264
                break;
×
265
            }
266
        }
267
    }
268

269
    m_global_queue.enqueue(handle);
56,424,889✔
270
}
271

272
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc