• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 9921718395

05 Jul 2024 06:29PM UTC coverage: 84.498% (+0.9%) from 83.609%
9921718395

push

github

web-flow
coro::thread_pool high cpu usage when tasks < threads (#265)

* coro::thread_pool high cpu usage when tasks < threads

The check for m_size > 0 was keeping threads awake in a spin state until
all tasks completed. This correctl now uses m_queue.size() behind the
lock to correctly only wake up threads on the condition variable when
tasks are waiting to be processed.

* Fix deadlock with task_container and tls::client with the client's
  destructor scheduling a tls cleanup task, the task_container's lock
was being locked twice when the cleanup task was being destroyed.

Closes #262

* Adjust when task_container's user_task is deleted

It is now deleted inline in make_user_task so any destructors that get
invoked that possibly schedule more coroutines do not cause a deadlock

* io_scheduler is now std::enable_shared_from_this

51 of 64 new or added lines in 5 files covered. (79.69%)

51 existing lines in 4 files now uncovered.

1379 of 1632 relevant lines covered (84.5%)

4234968.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.16
/src/thread_pool.cpp
1
#include "coro/thread_pool.hpp"
2

3
#include <iostream>
4

5
namespace coro
6
{
7
thread_pool::operation::operation(thread_pool& tp) noexcept : m_thread_pool(tp)
46,678,046✔
8
{
9
}
46,675,984✔
10

11
auto thread_pool::operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
46,671,122✔
12
{
13
    m_awaiting_coroutine = awaiting_coroutine;
46,671,122✔
14
    m_thread_pool.schedule_impl(m_awaiting_coroutine);
46,671,122✔
15

16
    // void return on await_suspend suspends the _this_ coroutine, which is now scheduled on the
17
    // thread pool and returns control to the caller.  They could be sync_wait'ing or go do
18
    // something else while this coroutine gets picked up by the thread pool.
19
}
46,686,642✔
20

21
thread_pool::thread_pool(options opts) : m_opts(std::move(opts))
65✔
22
{
23
    m_threads.reserve(m_opts.thread_count);
65✔
24

25
    for (uint32_t i = 0; i < m_opts.thread_count; ++i)
174✔
26
    {
27
        m_threads.emplace_back([this, i]() { executor(i); });
218✔
28
    }
29
}
65✔
30

31
thread_pool::~thread_pool()
30✔
32
{
33
    shutdown();
30✔
34
}
30✔
35

36
auto thread_pool::schedule() -> operation
46,679,306✔
37
{
38
    if (!m_shutdown_requested.load(std::memory_order::acquire))
46,679,306✔
39
    {
40
        m_size.fetch_add(1, std::memory_order::release);
46,683,920✔
41
        return operation{*this};
46,683,920✔
42
    }
43

44
    throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
45
}
46

47
auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> bool
25,896,924✔
48
{
49
    if (handle == nullptr)
25,896,924✔
50
    {
NEW
51
        return false;
×
52
    }
53

54
    if (m_shutdown_requested.load(std::memory_order::acquire))
25,867,939✔
55
    {
NEW
56
        return false;
×
57
    }
58

59
    m_size.fetch_add(1, std::memory_order::release);
25,853,739✔
60
    schedule_impl(handle);
25,853,739✔
61
    return true;
25,987,304✔
62
}
63

64
auto thread_pool::shutdown() noexcept -> void
56✔
65
{
66
    // Only allow shutdown to occur once.
67
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
56✔
68
    {
69
        {
70
            // There is a race condition if we are not holding the lock with the executors
71
            // to always receive this.  std::jthread stop token works without this properly.
72
            std::unique_lock<std::mutex> lk{m_wait_mutex};
50✔
73
            m_wait_cv.notify_all();
50✔
74
        }
50✔
75

76
        for (auto& thread : m_threads)
125✔
77
        {
78
            if (thread.joinable())
75✔
79
            {
80
                thread.join();
75✔
81
            }
82
        }
83
    }
84
}
56✔
85

86
auto thread_pool::executor(std::size_t idx) -> void
109✔
87
{
88
    if (m_opts.on_thread_start_functor != nullptr)
109✔
89
    {
90
        m_opts.on_thread_start_functor(idx);
4✔
91
    }
92

93
    // Process until shutdown is requested.
94
    while (!m_shutdown_requested.load(std::memory_order::acquire))
78,209,790✔
95
    {
96
        std::unique_lock<std::mutex> lk{m_wait_mutex};
78,216,345✔
97
        m_wait_cv.wait(lk, [&]() { return !m_queue.empty() || m_shutdown_requested.load(std::memory_order::acquire); });
163,967,923✔
98

99
        if (m_queue.empty())
78,290,103✔
100
        {
101
            continue;
70✔
102
        }
103

104
        auto handle = m_queue.front();
78,290,321✔
105
        m_queue.pop_front();
78,288,872✔
106
        lk.unlock();
78,289,064✔
107

108
        // Release the lock while executing the coroutine.
109
        handle.resume();
78,272,347✔
110
        m_size.fetch_sub(1, std::memory_order::release);
78,194,068✔
111
    }
78,194,138✔
112

113
    // Process until there are no ready tasks left.
114
    while (m_size.load(std::memory_order::acquire) > 0)
150✔
115
    {
NEW
116
        std::unique_lock<std::mutex> lk{m_wait_mutex};
×
117
        // m_size will only drop to zero once all executing coroutines are finished
118
        // but the queue could be empty for threads that finished early.
NEW
119
        if (m_queue.empty())
×
120
        {
NEW
121
            break;
×
122
        }
123

NEW
124
        auto handle = m_queue.front();
×
NEW
125
        m_queue.pop_front();
×
NEW
126
        lk.unlock();
×
127

128
        // Release the lock while executing the coroutine.
NEW
129
        handle.resume();
×
NEW
130
        m_size.fetch_sub(1, std::memory_order::release);
×
UNCOV
131
    }
×
132

133
    if (m_opts.on_thread_stop_functor != nullptr)
75✔
134
    {
135
        m_opts.on_thread_stop_functor(idx);
×
136
    }
137
}
75✔
138

139
auto thread_pool::schedule_impl(std::coroutine_handle<> handle) noexcept -> void
72,521,357✔
140
{
141
    if (handle == nullptr)
72,521,357✔
142
    {
143
        return;
×
144
    }
145

146
    {
147
        std::scoped_lock lk{m_wait_mutex};
72,535,931✔
148
        m_queue.emplace_back(handle);
72,690,938✔
149
        m_wait_cv.notify_one();
72,690,921✔
150
    }
72,690,907✔
151
}
152

153
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc