• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 15447416393

04 Jun 2025 04:19PM UTC coverage: 87.206%. First build
15447416393

Pull #336

github

web-flow
Merge ed2eec896 into f0cccaaf4
Pull Request #336: coro::ring_buffer use coro::mutex instead of std::mutex

72 of 86 new or added lines in 4 files covered. (83.72%)

1595 of 1829 relevant lines covered (87.21%)

4633307.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.04
/src/thread_pool.cpp
1
#include "coro/thread_pool.hpp"
2
#include "coro/detail/task_self_deleting.hpp"
3

4
#include <iostream>
5

6
namespace coro
7
{
8
thread_pool::operation::operation(thread_pool& tp) noexcept : m_thread_pool(tp)
46,251,028✔
9
{
10
}
46,252,801✔
11

12
auto thread_pool::operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
46,248,086✔
13
{
14
    m_awaiting_coroutine = awaiting_coroutine;
46,248,086✔
15
    m_thread_pool.schedule_impl(m_awaiting_coroutine);
46,248,086✔
16

17
    // void return on await_suspend suspends the _this_ coroutine, which is now scheduled on the
18
    // thread pool and returns control to the caller.  They could be sync_wait'ing or go do
19
    // something else while this coroutine gets picked up by the thread pool.
20
}
46,276,277✔
21

22
thread_pool::thread_pool(options opts) : m_opts(std::move(opts))
83✔
23
{
24
    m_threads.reserve(m_opts.thread_count);
83✔
25

26
    for (uint32_t i = 0; i < m_opts.thread_count; ++i)
242✔
27
    {
28
        m_threads.emplace_back([this, i]() { executor(i); });
318✔
29
    }
30
}
83✔
31

32
thread_pool::~thread_pool()
80✔
33
{
34
    shutdown();
59✔
35
    std::cerr << "~thread_pool() exit\n";
59✔
36
}
80✔
37

38
auto thread_pool::schedule() -> operation
46,269,055✔
39
{
40
    m_size.fetch_add(1, std::memory_order::release);
46,269,055✔
41
    if (!m_shutdown_requested.load(std::memory_order::acquire))
46,269,055✔
42
    {
43
        return operation{*this};
46,268,313✔
44
    }
45
    else
46
    {
47
        m_size.fetch_sub(1, std::memory_order::release);
1✔
48
        throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
1✔
49
    }
50
}
51

52
auto thread_pool::spawn(coro::task<void>&& task) noexcept -> bool
200,003✔
53
{
54
    m_size.fetch_add(1, std::memory_order::release);
200,003✔
55
    auto wrapper_task = detail::make_task_self_deleting(std::move(task));
200,003✔
56
    wrapper_task.promise().executor_size(m_size);
200,003✔
57
    return resume(wrapper_task.handle());
200,003✔
58
}
200,003✔
59

60
auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> bool
26,297,203✔
61
{
62
    if (handle == nullptr || handle.done())
26,297,203✔
63
    {
64
        return false;
×
65
    }
66

67
    m_size.fetch_add(1, std::memory_order::release);
26,235,230✔
68
    if (m_shutdown_requested.load(std::memory_order::acquire))
26,235,230✔
69
    {
70
        m_size.fetch_sub(1, std::memory_order::release);
×
71
        return false;
×
72
    }
73

74
    schedule_impl(handle);
26,152,828✔
75
    return true;
26,390,828✔
76
}
77

78
auto thread_pool::shutdown() noexcept -> void
86✔
79
{
80
    // Only allow shutdown to occur once.
81
    std::cerr << "thread_pool::shutdown()\n";
86✔
82
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
86✔
83
    {
84
        {
85
            // There is a race condition if we are not holding the lock with the executors
86
            // to always receive this.  std::jthread stop token works without this properly.
87
            std::cerr << "thread_pool::shutdown() lock()\n";
59✔
88
            std::unique_lock<std::mutex> lk{m_wait_mutex};
59✔
89
            std::cerr << "thread_pool::shutdown() notify_all()\n";
59✔
90
            m_wait_cv.notify_all();
59✔
91
        }
59✔
92

93
        for (auto& thread : m_threads)
165✔
94
        {
95
            if (thread.joinable())
106✔
96
            {
97
                std::cerr << "thread_pool::shutdown() thread.join()\n";
106✔
98
                thread.join();
106✔
99
            }
100
            else
101
            {
NEW
102
                std::cerr << "thread_pool::shutdown() thread is not joinable\n";
×
103
            }
104
        }
105
    }
106
    std::cerr << "thread_pool::shutdown() return\n";
86✔
107
}
86✔
108

109
auto thread_pool::executor(std::size_t idx) -> void
159✔
110
{
111
    if (m_opts.on_thread_start_functor != nullptr)
159✔
112
    {
113
        m_opts.on_thread_start_functor(idx);
4✔
114
    }
115

116
    // Process until shutdown is requested.
117
    while (!m_shutdown_requested.load(std::memory_order::acquire))
78,214,585✔
118
    {
119
        std::unique_lock<std::mutex> lk{m_wait_mutex};
78,205,263✔
120
        m_wait_cv.wait(lk, [&]() { return !m_queue.empty() || m_shutdown_requested.load(std::memory_order::acquire); });
163,363,132✔
121

122
        if (m_queue.empty())
78,269,929✔
123
        {
124
            continue;
100✔
125
        }
126

127
        auto handle = m_queue.front();
78,272,877✔
128
        m_queue.pop_front();
78,273,398✔
129
        lk.unlock();
78,270,276✔
130

131
        // Release the lock while executing the coroutine.
132
        if (handle == nullptr)
78,251,132✔
133
        {
NEW
134
            std::cerr << "handle is nullptr\n";
×
135
        }
136
        else if (handle.done())
78,226,021✔
137
        {
NEW
138
            std::cerr << "handle.done() == true\n";
×
139
        }
140
        else
141
        {
142
            handle.resume();
78,240,055✔
143
        }
144
        m_size.fetch_sub(1, std::memory_order::release);
78,223,480✔
145
    }
78,223,580✔
146

147
    // Process until there are no ready tasks left.
148
    while (m_size.load(std::memory_order::acquire) > 0)
212✔
149
    {
150
        std::unique_lock<std::mutex> lk{m_wait_mutex};
×
151
        // m_size will only drop to zero once all executing coroutines are finished
152
        // but the queue could be empty for threads that finished early.
153
        if (m_queue.empty())
×
154
        {
NEW
155
            std::cerr << "m_queue.empty() breaking final loop m_size = " << m_size.load(std::memory_order::acquire) << "\n";
×
156
            break;
×
157
        }
158

159
        auto handle = m_queue.front();
×
160
        m_queue.pop_front();
×
161
        lk.unlock();
×
162

163
        // Release the lock while executing the coroutine.
NEW
164
        if (handle == nullptr)
×
165
        {
NEW
166
            std::cerr << "handle is nullptr\n";
×
167
        }
NEW
168
        else if (handle.done())
×
169
        {
NEW
170
            std::cerr << "handle.done() == true\n";
×
171
        }
172
        else
173
        {
NEW
174
            std::cerr << "handle.resume()\n";
×
NEW
175
            handle.resume();
×
176
        }
177
        m_size.fetch_sub(1, std::memory_order::release);
×
178
    }
×
179

180
    if (m_opts.on_thread_stop_functor != nullptr)
106✔
181
    {
182
        m_opts.on_thread_stop_functor(idx);
×
183
    }
184

185
    std::cerr << "thread_pool::executor() return\n";
106✔
186
}
106✔
187

188
auto thread_pool::schedule_impl(std::coroutine_handle<> handle) noexcept -> void
72,383,126✔
189
{
190
    if (handle == nullptr || handle.done())
72,383,126✔
191
    {
192
        return;
×
193
    }
194

195
    {
196
        std::scoped_lock lk{m_wait_mutex};
72,401,435✔
197
        m_queue.emplace_back(handle);
72,688,261✔
198
        m_wait_cv.notify_one();
72,688,063✔
199
    }
72,684,852✔
200
}
201

202
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc