• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 14100227883

27 Mar 2025 06:23AM UTC coverage: 85.793%. First build
14100227883

Pull #297

github

web-flow
Merge 82cb974a8 into 8fc3e2712
Pull Request #297: Add condition_variable

298 of 354 new or added lines in 10 files covered. (84.18%)

1709 of 1992 relevant lines covered (85.79%)

3511021.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.72
/src/condition_variable.cpp
1
#include "coro/condition_variable.hpp"
2
#include <cassert>
3

4
static constexpr const auto s_stop_source_check_interval = std::chrono::milliseconds(200);
5

6
namespace coro
7
{
8

9
#ifdef LIBCORO_FEATURE_NETWORKING
10

11
auto detail::strategy_based_on_io_scheduler::wait_for_ms(scoped_lock& lock, const std::chrono::milliseconds duration)
51✔
12
    -> task<std::cv_status>
13
{
14
    auto mtx = lock.mutex();
15

16
    auto             wo = std::make_shared<wait_operation>(*this, std::move(lock));
17
    std::stop_source stop_source;
18
    auto             result = co_await when_any(wait_task(wo, stop_source), timeout_task(wo, duration, stop_source));
19

20
    // cancel a late task
21
    stop_source.request_stop();
22

23
    auto ulock = co_await mtx->lock();
24
    lock       = std::move(ulock);
25
    co_return std::holds_alternative<timeout_status>(result) ? std::cv_status::timeout : std::cv_status::no_timeout;
26
}
102✔
27

28
auto detail::strategy_based_on_io_scheduler::wait_task(
51✔
29
    std::shared_ptr<detail::strategy_based_on_io_scheduler::wait_operation> wo,
30
    std::stop_source                                                        stop_source) -> task<bool>
31
{
32
    auto stop = [wo, stop_source]()
17✔
33
    {
34
        if (!wo->m_awaiting_coroutine.done())
17✔
35
            wo->m_awaiting_coroutine.resume();
17✔
36
    };
17✔
37
    std::stop_callback<decltype(stop)> stop_callback(stop_source.get_token(), stop);
38

39
    #if !defined(__clang__) && defined(__GNUC__) && __GNUC__ < 11
40
    struct wait_operation_proxy
41
    {
42
        explicit wait_operation_proxy(std::shared_ptr<detail::strategy_based_on_io_scheduler::wait_operation> wo)
43
            : m_wo(wo)
44
        {
45
        }
46

47
        auto await_ready() const noexcept -> bool { return false; }
48
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
49
        {
50
            return m_wo->await_suspend(awaiting_coroutine);
51
        }
52
        auto await_resume() noexcept -> void {};
53

54
    private:
55
        std::shared_ptr<detail::strategy_based_on_io_scheduler::wait_operation> m_wo;
56
    };
57

58
    co_await wait_operation_proxy(wo);
59
    #else
60
    co_await *wo;
61
    #endif
62

63
    co_return true;
64
}
102✔
65

66
auto detail::strategy_based_on_io_scheduler::timeout_task(
51✔
67
    std::shared_ptr<detail::strategy_based_on_io_scheduler::wait_operation> wo,
68
    std::chrono::milliseconds                                               timeout,
69
    std::stop_source                                                        stop_source) -> coro::task<timeout_status>
70
{
71
    using namespace std::chrono;
72
    using namespace std::chrono_literals;
73

74
    assert(!m_scheduler.expired());
75
    auto deadline   = steady_clock::now() + timeout;
76
    auto stop_token = stop_source.get_token();
77

78
    while ((steady_clock::now() < deadline) && !stop_source.stop_requested())
79
    {
80
        auto remain       = duration_cast<milliseconds>(deadline - steady_clock::now());
81
        auto next_timeout = std::min(remain, s_stop_source_check_interval);
82
        if (auto sched = m_scheduler.lock())
83
        {
84
            co_await sched->schedule_after(next_timeout);
85
        }
86
    }
87

88
    if (!stop_token.stop_requested())
89
    {
90
        extract_waiter(wo.get());
91
    }
92
    co_return timeout_status::timeout;
93
}
102✔
94

95
void detail::strategy_based_on_io_scheduler::insert_waiter(wait_operation* waiter) noexcept
10,115✔
96
{
97
    auto ptr = m_free_links.acquire();
10,115✔
98
    ptr->waiter.store(waiter, std::memory_order::relaxed);
10,115✔
99
    waiter->m_link.store(ptr.get(), std::memory_order::relaxed);
10,115✔
100
    m_internal_waiters.push(ptr.release());
10,115✔
101
}
10,115✔
102

103
void detail::strategy_based_on_io_scheduler::extract_waiter(wait_operation* waiter) noexcept
10,132✔
104
{
105
    auto link = waiter->m_link.exchange(nullptr, std::memory_order::acq_rel);
10,132✔
106
    if (!link)
10,132✔
107
        return;
10,115✔
108

109
    link->waiter.store(nullptr, std::memory_order::release);
17✔
110
}
111

112
detail::strategy_based_on_io_scheduler::strategy_based_on_io_scheduler(std::shared_ptr<io_scheduler> io_scheduler)
4✔
113
    : m_scheduler(io_scheduler),
4✔
114
      m_free_links(
12✔
115
          std::function<std::unique_ptr<wait_operation_link>()>([]()
8✔
116
                                                                { return std::make_unique<wait_operation_link>(); }),
112✔
117
          [](wait_operation_link* ptr) { ptr->waiter.store(nullptr, std::memory_order::relaxed); })
10,119✔
118
{
119
    assert(io_scheduler);
4✔
120
}
4✔
121

122
detail::strategy_based_on_io_scheduler::~strategy_based_on_io_scheduler()
19✔
123
{
124
    while (auto opt = m_internal_waiters.pop())
19✔
125
    {
126
        m_free_links.release(opt.value());
15✔
127
    }
15✔
128
}
4✔
129

130
task<void> detail::strategy_based_on_io_scheduler::wait(scoped_lock& lock)
10,064✔
131
{
132
    auto mtx = lock.mutex();
133

134
    co_await wait_for_notify(std::move(lock));
135

136
    auto ulock = co_await mtx->lock();
137
    lock       = std::move(ulock);
138
    co_return;
139
}
20,128✔
140

141
void detail::strategy_based_on_io_scheduler::notify_one() noexcept
10,004✔
142
{
143
    assert(!m_scheduler.expired());
10,004✔
144

145
    while (auto waiter_link = extract_one())
10,006✔
146
    {
147
        if (auto sched = m_scheduler.lock())
10,003✔
148
        {
149
            if (auto* waiter = waiter_link->waiter.exchange(nullptr, std::memory_order::acq_rel))
10,003✔
150
            {
151
                if (waiter->m_link.exchange(nullptr, std::memory_order::acq_rel))
10,001✔
152
                {
153
                    sched->resume(waiter->m_awaiting_coroutine);
10,001✔
154
                    break;
10,001✔
155
                }
156
            }
157
        }
10,003✔
158
    }
10,008✔
159
}
10,004✔
160

161
void detail::strategy_based_on_io_scheduler::notify_all() noexcept
4✔
162
{
163
    assert(!m_scheduler.expired());
4✔
164

165
    if (auto sched = m_scheduler.lock())
4✔
166
    {
167
        while (auto waiter_link = extract_one())
101✔
168
        {
169
            if (auto* waiter = waiter_link->waiter.exchange(nullptr, std::memory_order::acq_rel))
97✔
170
            {
171
                if (waiter->m_link.exchange(nullptr, std::memory_order::acq_rel))
97✔
172
                {
173
                    sched->resume(waiter->m_awaiting_coroutine);
97✔
174
                }
175
            }
176
        }
198✔
177
    }
4✔
178
}
4✔
179

180
detail::strategy_based_on_io_scheduler::wait_operation_link_unique_ptr
181
    detail::strategy_based_on_io_scheduler::extract_one()
10,107✔
182
{
183
    return {m_internal_waiters.pop().value_or(nullptr), m_free_links.pool_deleter()};
10,107✔
184
}
185

186
detail::strategy_based_on_io_scheduler::wait_operation::wait_operation(
10,115✔
187
    detail::strategy_based_on_io_scheduler& strategy, scoped_lock&& lock)
10,115✔
188
    : m_strategy(strategy),
10,115✔
189
      m_lock(std::move(lock))
10,115✔
190
{
191
}
10,115✔
192

193
detail::strategy_based_on_io_scheduler::wait_operation::~wait_operation()
10,115✔
194
{
195
    m_strategy.extract_waiter(this);
10,115✔
196
}
10,114✔
197

198
auto detail::strategy_based_on_io_scheduler::wait_operation::await_suspend(
10,115✔
199
    std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
200
{
201
    m_awaiting_coroutine = awaiting_coroutine;
10,115✔
202
    m_strategy.insert_waiter(this);
10,115✔
203
    if (auto sched = m_strategy.m_scheduler.lock())
10,115✔
204
    {
205
        m_lock.unlock(*sched);
10,115✔
206
    }
10,115✔
207
    return true;
10,115✔
208
}
209

210
#endif
211

NEW
212
detail::strategy_base::wait_operation::wait_operation(detail::strategy_base& strategy, scoped_lock&& lock)
×
NEW
213
    : m_strategy(strategy),
×
NEW
214
      m_lock(std::move(lock))
×
215
{
NEW
216
}
×
217

NEW
218
bool detail::strategy_base::wait_operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept
×
219
{
NEW
220
    m_awaiting_coroutine = awaiting_coroutine;
×
NEW
221
    m_strategy.m_internal_waiters.push(this);
×
NEW
222
    m_lock.unlock();
×
NEW
223
    return true;
×
224
}
225

NEW
226
auto detail::strategy_base::wait(scoped_lock& lock) -> task<void>
×
227
{
228
    auto mtx = lock.mutex();
229

230
    co_await wait_for_notify(std::move(lock));
231

232
    auto ulock = co_await mtx->lock();
233
    lock       = std::move(ulock);
234
    co_return;
NEW
235
}
×
236

NEW
237
void detail::strategy_base::notify_one() noexcept
×
238
{
NEW
239
    if (auto waiter = m_internal_waiters.pop().value_or(nullptr))
×
240
    {
NEW
241
        waiter->m_awaiting_coroutine.resume();
×
242
    }
NEW
243
}
×
244

NEW
245
void detail::strategy_base::notify_all() noexcept
×
246
{
NEW
247
    while (auto waiter = m_internal_waiters.pop().value_or(nullptr))
×
248
    {
NEW
249
        waiter->m_awaiting_coroutine.resume();
×
NEW
250
    }
×
NEW
251
}
×
252

253
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc