• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 18892717725

29 Oct 2025 12:03AM UTC coverage: 80.879%. First build
18892717725

Pull #407

github

web-flow
Merge 63a3d33f2 into f3ca22039
Pull Request #407: coro::thread_pool uses lockless mpmc queue

593 of 883 new or added lines in 7 files covered. (67.16%)

2191 of 2709 relevant lines covered (80.88%)

10826691.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.61
/src/thread_pool.cpp
1
#include "coro/thread_pool.hpp"
2
#include "coro/detail/task_self_deleting.hpp"
3

4
#include <queue>
5

6
namespace coro
7
{
8
static constexpr std::size_t MAX_HANDLES{2};
9

10
thread_pool::schedule_operation::schedule_operation(thread_pool& tp) noexcept : m_thread_pool(tp)
30,200,248✔
11
{
12

13
}
30,200,248✔
14

15
thread_pool::schedule_operation::schedule_operation(thread_pool& tp, bool force_global_queue) noexcept
11,073,707✔
16
    : m_thread_pool(tp),
11,073,707✔
17
      m_force_global_queue(force_global_queue)
11,073,707✔
18
{
19

20
}
11,073,707✔
21

22
auto thread_pool::schedule_operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
41,266,832✔
23
{
24
    m_thread_pool.schedule_impl(awaiting_coroutine, m_force_global_queue);
41,266,832✔
25
}
41,265,145✔
26

27
thread_pool::thread_pool(options&& opts, private_constructor)
90✔
28
    : m_opts(opts)
90✔
29
{
30
    m_threads.reserve(m_opts.thread_count);
90✔
31
    m_executor_state.reserve(m_opts.thread_count);
90✔
32
    m_local_queues.reserve(m_opts.thread_count);
90✔
33
}
90✔
34

35
auto thread_pool::make_unique(options opts) -> std::unique_ptr<thread_pool>
90✔
36
{
37
    auto tp = std::make_unique<thread_pool>(std::move(opts), private_constructor{});
90✔
38

39
    // Initialize the background worker threads once the thread pool is fully constructed
40
    // so the workers have a full ready object to work with.
41
    for (uint32_t i = 0; i < tp->m_opts.thread_count; ++i)
256✔
42
    {
43
        tp->m_local_queues.emplace_back();
166✔
44
        tp->m_executor_state.emplace_back(std::make_unique<executor_state>());
166✔
45
        tp->m_threads.emplace_back([tp = tp.get(), i]() { tp->executor(i); });
332✔
46
    }
47

48
    return tp;
90✔
49
}
×
50

51
thread_pool::~thread_pool()
180✔
52
{
53
    shutdown();
90✔
54
}
180✔
55

56
auto thread_pool::schedule() -> schedule_operation
30,200,309✔
57
{
58
    m_size.fetch_add(1, std::memory_order::release);
30,200,309✔
59
    if (!m_shutdown_requested.load(std::memory_order::acquire))
30,200,309✔
60
    {
61
        return schedule_operation{*this};
30,200,271✔
62
    }
63
    else
64
    {
65
        m_size.fetch_sub(1, std::memory_order::release);
1✔
66
        throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
1✔
67
    }
68
}
69

70
auto thread_pool::spawn(coro::task<void>&& task) noexcept -> bool
200,003✔
71
{
72
    m_size.fetch_add(1, std::memory_order::release);
200,003✔
73
    auto wrapper_task = detail::make_task_self_deleting(std::move(task));
200,003✔
74
    wrapper_task.promise().executor_size(m_size);
200,003✔
75
    return resume(wrapper_task.handle());
200,003✔
76
}
200,003✔
77

78
auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> bool
25,993,963✔
79
{
80
    if (handle == nullptr || handle.done())
25,993,963✔
81
    {
82
        return false;
×
83
    }
84

85
    m_size.fetch_add(1, std::memory_order::release);
25,946,823✔
86
    if (m_shutdown_requested.load(std::memory_order::acquire))
25,946,823✔
87
    {
88
        m_size.fetch_sub(1, std::memory_order::release);
×
89
        return false;
×
90
    }
91

92
    schedule_impl(handle, false);
25,930,635✔
93
    return true;
25,970,418✔
94
}
95

96
auto thread_pool::shutdown() noexcept -> void
146✔
97
{
98
    // Only allow shutdown to occur once.
99
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
146✔
100
    {
101
        for (auto& thread : m_threads)
256✔
102
        {
103
            if (thread.joinable())
166✔
104
            {
105
                thread.join();
166✔
106
            }
107
        }
108
    }
109
}
146✔
110

111
auto thread_pool::try_steal_work(std::size_t my_idx, std::array<std::coroutine_handle<>, MAX_HANDLES>& handles) -> bool
20,458,235✔
112
{
113
    for (std::size_t i = 0; i < m_local_queues.size(); ++i)
61,027,385✔
114
    {
115
        if (i == my_idx)
40,573,384✔
116
        {
117
            continue;
20,459,554✔
118
        }
119

120
        auto& queue = m_local_queues[i];
20,113,830✔
121
        if (queue.try_dequeue_bulk(handles.data(), MAX_HANDLES))
20,108,398✔
122
        {
123
            return true;
289✔
124
        }
125
    }
126

127
    return false;
20,439,939✔
128
}
129

130
auto thread_pool::executor(std::size_t idx) -> void
166✔
131
{
132
    auto& state = *m_executor_state[idx].get();
166✔
133
    state.m_thread_id = std::this_thread::get_id();
166✔
134
    auto& local_queue = m_local_queues[idx];
166✔
135

136
    constexpr std::chrono::milliseconds wait_timeout{100};
166✔
137
    if (m_opts.on_thread_start_functor != nullptr)
166✔
138
    {
139
        m_opts.on_thread_start_functor(idx);
4✔
140
    }
141

142
    // Process until shutdown is requested.
143
    while (!m_shutdown_requested.load(std::memory_order::acquire))
61,318,314✔
144
    {
145
        // Try and grab work from out local queue first.
146
        std::array<std::coroutine_handle<>, MAX_HANDLES> handles{nullptr};
61,257,939✔
147
        if (!local_queue.try_dequeue_bulk(handles.data(), MAX_HANDLES))
61,257,939✔
148
        {
149
            // Try and grab work from the global queue next.
150
            if (!m_global_queue.try_dequeue_bulk(handles.data(), MAX_HANDLES))
46,956,821✔
151
            {
152
                // Try and steal work from another local queue last.
153
                if (!try_steal_work(idx, handles))
20,458,389✔
154
                {
155
                    // If there is nothing on global and nothing to steal, try and lock to sleep
156
                    if (state.m_mutex.try_lock())
20,459,205✔
157
                    {
158
                        // Hold the lock for the duration of sleeping
159
                        std::scoped_lock lk{std::adopt_lock, state.m_mutex};
20,480,454✔
160
                        if (!m_global_queue.wait_dequeue_bulk_timed(handles.data(), MAX_HANDLES, wait_timeout))
20,459,665✔
161
                        {
162
                            // If we wait the full timeout and there is no work, probe around.
163
                            continue;
550✔
164
                        }
165
                    }
20,458,297✔
166
                    // else if we didn't get the lock we just enqueued on this thread (which should be impossible?)
167
                }
168
            }
169
        }
170

171
        for (std::size_t i = 0; i < MAX_HANDLES; ++i)
134,774,993✔
172
        {
173
            auto& handle = handles[i];
123,279,632✔
174
            if (handle == nullptr)
122,904,510✔
175
            {
176
                break;
49,822,251✔
177
            }
178

179
            handle.resume();
73,292,064✔
180
            m_size.fetch_sub(1, std::memory_order::release);
73,043,972✔
181
        }
182
    }
183

184
    // We'll lock our local thread so nothing new gets enqueued to it.
185
    std::scoped_lock lk{state.m_mutex};
5,278✔
186

187
    // Process until there are no ready tasks left, start by draining the local queue.
188
    while (true)
189
    {
190
        // Try and grab work from out local queue first.
191
        std::array<std::coroutine_handle<>, MAX_HANDLES> handles{nullptr};
163✔
192
        if (!local_queue.try_dequeue_bulk(handles.data(), MAX_HANDLES))
163✔
193
        {
194
            break;
158✔
195
        }
196

NEW
197
        for (std::size_t i = 0; i < MAX_HANDLES; ++i)
×
198
        {
NEW
199
            auto& handle = handles[i];
×
NEW
200
            if (handle == nullptr)
×
201
            {
NEW
202
                break;
×
203
            }
204

NEW
205
            handle.resume();
×
NEW
206
            m_size.fetch_sub(1, std::memory_order::release);
×
207
        }
208
    }
×
209

210
    // Now finish by draining the global queue.
211
    while (m_size.load(std::memory_order::acquire) > 0)
315✔
212
    {
213
        std::array<std::coroutine_handle<>, MAX_HANDLES> handles{nullptr};
20✔
214
        if (!m_global_queue.try_dequeue_bulk(handles.data(), MAX_HANDLES))
20✔
215
        {
216
            break;
18✔
217
        }
218

NEW
219
        for (std::size_t i = 0; i < MAX_HANDLES; ++i)
×
220
        {
NEW
221
            auto& handle = handles[i];
×
NEW
222
            if (handle == nullptr)
×
223
            {
NEW
224
                break;
×
225
            }
226

NEW
227
            handle.resume();
×
NEW
228
            m_size.fetch_sub(1, std::memory_order::release);
×
229
        }
230
    }
231

232
    if (m_opts.on_thread_stop_functor != nullptr)
155✔
233
    {
234
        m_opts.on_thread_stop_functor(idx);
×
235
    }
236
}
157✔
237

238
auto thread_pool::schedule_impl(std::coroutine_handle<> handle, bool force_global_queue) noexcept -> void
67,154,293✔
239
{
240
    if (handle == nullptr || handle.done())
67,154,293✔
241
    {
242
        return;
×
243
    }
244

245
    if (!force_global_queue)
67,280,127✔
246
    {
247
        // Attempt to see if we are on one of the thread_pool threads and enqueue to our local queue.
248
        for (std::size_t i = 0; i < m_executor_state.size(); i++)
117,143,495✔
249
        {
250
            // If we're on an executor thread and it is not sleeping enqueue locally, otherwise enqueue on the global queue to wake up a sleeping worker.
251
            auto& state = *m_executor_state[i].get();
76,505,722✔
252
            if (state.m_thread_id == std::this_thread::get_id())
76,623,264✔
253
            {
254
                // If we can lock and we're not sleeping enqueue locally.
255
                if (state.m_mutex.try_lock())
16,134,583✔
256
                {
257
                    std::scoped_lock lk{std::adopt_lock, state.m_mutex};
16,134,613✔
258
                    m_local_queues[i].enqueue(handle);
16,134,596✔
259
                    return;
16,134,367✔
260
                }
16,134,367✔
261

262
                // Either the lock couldn't be acquired without contention or the thread is sleeping
263
                // so enqueue globally.
NEW
264
                break;
×
265
            }
266
        }
267
    }
268

269
    m_global_queue.enqueue(handle);
51,517,836✔
270
}
271

272
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc