• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 13038964835

29 Jan 2025 07:30PM UTC coverage: 84.791%. First build
13038964835

Pull #288

github

web-flow
Merge 2e8e40f6f into 26de94ded
Pull Request #288: coro::task_container gc fix not completing coroutines

39 of 57 new or added lines in 5 files covered. (68.42%)

1377 of 1624 relevant lines covered (84.79%)

4284583.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.67
/src/thread_pool.cpp
1
#include "coro/thread_pool.hpp"
2

3
namespace coro
4
{
5
thread_pool::operation::operation(thread_pool& tp) noexcept : m_thread_pool(tp)
46,677,133✔
6
{
7
}
46,676,087✔
8

9
auto thread_pool::operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
46,671,198✔
10
{
11
    m_awaiting_coroutine = awaiting_coroutine;
46,671,198✔
12
    m_thread_pool.schedule_impl(m_awaiting_coroutine);
46,671,198✔
13

14
    // void return on await_suspend suspends the _this_ coroutine, which is now scheduled on the
15
    // thread pool and returns control to the caller.  They could be sync_wait'ing or go do
16
    // something else while this coroutine gets picked up by the thread pool.
17
}
46,685,917✔
18

19
thread_pool::thread_pool(options opts) : m_opts(std::move(opts))
68✔
20
{
21
    m_threads.reserve(m_opts.thread_count);
68✔
22

23
    for (uint32_t i = 0; i < m_opts.thread_count; ++i)
186✔
24
    {
25
        m_threads.emplace_back([this, i]() { executor(i); });
236✔
26
    }
27
}
68✔
28

29
thread_pool::~thread_pool()
33✔
30
{
31
    shutdown();
33✔
32
}
33✔
33

34
auto thread_pool::schedule() -> operation
46,679,224✔
35
{
36
    m_size.fetch_add(1, std::memory_order::release);
46,679,224✔
37
    if (!m_shutdown_requested.load(std::memory_order::acquire))
46,679,224✔
38
    {
39
        return operation{*this};
46,680,169✔
40
    }
41
    else
42
    {
43
        m_size.fetch_sub(1, std::memory_order::release);
3✔
44
        throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
3✔
45
    }
46
}
47

48
auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> bool
26,117,030✔
49
{
50
    if (handle == nullptr || handle.done())
26,117,030✔
51
    {
52
        return false;
×
53
    }
54

55
    m_size.fetch_add(1, std::memory_order::release);
26,070,129✔
56
    if (m_shutdown_requested.load(std::memory_order::acquire))
26,070,129✔
57
    {
NEW
58
        m_size.fetch_sub(1, std::memory_order::release);
×
59
        return false;
×
60
    }
61

62
    schedule_impl(handle);
26,081,882✔
63
    return true;
26,193,318✔
64
}
65

66
auto thread_pool::shutdown() noexcept -> void
60✔
67
{
68
    // Only allow shutdown to occur once.
69
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
60✔
70
    {
71
        {
72
            // There is a race condition if we are not holding the lock with the executors
73
            // to always receive this.  std::jthread stop token works without this properly.
74
            std::unique_lock<std::mutex> lk{m_wait_mutex};
53✔
75
            m_wait_cv.notify_all();
53✔
76
        }
53✔
77

78
        for (auto& thread : m_threads)
137✔
79
        {
80
            if (thread.joinable())
84✔
81
            {
82
                thread.join();
84✔
83
            }
84
        }
85
    }
86
}
60✔
87

88
auto thread_pool::executor(std::size_t idx) -> void
118✔
89
{
90
    if (m_opts.on_thread_start_functor != nullptr)
118✔
91
    {
92
        m_opts.on_thread_start_functor(idx);
4✔
93
    }
94

95
    // Process until shutdown is requested.
96
    while (!m_shutdown_requested.load(std::memory_order::acquire))
78,419,763✔
97
    {
98
        std::unique_lock<std::mutex> lk{m_wait_mutex};
78,423,131✔
99
        m_wait_cv.wait(lk, [&]() { return !m_queue.empty() || m_shutdown_requested.load(std::memory_order::acquire); });
164,077,452✔
100

101
        if (m_queue.empty())
78,488,048✔
102
        {
103
            continue;
82✔
104
        }
105

106
        auto handle = m_queue.front();
78,488,049✔
107
        m_queue.pop_front();
78,486,760✔
108
        lk.unlock();
78,486,597✔
109

110
        // Release the lock while executing the coroutine.
111
        handle.resume();
78,473,782✔
112
        m_size.fetch_sub(1, std::memory_order::release);
78,412,685✔
113
    }
78,412,767✔
114

115
    // Process until there are no ready tasks left.
116
    while (m_size.load(std::memory_order::acquire) > 0)
176✔
117
    {
118
        std::unique_lock<std::mutex> lk{m_wait_mutex};
4✔
119
        // m_size will only drop to zero once all executing coroutines are finished
120
        // but the queue could be empty for threads that finished early.
121
        if (m_queue.empty())
4✔
122
        {
123
            break;
×
124
        }
125

126
        auto handle = m_queue.front();
4✔
127
        m_queue.pop_front();
4✔
128
        lk.unlock();
4✔
129

130
        // Release the lock while executing the coroutine.
131
        handle.resume();
4✔
132
        m_size.fetch_sub(1, std::memory_order::release);
4✔
133
    }
4✔
134

135
    if (m_opts.on_thread_stop_functor != nullptr)
84✔
136
    {
137
        m_opts.on_thread_stop_functor(idx);
×
138
    }
139
}
82✔
140

141
auto thread_pool::schedule_impl(std::coroutine_handle<> handle) noexcept -> void
72,711,190✔
142
{
143
    if (handle == nullptr || handle.done())
72,711,190✔
144
    {
145
        return;
×
146
    }
147

148
    {
149
        std::scoped_lock lk{m_wait_mutex};
72,699,714✔
150
        m_queue.emplace_back(handle);
72,890,389✔
151
        m_wait_cv.notify_one();
72,889,924✔
152
    }
72,890,035✔
153
}
154

155
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc