• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 13041658666

29 Jan 2025 10:28PM UTC coverage: 84.577%. First build
13041658666

Pull #288

github

web-flow
Merge f299a9da1 into 26de94ded
Pull Request #288: coro::task_container gc fix not completing coroutines

41 of 59 new or added lines in 6 files covered. (69.49%)

1371 of 1621 relevant lines covered (84.58%)

4304824.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.67
/src/thread_pool.cpp
1
#include "coro/thread_pool.hpp"
2

3
namespace coro
4
{
5
thread_pool::operation::operation(thread_pool& tp) noexcept : m_thread_pool(tp)
46,678,831✔
6
{
7
}
46,676,897✔
8

9
auto thread_pool::operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
46,672,823✔
10
{
11
    m_awaiting_coroutine = awaiting_coroutine;
46,672,823✔
12
    m_thread_pool.schedule_impl(m_awaiting_coroutine);
46,672,823✔
13

14
    // void return on await_suspend suspends the _this_ coroutine, which is now scheduled on the
15
    // thread pool and returns control to the caller.  They could be sync_wait'ing or go do
16
    // something else while this coroutine gets picked up by the thread pool.
17
}
46,685,700✔
18

19
thread_pool::thread_pool(options opts) : m_opts(std::move(opts))
69✔
20
{
21
    m_threads.reserve(m_opts.thread_count);
69✔
22

23
    for (uint32_t i = 0; i < m_opts.thread_count; ++i)
188✔
24
    {
25
        m_threads.emplace_back([this, i]() { executor(i); });
238✔
26
    }
27
}
69✔
28

29
thread_pool::~thread_pool()
33✔
30
{
31
    shutdown();
33✔
32
}
33✔
33

34
auto thread_pool::schedule() -> operation
46,679,745✔
35
{
36
    m_size.fetch_add(1, std::memory_order::release);
46,679,745✔
37
    if (!m_shutdown_requested.load(std::memory_order::acquire))
46,679,745✔
38
    {
39
        return operation{*this};
46,681,213✔
40
    }
41
    else
42
    {
43
        m_size.fetch_sub(1, std::memory_order::release);
3✔
44
        throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
3✔
45
    }
46
}
47

48
auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> bool
26,308,356✔
49
{
50
    if (handle == nullptr || handle.done())
26,308,356✔
51
    {
52
        return false;
×
53
    }
54

55
    m_size.fetch_add(1, std::memory_order::release);
26,265,809✔
56
    if (m_shutdown_requested.load(std::memory_order::acquire))
26,265,809✔
57
    {
NEW
58
        m_size.fetch_sub(1, std::memory_order::release);
×
59
        return false;
×
60
    }
61

62
    schedule_impl(handle);
26,249,686✔
63
    return true;
26,374,540✔
64
}
65

66
auto thread_pool::shutdown() noexcept -> void
61✔
67
{
68
    // Only allow shutdown to occur once.
69
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
61✔
70
    {
71
        {
72
            // There is a race condition if we are not holding the lock with the executors
73
            // to always receive this.  std::jthread stop token works without this properly.
74
            std::unique_lock<std::mutex> lk{m_wait_mutex};
54✔
75
            m_wait_cv.notify_all();
54✔
76
        }
54✔
77

78
        for (auto& thread : m_threads)
139✔
79
        {
80
            if (thread.joinable())
85✔
81
            {
82
                thread.join();
85✔
83
            }
84
        }
85
    }
86
}
61✔
87

88
auto thread_pool::executor(std::size_t idx) -> void
119✔
89
{
90
    if (m_opts.on_thread_start_functor != nullptr)
119✔
91
    {
92
        m_opts.on_thread_start_functor(idx);
4✔
93
    }
94

95
    // Process until shutdown is requested.
96
    while (!m_shutdown_requested.load(std::memory_order::acquire))
78,624,599✔
97
    {
98
        std::unique_lock<std::mutex> lk{m_wait_mutex};
78,624,309✔
99
        m_wait_cv.wait(lk, [&]() { return !m_queue.empty() || m_shutdown_requested.load(std::memory_order::acquire); });
164,643,248✔
100

101
        if (m_queue.empty())
78,688,262✔
102
        {
103
            continue;
82✔
104
        }
105

106
        auto handle = m_queue.front();
78,687,965✔
107
        m_queue.pop_front();
78,686,547✔
108
        lk.unlock();
78,686,332✔
109

110
        // Release the lock while executing the coroutine.
111
        handle.resume();
78,672,682✔
112
        m_size.fetch_sub(1, std::memory_order::release);
78,613,989✔
113
    }
78,614,071✔
114

115
    // Process until there are no ready tasks left.
116
    while (m_size.load(std::memory_order::acquire) > 0)
188✔
117
    {
118
        std::unique_lock<std::mutex> lk{m_wait_mutex};
9✔
119
        // m_size will only drop to zero once all executing coroutines are finished
120
        // but the queue could be empty for threads that finished early.
121
        if (m_queue.empty())
9✔
122
        {
123
            break;
×
124
        }
125

126
        auto handle = m_queue.front();
9✔
127
        m_queue.pop_front();
9✔
128
        lk.unlock();
9✔
129

130
        // Release the lock while executing the coroutine.
131
        handle.resume();
9✔
132
        m_size.fetch_sub(1, std::memory_order::release);
9✔
133
    }
9✔
134

135
    if (m_opts.on_thread_stop_functor != nullptr)
85✔
136
    {
137
        m_opts.on_thread_stop_functor(idx);
×
138
    }
139
}
85✔
140

141
auto thread_pool::schedule_impl(std::coroutine_handle<> handle) noexcept -> void
72,894,185✔
142
{
143
    if (handle == nullptr || handle.done())
72,894,185✔
144
    {
145
        return;
×
146
    }
147

148
    {
149
        std::scoped_lock lk{m_wait_mutex};
72,837,974✔
150
        m_queue.emplace_back(handle);
73,090,187✔
151
        m_wait_cv.notify_one();
73,089,454✔
152
    }
73,089,583✔
153
}
154

155
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc