• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 15455559001

05 Jun 2025 12:27AM UTC coverage: 87.595%. First build
15455559001

Pull #336

github

web-flow
Merge 02c038965 into f0cccaaf4
Pull Request #336: coro::ring_buffer use coro::mutex instead of std::mutex

106 of 120 new or added lines in 5 files covered. (88.33%)

1610 of 1838 relevant lines covered (87.6%)

5644127.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.2
/src/thread_pool.cpp
1
#include "coro/thread_pool.hpp"
2
#include "coro/detail/task_self_deleting.hpp"
3

4
#include <iostream>
5

6
namespace coro
7
{
8
thread_pool::operation::operation(thread_pool& tp) noexcept : m_thread_pool(tp)
46,274,153✔
9
{
10
    m_thread_pool.created++;
46,270,967✔
11
}
46,291,178✔
12

13
auto thread_pool::operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
46,262,224✔
14
{
15
    m_thread_pool.scheduled++;
46,262,224✔
16
    m_awaiting_coroutine = awaiting_coroutine;
46,289,680✔
17
    m_thread_pool.schedule_impl(m_awaiting_coroutine);
46,289,680✔
18

19
    // void return on await_suspend suspends the _this_ coroutine, which is now scheduled on the
20
    // thread pool and returns control to the caller.  They could be sync_wait'ing or go do
21
    // something else while this coroutine gets picked up by the thread pool.
22
}
46,273,041✔
23

24
thread_pool::thread_pool(options opts) : m_opts(std::move(opts))
83✔
25
{
26
    m_threads.reserve(m_opts.thread_count);
83✔
27

28
    for (uint32_t i = 0; i < m_opts.thread_count; ++i)
242✔
29
    {
30
        m_threads.emplace_back([this, i]() { executor(i); });
318✔
31
    }
32
}
83✔
33

34
thread_pool::~thread_pool()
80✔
35
{
36
    shutdown();
59✔
37
    std::cerr << "~thread_pool() exit\n";
59✔
38
}
80✔
39

40
auto thread_pool::schedule() -> operation
46,281,944✔
41
{
42
    m_size.fetch_add(1, std::memory_order::release);
46,281,944✔
43
    if (!m_shutdown_requested.load(std::memory_order::acquire))
46,281,944✔
44
    {
45
        return operation{*this};
46,278,174✔
46
    }
47
    else
48
    {
49
        m_size.fetch_sub(1, std::memory_order::release);
1✔
50
        throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
1✔
51
    }
52
}
53

54
auto thread_pool::spawn(coro::task<void>&& task) noexcept -> bool
200,003✔
55
{
56
    m_size.fetch_add(1, std::memory_order::release);
200,003✔
57
    auto wrapper_task = detail::make_task_self_deleting(std::move(task));
200,003✔
58
    wrapper_task.promise().executor_size(m_size);
200,003✔
59
    return resume(wrapper_task.handle());
200,003✔
60
}
200,003✔
61

62
auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> bool
26,271,852✔
63
{
64
    if (handle == nullptr || handle.done())
26,271,852✔
65
    {
66
        return false;
×
67
    }
68

69
    m_size.fetch_add(1, std::memory_order::release);
26,215,873✔
70
    if (m_shutdown_requested.load(std::memory_order::acquire))
26,215,873✔
71
    {
72
        m_size.fetch_sub(1, std::memory_order::release);
×
73
        return false;
×
74
    }
75

76
    schedule_impl(handle);
26,186,829✔
77
    return true;
26,392,046✔
78
}
79

80
auto thread_pool::shutdown() noexcept -> void
86✔
81
{
82
    // Only allow shutdown to occur once.
83
    std::cerr << "thread_pool::shutdown()\n";
86✔
84
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
86✔
85
    {
86
        {
87
            // There is a race condition if we are not holding the lock with the executors
88
            // to always receive this.  std::jthread stop token works without this properly.
89
            std::cerr << "thread_pool::shutdown() lock()\n";
59✔
90
            std::unique_lock<std::mutex> lk{m_wait_mutex};
59✔
91
            std::cerr << "thread_pool::shutdown() notify_all()\n";
59✔
92
            m_wait_cv.notify_all();
59✔
93
        }
59✔
94

95
        std::cerr << "thread_pool::shutdown() m_threads.size() = [" << m_threads.size() << "]\n";
59✔
96
        for (auto& thread : m_threads)
165✔
97
        {
98
            std::cerr << "thread_pool::shutdown() thread.joinable()\n";
106✔
99
            if (thread.joinable())
106✔
100
            {
101
                std::cerr << "thread_pool::shutdown() thread.join()\n";
106✔
102
                thread.join();
106✔
103
            }
104
            else
105
            {
NEW
106
                std::cerr << "thread_pool::shutdown() thread is not joinable\n";
×
107
            }
108
        }
109
    }
110
    std::cerr << "thread_pool::shutdown() return\n";
86✔
111
}
86✔
112

113
auto thread_pool::executor(std::size_t idx) -> void
159✔
114
{
115
    if (m_opts.on_thread_start_functor != nullptr)
159✔
116
    {
117
        m_opts.on_thread_start_functor(idx);
4✔
118
    }
119

120
    // Process until shutdown is requested.
121
    while (!m_shutdown_requested.load(std::memory_order::acquire))
78,216,298✔
122
    {
123
        std::unique_lock<std::mutex> lk{m_wait_mutex};
78,193,353✔
124
        m_wait_cv.wait(lk, [&]() { return !m_queue.empty() || m_shutdown_requested.load(std::memory_order::acquire); });
163,441,653✔
125

126
        if (m_queue.empty())
78,260,718✔
127
        {
128
            continue;
101✔
129
        }
130

131
        auto handle = m_queue.front();
78,260,023✔
132
        m_queue.pop_front();
78,252,394✔
133
        lk.unlock();
78,253,413✔
134

135
        // Release the lock while executing the coroutine.
136
        if (handle == nullptr)
78,258,855✔
137
        {
NEW
138
            std::cerr << "handle is nullptr\n";
×
139
        }
140
        else if (handle.done())
78,198,677✔
141
        {
NEW
142
            std::cerr << "handle.done() == true\n";
×
143
        }
144
        else
145
        {
146
            handle.resume();
78,209,807✔
147
            executed++;
78,206,057✔
148
        }
149
        m_size.fetch_sub(1, std::memory_order::release);
78,248,809✔
150
    }
78,248,910✔
151

152
    // Process until there are no ready tasks left.
153
    while (m_size.load(std::memory_order::acquire) > 0)
214✔
154
    {
155
        std::unique_lock<std::mutex> lk{m_wait_mutex};
2✔
156
        // m_size will only drop to zero once all executing coroutines are finished
157
        // but the queue could be empty for threads that finished early.
158
        if (m_queue.empty())
2✔
159
        {
160
            std::cerr << "m_queue.empty() breaking final loop m_size = " << m_size.load(std::memory_order::acquire) << "\n";
2✔
161
            break;
1✔
162
        }
163

164
        auto handle = m_queue.front();
1✔
165
        m_queue.pop_front();
1✔
166
        lk.unlock();
1✔
167

168
        // Release the lock while executing the coroutine.
169
        if (handle == nullptr)
1✔
170
        {
NEW
171
            std::cerr << "handle is nullptr\n";
×
172
        }
173
        else if (handle.done())
1✔
174
        {
NEW
175
            std::cerr << "handle.done() == true\n";
×
176
        }
177
        else
178
        {
179
            std::cerr << "handle.resume()\n";
1✔
180
            handle.resume();
1✔
181
            executed++;
1✔
182
        }
183
        m_size.fetch_sub(1, std::memory_order::release);
1✔
184
    }
2✔
185

186
    if (m_opts.on_thread_stop_functor != nullptr)
106✔
187
    {
188
        m_opts.on_thread_stop_functor(idx);
×
189
    }
190

191
    std::cerr << "thread_pool::executor() return\n";
106✔
192
}
106✔
193

194
auto thread_pool::schedule_impl(std::coroutine_handle<> handle) noexcept -> void
72,458,150✔
195
{
196
    if (handle == nullptr || handle.done())
72,458,150✔
197
    {
198
        return;
×
199
    }
200

201
    {
202
        std::scoped_lock lk{m_wait_mutex};
72,402,156✔
203
        m_queue.emplace_back(handle);
72,692,333✔
204
        m_wait_cv.notify_one();
72,683,365✔
205
    }
72,682,267✔
206
}
207

208
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc