• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 23984203554

04 Apr 2026 05:45PM UTC coverage: 86.624%. First build
23984203554

Pull #450

github

web-flow
Merge e1ffe621d into 277467ffd
Pull Request #450: coro::thread_pool_ws (work stealing)

206 of 215 new or added lines in 16 files covered. (95.81%)

2014 of 2325 relevant lines covered (86.62%)

4584537.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.1
/src/thread_pool_ws.cpp
1
#include "coro/thread_pool_ws.hpp"
2
#include "coro/task_group.hpp"
3

4
#include <iostream>
5

6
namespace coro
7
{
8
namespace detail
9
{
10
static auto make_spawned_joinable_wait_task(std::unique_ptr<coro::task_group<coro::thread_pool_ws>> group_ptr)
7✔
11
    -> coro::task<void>
12
{
13
    co_await *group_ptr;
14
    co_return;
15
}
14✔
16

17
} // namespace detail
18

19
thread_local std::optional<uint32_t> thread_pool_ws::m_thread_pool_queue_idx{std::nullopt};
20

21
thread_pool_ws::worker_info::worker_info(thread_pool_ws& tp, uint32_t i) : m_thread_pool(tp), m_idx(i)
28✔
22
{
23
}
28✔
24

25
thread_pool_ws::thread_pool_ws(options opts, private_constructor) : m_opts(std::move(opts))
15✔
26
{
27
    m_workers.reserve(m_opts.thread_count);
15✔
28
}
15✔
29

30
auto thread_pool_ws::worker_info::start() -> void
28✔
31
{
32
    // Each worker's thread is only started after *all* worker_info structures are initialized
33
    // since they all reference each other's queues for stealing.
34
    m_thread = std::thread([this]() -> void { m_thread_pool.execute(m_idx); });
56✔
35
}
28✔
36

37
auto thread_pool_ws::make_unique(options opts) -> std::unique_ptr<thread_pool_ws>
15✔
38
{
39
    auto tp = std::make_unique<thread_pool_ws>(std::move(opts), private_constructor{});
15✔
40

41
    for (uint32_t i = 0; i < tp->m_opts.thread_count; ++i)
43✔
42
    {
43
        tp->m_workers.emplace_back(std::make_unique<worker_info>(*tp, i));
28✔
44
    }
45

46
    for (auto& worker : tp->m_workers)
43✔
47
    {
48
        worker->start();
28✔
49
    }
50

51
    return tp;
15✔
NEW
52
}
×
53

54
thread_pool_ws::~thread_pool_ws()
15✔
55
{
56
    shutdown();
15✔
57
}
15✔
58

59
thread_pool_ws::schedule_operation::schedule_operation(thread_pool_ws& tp) noexcept : m_thread_pool(tp)
3,428,263✔
60
{
61
    m_thread_pool.m_queue_size.fetch_add(1, std::memory_order::release);
5,191,059✔
62
    m_thread_pool.m_size.fetch_add(1, std::memory_order::release);
5,191,059✔
63
}
5,191,059✔
64

65
auto thread_pool_ws::schedule_operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
4,776,172✔
66
{
67
    m_awaiting_coroutine.store(awaiting_coroutine, std::memory_order::release);
4,776,172✔
68
    // See if we are running on a thread pool worker to enqueue locally.
69
    auto& idx = thread_pool_ws::m_thread_pool_queue_idx;
5,023,825✔
70
    if (idx.has_value())
5,023,825✔
71
    {
72
        m_thread_pool.m_workers[idx.value()]->m_queue.emplace(this);
4,307,894✔
73
    }
74
    else
75
    {
76
        detail::awaiter_list_push(m_thread_pool.m_global_queue, this);
301,044✔
77
    }
78

79
    m_thread_pool.try_wake_worker();
5,388,135✔
80
}
4,355,874✔
81

82
auto thread_pool_ws::schedule() -> schedule_operation
3,085,105✔
83
{
84
    if (m_shutdown_requested.load(std::memory_order::acquire))
3,085,105✔
85
    {
86
        throw std::runtime_error("coro::thread_pool_ws is shutting down, unable to schedule new tasks.");
1✔
87
    }
88

89
    return schedule_operation(*this);
3,534,340✔
90
}
91

92
auto thread_pool_ws::spawn_detached(coro::task<void> user_task) -> bool
200,003✔
93
{
94
    if (m_shutdown_requested.load(std::memory_order::acquire))
200,003✔
95
    {
NEW
96
        return false;
×
97
    }
98

99
    auto wrapper_task = detail::make_task_self_deleting(std::move(user_task));
200,003✔
100
    return resume(wrapper_task.handle());
200,003✔
101
}
102

103
auto thread_pool_ws::spawn_joinable(coro::task<void> user_task) -> coro::task<void>
7✔
104
{
105
    if (m_shutdown_requested.load(std::memory_order::acquire))
7✔
106
    {
NEW
107
        throw std::runtime_error("coro::thread_pool_ws is shutting down, unable to spawn new tasks.");
×
108
    }
109

110
    auto group_ptr = std::make_unique<coro::task_group<coro::thread_pool_ws>>(this, std::move(user_task));
7✔
111
    return detail::make_spawned_joinable_wait_task(std::move(group_ptr));
14✔
112
}
7✔
113

114
auto thread_pool_ws::resume(std::coroutine_handle<> handle) noexcept -> bool
200,011✔
115
{
116
    if (handle == nullptr || handle.done())
200,011✔
117
    {
NEW
118
        return false;
×
119
    }
120

121
    if (m_shutdown_requested.load(std::memory_order::acquire))
200,011✔
122
    {
NEW
123
        return false;
×
124
    }
125

126
    auto* op = new schedule_operation{*this};
200,011✔
127
    op->m_allocated.store(true, std::memory_order::release);
200,011✔
128
    op->await_suspend(handle);
200,011✔
129
    return true;
200,011✔
130
}
131

132
auto thread_pool_ws::shutdown() noexcept -> void
18✔
133
{
134
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
18✔
135
    {
136
        {
137
            std::unique_lock<std::mutex> lk{m_wait_mutex};
15✔
138
            m_wait_cv.notify_all();
15✔
139
        }
15✔
140

141
        for (auto& worker : m_workers)
43✔
142
        {
143
            if (worker->m_thread.joinable())
28✔
144
            {
145
                worker->m_thread.join();
28✔
146
            }
147
        }
148
    }
149
}
18✔
150

151
auto thread_pool_ws::execute(uint32_t idx) -> void
28✔
152
{
153
    m_thread_pool_queue_idx = {idx};
28✔
154

155
    if (m_opts.on_thread_start_functor != nullptr)
28✔
156
    {
NEW
157
        m_opts.on_thread_start_functor(idx);
×
158
    }
159

160
    auto&  info         = m_workers[idx];
28✔
161
    auto&  thread_queue = info->m_queue;
28✔
162
    size_t spin_counter{0};
28✔
163
    while (!m_shutdown_requested.load(std::memory_order::acquire))
1,147,161✔
164
    {
165
        bool had_tasks = drain_thread_queue(thread_queue);
1,147,057✔
166
        had_tasks |= try_steal(idx);
1,147,055✔
167
        had_tasks |= drain_global_queue(thread_queue);
1,147,051✔
168

169
        if (had_tasks)
1,147,023✔
170
        {
171
            spin_counter = 0;
331,124✔
172
            continue;
1,146,908✔
173
        }
174

175
        if (++spin_counter <= 100)
815,899✔
176
        {
177
            continue;
815,784✔
178
        }
179

180
        // Go to sleep until a task is scheduled.
181
        std::unique_lock<std::mutex> lk{m_wait_mutex};
115✔
182
        m_sleeping.fetch_add(1, std::memory_order::release);
225✔
183
        m_wait_cv.wait(
225✔
184
            lk,
185
            [&]() {
292✔
186
                return m_queue_size.load(std::memory_order::acquire) > 0 ||
674✔
187
                       m_shutdown_requested.load(std::memory_order::acquire);
382✔
188
            });
189
    }
225✔
190

191
    while (m_size.load(std::memory_order::acquire) > 0)
62✔
192
    {
193
        bool had_tasks = drain_thread_queue(thread_queue);
4✔
194
        had_tasks |= try_steal(idx);
4✔
195
        had_tasks |= drain_global_queue(thread_queue);
4✔
196

197
        // If there were no tasks left to work on, this thread can exit.
198
        if (!had_tasks)
4✔
199
        {
200
            break;
1✔
201
        }
202
    }
203

204
    if (m_opts.on_thread_stop_functor != nullptr)
28✔
205
    {
NEW
206
        m_opts.on_thread_stop_functor(idx);
×
207
    }
208
}
28✔
209

210
auto thread_pool_ws::drain_thread_queue(riften::Deque<schedule_operation*>& queue) -> bool
1,147,029✔
211
{
212
    bool had_task{false};
1,147,029✔
213
    while (!queue.empty())
5,247,305✔
214
    {
215
        had_task |= resume_task(queue.pop());
4,384,698✔
216
    }
217
    return had_task;
1,147,069✔
218
}
219

220
auto thread_pool_ws::try_steal(uint32_t my_idx) -> bool
1,147,021✔
221
{
222
    bool had_tasks{false};
1,147,021✔
223
    auto queue_size = m_workers.size();
1,147,021✔
224
    for (size_t i = 0; i < queue_size; ++i)
2,363,886✔
225
    {
226
        if (i == my_idx)
1,220,001✔
227
        {
228
            continue;
1,147,328✔
229
        }
230

231
        had_tasks |= drain_peer_queue(m_workers[i]->m_queue);
72,673✔
232
    }
233
    return had_tasks;
1,143,885✔
234
}
235

236
auto thread_pool_ws::drain_peer_queue(riften::Deque<schedule_operation*>& queue) -> bool
69,377✔
237
{
238
    bool had_task{false};
69,377✔
239
    while (!queue.empty())
1,034,147✔
240
    {
241
        had_task |= resume_task(queue.steal());
972,679✔
242
    }
243
    return had_task;
71,433✔
244
}
245

246
auto thread_pool_ws::drain_global_queue(riften::Deque<schedule_operation*>& queue) -> bool
1,147,040✔
247
{
248
    if (m_global_queue.load(std::memory_order::acquire) != nullptr)
1,147,040✔
249
    {
250
        auto* head_op = detail::awaiter_list_pop_all(m_global_queue);
195,717✔
251
        if (head_op != nullptr)
195,739✔
252
        {
253
            while (head_op != nullptr)
496,755✔
254
            {
255
                auto* next_op = head_op->m_next;
301,020✔
256
                queue.emplace(head_op);
301,020✔
257
                head_op = next_op;
301,021✔
258
            }
259
        }
260

261
        // 1. If we got the global list we have tasks.
262
        // 2. If we didn't get the global list we know we can possibly steal.
263
        // Either way there should be tasks to process.
264
        return true;
195,740✔
265
    }
266

267
    return false;
951,304✔
268
}
269

270
auto thread_pool_ws::resume_task(std::optional<schedule_operation*> op) -> bool
5,310,986✔
271
{
272
    if (op.has_value())
5,310,986✔
273
    {
274
        m_queue_size.fetch_sub(1, std::memory_order::release);
5,041,144✔
275
        auto v = op.value();
5,041,144✔
276
        v->m_awaiting_coroutine.load(std::memory_order::acquire).resume();
5,279,652✔
277
        m_size.fetch_sub(1, std::memory_order::release);
5,336,118✔
278
        if (v->m_allocated.load(std::memory_order::acquire))
5,336,118✔
279
        {
280
            delete v;
200,011✔
281
        }
282
        return true;
5,331,372✔
283
    }
284
    return false;
285
}
286

287
auto thread_pool_ws::try_wake_worker() noexcept -> void
4,212,193✔
288
{
289
    // Attempt to wake a sleeper if there are any.
290
    if (m_sleeping.load(std::memory_order::acquire) > 0)
8,618,546✔
291
    {
292
        std::unique_lock<std::mutex> lk{m_wait_mutex};
208✔
293
        // Check again after acquiring the lock to see if there are any sleeping workers.
294
        if (m_sleeping.load(std::memory_order::acquire) == 0)
416✔
295
        {
296
            return;
4✔
297
        }
298

299
        m_sleeping.fetch_sub(1, std::memory_order::release);
204✔
300
        m_wait_cv.notify_one();
204✔
301
    }
208✔
302
}
303

304
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc