• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 18877428141

28 Oct 2025 02:03PM UTC coverage: 82.271%. First build
18877428141

Pull #407

github

web-flow
Merge b9eb98ad5 into f3ca22039
Pull Request #407: coro::thread_pool uses lockless mpmc queue

612 of 868 new or added lines in 5 files covered. (70.51%)

2232 of 2713 relevant lines covered (82.27%)

10440887.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.61
/src/thread_pool.cpp
1
#include "coro/thread_pool.hpp"
2
#include "coro/detail/task_self_deleting.hpp"
3

4
namespace coro
5
{
6
thread_pool::schedule_operation::schedule_operation(thread_pool& tp) noexcept : m_thread_pool(tp)
46,272,911✔
7
{
8

9
}
46,272,911✔
10

11
auto thread_pool::schedule_operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
46,267,774✔
12
{
13
    m_thread_pool.schedule_impl(awaiting_coroutine);
46,267,774✔
14
}
46,261,693✔
15

16
thread_pool::thread_pool(options&& opts, private_constructor)
90✔
17
    : m_opts(opts),
90✔
18
      m_queue(6 * moodycamel::ConcurrentQueue<std::coroutine_handle<>>::BLOCK_SIZE, 1, 1)
90✔
19
{
20
    m_threads.reserve(m_opts.thread_count);
90✔
21
}
90✔
22

23
auto thread_pool::make_unique(options opts) -> std::unique_ptr<thread_pool>
90✔
24
{
25
    auto tp = std::make_unique<thread_pool>(std::move(opts), private_constructor{});
90✔
26

27
    // Initialize the background worker threads once the thread pool is fully constructed
28
    // so the workers have a full ready object to work with.
29
    for (uint32_t i = 0; i < tp->m_opts.thread_count; ++i)
256✔
30
    {
31
        tp->m_threads.emplace_back([tp = tp.get(), i]() { tp->executor(i); });
332✔
32
    }
33

34
    return tp;
90✔
35
}
×
36

37
thread_pool::~thread_pool()
180✔
38
{
39
    shutdown();
90✔
40
}
180✔
41

42
auto thread_pool::schedule() -> schedule_operation
46,275,537✔
43
{
44
    m_size.fetch_add(1, std::memory_order::release);
46,275,537✔
45
    if (!m_shutdown_requested.load(std::memory_order::acquire))
46,275,537✔
46
    {
47
        return schedule_operation{*this};
46,276,354✔
48
    }
49
    else
50
    {
51
        m_size.fetch_sub(1, std::memory_order::release);
1✔
52
        throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
1✔
53
    }
54
}
55

56
auto thread_pool::spawn(coro::task<void>&& task) noexcept -> bool
200,003✔
57
{
58
    m_size.fetch_add(1, std::memory_order::release);
200,003✔
59
    auto wrapper_task = detail::make_task_self_deleting(std::move(task));
200,003✔
60
    wrapper_task.promise().executor_size(m_size);
200,003✔
61
    return resume(wrapper_task.handle());
200,003✔
62
}
200,003✔
63

64
auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> bool
25,746,836✔
65
{
66
    if (handle == nullptr || handle.done())
25,746,836✔
67
    {
68
        return false;
×
69
    }
70

71
    m_size.fetch_add(1, std::memory_order::release);
25,632,220✔
72
    if (m_shutdown_requested.load(std::memory_order::acquire))
25,632,220✔
73
    {
74
        m_size.fetch_sub(1, std::memory_order::release);
×
75
        return false;
×
76
    }
77

78
    schedule_impl(handle);
25,380,248✔
79
    return true;
24,942,678✔
80
}
81

82
auto thread_pool::shutdown() noexcept -> void
146✔
83
{
84
    // Only allow shutdown to occur once.
85
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
146✔
86
    {
87
        for (auto& thread : m_threads)
256✔
88
        {
89
            if (thread.joinable())
166✔
90
            {
91
                thread.join();
166✔
92
            }
93
        }
94
    }
95
}
146✔
96

97
auto thread_pool::executor(std::size_t idx) -> void
166✔
98
{
99
    std::chrono::milliseconds wait_timeout{100};
166✔
100
    if (m_opts.on_thread_start_functor != nullptr)
166✔
101
    {
102
        m_opts.on_thread_start_functor(idx);
4✔
103
    }
104

105
    // Process until shutdown is requested.
106
    while (!m_shutdown_requested.load(std::memory_order::acquire))
55,867,060✔
107
    {
108
        // todo: make this constexpr.
109
        if (m_threads.size() > 1)
55,667,529✔
110
        {
111
            std::coroutine_handle<> handle{nullptr};
11,873,272✔
112
            if (!m_queue.wait_dequeue_timed(handle, wait_timeout))
11,873,272✔
113
            {
114
                continue;
376✔
115
            }
116

117
            handle.resume();
11,866,587✔
118
            m_size.fetch_sub(1, std::memory_order::release);
11,880,541✔
119
        }
120
        else
121
        {
122
            // dequeue multiple items so a continuous yield() on a coroutine doesn't hold the executor thread indefinitly.
123
            std::coroutine_handle<> handles[32] = { nullptr };
43,704,339✔
124
            if (!m_queue.wait_dequeue_bulk_timed(handles, 32, wait_timeout))
43,704,339✔
125
            {
126
                continue;
164✔
127
            }
128

129
            for (auto& handle : handles)
110,550,675✔
130
            {
131
                if (handle == nullptr)
109,928,016✔
132
                {
133
                    break;
43,363,154✔
134
                }
135

136
                handle.resume();
66,397,090✔
137
                m_size.fetch_sub(1, std::memory_order::release);
66,296,648✔
138
            }
139
        }
140
    }
141

142
    // Process until there are no ready tasks left.
143
    while (m_size.load(std::memory_order::acquire) > 0)
319✔
144
    {
145
        if (m_threads.size() > 1)
1✔
146
        {
147
            std::coroutine_handle<> handle{nullptr};
1✔
148
            if (!m_queue.try_dequeue(handle))
1✔
149
            {
150
                break;
1✔
151
            }
152

NEW
153
            handle.resume();
×
NEW
154
            m_size.fetch_sub(1, std::memory_order::release);
×
155
        }
156
        else
157
        {
NEW
158
            std::coroutine_handle<> handles[32] = { nullptr };
×
NEW
159
            if (!m_queue.wait_dequeue_bulk_timed(handles, 32, wait_timeout))
×
160
            {
NEW
161
                break;
×
162
            }
163

NEW
164
            for (auto& handle : handles)
×
165
            {
NEW
166
                if (handle == nullptr)
×
167
                {
NEW
168
                    break;
×
169
                }
170

NEW
171
                handle.resume();
×
NEW
172
                m_size.fetch_sub(1, std::memory_order::release);
×
173
            }
174
        }
175
    }
176

177
    if (m_opts.on_thread_stop_functor != nullptr)
160✔
178
    {
179
        m_opts.on_thread_stop_functor(idx);
×
180
    }
181
}
155✔
182

183
auto thread_pool::schedule_impl(std::coroutine_handle<> handle) noexcept -> void
71,622,472✔
184
{
185
    if (handle == nullptr || handle.done())
71,622,472✔
186
    {
187
        return;
×
188
    }
189

190
    m_queue.enqueue(handle);
71,915,127✔
191
}
192

193
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc