• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 18886238189

28 Oct 2025 07:04PM UTC coverage: 80.614%. First build
18886238189

Pull #407

github

web-flow
Merge add42f5e8 into f3ca22039
Pull Request #407: coro::thread_pool uses lockless mpmc queue

641 of 960 new or added lines in 5 files covered. (66.77%)

2258 of 2801 relevant lines covered (80.61%)

10556979.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.05
/src/thread_pool.cpp
1
#include "coro/thread_pool.hpp"
2
#include "coro/detail/task_self_deleting.hpp"
3

4
namespace coro
5
{
6
thread_pool::schedule_operation::schedule_operation(thread_pool& tp) noexcept : m_thread_pool(tp)
46,271,902✔
7
{
8

9
}
46,271,902✔
10

11
auto thread_pool::schedule_operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
46,266,292✔
12
{
13
    m_thread_pool.schedule_impl(awaiting_coroutine);
46,266,292✔
14
}
46,265,502✔
15

16
thread_pool::thread_pool(options&& opts, private_constructor)
90✔
17
    : m_opts(opts)
90✔
18
{
19
    m_threads.reserve(m_opts.thread_count);
90✔
20
    m_executor_idx.reserve(m_opts.thread_count);
90✔
21
    m_queues.reserve(m_opts.thread_count);
90✔
22
}
90✔
23

24
auto thread_pool::make_unique(options opts) -> std::unique_ptr<thread_pool>
90✔
25
{
26
    auto tp = std::make_unique<thread_pool>(std::move(opts), private_constructor{});
90✔
27

28
    // Initialize the background worker threads once the thread pool is fully constructed
29
    // so the workers have a full ready object to work with.
30
    for (uint32_t i = 0; i < tp->m_opts.thread_count; ++i)
256✔
31
    {
32
        tp->m_queues.emplace_back();
166✔
33
        tp->m_executor_idx.emplace_back(i);
166✔
34
        tp->m_threads.emplace_back([tp = tp.get(), i]() { tp->executor(i); });
332✔
35
    }
36

37
    return tp;
90✔
38
}
×
39

40
thread_pool::~thread_pool()
180✔
41
{
42
    shutdown();
90✔
43
}
180✔
44

45
auto thread_pool::schedule() -> schedule_operation
46,274,063✔
46
{
47
    m_size.fetch_add(1, std::memory_order::release);
46,274,063✔
48
    if (!m_shutdown_requested.load(std::memory_order::acquire))
46,274,063✔
49
    {
50
        return schedule_operation{*this};
46,274,978✔
51
    }
52
    else
53
    {
54
        m_size.fetch_sub(1, std::memory_order::release);
1✔
55
        throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
1✔
56
    }
57
}
58

59
auto thread_pool::spawn(coro::task<void>&& task) noexcept -> bool
200,003✔
60
{
61
    m_size.fetch_add(1, std::memory_order::release);
200,003✔
62
    auto wrapper_task = detail::make_task_self_deleting(std::move(task));
200,003✔
63
    wrapper_task.promise().executor_size(m_size);
200,003✔
64
    return resume(wrapper_task.handle());
200,003✔
65
}
200,003✔
66

67
auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> bool
25,777,087✔
68
{
69
    if (handle == nullptr || handle.done())
25,777,087✔
70
    {
71
        return false;
×
72
    }
73

74
    m_size.fetch_add(1, std::memory_order::release);
25,681,389✔
75
    if (m_shutdown_requested.load(std::memory_order::acquire))
25,681,389✔
76
    {
77
        m_size.fetch_sub(1, std::memory_order::release);
×
78
        return false;
×
79
    }
80

81
    schedule_impl(handle);
25,622,392✔
82
    return true;
24,926,405✔
83
}
84

85
auto thread_pool::shutdown() noexcept -> void
146✔
86
{
87
    // Only allow shutdown to occur once.
88
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
146✔
89
    {
90
        for (auto& thread : m_threads)
256✔
91
        {
92
            if (thread.joinable())
166✔
93
            {
94
                thread.join();
166✔
95
            }
96
        }
97
    }
98
}
146✔
99

100
auto thread_pool::executor(std::size_t idx) -> void
165✔
101
{
102
    auto& queue = m_queues[idx];
165✔
103

104
    std::chrono::milliseconds wait_timeout{10};
166✔
105
    if (m_opts.on_thread_start_functor != nullptr)
166✔
106
    {
107
        m_opts.on_thread_start_functor(idx);
4✔
108
    }
109

110
    // Process until shutdown is requested.
111
    while (!m_shutdown_requested.load(std::memory_order::acquire))
56,272,629✔
112
    {
113
        // todo: make this constexpr.
114
        if (m_threads.size() > 1)
56,181,722✔
115
        {
116
            std::coroutine_handle<> handle{nullptr};
11,823,135✔
117
            if (!queue.try_dequeue(handle))
11,823,135✔
118
            {
119
                // attempt to steal some work from our neighbors.
120
                std::size_t i = (idx + 1) % m_queues.size();
10,387,896✔
121
                while (i != idx)
27,422,674✔
122
                {
123
                    auto& q = m_queues[i];
19,044,987✔
124
                    if (q.try_dequeue(handle))
19,072,248✔
125
                    {
126
                        break;
2,084,026✔
127
                    }
128

129
                    i = (i + 1) % m_queues.size();
17,007,247✔
130
                }
131

132
                if (handle == nullptr)
10,461,713✔
133
                {
134
                    // If we cannot get any work off our queue and we tried to steal from all our neighbors go to asleep
135
                    if (!queue.wait_dequeue_timed(handle, wait_timeout))
8,357,326✔
136
                    {
137
                        // If we still didn't get anything loop around.
138
                        continue;
2,800✔
139
                    }
140
                }
141
            }
142

143
            handle.resume();
11,842,105✔
144
            m_size.fetch_sub(1, std::memory_order::release);
11,827,834✔
145
        }
146
        else
147
        {
148
            // dequeue multiple items so a continuous yield() on a coroutine doesn't hold the executor thread indefinitely.
149
            std::coroutine_handle<> handles[32] = { nullptr };
44,350,397✔
150
            if (!queue.wait_dequeue_bulk_timed(handles, 32, wait_timeout))
44,350,397✔
151
            {
152
                continue;
1,157✔
153
            }
154

155
            for (auto& handle : handles)
110,849,509✔
156
            {
157
                if (handle == nullptr)
110,218,155✔
158
                {
159
                    break;
43,809,318✔
160
                }
161

162
                handle.resume();
66,395,327✔
163
                m_size.fetch_sub(1, std::memory_order::release);
66,380,172✔
164
            }
165
        }
166
    }
167

168
    // Process until there are no ready tasks left.
169
    while (m_size.load(std::memory_order::acquire) > 0)
328✔
170
    {
171
        if (m_threads.size() > 1)
1✔
172
        {
173
            bool empty{true};
1✔
174
            for (auto& q : m_queues)
3✔
175
            {
176
                std::coroutine_handle<> handle{nullptr};
2✔
177
                if (!q.try_dequeue(handle))
2✔
178
                {
179
                    continue;
1✔
180
                }
181

182
                handle.resume();
1✔
183
                m_size.fetch_sub(1, std::memory_order::release);
1✔
184
                empty = false;
1✔
185
            }
186

187
            if (empty)
1✔
188
            {
NEW
189
                break;
×
190
            }
191
        }
192
        else
193
        {
194
            // this is the only queue, drain and quit.
NEW
195
            std::coroutine_handle<> handles[32] = { nullptr };
×
NEW
196
            if (!queue.wait_dequeue_bulk_timed(handles, 32, wait_timeout))
×
197
            {
NEW
198
                break;
×
199
            }
200

NEW
201
            for (auto& handle : handles)
×
202
            {
NEW
203
                if (handle == nullptr)
×
204
                {
NEW
205
                    break;
×
206
                }
207

NEW
208
                handle.resume();
×
NEW
209
                m_size.fetch_sub(1, std::memory_order::release);
×
210
            }
211
        }
212
    }
213

214
    if (m_opts.on_thread_stop_functor != nullptr)
164✔
215
    {
216
        m_opts.on_thread_stop_functor(idx);
×
217
    }
218
}
162✔
219

220
auto thread_pool::schedule_impl(std::coroutine_handle<> handle) noexcept -> void
71,838,018✔
221
{
222
    if (handle == nullptr || handle.done())
71,838,018✔
223
    {
224
        return;
×
225
    }
226

227
    // Attempt to see if we are on one of the thread_pool threads and enqueue to our local queue.
228
    for (std::size_t i = 0; i < m_executor_idx.size(); i++)
167,702,760✔
229
    {
230
        if (m_executor_idx[i] == std::this_thread::get_id())
94,653,442✔
231
        {
NEW
232
            m_queues[i].enqueue(handle);
×
233
        }
234
    }
235

236
    m_queues[m_round_robin.fetch_add(1, std::memory_order::acq_rel) % m_queues.size()].enqueue(handle);
145,120,534✔
237
}
238

239
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc