• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 14100227883

27 Mar 2025 06:23AM UTC coverage: 85.793%. First build
14100227883

Pull #297

github

web-flow
Merge 82cb974a8 into 8fc3e2712
Pull Request #297: Add condition_variable

298 of 354 new or added lines in 10 files covered. (84.18%)

1709 of 1992 relevant lines covered (85.79%)

3511021.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.95
/include/coro/condition_variable.hpp
1
#pragma once
2

3
#include "coro/default_executor.hpp"
4
#ifdef LIBCORO_FEATURE_NETWORKING
5
    #include "coro/io_scheduler.hpp"
6
#else
7
    #include "coro/concepts/executor.hpp"
8
    #include <condition_variable>
9
#endif
10

11
#include "coro/detail/lockfree_object_pool.hpp"
12
#include "coro/mutex.hpp"
13

14
namespace coro
15
{
16

17
namespace concepts
18
{
19
// clang-format off
20

21

22
/**
23
 * Concept of basic capabilities condition_variable
24
 */
25
template<typename strategy_type>
26
concept cv_strategy_base = requires(strategy_type s, scoped_lock& l)
27
{
28
    { s.wait(l) }-> std::same_as<task<void>>;
29
    { s.notify_one() } -> std::same_as<void>;
30
    { s.notify_all() } -> std::same_as<void>;
31
};
32

33
/**
34
 * Concept of full capabilities condition_variable
35
 */
36
template<typename strategy_type>
37
concept cv_strategy = cv_strategy_base<strategy_type> and requires(strategy_type s, coro::scoped_lock l, std::chrono::milliseconds d)
38
{
39
    { s.wait_for_ms(l, d) } -> std::same_as<coro::task<std::cv_status>>;
40
};
41
// clang-format on
42
} // namespace concepts
43

44
namespace detail
45
{
46

47
/**
48
 * The strategy implementing basic features of condition_variable, such as wait(), notify_one(), notify_all(). Does not
49
 * require LIBCORO_FEATURE_NETWORKING for its operation
50
 */
51
class strategy_base
52
{
53
public:
54
    template<concepts::executor executor_type>
55
    struct wait_operation_with_executor;
56

57
    struct wait_operation
58
    {
59
        explicit wait_operation(strategy_base& strategy, scoped_lock&& lock);
60

NEW
61
        auto await_ready() const noexcept -> bool { return false; }
×
62
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool;
NEW
63
        auto await_resume() noexcept -> void {}
×
64

65
    protected:
66
        friend class strategy_base;
67

68
        template<concepts::executor executor_type>
69
        friend struct wait_operation_with_executor;
70

71
        strategy_base&          m_strategy;
72
        std::coroutine_handle<> m_awaiting_coroutine;
73
        scoped_lock             m_lock;
74
    };
75

76
    template<concepts::executor executor_type>
77
    struct wait_operation_with_executor
78
    {
79
        explicit wait_operation_with_executor(strategy_base& strategy, scoped_lock&& lock, executor_type& e)
80
            : m_waiter(strategy, std::move(lock)),
81
              m_executor(e)
82
        {
83
        }
84

85
        auto await_ready() const noexcept -> bool { return false; }
86
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
87
        {
88
            m_waiter.m_awaiting_coroutine = awaiting_coroutine;
89
            m_waiter.m_strategy.m_internal_waiters.push(&m_waiter);
90
            m_waiter.m_lock.unlock(m_executor);
91
            return true;
92
        }
93
        auto await_resume() noexcept -> void {}
94

95
    private:
96
        wait_operation m_waiter;
97
        executor_type& m_executor;
98
    };
99

100
    /**
101
     * Suspend the current coroutine until the condition variable is awakened
102
     * @param lock an lock which must be locked by the calling coroutine
103
     * @return The task to await until the condition variable is awakened.
104
     */
105
    auto wait(scoped_lock& lock) -> task<void>;
106

107
    /**
108
     * Suspend the current coroutine until the condition variable is awakened. This will distribute
109
     * the waiters across the executor's threads.
110
     * @param lock an lock which must be locked by the calling coroutine
111
     * @return The task to await until the condition variable is awakened.
112
     */
113
    template<concepts::executor executor_type>
114
    auto wait(scoped_lock& lock, executor_type& e) -> task<void>
115
    {
116
        auto mtx = lock.mutex();
117

118
        co_await wait_for_notify(std::move(lock), e);
119

120
        auto ulock = co_await mtx->lock();
121
        lock       = std::move(ulock);
122
        co_return;
123
    }
124

125
    /**
126
     * Notifies and resumes one waiter immediately at the moment of the call, the calling coroutine will be forced to
127
     * wait for the switching of the awakened coroutine
128
     */
129
    void notify_one() noexcept;
130

131
    /**
132
     * Notifies and resume one awaiters onto the given executor. This will distribute
133
     * the waiters across the executor's threads.
134
     */
135
    template<concepts::executor executor_type>
136
    void notify_one(executor_type& e)
137
    {
138
        if (auto waiter = m_internal_waiters.pop().value_or(nullptr))
139
        {
140
            e.resume(waiter->m_awaiting_coroutine);
141
        }
142
    }
143

144
    /**
145
     * Notifies and resumes all awaiters immediately at the moment of the call, the calling coroutine will be forced to
146
     * wait for the switching of all awakened coroutines
147
     */
148
    void notify_all() noexcept;
149

150
    /**
151
     * Notifies and resumes all awaiters onto the given executor. This will distribute
152
     * the waiters across the executor's threads.
153
     */
154
    template<concepts::executor executor_type>
155
    void notify_all(executor_type& e)
156
    {
157
        while (auto waiter = m_internal_waiters.pop().value_or(nullptr))
158
        {
159
            e.resume(waiter->m_awaiting_coroutine);
160
        }
161
    }
162

163
    /// Internal helper function to wait for a condition variable
NEW
164
    [[nodiscard]] auto wait_for_notify(scoped_lock&& lock) -> wait_operation
×
165
    {
NEW
166
        return wait_operation{*this, std::move(lock)};
×
167
    };
168

169
    /**
170
     * Internal helper function to wait for a condition variable. This will distribute coro::mutex
171
     * waiters across the executor's threads.
172
     */
173
    template<concepts::executor executor_type>
174
    [[nodiscard]] auto wait_for_notify(scoped_lock&& lock, executor_type& e) -> wait_operation
175
    {
176
        return wait_operation_with_executor{*this, std::move(lock), e};
177
    };
178

179
protected:
180
    friend struct wait_operation;
181

182
    /// A queue of grabbed internal waiters that are only accessed by the notify'er the wait'er
183
    coro::detail::lockfree_queue_based_on_pool<wait_operation*> m_internal_waiters;
184
};
185

186
#ifdef LIBCORO_FEATURE_NETWORKING
187

188
/**
189
 * The strategy fully implements all the capabilities of condition_variable, including such as wait_for(), wait_until().
190
 * Requires LIBCORO_FEATURE_NETWORKING for its operation.
191
 */
192
class strategy_based_on_io_scheduler
193
{
194
protected:
195
    struct wait_operation_link;
196
    using wait_operation_link_unique_ptr =
197
        std::unique_ptr<wait_operation_link, std::function<void(wait_operation_link*)>>;
198

199
public:
200
    explicit strategy_based_on_io_scheduler(
201
        std::shared_ptr<io_scheduler> io_scheduler = coro::default_executor::instance()->get_io_scheduler());
202

203
    ~strategy_based_on_io_scheduler();
204

205
    struct wait_operation
206
    {
207
        explicit wait_operation(strategy_based_on_io_scheduler& strategy, scoped_lock&& lock);
208
        ~wait_operation();
209

210
        auto await_ready() const noexcept -> bool { return false; }
10,115✔
211
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool;
212
        auto await_resume() noexcept -> void {};
10,114✔
213

214
    private:
215
        friend class strategy_based_on_io_scheduler;
216

217
        strategy_based_on_io_scheduler&   m_strategy;
218
        std::coroutine_handle<>           m_awaiting_coroutine;
219
        std::atomic<wait_operation_link*> m_link{nullptr};
220
        scoped_lock                       m_lock;
221
    };
222

223
    auto wait(scoped_lock& lock) -> task<void>;
224

225
    void notify_one() noexcept;
226

227
    void notify_all() noexcept;
228

229
    /// Internal helper function to wait for a condition variable
230
    [[nodiscard]] auto wait_for_notify(scoped_lock&& lock) -> wait_operation
10,064✔
231
    {
232
        return wait_operation{*this, std::move(lock)};
10,064✔
233
    };
234

235
    /// Internal unification version of the function for waiting on a coro::condition_variable with a time limit
236
    [[nodiscard]] auto wait_for_ms(scoped_lock& lock, const std::chrono::milliseconds duration) -> task<std::cv_status>;
237

238
protected:
239
    friend class std::lock_guard<strategy_based_on_io_scheduler>;
240
    friend struct wait_operation;
241

242
    struct wait_operation_link
243
    {
244
        std::atomic<wait_operation*> waiter{nullptr};
245
    };
246

247
    /// A scheduler is needed to suspend coroutines and then wake them up upon notification or timeout.
248
    std::weak_ptr<io_scheduler> m_scheduler;
249

250
    /// A queue of grabbed internal waiters that are only accessed by the notify'er the wait'er
251
    coro::detail::lockfree_queue_based_on_pool<wait_operation_link*> m_internal_waiters;
252

253
    /// A object pool of free wait_operation_link_ptr
254
    coro::detail::lockfree_object_pool<wait_operation_link> m_free_links;
255

256
    /// Insert @ref waiter to @ref m_internal_waiters
257
    void insert_waiter(wait_operation* waiter) noexcept;
258

259
    /// Extract @ref waiter from @ref m_internal_waiters
260
    void extract_waiter(wait_operation* waiter) noexcept;
261

262
    /// Extract one waiter from @ref m_internal_waiters
263
    wait_operation_link_unique_ptr extract_one();
264

265
    /// Internal helper function to wait for a condition variable. This is necessary for the scheduler when he schedules
266
    /// a task with a time limit
267
    [[nodiscard]] auto wait_task(std::shared_ptr<wait_operation> wo, std::stop_source stop_source) -> task<bool>;
268

269
    [[nodiscard]] auto timeout_task(
270
        std::shared_ptr<wait_operation> wo,
271
        std::chrono::milliseconds       timeout,
272
        std::stop_source                stop_source) -> coro::task<timeout_status>;
273
};
274
#endif
275

276
/**
277
 * You can set a custom default strategy for the coro::condition_variable
278
 */
279
#ifdef LIBCORO_CONDITION_VARIABLE_DEFAULT_STRATEGY
280
using default_strategy = LIBCORO_CONDITION_VARIABLE_DEFAULT_STRATEGY;
281
#else
282
    #ifdef LIBCORO_FEATURE_NETWORKING
283
using default_strategy = strategy_based_on_io_scheduler;
284
    #else  // LIBCORO_FEATURE_NETWORKING
285
using default_strategy = strategy_base;
286
    #endif // LIBCORO_FEATURE_NETWORKING
287
#endif     // LIBCORO_CONDITION_VARIABLE_DEFAULT_STRATEGY
288
} // namespace detail
289

290
/**
291
 * The coro::condition_variable_base is a thread safe async tool used with a coro::mutex (coro::scoped_lock)
292
 * to suspend one or more coro::task until another coro::task both modifies a shared variable
293
 *  (the condition) and notifies the coro::condition_variable_base
294
 */
295
template<concepts::cv_strategy_base Strategy>
296
class condition_variable_base : public Strategy
297
{
298
public:
299
    condition_variable_base() = default;
2✔
300

301
    template<typename... Args>
302
    explicit condition_variable_base(Args&&... args) : Strategy(std::forward<Args>(args)...)
2✔
303
    {
304
    }
2✔
305

306
    condition_variable_base(const condition_variable_base&)            = delete;
307
    condition_variable_base& operator=(const condition_variable_base&) = delete;
308

309
    /**
310
     * Notifies one waiting coroutine
311
     */
312
    using Strategy::notify_one;
313

314
    /**
315
     * Notifies all waiting threads
316
     */
317
    using Strategy::notify_all;
318

319
    /**
320
     * Suspend the current coroutine until the condition variable is awakened
321
     * @param lock an lock which must be locked by the calling coroutine
322
     * @return The task to await until the condition variable is awakened.
323
     */
324
    using Strategy::wait;
325

326
    /**
327
     * Suspend the current coroutine until the condition variable is awakened and predicate becomes true
328
     * @param lock an lock which must be locked by the calling coroutine
329
     * @param pred the predicate to check whether the waiting can be completed
330
     * @return The task to await until the condition variable is awakened and predicate becomes true.
331
     */
332
    template<class Predicate>
333
    [[nodiscard]] auto wait(scoped_lock& lock, Predicate pred) -> task<void>;
334

335
    /**
336
     * Causes the current coroutine to suspend until the condition variable is notified, or the given duration has been
337
     * elapsed
338
     * @param lock an lock which must be locked by the calling coroutine
339
     * @param duration the maximum duration to wait
340
     * @return The task to await until the condition variable is notified, or the given duration has been elapsed
341
     */
342
    template<class Rep, class Period>
343
        requires concepts::cv_strategy<Strategy>
344
    [[nodiscard]] auto
345
        wait_for(scoped_lock& lock, const std::chrono::duration<Rep, Period>& duration) -> task<std::cv_status>;
346

347
    /**
348
     * Causes the current coroutine to suspend until the condition variable is notified and predicate becomes true, or
349
     * the given duration has been elapsed
350
     * @param lock an lock which must be locked by the calling coroutine
351
     * @param duration the maximum duration to wait
352
     * @param pred the predicate to check whether the waiting can be completed
353
     * @return The task to await until the condition variable is notified and predicate becomes true, or the given
354
     * duration has been elapsed
355
     */
356
    template<class Rep, class Period, class Predicate>
357
        requires concepts::cv_strategy<Strategy>
358
    [[nodiscard]] auto
359
        wait_for(scoped_lock& lock, const std::chrono::duration<Rep, Period>& duration, Predicate pred) -> task<bool>;
360

361
    /**
362
     * Causes the current coroutine to suspend until the condition variable is notified, or the given time point has
363
     * been reached
364
     * @param lock an lock which must be locked by the calling coroutine
365
     * @param wakeup the time point where waiting expires
366
     * @return The task to await until the condition variable is notified, or the given time point has been reached
367
     */
368
    template<class Clock, class Duration>
369
        requires concepts::cv_strategy<Strategy>
370
    [[nodiscard]] auto
371
        wait_until(scoped_lock& lock, const std::chrono::time_point<Clock, Duration>& wakeup) -> task<std::cv_status>;
372

373
    /**
374
     * Causes the current coroutine to suspend until the condition variable is notified and predicate becomes true, or
375
     * the given time point has been reached
376
     * @param lock an lock which must be locked by the calling coroutine
377
     * @param wakeup the time point where waiting expires
378
     * @param pred the predicate to check whether the waiting can be completed
379
     * @return The task to await until the condition variable is notified and predicate becomes true, or the given time
380
     * point has been reached
381
     */
382
    template<class Clock, class Duration, class Predicate>
383
        requires concepts::cv_strategy<Strategy>
384
    [[nodiscard]] auto wait_until(
385
        scoped_lock& lock, const std::chrono::time_point<Clock, Duration>& wakeup, Predicate pred) -> task<bool>;
386
};
387

388
template<concepts::cv_strategy_base Strategy>
389
template<class Predicate>
390
inline auto condition_variable_base<Strategy>::wait(scoped_lock& lock, Predicate pred) -> task<void>
10,064✔
391
{
392
    while (!pred())
393
    {
394
        co_await wait(lock);
395
    }
396
}
20,128✔
397

398
template<concepts::cv_strategy_base Strategy>
399
template<class Clock, class Duration, class Predicate>
400
    requires concepts::cv_strategy<Strategy>
401
inline auto condition_variable_base<Strategy>::wait_until(
6✔
402
    scoped_lock& lock, const std::chrono::time_point<Clock, Duration>& wakeup, Predicate pred) -> task<bool>
403
{
404
    while (!pred())
405
    {
406
        if (co_await wait_until(lock, wakeup) == std::cv_status::timeout)
407
        {
408
            co_return pred();
409
        }
410
    }
411
    co_return true;
412
}
12✔
413

414
template<concepts::cv_strategy_base Strategy>
415
template<class Clock, class Duration>
416
    requires concepts::cv_strategy<Strategy>
417
inline auto condition_variable_base<Strategy>::wait_until(
9✔
418
    scoped_lock& lock, const std::chrono::time_point<Clock, Duration>& wakeup) -> task<std::cv_status>
419
{
420
    using namespace std::chrono;
421

422
    auto msec = duration_cast<milliseconds>(wakeup - Clock::now());
423

424
    if (msec.count() <= 0)
425
    {
426
        msec = 1ms; // prevent infinity wait
427
    }
428

429
    co_return co_await Strategy::wait_for_ms(lock, msec);
430
}
18✔
431

432
template<concepts::cv_strategy_base Strategy>
433
template<class Rep, class Period, class Predicate>
434
    requires concepts::cv_strategy<Strategy>
435
inline auto condition_variable_base<Strategy>::wait_for(
6✔
436
    scoped_lock& lock, const std::chrono::duration<Rep, Period>& duration, Predicate pred) -> task<bool>
437
{
438
    while (!pred())
439
    {
440
        if (co_await wait_for(lock, duration) == std::cv_status::timeout)
441
        {
442
            co_return pred();
443
        }
444
    }
445
    co_return true;
446
}
12✔
447

448
template<concepts::cv_strategy_base Strategy>
449
template<class Rep, class Period>
450
    requires concepts::cv_strategy<Strategy>
451
inline auto condition_variable_base<Strategy>::wait_for(
42✔
452
    scoped_lock& lock, const std::chrono::duration<Rep, Period>& duration) -> task<std::cv_status>
453
{
454
    using namespace std::chrono;
455

456
    auto msec = duration_cast<milliseconds>(duration);
457
    if (msec.count() <= 0)
458
    {
459
        // infinity wait
460
        co_await wait(lock);
461
        co_return std::cv_status::no_timeout;
462
    }
463
    else
464
    {
465
        co_return co_await Strategy::wait_for_ms(lock, msec);
466
    }
467
}
84✔
468

469
/**
470
 * this is coro::condition_variable_base with default strategy parameter (coro::detail::default_strategy)
471
 */
472
using condition_variable = condition_variable_base<detail::default_strategy>;
473

474
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc