• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 15455100314

04 Jun 2025 11:51PM UTC coverage: 86.834%. First build
15455100314

Pull #336

github

web-flow
Merge 6adf0e26b into f0cccaaf4
Pull Request #336: coro::ring_buffer use coro::mutex instead of std::mutex

100 of 120 new or added lines in 5 files covered. (83.33%)

1596 of 1838 relevant lines covered (86.83%)

5640982.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.47
/src/thread_pool.cpp
1
#include "coro/thread_pool.hpp"
2
#include "coro/detail/task_self_deleting.hpp"
3

4
#include <iostream>
5

6
namespace coro
7
{
8
thread_pool::operation::operation(thread_pool& tp) noexcept : m_thread_pool(tp)
46,269,956✔
9
{
10
    m_thread_pool.created++;
46,269,146✔
11
}
46,288,563✔
12

13
auto thread_pool::operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
46,259,452✔
14
{
15
    m_thread_pool.scheduled++;
46,259,452✔
16
    m_awaiting_coroutine = awaiting_coroutine;
46,288,568✔
17
    m_thread_pool.schedule_impl(m_awaiting_coroutine);
46,288,568✔
18

19
    // void return on await_suspend suspends the _this_ coroutine, which is now scheduled on the
20
    // thread pool and returns control to the caller.  They could be sync_wait'ing or go do
21
    // something else while this coroutine gets picked up by the thread pool.
22
}
46,264,723✔
23

24
thread_pool::thread_pool(options opts) : m_opts(std::move(opts))
83✔
25
{
26
    m_threads.reserve(m_opts.thread_count);
83✔
27

28
    for (uint32_t i = 0; i < m_opts.thread_count; ++i)
242✔
29
    {
30
        m_threads.emplace_back([this, i]() { executor(i); });
318✔
31
    }
32
}
83✔
33

34
thread_pool::~thread_pool()
80✔
35
{
36
    shutdown();
59✔
37
    std::cerr << "~thread_pool() exit\n";
59✔
38
}
80✔
39

40
auto thread_pool::schedule() -> operation
46,280,947✔
41
{
42
    m_size.fetch_add(1, std::memory_order::release);
46,280,947✔
43
    if (!m_shutdown_requested.load(std::memory_order::acquire))
46,280,947✔
44
    {
45
        return operation{*this};
46,277,086✔
46
    }
47
    else
48
    {
49
        m_size.fetch_sub(1, std::memory_order::release);
1✔
50
        throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
1✔
51
    }
52
}
53

54
auto thread_pool::spawn(coro::task<void>&& task) noexcept -> bool
200,003✔
55
{
56
    m_size.fetch_add(1, std::memory_order::release);
200,003✔
57
    auto wrapper_task = detail::make_task_self_deleting(std::move(task));
200,003✔
58
    wrapper_task.promise().executor_size(m_size);
200,003✔
59
    return resume(wrapper_task.handle());
200,003✔
60
}
200,003✔
61

62
auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> bool
26,326,111✔
63
{
64
    if (handle == nullptr || handle.done())
26,326,111✔
65
    {
66
        return false;
×
67
    }
68

69
    m_size.fetch_add(1, std::memory_order::release);
26,301,899✔
70
    if (m_shutdown_requested.load(std::memory_order::acquire))
26,301,899✔
71
    {
72
        m_size.fetch_sub(1, std::memory_order::release);
×
73
        return false;
×
74
    }
75

76
    schedule_impl(handle);
26,309,068✔
77
    return true;
26,400,530✔
78
}
79

80
auto thread_pool::shutdown() noexcept -> void
86✔
81
{
82
    // Only allow shutdown to occur once.
83
    std::cerr << "thread_pool::shutdown()\n";
86✔
84
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
86✔
85
    {
86
        {
87
            // There is a race condition if we are not holding the lock with the executors
88
            // to always receive this.  std::jthread stop token works without this properly.
89
            std::cerr << "thread_pool::shutdown() lock()\n";
59✔
90
            std::unique_lock<std::mutex> lk{m_wait_mutex};
59✔
91
            std::cerr << "thread_pool::shutdown() notify_all()\n";
59✔
92
            m_wait_cv.notify_all();
59✔
93
        }
59✔
94

95
        std::cerr << "thread_pool::shutdown() m_threads.size() = [" << m_threads.size() << "]\n";
59✔
96
        for (auto& thread : m_threads)
165✔
97
        {
98
            std::cerr << "thread_pool::shutdown() thread.joinable()\n";
106✔
99
            if (thread.joinable())
106✔
100
            {
101
                std::cerr << "thread_pool::shutdown() thread.join()\n";
106✔
102
                thread.join();
106✔
103
            }
104
            else
105
            {
NEW
106
                std::cerr << "thread_pool::shutdown() thread is not joinable\n";
×
107
            }
108
        }
109
    }
110
    std::cerr << "thread_pool::shutdown() return\n";
86✔
111
}
86✔
112

113
auto thread_pool::executor(std::size_t idx) -> void
159✔
114
{
115
    if (m_opts.on_thread_start_functor != nullptr)
159✔
116
    {
117
        m_opts.on_thread_start_functor(idx);
4✔
118
    }
119

120
    // Process until shutdown is requested.
121
    while (!m_shutdown_requested.load(std::memory_order::acquire))
78,211,818✔
122
    {
123
        std::unique_lock<std::mutex> lk{m_wait_mutex};
78,200,240✔
124
        m_wait_cv.wait(lk, [&]() { return !m_queue.empty() || m_shutdown_requested.load(std::memory_order::acquire); });
163,469,911✔
125

126
        if (m_queue.empty())
78,251,071✔
127
        {
128
            continue;
103✔
129
        }
130

131
        auto handle = m_queue.front();
78,252,371✔
132
        m_queue.pop_front();
78,242,943✔
133
        lk.unlock();
78,242,707✔
134

135
        // Release the lock while executing the coroutine.
136
        if (handle == nullptr)
78,249,599✔
137
        {
NEW
138
            std::cerr << "handle is nullptr\n";
×
139
        }
140
        else if (handle.done())
78,177,521✔
141
        {
NEW
142
            std::cerr << "handle.done() == true\n";
×
143
        }
144
        else
145
        {
146
            handle.resume();
78,193,804✔
147
            executed++;
78,189,401✔
148
        }
149
        m_size.fetch_sub(1, std::memory_order::release);
78,245,371✔
150
    }
78,245,474✔
151

152
    // Process until there are no ready tasks left.
153
    while (m_size.load(std::memory_order::acquire) > 0)
212✔
154
    {
155
        std::unique_lock<std::mutex> lk{m_wait_mutex};
×
156
        // m_size will only drop to zero once all executing coroutines are finished
157
        // but the queue could be empty for threads that finished early.
158
        if (m_queue.empty())
×
159
        {
NEW
160
            std::cerr << "m_queue.empty() breaking final loop m_size = " << m_size.load(std::memory_order::acquire) << "\n";
×
161
            break;
×
162
        }
163

164
        auto handle = m_queue.front();
×
165
        m_queue.pop_front();
×
166
        lk.unlock();
×
167

168
        // Release the lock while executing the coroutine.
NEW
169
        if (handle == nullptr)
×
170
        {
NEW
171
            std::cerr << "handle is nullptr\n";
×
172
        }
NEW
173
        else if (handle.done())
×
174
        {
NEW
175
            std::cerr << "handle.done() == true\n";
×
176
        }
177
        else
178
        {
NEW
179
            std::cerr << "handle.resume()\n";
×
NEW
180
            handle.resume();
×
NEW
181
            executed++;
×
182
        }
183
        m_size.fetch_sub(1, std::memory_order::release);
×
184
    }
×
185

186
    if (m_opts.on_thread_stop_functor != nullptr)
106✔
187
    {
188
        m_opts.on_thread_stop_functor(idx);
×
189
    }
190

191
    std::cerr << "thread_pool::executor() return\n";
106✔
192
}
106✔
193

194
auto thread_pool::schedule_impl(std::coroutine_handle<> handle) noexcept -> void
72,569,504✔
195
{
196
    if (handle == nullptr || handle.done())
72,569,504✔
197
    {
198
        return;
×
199
    }
200

201
    {
202
        std::scoped_lock lk{m_wait_mutex};
72,553,636✔
203
        m_queue.emplace_back(handle);
72,687,992✔
204
        m_wait_cv.notify_one();
72,670,927✔
205
    }
72,674,342✔
206
}
207

208
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc