• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 24749973918

21 Apr 2026 10:35PM UTC coverage: 86.948%. First build
24749973918

Pull #450

github

web-flow
Merge c27bef5ba into 277467ffd
Pull Request #450: coro::thread_pool_ws (work stealing)

294 of 309 new or added lines in 17 files covered. (95.15%)

2105 of 2421 relevant lines covered (86.95%)

4453278.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.53
/src/thread_pool_ws.cpp
1
#include "coro/thread_pool_ws.hpp"
2
#include "coro/task_group.hpp"
3

4
#include <iostream>
5
#include <mutex>
6

7
#ifdef LIBCORO_TSAN
8
#include <sanitizer/tsan_interface.h>
9
#endif
10

11
namespace coro
12
{
13
namespace detail
14
{
15
static auto make_spawned_joinable_wait_task(std::unique_ptr<coro::task_group<coro::thread_pool_ws>> group_ptr)
8✔
16
    -> coro::task<void>
17
{
18
    co_await *group_ptr;
19
    co_return;
20
}
16✔
21

22
} // namespace detail
23

24
thread_local std::optional<uint32_t> thread_pool_ws::m_thread_pool_queue_idx{std::nullopt};
25

26
thread_pool_ws::worker_info::worker_info(thread_pool_ws& tp, uint32_t i) : m_thread_pool(tp), m_idx(i)
28✔
27
{
28
}
28✔
29

30
thread_pool_ws::thread_pool_ws(options opts, private_constructor) : m_opts(std::move(opts))
15✔
31
{
32
    m_workers.reserve(m_opts.thread_count);
15✔
33
}
15✔
34

35
auto thread_pool_ws::worker_info::start() -> void
28✔
36
{
37
    // Each worker's thread is only started after *all* worker_info structures are initialized
38
    // since they all reference each other's queues for stealing.
39
    m_thread = std::thread([this]() -> void { m_thread_pool.execute(m_idx); });
56✔
40
}
28✔
41

42
auto thread_pool_ws::make_unique(options opts) -> std::unique_ptr<thread_pool_ws>
15✔
43
{
44
    auto tp = std::make_unique<thread_pool_ws>(std::move(opts), private_constructor{});
15✔
45

46
    for (uint32_t i = 0; i < tp->m_opts.thread_count; ++i)
43✔
47
    {
48
        tp->m_workers.emplace_back(std::make_unique<worker_info>(*tp, i));
28✔
49
    }
50

51
    for (auto& worker : tp->m_workers)
43✔
52
    {
53
        worker->start();
28✔
54
    }
55

56
    while (tp->m_workers_started.load(std::memory_order::acquire) != tp->m_opts.thread_count)
960✔
57
    {
58
        std::this_thread::yield();
465✔
59
    }
60

61
    return tp;
15✔
NEW
62
}
×
63

64
thread_pool_ws::~thread_pool_ws()
15✔
65
{
66
    shutdown();
15✔
67
}
15✔
68

69
thread_pool_ws::schedule_operation::schedule_operation(thread_pool_ws& tp, bool allocated) noexcept
4,430,581✔
70
    : m_thread_pool(tp),
4,437,689✔
71
      m_allocated(allocated)
4,430,581✔
72
{
73
#ifdef LIBCORO_TSAN
74
    // TSAN thinks the non-atomic constructor is a data race with the loads of this atomic on other threads.
75
    __tsan_release(&m_allocated);
76
#endif
77
}
4,518,516✔
78

79
auto thread_pool_ws::schedule_operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
4,462,943✔
80
{
81
    m_awaiting_coroutine.store(awaiting_coroutine, std::memory_order::release);
4,462,943✔
82
    // See if we are running on a thread pool worker to enqueue locally.
83
    auto& idx = thread_pool_ws::m_thread_pool_queue_idx;
4,853,298✔
84
    if (idx.has_value())
4,853,298✔
85
    {
86
        m_thread_pool.m_workers[idx.value()]->m_queue.emplace(this);
4,416,779✔
87
    }
88
    else
89
    {
90
        detail::awaiter_list_push(m_thread_pool.m_global_queue, this);
301,044✔
91
    }
92

93
    m_thread_pool.try_wake_worker();
4,746,497✔
94
}
4,829,726✔
95

96
auto thread_pool_ws::schedule() -> schedule_operation
3,991,657✔
97
{
98
    m_try_wake_workers_size.fetch_add(1, std::memory_order::release);
3,991,657✔
99
    m_queue_size.fetch_add(1, std::memory_order::release);
3,991,657✔
100
    m_size.fetch_add(1, std::memory_order::release);
3,991,657✔
101
    if (m_shutdown_requested.load(std::memory_order::acquire))
3,991,657✔
102
    {
103
        m_try_wake_workers_size.fetch_sub(1, std::memory_order::release);
1✔
104
        m_queue_size.fetch_sub(1, std::memory_order::release);
1✔
105
        m_size.fetch_sub(1, std::memory_order::release);
1✔
106
        throw std::runtime_error("coro::thread_pool_ws is shutting down, unable to schedule new tasks.");
1✔
107
    }
108

109
    return schedule_operation{*this};
4,377,083✔
110
}
111

112
auto thread_pool_ws::spawn_detached(coro::task<void> user_task) -> bool
200,003✔
113
{
114
    if (m_shutdown_requested.load(std::memory_order::acquire))
200,003✔
115
    {
NEW
116
        return false;
×
117
    }
118

119
    auto wrapper_task = detail::make_task_self_deleting(std::move(user_task));
200,003✔
120
    return resume(wrapper_task.handle());
200,003✔
121
}
122

123
auto thread_pool_ws::spawn_joinable(coro::task<void> user_task) -> coro::task<void>
8✔
124
{
125
    if (m_shutdown_requested.load(std::memory_order::acquire))
8✔
126
    {
NEW
127
        throw std::runtime_error("coro::thread_pool_ws is shutting down, unable to spawn new tasks.");
×
128
    }
129

130
    auto group_ptr = std::make_unique<coro::task_group<coro::thread_pool_ws>>(this, std::move(user_task));
8✔
131
    return detail::make_spawned_joinable_wait_task(std::move(group_ptr));
16✔
132
}
8✔
133

134
auto thread_pool_ws::resume(std::coroutine_handle<> handle) noexcept -> bool
200,012✔
135
{
136
    if (handle == nullptr || handle.done())
200,012✔
137
    {
NEW
138
        return false;
×
139
    }
140

141
    m_try_wake_workers_size.fetch_add(1, std::memory_order::release);
200,012✔
142
    m_queue_size.fetch_add(1, std::memory_order::release);
200,012✔
143
    m_size.fetch_add(1, std::memory_order::release);
200,012✔
144
    if (m_shutdown_requested.load(std::memory_order::acquire))
200,012✔
145
    {
NEW
146
        m_try_wake_workers_size.fetch_sub(1, std::memory_order::release);
×
NEW
147
        m_queue_size.fetch_sub(1, std::memory_order::release);
×
NEW
148
        m_size.fetch_sub(1, std::memory_order::release);
×
NEW
149
        return false;
×
150
    }
151

152
    auto* op = new schedule_operation{*this, true};
200,012✔
153
    op->await_suspend(handle);
200,012✔
154
    return true;
200,012✔
155
}
156

157
auto thread_pool_ws::shutdown() noexcept -> void
18✔
158
{
159
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
18✔
160
    {
161
        {
162
            for (auto& info : m_workers)
43✔
163
            {
164
                std::unique_lock<std::mutex> lk{info->m_wait_mutex};
28✔
165
                info->m_sleeping.store(false, std::memory_order::release);
28✔
166
                info->m_wait_cv.notify_one();
28✔
167
            }
28✔
168
            // std::unique_lock<std::mutex> lk{m_wait_mutex};
169
            // m_wait_cv.notify_all();
170
        }
171

172
        while (m_workers_stopped.load(std::memory_order::acquire) < m_opts.thread_count)
228✔
173
        {
174
            std::this_thread::yield();
99✔
175
        }
176

177
        for (auto& worker : m_workers)
43✔
178
        {
179
            if (worker->m_thread.joinable())
28✔
180
            {
181
                worker->m_thread.join();
28✔
182
            }
183
        }
184

185
        // Make sure all try wake tasks are completed prior to destructing this thread pool.
186
        while (m_try_wake_workers_size.load(std::memory_order::acquire) > 0)
30✔
187
        {
NEW
188
            std::this_thread::yield();
×
189
        }
190
    }
191
}
18✔
192

193
auto thread_pool_ws::execute(uint32_t idx) -> void
28✔
194
{
195
    m_thread_pool_queue_idx = {idx};
28✔
196

197
    m_workers_started.fetch_add(1, std::memory_order::release);
28✔
198

199
    if (m_opts.on_thread_start_functor != nullptr)
28✔
200
    {
NEW
201
        m_opts.on_thread_start_functor(idx);
×
202
    }
203

204
    auto&  info         = m_workers[idx];
28✔
205
    auto&  thread_queue = info->m_queue;
28✔
206
    size_t spin_counter{0};
28✔
207
    while (!m_shutdown_requested.load(std::memory_order::acquire))
867,784✔
208
    {
209
        bool had_tasks = drain_thread_queue(thread_queue);
867,658✔
210
        had_tasks |= try_steal(idx);
867,698✔
211
        had_tasks |= drain_global_queue(thread_queue);
867,626✔
212

213
        if (had_tasks)
867,756✔
214
        {
215
            spin_counter = 0;
351,422✔
216
            continue;
351,422✔
217
        }
218

219
        if (++spin_counter <= 100)
516,334✔
220
        {
221
            continue;
516,269✔
222
        }
223

224
        // Notify to all scheduled tasks that this worker is going to sleep and upon
225
        // scheduling a task this worker will need to be woken up.
226
        {
227
            std::scoped_lock lk{m_wait_mutex, info->m_wait_mutex};
65✔
228
            info->m_sleeping.exchange(true);
65✔
229
            m_sleeping.fetch_add(1, std::memory_order::release);
65✔
230
        }
65✔
231

232
        // Go to sleep.
233
        {
234
            std::unique_lock lk{info->m_wait_mutex};
65✔
235
            info->m_wait_cv.wait(
65✔
236
                lk,
237
                [&]() {
119✔
238
                    return !info->m_sleeping.load(std::memory_order::acquire) ||
119✔
239
                           m_queue_size.load(std::memory_order::acquire) > 0 ||
237✔
240
                           m_shutdown_requested.load(std::memory_order::acquire);
174✔
241
                });
242
        }
65✔
243
    }
244

245
    // while true; do gdb -ex run -ex quit --args taskset -c 0-1 ./test/libcoro_test "[thread_pool_ws]"; done
246

247
    // Run until the queue is fully drained.
248
    while (m_queue_size.load(std::memory_order::acquire) > 0)
58✔
249
    {
250
        (void)drain_thread_queue(thread_queue);
1✔
251
        (void)try_steal(idx);
1✔
252
        (void)drain_global_queue(thread_queue);
1✔
253
    }
254

255
    if (m_opts.on_thread_stop_functor != nullptr)
28✔
256
    {
NEW
257
        m_opts.on_thread_stop_functor(idx);
×
258
    }
259

260
    m_workers_stopped.fetch_add(1, std::memory_order::release);
28✔
261
}
28✔
262

263
auto thread_pool_ws::drain_thread_queue(riften::Deque<schedule_operation*>& queue) -> bool
867,641✔
264
{
265
    bool had_task{false};
867,641✔
266
    while (!queue.empty())
4,904,545✔
267
    {
268
        had_task |= resume_task(queue.pop());
4,126,389✔
269
    }
270
    return had_task;
867,746✔
271
}
272

273
auto thread_pool_ws::try_steal(uint32_t my_idx) -> bool
867,634✔
274
{
275
    bool had_tasks{false};
867,634✔
276
    auto queue_size = m_workers.size();
867,634✔
277
    for (size_t i = 1; i < queue_size; ++i)
983,044✔
278
    {
279
        auto steal_idx = (my_idx + i) % queue_size;
115,390✔
280
        had_tasks |= drain_peer_queue(m_workers[steal_idx]->m_queue);
115,390✔
281
    }
282
    return had_tasks;
867,654✔
283
}
284

285
auto thread_pool_ws::drain_peer_queue(riften::Deque<schedule_operation*>& queue) -> bool
112,585✔
286
{
287
    bool had_task{false};
112,585✔
288
    while (!queue.empty())
1,046,252✔
289
    {
290
        had_task |= resume_task(queue.steal());
937,360✔
291
    }
292
    return had_task;
115,844✔
293
}
294

295
auto thread_pool_ws::drain_global_queue(riften::Deque<schedule_operation*>& queue) -> bool
867,603✔
296
{
297
    if (m_global_queue.load(std::memory_order::acquire) != nullptr)
867,603✔
298
    {
299
        auto* head_op = detail::awaiter_list_pop_all(m_global_queue);
192,752✔
300
        if (head_op != nullptr)
192,776✔
301
        {
302
            while (head_op != nullptr)
493,345✔
303
            {
304
                auto* next_op = head_op->m_next;
300,634✔
305
                queue.emplace(head_op);
300,634✔
306
                head_op = next_op;
300,575✔
307
            }
308
        }
309

310
        // 1. If we got the global list we have tasks.
311
        // 2. If we didn't get the global list we know we can possibly steal.
312
        // Either way there should be tasks to process.
313
        return true;
192,717✔
314
    }
315

316
    return false;
674,908✔
317
}
318

319
auto thread_pool_ws::resume_task(std::optional<schedule_operation*> op) -> bool
4,735,161✔
320
{
321
    if (op.has_value())
4,735,161✔
322
    {
323
        m_queue_size.fetch_sub(1, std::memory_order::release);
5,100,272✔
324
        auto v = op.value();
5,100,272✔
325
        auto allocated = v->m_allocated.load(std::memory_order::acquire);
4,761,841✔
326
        v->m_awaiting_coroutine.load(std::memory_order::acquire).resume();
4,568,542✔
327
        m_size.fetch_sub(1, std::memory_order::release);
4,762,339✔
328
        if (allocated)
4,762,339✔
329
        {
330
            delete v;
200,012✔
331
        }
332
        return true;
4,808,961✔
333
    }
334
    return false;
335
}
336

337
auto thread_pool_ws::try_wake_worker() noexcept -> void
4,627,746✔
338
{
339
    // We're shutting down, no need to even attempt to wake any workers.
340
    if (m_shutdown_requested.load(std::memory_order::acquire))
4,627,746✔
341
    {
NEW
342
        m_try_wake_workers_size.fetch_sub(1, std::memory_order::release);
×
NEW
343
        return;
×
344
    }
345

346
    auto asleep = m_sleeping.load(std::memory_order::acquire);
4,523,884✔
347
    while (asleep > 0 && !m_shutdown_requested.load(std::memory_order::acquire))
4,853,847✔
348
    {
349
        if (m_sleeping.compare_exchange_weak(
92✔
350
                asleep, asleep - 1, std::memory_order::acq_rel, std::memory_order::acquire))
351
        {
352
            for (auto & info : m_workers)
107✔
353
            {
354
                std::unique_lock lk {info->m_wait_mutex};
107✔
355
                if (info->m_sleeping.load(std::memory_order::acquire))
107✔
356
                {
357
                    info->m_sleeping.store(false, std::memory_order::release);
45✔
358
                    info->m_wait_cv.notify_one();
45✔
359
                    break; // for
45✔
360
                }
361
            }
107✔
362
            break; // while
46✔
363
        }
364
    }
365

366
    m_try_wake_workers_size.fetch_sub(1, std::memory_order::release);
4,853,847✔
367

368
    // // Attempt to wake a sleeper if there are any.
369
    // if (m_sleeping.load(std::memory_order::acquire) > 0)
370
    // {
371
    //     std::unique_lock<std::mutex> lk{m_wait_mutex};
372
    //     // Check again after acquiring the lock to see if there are any sleeping workers.
373
    //     if (m_sleeping.load(std::memory_order::acquire) == 0)
374
    //     {
375
    //         return;
376
    //     }
377

378
    //     m_sleeping.fetch_sub(1, std::memory_order::release);
379
    //     m_wait_cv.notify_one();
380
    // }
381
}
382

383
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc