• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 18876249349

28 Oct 2025 01:23PM UTC coverage: 82.318%. First build
18876249349

Pull #407

github

web-flow
Merge 40d637537 into f3ca22039
Pull Request #407: coro::thread_pool uses lockless mpmc queue

611 of 865 new or added lines in 5 files covered. (70.64%)

2230 of 2709 relevant lines covered (82.32%)

10470571.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.76
/src/thread_pool.cpp
1
#include "coro/thread_pool.hpp"
2
#include "coro/detail/task_self_deleting.hpp"
3

4
namespace coro
5
{
6
thread_pool::schedule_operation::schedule_operation(thread_pool& tp) noexcept : m_thread_pool(tp)
46,272,079✔
7
{
8

9
}
46,272,079✔
10

11
auto thread_pool::schedule_operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
46,267,522✔
12
{
13
    m_thread_pool.schedule_impl(awaiting_coroutine);
46,267,522✔
14
}
46,269,527✔
15

16
thread_pool::thread_pool(options&& opts, private_constructor)
90✔
17
    : m_opts(opts),
90✔
18
      m_queue(6 * moodycamel::ConcurrentQueue<std::coroutine_handle<>>::BLOCK_SIZE, 1, 1)
90✔
19
{
20
    m_threads.reserve(m_opts.thread_count);
90✔
21
}
90✔
22

23
auto thread_pool::make_unique(options opts) -> std::unique_ptr<thread_pool>
90✔
24
{
25
    auto tp = std::make_unique<thread_pool>(std::move(opts), private_constructor{});
90✔
26

27
    // Initialize the background worker threads once the thread pool is fully constructed
28
    // so the workers have a full ready object to work with.
29
    for (uint32_t i = 0; i < tp->m_opts.thread_count; ++i)
256✔
30
    {
31
        tp->m_threads.emplace_back([tp = tp.get(), i]() { tp->executor(i); });
332✔
32
    }
33

34
    return tp;
90✔
35
}
×
36

37
thread_pool::~thread_pool()
180✔
38
{
39
    shutdown();
90✔
40
}
180✔
41

42
auto thread_pool::schedule() -> schedule_operation
46,276,102✔
43
{
44
    m_size.fetch_add(1, std::memory_order::release);
46,276,102✔
45
    if (!m_shutdown_requested.load(std::memory_order::acquire))
46,276,102✔
46
    {
47
        return schedule_operation{*this};
46,276,090✔
48
    }
49
    else
50
    {
51
        m_size.fetch_sub(1, std::memory_order::release);
1✔
52
        throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
1✔
53
    }
54
}
55

56
auto thread_pool::spawn(coro::task<void>&& task) noexcept -> bool
200,003✔
57
{
58
    m_size.fetch_add(1, std::memory_order::release);
200,003✔
59
    auto wrapper_task = detail::make_task_self_deleting(std::move(task));
200,003✔
60
    wrapper_task.promise().executor_size(m_size);
200,003✔
61
    return resume(wrapper_task.handle());
200,003✔
62
}
200,003✔
63

64
auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> bool
25,889,839✔
65
{
66
    if (handle == nullptr || handle.done())
25,889,839✔
67
    {
68
        return false;
×
69
    }
70

71
    m_size.fetch_add(1, std::memory_order::release);
25,811,559✔
72
    if (m_shutdown_requested.load(std::memory_order::acquire))
25,811,559✔
73
    {
74
        m_size.fetch_sub(1, std::memory_order::release);
×
75
        return false;
×
76
    }
77

78
    schedule_impl(handle);
25,995,660✔
79
    return true;
25,493,858✔
80
}
81

82
auto thread_pool::shutdown() noexcept -> void
146✔
83
{
84
    // Only allow shutdown to occur once.
85
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
146✔
86
    {
87
        for (auto& thread : m_threads)
256✔
88
        {
89
            if (thread.joinable())
166✔
90
            {
91
                thread.join();
166✔
92
            }
93
        }
94
    }
95
}
146✔
96

97
auto thread_pool::executor(std::size_t idx) -> void
166✔
98
{
99
    std::chrono::milliseconds wait_timeout{100};
166✔
100
    if (m_opts.on_thread_start_functor != nullptr)
166✔
101
    {
102
        m_opts.on_thread_start_functor(idx);
4✔
103
    }
104

105
    // Process until shutdown is requested.
106
    while (!m_shutdown_requested.load(std::memory_order::acquire))
56,188,719✔
107
    {
108
        // todo: make this constexpr.
109
        if (m_threads.size() > 1)
56,086,491✔
110
        {
111
            std::coroutine_handle<> handle{nullptr};
11,875,008✔
112
            if (!m_queue.wait_dequeue_timed(handle, wait_timeout))
11,875,008✔
113
            {
114
                continue;
377✔
115
            }
116

117
            handle.resume();
11,872,096✔
118
            m_size.fetch_sub(1, std::memory_order::release);
11,881,696✔
119
        }
120
        else
121
        {
122
            // dequeue multiple items so a continuous yield() on a coroutine doesn't hold the executor thread indefinitly.
123
            std::coroutine_handle<> handles[32] = { nullptr };
44,203,842✔
124
            if (!m_queue.wait_dequeue_bulk_timed(handles, 32, wait_timeout))
44,203,842✔
125
            {
126
                continue;
168✔
127
            }
128

129
            for (auto& handle : handles)
110,720,331✔
130
            {
131
                if (handle == nullptr)
110,078,400✔
132
                {
133
                    break;
43,664,381✔
134
                }
135

136
                handle.resume();
66,397,144✔
137
                m_size.fetch_sub(1, std::memory_order::release);
66,376,751✔
138
            }
139
        }
140
    }
141

142
    // Process until there are no ready tasks left.
143
    while (m_size.load(std::memory_order::acquire) > 0)
316✔
144
    {
145
        if (m_threads.size() > 1)
1✔
146
        {
147
            std::coroutine_handle<> handle{nullptr};
1✔
148
            if (!m_queue.try_dequeue(handle))
1✔
149
            {
NEW
150
                break;
×
151
            }
152

153
            handle.resume();
1✔
154
            m_size.fetch_sub(1, std::memory_order::release);
1✔
155
        }
156
        else
157
        {
NEW
158
            std::coroutine_handle<> handles[32] = { nullptr };
×
NEW
159
            if (!m_queue.wait_dequeue_bulk_timed(handles, 32, wait_timeout))
×
160
            {
NEW
161
                break;
×
162
            }
163

NEW
164
            for (auto& handle : handles)
×
165
            {
NEW
166
                if (handle == nullptr)
×
167
                {
NEW
168
                    break;
×
169
                }
170

NEW
171
                handle.resume();
×
NEW
172
                m_size.fetch_sub(1, std::memory_order::release);
×
173
            }
174
        }
175
    }
176

177
    if (m_opts.on_thread_stop_functor != nullptr)
158✔
178
    {
179
        m_opts.on_thread_stop_functor(idx);
×
180
    }
181
}
153✔
182

183
auto thread_pool::schedule_impl(std::coroutine_handle<> handle) noexcept -> void
72,231,742✔
184
{
185
    if (handle == nullptr || handle.done())
72,231,742✔
186
    {
187
        return;
×
188
    }
189

190
    m_queue.enqueue(handle);
72,123,478✔
191
}
192

193
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc