• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / b3fb243086f752d5358cb072f986ec410e86a3c8

01 Dec 2023 05:49PM UTC coverage: 83.609% (+0.3%) from 83.333%
b3fb243086f752d5358cb072f986ec410e86a3c8

push

github

web-flow
Changes to be able to build to webassembly with emscripten. (#201)

* Changes to enable libcoro to be built with emscripten
* Removed the TL::expected dependency from submodules
* Tweaked a size test in test_task.cpp that failed in wasm as it
em++ seems to be adding padding.

556 of 665 relevant lines covered (83.61%)

6116968.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.64
/src/thread_pool.cpp
1
#include "coro/thread_pool.hpp"
2

3
#include <iostream>
4

5
namespace coro
6
{
7
thread_pool::operation::operation(thread_pool& tp) noexcept : m_thread_pool(tp)
46,674,560✔
8
{
9
}
46,670,853✔
10

11
auto thread_pool::operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
46,665,580✔
12
{
13
    m_awaiting_coroutine = awaiting_coroutine;
46,665,580✔
14
    m_thread_pool.schedule_impl(m_awaiting_coroutine);
46,665,580✔
15

16
    // void return on await_suspend suspends the _this_ coroutine, which is now scheduled on the
17
    // thread pool and returns control to the caller.  They could be sync_wait'ing or go do
18
    // something else while this coroutine gets picked up by the thread pool.
19
}
46,689,160✔
20

21
thread_pool::thread_pool(options opts) : m_opts(std::move(opts))
62✔
22
{
23
    m_threads.reserve(m_opts.thread_count);
62✔
24

25
    for (uint32_t i = 0; i < m_opts.thread_count; ++i)
160✔
26
    {
27
        m_threads.emplace_back([this, i]() { executor(i); });
196✔
28
    }
29
}
62✔
30

31
thread_pool::~thread_pool()
60✔
32
{
33
    shutdown();
60✔
34
}
60✔
35

36
auto thread_pool::schedule() -> operation
46,676,632✔
37
{
38
    if (!m_shutdown_requested.load(std::memory_order::acquire))
46,676,632✔
39
    {
40
        m_size.fetch_add(1, std::memory_order::release);
46,682,540✔
41
        return operation{*this};
46,682,540✔
42
    }
43

44
    throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
×
45
}
46

47
auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> void
25,777,872✔
48
{
49
    if (handle == nullptr)
25,777,872✔
50
    {
51
        return;
×
52
    }
53

54
    m_size.fetch_add(1, std::memory_order::release);
25,909,275✔
55
    schedule_impl(handle);
25,909,275✔
56
}
57

58
auto thread_pool::shutdown() noexcept -> void
97✔
59
{
60
    // Only allow shutdown to occur once.
61
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
97✔
62
    {
63
        {
64
            // There is a race condition if we are not holding the lock with the executors
65
            // to always receive this.  std::jthread stop token works without this properly.
66
            std::unique_lock<std::mutex> lk{m_wait_mutex};
120✔
67
            m_wait_cv.notify_all();
60✔
68
        }
69

70
        for (auto& thread : m_threads)
153✔
71
        {
72
            if (thread.joinable())
93✔
73
            {
74
                thread.join();
93✔
75
            }
76
        }
77
    }
78
}
97✔
79

80
auto thread_pool::executor(std::size_t idx) -> void
98✔
81
{
82
    if (m_opts.on_thread_start_functor != nullptr)
98✔
83
    {
84
        m_opts.on_thread_start_functor(idx);
4✔
85
    }
86

87
    // Process until shutdown is requested and the total number of tasks reaches zero.
88
    while (!m_shutdown_requested.load(std::memory_order::acquire) || m_size.load(std::memory_order::acquire) > 0)
235,552,258✔
89
    {
90
        std::unique_lock<std::mutex> lk{m_wait_mutex};
471,070,040✔
91
        m_wait_cv.wait(
235,845,934✔
92
            lk,
93
            [&] {
244,032,643✔
94
                return m_size.load(std::memory_order::acquire) > 0 ||
496,256,794✔
95
                       m_shutdown_requested.load(std::memory_order::acquire);
252,224,151✔
96
            });
97
        // Process this batch until the queue is empty.
98
        while (!m_queue.empty())
313,732,320✔
99
        {
100
            auto handle = m_queue.front();
77,890,755✔
101
            m_queue.pop_front();
77,890,778✔
102

103
            // Release the lock while executing the coroutine.
104
            lk.unlock();
77,890,243✔
105
            handle.resume();
77,874,426✔
106

107
            m_size.fetch_sub(1, std::memory_order::release);
77,823,608✔
108
            lk.lock();
77,823,608✔
109
        }
110
    }
111

112
    if (m_opts.on_thread_stop_functor != nullptr)
79,085✔
113
    {
114
        m_opts.on_thread_stop_functor(idx);
×
115
    }
116
}
92✔
117

118
auto thread_pool::schedule_impl(std::coroutine_handle<> handle) noexcept -> void
72,425,906✔
119
{
120
    if (handle == nullptr)
72,425,906✔
121
    {
122
        return;
×
123
    }
124

125
    {
126
        std::scoped_lock lk{m_wait_mutex};
145,101,455✔
127
        m_queue.emplace_back(handle);
72,691,797✔
128
    }
129

130
    m_wait_cv.notify_one();
72,677,272✔
131
}
132

133
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc