• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 25259372037

02 May 2026 06:56PM UTC coverage: 86.791%. First build
25259372037

Pull #450

github

web-flow
Merge d4b9b5af9 into 277467ffd
Pull Request #450: coro::thread_pool_ws (work stealing)

297 of 315 new or added lines in 17 files covered. (94.29%)

2096 of 2415 relevant lines covered (86.79%)

4506320.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.86
/src/thread_pool_ws.cpp
1
#include "coro/thread_pool_ws.hpp"
2
#include "coro/task_group.hpp"
3

4
#include <iostream>
5
#include <mutex>
6

7
#ifdef LIBCORO_TSAN
8
#include <sanitizer/tsan_interface.h>
9
#endif
10

11
namespace coro
12
{
13
namespace detail
14
{
15
static auto make_spawned_joinable_wait_task(std::unique_ptr<coro::task_group<coro::thread_pool_ws>> group_ptr)
8✔
16
    -> coro::task<void>
17
{
18
    co_await *group_ptr;
19
    co_return;
20
}
16✔
21

22
} // namespace detail
23

24
thread_local std::optional<uint32_t> thread_pool_ws::m_thread_pool_queue_idx{std::nullopt};
25

26
thread_pool_ws::worker_info::worker_info(thread_pool_ws& tp, uint32_t i) : m_thread_pool(tp), m_idx(i)
28✔
27
{
28
}
28✔
29

30
thread_pool_ws::thread_pool_ws(options opts, private_constructor) : m_opts(std::move(opts))
15✔
31
{
32
    m_workers.reserve(m_opts.thread_count);
15✔
33
}
15✔
34

35
auto thread_pool_ws::worker_info::start() -> void
28✔
36
{
37
    // Each worker's thread is only started after *all* worker_info structures are initialized
38
    // since they all reference each other's queues for stealing.
39
    m_thread = std::thread([this]() -> void { m_thread_pool.execute(m_idx); });
56✔
40
}
28✔
41

42
auto thread_pool_ws::make_unique(options opts) -> std::unique_ptr<thread_pool_ws>
15✔
43
{
44
    auto tp = std::make_unique<thread_pool_ws>(std::move(opts), private_constructor{});
15✔
45

46
    for (uint32_t i = 0; i < tp->m_opts.thread_count; ++i)
43✔
47
    {
48
        tp->m_workers.emplace_back(std::make_unique<worker_info>(*tp, i));
28✔
49
    }
50

51
    for (auto& worker : tp->m_workers)
43✔
52
    {
53
        worker->start();
28✔
54
    }
55

56
    while (tp->m_workers_started.load(std::memory_order::acquire) != tp->m_opts.thread_count)
234✔
57
    {
58
        std::this_thread::yield();
102✔
59
    }
60

61
    return tp;
15✔
NEW
62
}
×
63

64
thread_pool_ws::~thread_pool_ws()
15✔
65
{
66
    shutdown();
15✔
67
}
15✔
68

69
thread_pool_ws::schedule_operation::schedule_operation(thread_pool_ws& tp, bool allocated) noexcept
4,562,666✔
70
    : m_thread_pool(tp),
4,370,877✔
71
      m_allocated(allocated)
4,562,666✔
72
{
73
#ifdef LIBCORO_TSAN
74
    // TSAN thinks the non-atomic constructor is a data race with the loads of this atomic on other threads.
75
    __tsan_release(&m_allocated);
76
#endif
77
}
4,448,171✔
78

79
auto thread_pool_ws::schedule_operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
4,340,846✔
80
{
81
    m_awaiting_coroutine.store(awaiting_coroutine, std::memory_order::release);
4,340,846✔
82
    auto& tp = m_thread_pool; // take a copy since allocated schedule_operations can be deleted before this function completes.
4,884,041✔
83

84
    // See if we are running on a thread pool worker to enqueue locally.
85
    auto& idx = thread_pool_ws::m_thread_pool_queue_idx;
4,884,041✔
86
    if (idx.has_value())
4,884,041✔
87
    {
88
        m_thread_pool.m_workers[idx.value()]->m_queue.emplace(this);
4,408,087✔
89
    }
90
    else
91
    {
92
        detail::awaiter_list_push(m_thread_pool.m_global_queue, this);
301,044✔
93
    }
94

95
    // At this point `this` is no longer safe to use since a worker thread could have completed this coroutine.
96
    tp.try_wake_worker();
4,900,500✔
97
}
4,867,875✔
98

99
auto thread_pool_ws::schedule() -> schedule_operation
3,931,733✔
100
{
101
    m_try_wake_workers_size.fetch_add(1, std::memory_order::release);
3,931,733✔
102
    m_queue_size.fetch_add(1, std::memory_order::release);
3,931,733✔
103
    m_size.fetch_add(1, std::memory_order::release);
3,931,733✔
104
    if (m_shutdown_requested.load(std::memory_order::acquire))
3,931,733✔
105
    {
106
        m_try_wake_workers_size.fetch_sub(1, std::memory_order::release);
1✔
107
        m_queue_size.fetch_sub(1, std::memory_order::release);
1✔
108
        m_size.fetch_sub(1, std::memory_order::release);
1✔
109
        throw std::runtime_error("coro::thread_pool_ws is shutting down, unable to schedule new tasks.");
1✔
110
    }
111

112
    return schedule_operation{*this};
4,524,518✔
113
}
114

115
auto thread_pool_ws::spawn_detached(coro::task<void> user_task) -> bool
200,003✔
116
{
117
    if (m_shutdown_requested.load(std::memory_order::acquire))
200,003✔
118
    {
NEW
119
        return false;
×
120
    }
121

122
    auto wrapper_task = detail::make_task_self_deleting(std::move(user_task));
200,003✔
123
    return resume(wrapper_task.handle());
200,003✔
124
}
125

126
auto thread_pool_ws::spawn_joinable(coro::task<void> user_task) -> coro::task<void>
8✔
127
{
128
    if (m_shutdown_requested.load(std::memory_order::acquire))
8✔
129
    {
NEW
130
        throw std::runtime_error("coro::thread_pool_ws is shutting down, unable to spawn new tasks.");
×
131
    }
132

133
    auto group_ptr = std::make_unique<coro::task_group<coro::thread_pool_ws>>(this, std::move(user_task));
8✔
134
    return detail::make_spawned_joinable_wait_task(std::move(group_ptr));
16✔
135
}
8✔
136

137
auto thread_pool_ws::resume(std::coroutine_handle<> handle) noexcept -> bool
200,012✔
138
{
139
    if (handle == nullptr || handle.done())
200,012✔
140
    {
NEW
141
        return false;
×
142
    }
143

144
    m_try_wake_workers_size.fetch_add(1, std::memory_order::release);
200,012✔
145
    m_queue_size.fetch_add(1, std::memory_order::release);
200,012✔
146
    m_size.fetch_add(1, std::memory_order::release);
200,012✔
147
    if (m_shutdown_requested.load(std::memory_order::acquire))
200,012✔
148
    {
NEW
149
        m_try_wake_workers_size.fetch_sub(1, std::memory_order::release);
×
NEW
150
        m_queue_size.fetch_sub(1, std::memory_order::release);
×
NEW
151
        m_size.fetch_sub(1, std::memory_order::release);
×
NEW
152
        return false;
×
153
    }
154

155
    auto* op = new schedule_operation{*this, true};
200,012✔
156
    op->await_suspend(handle);
200,012✔
157
    return true;
200,012✔
158
}
159

160
auto thread_pool_ws::shutdown() noexcept -> void
18✔
161
{
162
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
18✔
163
    {
164
        {
165
            for (auto& info : m_workers)
43✔
166
            {
167
                std::unique_lock<std::mutex> lk{info->m_wait_mutex};
28✔
168
                info->m_sleeping.store(false, std::memory_order::release);
28✔
169
                info->m_wait_cv.notify_one();
28✔
170
            }
28✔
171
            // std::unique_lock<std::mutex> lk{m_wait_mutex};
172
            // m_wait_cv.notify_all();
173
        }
174

175
        while (m_workers_stopped.load(std::memory_order::acquire) < m_opts.thread_count)
262✔
176
        {
177
            std::this_thread::yield();
116✔
178
        }
179

180
        for (auto& worker : m_workers)
43✔
181
        {
182
            if (worker->m_thread.joinable())
28✔
183
            {
184
                worker->m_thread.join();
28✔
185
            }
186
        }
187

188
        // Make sure all try wake tasks are completed prior to destructing this thread pool.
189
        while (m_try_wake_workers_size.load(std::memory_order::acquire) > 0)
30✔
190
        {
NEW
191
            std::this_thread::yield();
×
192
        }
193
    }
194
}
18✔
195

196
auto thread_pool_ws::execute(uint32_t idx) -> void
28✔
197
{
198
    m_thread_pool_queue_idx = {idx};
28✔
199

200
    m_workers_started.fetch_add(1, std::memory_order::release);
28✔
201

202
    if (m_opts.on_thread_start_functor != nullptr)
28✔
203
    {
NEW
204
        m_opts.on_thread_start_functor(idx);
×
205
    }
206

207
    auto&  info         = m_workers[idx];
28✔
208
    auto&  thread_queue = info->m_queue;
28✔
209
    size_t spin_counter{0};
28✔
210
    while (!m_shutdown_requested.load(std::memory_order::acquire))
903,568✔
211
    {
212
        bool had_tasks = drain_thread_queue(thread_queue);
903,441✔
213
        had_tasks |= try_steal(idx);
903,440✔
214
        had_tasks |= drain_global_queue(thread_queue);
903,444✔
215

216
        if (had_tasks)
903,540✔
217
        {
218
            spin_counter = 0;
315,734✔
219
            continue;
315,734✔
220
        }
221

222
        if (++spin_counter <= 100)
587,806✔
223
        {
224
            continue;
587,525✔
225
        }
226

227
        // Notify to all scheduled tasks that this worker is going to sleep and upon
228
        // scheduling a task this worker will need to be woken up.
229
        {
230
            std::scoped_lock lk{m_wait_mutex, info->m_wait_mutex};
281✔
231
            info->m_sleeping.exchange(true);
281✔
232
            m_sleeping.fetch_add(1, std::memory_order::release);
281✔
233
        }
281✔
234

235
        // Go to sleep.
236
        {
237
            std::unique_lock lk{info->m_wait_mutex};
281✔
238
            info->m_wait_cv.wait(
281✔
239
                lk,
240
                [&]() {
368✔
241
                    return !info->m_sleeping.load(std::memory_order::acquire) ||
368✔
242
                           m_queue_size.load(std::memory_order::acquire) > 0 ||
731✔
243
                           m_shutdown_requested.load(std::memory_order::acquire);
454✔
244
                });
245
        }
280✔
246
    }
247

248
    // while true; do gdb -ex run -ex quit --args taskset -c 0-1 ./test/libcoro_test "[thread_pool_ws]"; done
249

250
    // Run until the queue is fully drained.
251
    while (m_queue_size.load(std::memory_order::acquire) > 0)
56✔
252
    {
NEW
253
        (void)drain_thread_queue(thread_queue);
×
NEW
254
        (void)try_steal(idx);
×
NEW
255
        (void)drain_global_queue(thread_queue);
×
256
    }
257

258
    if (m_opts.on_thread_stop_functor != nullptr)
28✔
259
    {
NEW
260
        m_opts.on_thread_stop_functor(idx);
×
261
    }
262

263
    m_workers_stopped.fetch_add(1, std::memory_order::release);
28✔
264
}
28✔
265

266
auto thread_pool_ws::drain_thread_queue(riften::Deque<schedule_operation*>& queue) -> bool
903,418✔
267
{
268
    bool had_task{false};
903,418✔
269
    while (!queue.empty())
4,933,638✔
270
    {
271
        had_task |= resume_task(queue.pop());
4,133,880✔
272
    }
273
    return had_task;
903,482✔
274
}
275

276
auto thread_pool_ws::try_steal(uint32_t my_idx) -> bool
903,352✔
277
{
278
    bool had_tasks{false};
903,352✔
279
    auto queue_size = m_workers.size();
903,352✔
280
    for (size_t i = 1; i < queue_size; ++i)
962,940✔
281
    {
282
        auto steal_idx = (my_idx + i) % queue_size;
59,490✔
283
        had_tasks |= drain_peer_queue(m_workers[steal_idx]->m_queue);
59,490✔
284
    }
285
    return had_tasks;
903,450✔
286
}
287

288
auto thread_pool_ws::drain_peer_queue(riften::Deque<schedule_operation*>& queue) -> bool
57,150✔
289
{
290
    bool had_task{false};
57,150✔
291
    while (!queue.empty())
1,000,498✔
292
    {
293
        had_task |= resume_task(queue.steal());
944,094✔
294
    }
295
    return had_task;
60,453✔
296
}
297

298
auto thread_pool_ws::drain_global_queue(riften::Deque<schedule_operation*>& queue) -> bool
903,405✔
299
{
300
    if (m_global_queue.load(std::memory_order::acquire) != nullptr)
903,405✔
301
    {
302
        auto* head_op = detail::awaiter_list_pop_all(m_global_queue);
196,600✔
303
        if (head_op != nullptr)
196,615✔
304
        {
305
            while (head_op != nullptr)
496,767✔
306
            {
307
                auto* next_op = head_op->m_next;
300,328✔
308
                queue.emplace(head_op);
300,328✔
309
                head_op = next_op;
300,153✔
310
            }
311
        }
312

313
        // 1. If we got the global list we have tasks.
314
        // 2. If we didn't get the global list we know we can possibly steal.
315
        // Either way there should be tasks to process.
316
        return true;
196,440✔
317
    }
318

319
    return false;
706,871✔
320
}
321

322
auto thread_pool_ws::resume_task(std::optional<schedule_operation*> op) -> bool
4,791,351✔
323
{
324
    if (op.has_value())
4,791,351✔
325
    {
326
        m_queue_size.fetch_sub(1, std::memory_order::release);
5,068,661✔
327
        auto v = op.value();
5,068,661✔
328
        auto allocated = v->m_allocated.load(std::memory_order::acquire);
4,645,388✔
329
        v->m_awaiting_coroutine.load(std::memory_order::acquire).resume();
4,497,473✔
330
        m_size.fetch_sub(1, std::memory_order::release);
4,752,720✔
331
        if (allocated)
4,752,720✔
332
        {
333
            delete v;
200,012✔
334
        }
335
        return true;
4,810,592✔
336
    }
337
    return false;
338
}
339

340
auto thread_pool_ws::try_wake_worker() noexcept -> void
4,727,391✔
341
{
342
    // We're shutting down, no need to even attempt to wake any workers.
343
    if (m_shutdown_requested.load(std::memory_order::acquire))
4,727,391✔
344
    {
NEW
345
        m_try_wake_workers_size.fetch_sub(1, std::memory_order::release);
×
NEW
346
        return;
×
347
    }
348

349
    auto asleep = m_sleeping.load(std::memory_order::acquire);
4,613,216✔
350
    while (asleep > 0 && !m_shutdown_requested.load(std::memory_order::acquire))
4,848,705✔
351
    {
352
        if (m_sleeping.compare_exchange_weak(
518✔
353
                asleep, asleep - 1, std::memory_order::acq_rel, std::memory_order::acquire))
354
        {
355
            for (auto & info : m_workers)
1,150✔
356
            {
357
                std::unique_lock lk {info->m_wait_mutex};
989✔
358
                if (info->m_sleeping.load(std::memory_order::acquire))
988✔
359
                {
360
                    info->m_sleeping.store(false, std::memory_order::release);
99✔
361
                    info->m_wait_cv.notify_one();
99✔
362
                    break; // for
99✔
363
                }
364
            }
952✔
365
            break; // while
257✔
366
        }
367
    }
368

369
    m_try_wake_workers_size.fetch_sub(1, std::memory_order::release);
4,848,703✔
370

371
    // // Attempt to wake a sleeper if there are any.
372
    // if (m_sleeping.load(std::memory_order::acquire) > 0)
373
    // {
374
    //     std::unique_lock<std::mutex> lk{m_wait_mutex};
375
    //     // Check again after acquiring the lock to see if there are any sleeping workers.
376
    //     if (m_sleeping.load(std::memory_order::acquire) == 0)
377
    //     {
378
    //         return;
379
    //     }
380

381
    //     m_sleeping.fetch_sub(1, std::memory_order::release);
382
    //     m_wait_cv.notify_one();
383
    // }
384
}
385

386
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc