• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 16fa75ad4ee639bb3856ae80d247e65dd179c774-PR-221

29 Nov 2023 10:11PM UTC coverage: 82.938%. First build
16fa75ad4ee639bb3856ae80d247e65dd179c774-PR-221

Pull #221

github

web-flow
Merge 9cdbb8a76 into b652e387b
Pull Request #221: Use std::thread for coro::thread_pool

20 of 24 new or added lines in 2 files covered. (83.33%)

559 of 674 relevant lines covered (82.94%)

7195474.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.89
/src/thread_pool.cpp
1
#include "coro/thread_pool.hpp"
2

3
#include <iostream>
4

5
namespace coro
6
{
7
thread_pool::operation::operation(thread_pool& tp) noexcept : m_thread_pool(tp)
46,672,114✔
8
{
9
}
46,668,357✔
10

11
auto thread_pool::operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
46,661,261✔
12
{
13
    m_awaiting_coroutine = awaiting_coroutine;
46,661,261✔
14
    m_thread_pool.schedule_impl(m_awaiting_coroutine);
46,661,261✔
15

16
    // void return on await_suspend suspends the _this_ coroutine, which is now scheduled on the
17
    // thread pool and returns control to the caller.  They could be sync_wait'ing or go do
18
    // something else while this coroutine gets picked up by the thread pool.
19
}
46,688,339✔
20

21
thread_pool::thread_pool(options opts) : m_opts(std::move(opts))
62✔
22
{
23
    m_threads.reserve(m_opts.thread_count);
62✔
24

25
    for (uint32_t i = 0; i < m_opts.thread_count; ++i)
160✔
26
    {
27
        m_threads.emplace_back([this, i]() { executor(i); });
196✔
28
    }
29
}
62✔
30

31
thread_pool::~thread_pool()
60✔
32
{
33
    shutdown();
60✔
34
}
60✔
35

36
auto thread_pool::schedule() -> operation
46,676,728✔
37
{
38
    if (!m_shutdown_requested.load(std::memory_order::acquire))
46,676,728✔
39
    {
40
        m_size.fetch_add(1, std::memory_order::release);
46,681,124✔
41
        return operation{*this};
46,681,124✔
42
    }
43

44
    throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
×
45
}
46

47
auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> void
25,818,696✔
48
{
49
    if (handle == nullptr)
25,818,696✔
50
    {
51
        return;
×
52
    }
53

54
    m_size.fetch_add(1, std::memory_order::release);
25,920,250✔
55
    schedule_impl(handle);
25,920,250✔
56
}
57

58
auto thread_pool::shutdown() noexcept -> void
97✔
59
{
60
    // Only allow shutdown to occur once.
61
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
97✔
62
    {
63
        {
64
            // There is a race condition if we are not holding the lock with the executors
65
            // to always receive this.  std::jthread stop token works without this properly.
66
            std::unique_lock<std::mutex> lk{m_wait_mutex};
120✔
67
            m_wait_cv.notify_all();
60✔
68
        }
69

70
        for (auto& thread : m_threads)
153✔
71
        {
72
            if (thread.joinable())
93✔
73
            {
74
                thread.join();
93✔
75
            }
76
        }
77
    }
78
}
97✔
79

80
auto thread_pool::executor(std::size_t idx) -> void
98✔
81
{
82
    if (m_opts.on_thread_start_functor != nullptr)
98✔
83
    {
84
        m_opts.on_thread_start_functor(idx);
4✔
85
    }
86

87
    auto drain_queue = [this](std::unique_lock<std::mutex>& lk) -> void
527,623,305✔
88
    {
89
        // Process this batch until the queue is empty.
90
        while (!m_queue.empty())
294,013,860✔
91
        {
92
            auto handle = m_queue.front();
77,889,461✔
93
            m_queue.pop_front();
77,889,779✔
94

95
            // Release the lock while executing the coroutine.
96
            lk.unlock();
77,889,257✔
97
            handle.resume();
77,874,769✔
98
            m_size.fetch_sub(1, std::memory_order::release);
77,830,205✔
99
            lk.lock();
77,830,205✔
100
        }
101
    };
216,126,898✔
102

103
    // Process until shutdown is requested and the total number of tasks reaches zero.
104
    while (!m_shutdown_requested.load(std::memory_order::acquire))
215,817,545✔
105
    {
106
        std::unique_lock<std::mutex> lk{m_wait_mutex};
431,712,371✔
107
        m_wait_cv.wait(
216,130,061✔
108
            lk,
109
            [&] {
224,132,496✔
110
                return m_size.load(std::memory_order::acquire) > 0 ||
456,274,728✔
111
                       m_shutdown_requested.load(std::memory_order::acquire);
232,142,232✔
112
            });
113

114
        drain_queue(lk);
216,124,466✔
115
    }
116

117
    // Shutdown has been requested, we need to drain until no more tasks remain.
118
    while (m_size.load(std::memory_order::acquire) > 0)
×
119
    {
NEW
120
        std::unique_lock<std::mutex> lk{m_wait_mutex};
×
NEW
121
        m_wait_cv.wait_for(
×
NEW
122
            lk, std::chrono::milliseconds{10}, [&] { return m_size.load(std::memory_order::acquire) > 0; });
×
123

NEW
124
        drain_queue(lk);
×
125
    }
126

127
    if (m_opts.on_thread_stop_functor != nullptr)
93✔
128
    {
129
        m_opts.on_thread_stop_functor(idx);
×
130
    }
131
}
93✔
132

133
auto thread_pool::schedule_impl(std::coroutine_handle<> handle) noexcept -> void
72,444,908✔
134
{
135
    if (handle == nullptr)
72,444,908✔
136
    {
137
        return;
×
138
    }
139

140
    {
141
        std::scoped_lock lk{m_wait_mutex};
145,103,236✔
142
        m_queue.emplace_back(handle);
72,690,888✔
143
    }
144

145
    m_wait_cv.notify_one();
72,677,394✔
146
}
147

148
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc