• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 18884332794

28 Oct 2025 05:54PM UTC coverage: 80.58%. First build
18884332794

Pull #407

github

web-flow
Merge cfc9cf8a6 into f3ca22039
Pull Request #407: coro::thread_pool uses lockless mpmc queue

633 of 950 new or added lines in 5 files covered. (66.63%)

2249 of 2791 relevant lines covered (80.58%)

10327635.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.33
/src/thread_pool.cpp
1
#include "coro/thread_pool.hpp"
2
#include "coro/detail/task_self_deleting.hpp"
3

4
namespace coro
5
{
6
thread_pool::schedule_operation::schedule_operation(thread_pool& tp) noexcept : m_thread_pool(tp)
46,270,715✔
7
{
8

9
}
46,270,715✔
10

11
auto thread_pool::schedule_operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
46,266,052✔
12
{
13
    m_thread_pool.schedule_impl(awaiting_coroutine);
46,266,052✔
14
}
46,272,115✔
15

16
thread_pool::thread_pool(options&& opts, private_constructor)
90✔
17
    : m_opts(opts)
90✔
18
{
19
    m_threads.reserve(m_opts.thread_count);
90✔
20
    m_queues.reserve(m_opts.thread_count);
90✔
21
}
90✔
22

23
auto thread_pool::make_unique(options opts) -> std::unique_ptr<thread_pool>
90✔
24
{
25
    auto tp = std::make_unique<thread_pool>(std::move(opts), private_constructor{});
90✔
26

27
    // Initialize the background worker threads once the thread pool is fully constructed
28
    // so the workers have a full ready object to work with.
29
    for (uint32_t i = 0; i < tp->m_opts.thread_count; ++i)
256✔
30
    {
31
        tp->m_queues.emplace_back();
166✔
32
        tp->m_threads.emplace_back([tp = tp.get(), i]() { tp->executor(i); });
331✔
33
    }
34

35
    return tp;
90✔
36
}
×
37

38
thread_pool::~thread_pool()
180✔
39
{
40
    shutdown();
90✔
41
}
180✔
42

43
auto thread_pool::schedule() -> schedule_operation
46,274,990✔
44
{
45
    m_size.fetch_add(1, std::memory_order::release);
46,274,990✔
46
    if (!m_shutdown_requested.load(std::memory_order::acquire))
46,274,990✔
47
    {
48
        return schedule_operation{*this};
46,275,084✔
49
    }
50
    else
51
    {
52
        m_size.fetch_sub(1, std::memory_order::release);
1✔
53
        throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
1✔
54
    }
55
}
56

57
auto thread_pool::spawn(coro::task<void>&& task) noexcept -> bool
200,003✔
58
{
59
    m_size.fetch_add(1, std::memory_order::release);
200,003✔
60
    auto wrapper_task = detail::make_task_self_deleting(std::move(task));
200,003✔
61
    wrapper_task.promise().executor_size(m_size);
200,003✔
62
    return resume(wrapper_task.handle());
200,003✔
63
}
200,003✔
64

65
auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> bool
25,810,622✔
66
{
67
    if (handle == nullptr || handle.done())
25,810,622✔
68
    {
69
        return false;
×
70
    }
71

72
    m_size.fetch_add(1, std::memory_order::release);
25,753,046✔
73
    if (m_shutdown_requested.load(std::memory_order::acquire))
25,753,046✔
74
    {
75
        m_size.fetch_sub(1, std::memory_order::release);
×
76
        return false;
×
77
    }
78

79
    schedule_impl(handle);
25,912,600✔
80
    return true;
25,361,457✔
81
}
82

83
auto thread_pool::shutdown() noexcept -> void
146✔
84
{
85
    // Only allow shutdown to occur once.
86
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
146✔
87
    {
88
        for (auto& thread : m_threads)
256✔
89
        {
90
            if (thread.joinable())
166✔
91
            {
92
                thread.join();
166✔
93
            }
94
        }
95
    }
96
}
146✔
97

98
auto thread_pool::executor(std::size_t idx) -> void
165✔
99
{
100
    auto& queue = m_queues[idx];
165✔
101

102
    std::chrono::milliseconds wait_timeout{10};
166✔
103
    if (m_opts.on_thread_start_functor != nullptr)
166✔
104
    {
105
        m_opts.on_thread_start_functor(idx);
4✔
106
    }
107

108
    // Process until shutdown is requested.
109
    while (!m_shutdown_requested.load(std::memory_order::acquire))
55,345,924✔
110
    {
111
        // todo: make this constexpr.
112
        if (m_threads.size() > 1)
55,192,413✔
113
        {
114
            std::coroutine_handle<> handle{nullptr};
11,846,190✔
115
            if (!queue.try_dequeue(handle))
11,846,190✔
116
            {
117
                // attempt to steal some work from our neighbors.
118
                std::size_t i = (idx + 1) % m_queues.size();
10,396,242✔
119
                while (i != idx)
26,689,168✔
120
                {
121
                    auto& q = m_queues[i];
18,287,983✔
122
                    if (q.try_dequeue(handle))
18,303,510✔
123
                    {
124
                        break;
2,049,111✔
125
                    }
126

127
                    i = (i + 1) % m_queues.size();
16,263,806✔
128
                }
129

130
                if (handle == nullptr)
10,450,296✔
131
                {
132
                    // If we cannot get any work off our queue and we tried to steal from all our neighbors go to aleep
133
                    if (!queue.wait_dequeue_timed(handle, wait_timeout))
8,379,327✔
134
                    {
135
                        // If we still didn't get anything loop around.
136
                        continue;
2,920✔
137
                    }
138
                }
139
            }
140

141
            handle.resume();
11,858,937✔
142
            m_size.fetch_sub(1, std::memory_order::release);
11,854,343✔
143
        }
144
        else
145
        {
146
            // dequeue multiple items so a continuous yield() on a coroutine doesn't hold the executor thread indefinitely.
147
            std::coroutine_handle<> handles[32] = { nullptr };
43,310,614✔
148
            if (!queue.wait_dequeue_bulk_timed(handles, 32, wait_timeout))
43,310,614✔
149
            {
150
                continue;
1,167✔
151
            }
152

153
            for (auto& handle : handles)
109,922,388✔
154
            {
155
                if (handle == nullptr)
109,338,704✔
156
                {
157
                    break;
42,903,644✔
158
                }
159

160
                handle.resume();
66,396,981✔
161
                m_size.fetch_sub(1, std::memory_order::release);
66,291,823✔
162
            }
163
        }
164
    }
165

166
    // Process until there are no ready tasks left.
167
    while (m_size.load(std::memory_order::acquire) > 0)
330✔
168
    {
169
        if (m_threads.size() > 1)
1✔
170
        {
171
            bool empty{true};
1✔
172
            for (auto& q : m_queues)
3✔
173
            {
174
                std::coroutine_handle<> handle{nullptr};
2✔
175
                if (!q.try_dequeue(handle))
2✔
176
                {
177
                    continue;
2✔
178
                }
179

NEW
180
                handle.resume();
×
NEW
181
                m_size.fetch_sub(1, std::memory_order::release);
×
NEW
182
                empty = false;
×
183
            }
184

185
            if (empty)
1✔
186
            {
187
                break;
1✔
188
            }
189
        }
190
        else
191
        {
192
            // this is the only queue, drain and quit.
NEW
193
            std::coroutine_handle<> handles[32] = { nullptr };
×
NEW
194
            if (!queue.wait_dequeue_bulk_timed(handles, 32, wait_timeout))
×
195
            {
NEW
196
                break;
×
197
            }
198

NEW
199
            for (auto& handle : handles)
×
200
            {
NEW
201
                if (handle == nullptr)
×
202
                {
NEW
203
                    break;
×
204
                }
205

NEW
206
                handle.resume();
×
NEW
207
                m_size.fetch_sub(1, std::memory_order::release);
×
208
            }
209
        }
210
    }
211

212
    if (m_opts.on_thread_stop_functor != nullptr)
165✔
213
    {
214
        m_opts.on_thread_stop_functor(idx);
×
215
    }
216
}
164✔
217

218
auto thread_pool::schedule_impl(std::coroutine_handle<> handle) noexcept -> void
71,988,410✔
219
{
220
    if (handle == nullptr || handle.done())
71,988,410✔
221
    {
222
        return;
×
223
    }
224

225
    m_queues[m_round_robin.fetch_add(1, std::memory_order::acq_rel) % m_queues.size()].enqueue(handle);
144,012,180✔
226
}
227

228
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc