• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 15448013898

04 Jun 2025 04:49PM UTC coverage: 87.93%. First build
15448013898

Pull #336

github

web-flow
Merge e238fa0b8 into f0cccaaf4
Pull Request #336: coro::ring_buffer use coro::mutex instead of std::mutex

79 of 88 new or added lines in 4 files covered. (89.77%)

1610 of 1831 relevant lines covered (87.93%)

4633499.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.8
/src/thread_pool.cpp
1
#include "coro/thread_pool.hpp"
2
#include "coro/detail/task_self_deleting.hpp"
3

4
#include <iostream>
5

6
namespace coro
7
{
8
thread_pool::operation::operation(thread_pool& tp) noexcept : m_thread_pool(tp)
46,253,244✔
9
{
10
}
46,254,887✔
11

12
auto thread_pool::operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
46,249,388✔
13
{
14
    m_awaiting_coroutine = awaiting_coroutine;
46,249,388✔
15
    m_thread_pool.schedule_impl(m_awaiting_coroutine);
46,249,388✔
16

17
    // void return on await_suspend suspends the _this_ coroutine, which is now scheduled on the
18
    // thread pool and returns control to the caller.  They could be sync_wait'ing or go do
19
    // something else while this coroutine gets picked up by the thread pool.
20
}
46,279,245✔
21

22
thread_pool::thread_pool(options opts) : m_opts(std::move(opts))
83✔
23
{
24
    m_threads.reserve(m_opts.thread_count);
83✔
25

26
    for (uint32_t i = 0; i < m_opts.thread_count; ++i)
242✔
27
    {
28
        m_threads.emplace_back([this, i]() { executor(i); });
318✔
29
    }
30
}
83✔
31

32
thread_pool::~thread_pool()
80✔
33
{
34
    shutdown();
59✔
35
    std::cerr << "~thread_pool() exit\n";
59✔
36
}
80✔
37

38
auto thread_pool::schedule() -> operation
46,271,816✔
39
{
40
    m_size.fetch_add(1, std::memory_order::release);
46,271,816✔
41
    if (!m_shutdown_requested.load(std::memory_order::acquire))
46,271,816✔
42
    {
43
        return operation{*this};
46,269,277✔
44
    }
45
    else
46
    {
47
        m_size.fetch_sub(1, std::memory_order::release);
1✔
48
        throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
1✔
49
    }
50
}
51

52
auto thread_pool::spawn(coro::task<void>&& task) noexcept -> bool
200,003✔
53
{
54
    m_size.fetch_add(1, std::memory_order::release);
200,003✔
55
    auto wrapper_task = detail::make_task_self_deleting(std::move(task));
200,003✔
56
    wrapper_task.promise().executor_size(m_size);
200,003✔
57
    return resume(wrapper_task.handle());
200,003✔
58
}
200,003✔
59

60
auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> bool
26,265,600✔
61
{
62
    if (handle == nullptr || handle.done())
26,265,600✔
63
    {
64
        return false;
×
65
    }
66

67
    m_size.fetch_add(1, std::memory_order::release);
26,198,313✔
68
    if (m_shutdown_requested.load(std::memory_order::acquire))
26,198,313✔
69
    {
70
        m_size.fetch_sub(1, std::memory_order::release);
×
71
        return false;
×
72
    }
73

74
    schedule_impl(handle);
26,216,238✔
75
    return true;
26,381,703✔
76
}
77

78
auto thread_pool::shutdown() noexcept -> void
86✔
79
{
80
    // Only allow shutdown to occur once.
81
    std::cerr << "thread_pool::shutdown()\n";
86✔
82
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
86✔
83
    {
84
        {
85
            // There is a race condition if we are not holding the lock with the executors
86
            // to always receive this.  std::jthread stop token works without this properly.
87
            std::cerr << "thread_pool::shutdown() lock()\n";
59✔
88
            std::unique_lock<std::mutex> lk{m_wait_mutex};
59✔
89
            std::cerr << "thread_pool::shutdown() notify_all()\n";
59✔
90
            m_wait_cv.notify_all();
59✔
91
        }
59✔
92

93
        std::cerr << "thread_pool::shutdown() m_threads.size() = " << m_threads.size() << "\n";
59✔
94
        for (auto& thread : m_threads)
165✔
95
        {
96
            std::cerr << "thread_pool::shutdown() thread.joinable()\n";
106✔
97
            if (thread.joinable())
106✔
98
            {
99
                std::cerr << "thread_pool::shutdown() thread.join()\n";
106✔
100
                thread.join();
106✔
101
            }
102
            else
103
            {
NEW
104
                std::cerr << "thread_pool::shutdown() thread is not joinable\n";
×
105
            }
106
        }
107
    }
108
    std::cerr << "thread_pool::shutdown() return\n";
86✔
109
}
86✔
110

111
auto thread_pool::executor(std::size_t idx) -> void
159✔
112
{
113
    if (m_opts.on_thread_start_functor != nullptr)
159✔
114
    {
115
        m_opts.on_thread_start_functor(idx);
4✔
116
    }
117

118
    // Process until shutdown is requested.
119
    while (!m_shutdown_requested.load(std::memory_order::acquire))
78,213,464✔
120
    {
121
        std::unique_lock<std::mutex> lk{m_wait_mutex};
78,209,858✔
122
        m_wait_cv.wait(lk, [&]() { return !m_queue.empty() || m_shutdown_requested.load(std::memory_order::acquire); });
163,527,818✔
123

124
        if (m_queue.empty())
78,272,544✔
125
        {
126
            continue;
102✔
127
        }
128

129
        auto handle = m_queue.front();
78,276,259✔
130
        m_queue.pop_front();
78,273,009✔
131
        lk.unlock();
78,272,977✔
132

133
        // Release the lock while executing the coroutine.
134
        if (handle == nullptr)
78,251,323✔
135
        {
NEW
136
            std::cerr << "handle is nullptr\n";
×
137
        }
138
        else if (handle.done())
78,229,992✔
139
        {
NEW
140
            std::cerr << "handle.done() == true\n";
×
141
        }
142
        else
143
        {
144
            handle.resume();
78,242,148✔
145
        }
146
        m_size.fetch_sub(1, std::memory_order::release);
78,227,273✔
147
    }
78,227,375✔
148

149
    // Process until there are no ready tasks left.
150
    while (m_size.load(std::memory_order::acquire) > 0)
216✔
151
    {
152
        std::unique_lock<std::mutex> lk{m_wait_mutex};
3✔
153
        // m_size will only drop to zero once all executing coroutines are finished
154
        // but the queue could be empty for threads that finished early.
155
        if (m_queue.empty())
3✔
156
        {
157
            std::cerr << "m_queue.empty() breaking final loop m_size = " << m_size.load(std::memory_order::acquire) << "\n";
2✔
158
            break;
1✔
159
        }
160

161
        auto handle = m_queue.front();
2✔
162
        m_queue.pop_front();
2✔
163
        lk.unlock();
2✔
164

165
        // Release the lock while executing the coroutine.
166
        if (handle == nullptr)
2✔
167
        {
NEW
168
            std::cerr << "handle is nullptr\n";
×
169
        }
170
        else if (handle.done())
2✔
171
        {
NEW
172
            std::cerr << "handle.done() == true\n";
×
173
        }
174
        else
175
        {
176
            std::cerr << "handle.resume()\n";
2✔
177
            handle.resume();
2✔
178
        }
179
        m_size.fetch_sub(1, std::memory_order::release);
2✔
180
    }
3✔
181

182
    if (m_opts.on_thread_stop_functor != nullptr)
106✔
183
    {
184
        m_opts.on_thread_stop_functor(idx);
×
185
    }
186

187
    std::cerr << "thread_pool::executor() return\n";
106✔
188
}
106✔
189

190
auto thread_pool::schedule_impl(std::coroutine_handle<> handle) noexcept -> void
72,425,681✔
191
{
192
    if (handle == nullptr || handle.done())
72,425,681✔
193
    {
194
        return;
×
195
    }
196

197
    {
198
        std::scoped_lock lk{m_wait_mutex};
72,447,465✔
199
        m_queue.emplace_back(handle);
72,690,716✔
200
        m_wait_cv.notify_one();
72,689,734✔
201
    }
72,688,315✔
202
}
203

204
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc