• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 18884179943

28 Oct 2025 05:48PM UTC coverage: 80.372%. First build
18884179943

Pull #407

github

web-flow
Merge 5e95f8910 into f3ca22039
Pull Request #407: coro::thread_pool uses lockless mpmc queue

621 of 943 new or added lines in 5 files covered. (65.85%)

2244 of 2792 relevant lines covered (80.37%)

10552253.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.15
/src/thread_pool.cpp
1
#include "coro/thread_pool.hpp"
2
#include "coro/detail/task_self_deleting.hpp"
3

4
namespace coro
5
{
6
thread_pool::schedule_operation::schedule_operation(thread_pool& tp) noexcept : m_thread_pool(tp)
46,274,413✔
7
{
8

9
}
46,274,413✔
10

11
auto thread_pool::schedule_operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
46,273,418✔
12
{
13
    m_thread_pool.schedule_impl(awaiting_coroutine);
46,273,418✔
14
}
46,278,769✔
15

16
thread_pool::thread_pool(options&& opts, private_constructor)
90✔
17
    : m_opts(opts)
90✔
18
{
19
    m_threads.reserve(m_opts.thread_count);
90✔
20
    m_queues.reserve(m_opts.thread_count);
90✔
21
}
90✔
22

23
auto thread_pool::make_unique(options opts) -> std::unique_ptr<thread_pool>
90✔
24
{
25
    auto tp = std::make_unique<thread_pool>(std::move(opts), private_constructor{});
90✔
26

27
    // Initialize the background worker threads once the thread pool is fully constructed
28
    // so the workers have a full ready object to work with.
29
    for (uint32_t i = 0; i < tp->m_opts.thread_count; ++i)
256✔
30
    {
31
        tp->m_queues.emplace_back();
166✔
32
        tp->m_threads.emplace_back([tp = tp.get(), i]() { tp->executor(i); });
332✔
33
    }
34

35
    return tp;
90✔
36
}
×
37

38
thread_pool::~thread_pool()
180✔
39
{
40
    shutdown();
90✔
41
}
180✔
42

43
auto thread_pool::schedule() -> schedule_operation
46,279,922✔
44
{
45
    m_size.fetch_add(1, std::memory_order::release);
46,279,922✔
46
    if (!m_shutdown_requested.load(std::memory_order::acquire))
46,279,922✔
47
    {
48
        return schedule_operation{*this};
46,277,197✔
49
    }
50
    else
51
    {
52
        m_size.fetch_sub(1, std::memory_order::release);
1✔
53
        throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
1✔
54
    }
55
}
56

57
auto thread_pool::spawn(coro::task<void>&& task) noexcept -> bool
200,003✔
58
{
59
    m_size.fetch_add(1, std::memory_order::release);
200,003✔
60
    auto wrapper_task = detail::make_task_self_deleting(std::move(task));
200,003✔
61
    wrapper_task.promise().executor_size(m_size);
200,003✔
62
    return resume(wrapper_task.handle());
200,003✔
63
}
200,003✔
64

65
auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> bool
25,811,852✔
66
{
67
    if (handle == nullptr || handle.done())
25,811,852✔
68
    {
69
        return false;
×
70
    }
71

72
    m_size.fetch_add(1, std::memory_order::release);
25,780,240✔
73
    if (m_shutdown_requested.load(std::memory_order::acquire))
25,780,240✔
74
    {
75
        m_size.fetch_sub(1, std::memory_order::release);
×
76
        return false;
×
77
    }
78

79
    schedule_impl(handle);
25,580,539✔
80
    return true;
25,322,669✔
81
}
82

83
auto thread_pool::shutdown() noexcept -> void
146✔
84
{
85
    // Only allow shutdown to occur once.
86
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
146✔
87
    {
88
        for (auto& thread : m_threads)
256✔
89
        {
90
            if (thread.joinable())
166✔
91
            {
92
                thread.join();
166✔
93
            }
94
        }
95
    }
96
}
146✔
97

98
auto thread_pool::executor(std::size_t idx) -> void
166✔
99
{
100
    auto& queue = m_queues[idx];
166✔
101

102
    std::chrono::milliseconds wait_timeout{10};
166✔
103
    if (m_opts.on_thread_start_functor != nullptr)
166✔
104
    {
105
        m_opts.on_thread_start_functor(idx);
4✔
106
    }
107

108
    // Process until shutdown is requested.
109
    while (!m_shutdown_requested.load(std::memory_order::acquire))
53,563,914✔
110
    {
111
        // todo: make this constexpr.
112
        if (m_threads.size() > 1)
53,223,846✔
113
        {
114
            std::coroutine_handle<> handle{nullptr};
11,821,739✔
115
            if (!queue.wait_dequeue_timed(handle, wait_timeout))
11,821,739✔
116
            {
117
                // attempt to steal some work from our neighbors.
118
                std::size_t i = (idx + 1) % m_queues.size();
2,888✔
119
                while (i != idx)
8,593✔
120
                {
121
                    auto& q = m_queues[i];
5,752✔
122
                    if (q.try_dequeue(handle))
5,747✔
123
                    {
124
                        break;
6✔
125
                    }
126

127
                    i = (i + 1) % m_queues.size();
5,723✔
128
                }
129

130
                if (handle == nullptr)
2,847✔
131
                {
132
                    continue;
2,877✔
133
                }
134
            }
135

136
            handle.resume();
11,831,591✔
137
            m_size.fetch_sub(1, std::memory_order::release);
11,829,523✔
138
        }
139
        else
140
        {
141
            // dequeue multiple items so a continuous yield() on a coroutine doesn't hold the executor thread indefinitely.
142
            std::coroutine_handle<> handles[32] = { nullptr };
41,333,200✔
143
            if (!queue.wait_dequeue_bulk_timed(handles, 32, wait_timeout))
41,333,200✔
144
            {
145
                continue;
1,161✔
146
            }
147

148
            for (auto& handle : handles)
108,436,409✔
149
            {
150
                if (handle == nullptr)
107,915,830✔
151
                {
152
                    break;
41,209,608✔
153
                }
154

155
                handle.resume();
66,397,248✔
156
                m_size.fetch_sub(1, std::memory_order::release);
66,164,709✔
157
            }
158
        }
159
    }
160

161
    // Process until there are no ready tasks left.
162
    while (m_size.load(std::memory_order::acquire) > 0)
327✔
163
    {
164
        if (m_threads.size() > 1)
2✔
165
        {
166
            bool empty{true};
2✔
167
            for (auto& q : m_queues)
6✔
168
            {
169
                std::coroutine_handle<> handle{nullptr};
4✔
170
                if (!q.try_dequeue(handle))
4✔
171
                {
172
                    continue;
2✔
173
                }
174

175
                handle.resume();
2✔
176
                m_size.fetch_sub(1, std::memory_order::release);
2✔
177
                empty = false;
2✔
178
            }
179

180
            if (empty)
2✔
181
            {
NEW
182
                break;
×
183
            }
184
        }
185
        else
186
        {
187
            // this is the only queue, drain and quit.
NEW
188
            std::coroutine_handle<> handles[32] = { nullptr };
×
189
            if (!queue.wait_dequeue_bulk_timed(handles, 32, wait_timeout))
×
190
            {
NEW
191
                break;
×
192
            }
193

NEW
194
            for (auto& handle : handles)
×
195
            {
NEW
196
                if (handle == nullptr)
×
197
                {
198
                    break;
×
199
                }
200

NEW
201
                handle.resume();
×
NEW
202
                m_size.fetch_sub(1, std::memory_order::release);
×
203
            }
204
        }
205
    }
206

207
    if (m_opts.on_thread_stop_functor != nullptr)
161✔
208
    {
NEW
209
        m_opts.on_thread_stop_functor(idx);
×
210
    }
211
}
160✔
212

213
auto thread_pool::schedule_impl(std::coroutine_handle<> handle) noexcept -> void
71,818,770✔
214
{
215
    if (handle == nullptr || handle.done())
71,818,770✔
216
    {
217
        return;
×
218
    }
219

220
    m_queues[m_round_robin.fetch_add(1, std::memory_order::acq_rel) % m_queues.size()].enqueue(handle);
143,808,714✔
221
}
222

223
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc