• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 20009361012

07 Dec 2025 07:41PM UTC coverage: 86.572%. First build
20009361012

Pull #423

github

web-flow
Merge 81b88315a into e7a183f25
Pull Request #423: executor->spawn(joinable)->task

49 of 58 new or added lines in 6 files covered. (84.48%)

1702 of 1966 relevant lines covered (86.57%)

5072833.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.33
/include/coro/condition_variable.hpp
1
#pragma once
2

3
#include "coro/concepts/executor.hpp"
4
#include "coro/detail/awaiter_list.hpp"
5
#include "coro/detail/task_self_deleting.hpp"
6
#include "coro/event.hpp"
7
#include "coro/mutex.hpp"
8
#include "coro/task.hpp"
9
#include "coro/when_any.hpp"
10

11
#include <atomic>
12
#include <chrono>
13
#include <condition_variable>
14
#include <functional>
15
#include <optional>
16

17
#ifdef LIBCORO_FEATURE_NETWORKING
18
    #include <stop_token>
19
#endif
20

21
namespace coro
22
{
23

24
class condition_variable
25
{
26
public:
27
    using predicate_type = std::function<bool()>;
28

29
private:
30
    enum notify_status_t
31
    {
32
        /// @brief The waiter is ready to be resumed, either the predicate passed or its been requested to stop.
33
        ready,
34
        /// @brief The waiter is not ready to be resumed
35
        not_ready,
36
        /// @brief The waiter is a hook and has timed out and is dead.
37
        awaiter_dead,
38
    };
39

40
    struct awaiter_base
41
    {
42
        awaiter_base(coro::condition_variable& cv, coro::scoped_lock& l);
43
        virtual ~awaiter_base() = default;
68✔
44

45
        awaiter_base(const awaiter_base&)                    = delete;
46
        awaiter_base(awaiter_base&&)                         = delete;
47
        auto operator=(const awaiter_base&) -> awaiter_base& = delete;
48
        auto operator=(awaiter_base&&) -> awaiter_base&      = delete;
49

50
        /// @brief The next waiting awaiter.
51
        awaiter_base* m_next{nullptr};
52
        /// @brief The coroutine to resume the waiter.
53
        std::coroutine_handle<> m_awaiting_coroutine{nullptr};
54
        /// @brief The condition variable this waiter is waiting on.
55
        coro::condition_variable& m_condition_variable;
56
        /// @brief The lock that the wait() was called with.
57
        coro::scoped_lock& m_lock;
58

59
        /// @brief Each awaiter type defines its own notify behavior.
60
        /// @return The status of if the waiter's notify result.
61
        virtual auto on_notify() -> coro::task<notify_status_t> = 0;
62
    };
63

64
    struct awaiter : public awaiter_base
65
    {
66
        awaiter(coro::condition_variable& cv, coro::scoped_lock& l) noexcept;
67
        ~awaiter() override = default;
22✔
68

69
        awaiter(const awaiter&)                    = delete;
70
        awaiter(awaiter&&)                         = delete;
71
        auto operator=(const awaiter&) -> awaiter& = delete;
72
        auto operator=(awaiter&&) -> awaiter&      = delete;
73

74
        auto await_ready() const noexcept -> bool;
75
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool;
76
        auto await_resume() noexcept {}
22✔
77

78
        auto on_notify() -> coro::task<notify_status_t> override;
79
    };
80

81
    struct awaiter_with_predicate : public awaiter_base
82
    {
83
        awaiter_with_predicate(coro::condition_variable& cv, coro::scoped_lock& l, predicate_type p) noexcept;
84
        ~awaiter_with_predicate() override = default;
9✔
85

86
        awaiter_with_predicate(const awaiter_with_predicate&)                    = delete;
87
        awaiter_with_predicate(awaiter_with_predicate&&)                         = delete;
88
        auto operator=(const awaiter_with_predicate&) -> awaiter_with_predicate& = delete;
89
        auto operator=(awaiter_with_predicate&&) -> awaiter_with_predicate&      = delete;
90

91
        auto await_ready() const noexcept -> bool;
92
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool;
93
        auto await_resume() noexcept {}
9✔
94

95
        auto on_notify() -> coro::task<notify_status_t> override;
96

97
        /// @brief The wait predicate to execute on notify.
98
        predicate_type m_predicate;
99
    };
100

101
#ifndef EMSCRIPTEN
102

103
    struct awaiter_with_predicate_stop_token : public awaiter_base
104
    {
105
        awaiter_with_predicate_stop_token(
106
            coro::condition_variable& cv, coro::scoped_lock& l, predicate_type p, std::stop_token stop_token) noexcept;
107
        ~awaiter_with_predicate_stop_token() override = default;
1✔
108

109
        awaiter_with_predicate_stop_token(const awaiter_with_predicate_stop_token&)                    = delete;
110
        awaiter_with_predicate_stop_token(awaiter_with_predicate_stop_token&&)                         = delete;
111
        auto operator=(const awaiter_with_predicate_stop_token&) -> awaiter_with_predicate_stop_token& = delete;
112
        auto operator=(awaiter_with_predicate_stop_token&&) -> awaiter_with_predicate_stop_token&      = delete;
113

114
        auto await_ready() noexcept -> bool;
115
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool;
116
        auto await_resume() noexcept -> bool { return m_predicate_result; }
1✔
117

118
        auto on_notify() -> coro::task<notify_status_t> override;
119

120
        /// @brief The wait predicate to execute on notify.
121
        predicate_type m_predicate;
122
        /// @brief The stop token that will guarantee the next notify will wake the awaiter regardless of the predicate.
123
        std::stop_token m_stop_token;
124
        /// @brief The last predicate's call result.
125
        bool m_predicate_result{false};
126
    };
127

128
#endif
129

130
#ifdef LIBCORO_FEATURE_NETWORKING
131

132
    /// @brief This structure encapsulates the data from the controller task.
133
    struct controller_data
134
    {
135
        controller_data(
136
            std::optional<std::cv_status>&       status,
137
            bool&                                predicate_result,
138
            std::optional<predicate_type>        predicate,
139
            std::optional<const std::stop_token> stop_token) noexcept;
140
        ~controller_data() = default;
18✔
141

142
        controller_data(const controller_data&)                    = delete;
143
        controller_data(controller_data&&)                         = delete;
144
        auto operator=(const controller_data&) -> controller_data& = delete;
145
        auto operator=(controller_data&&) -> controller_data&      = delete;
146

147
        /// @brief Mutex for notify or timeout mutual exclusion.
148
        coro::mutex m_event_mutex{};
149
        /// @brief Event to notify the no timeout task.
150
        coro::event m_notify_callback{};
151
        /// @brief Flag to notify if the awaiter has completed via no_timeout or timeout.
152
        std::atomic<bool> m_awaiter_completed{false};
153
        /// @brief The status of the wait() call, this is _ONLY_ valid if the m_awaiter_completed flag was obtained,
154
        /// otherwise its a dangling reference.
155
        std::optional<std::cv_status>& m_status;
156
        /// @brief The last result of the predicate call.
157
        bool& m_predicate_result;
158
        /// @brief The predicate, this can be no predicate, default predicate or stop token predicate.
159
        std::optional<predicate_type> m_predicate{std::nullopt};
160
        /// @brief The stop token.
161
        std::optional<const std::stop_token> m_stop_token{std::nullopt};
162
    };
163

164
    /**
165
     * @brief This awaiter is a facade to hook in between the wait_[for|unitl]() caller and the actual awaiter object.
166
     * This allows the hook to either call the no_timeout or timeout results correctly based on which one completes
167
     * first.
168
     *
169
     * This class mimics the awaiter_base class so it can silently sit in the coro:condition_variable.m_awaiters to look
170
     * like its the actual awaiter, but just proxies to the real awaiter when appropriate.
171
     *
172
     * If the wait_[for|until] does timeout, then this will do nothing except delete itself from the list on a
173
     * notify_[one|all] call.
174
     */
175
    struct awaiter_with_wait_hook : public awaiter_base
176
    {
177
        awaiter_with_wait_hook(coro::condition_variable& cv, coro::scoped_lock& l, controller_data& data) noexcept;
178
        ~awaiter_with_wait_hook() override = default;
18✔
179

180
        auto on_notify() -> coro::task<notify_status_t> override;
181

182
        controller_data& m_data;
183
    };
184

185
    template<concepts::io_executor io_executor_type, typename return_type>
186
    struct awaiter_with_wait : public awaiter_base
187
    {
188
        awaiter_with_wait(
18✔
189
            std::unique_ptr<io_executor_type>& executor,
190
            coro::condition_variable&          cv,
191
            coro::scoped_lock&                 l,
192
            const std::chrono::nanoseconds     wait_for,
193
            std::optional<predicate_type>      predicate  = std::nullopt,
194
            std::optional<std::stop_token>     stop_token = std::nullopt) noexcept
195
            : awaiter_base(cv, l),
196
              m_executor(executor),
18✔
197
              m_wait_for(wait_for),
18✔
198
              m_predicate(std::move(predicate)),
18✔
199
              m_stop_token(std::move(stop_token))
36✔
200
        {
201
        }
18✔
202
        ~awaiter_with_wait() override = default;
18✔
203

204
        awaiter_with_wait(const awaiter_with_wait&)                    = delete;
205
        awaiter_with_wait(awaiter_with_wait&&)                         = delete;
206
        auto operator=(const awaiter_with_wait&) -> awaiter_with_wait& = delete;
207
        auto operator=(awaiter_with_wait&&) -> awaiter_with_wait&      = delete;
208

209
        /**
210
         * @brief Task to handle the no_timeout case, however it is resumed even after a timeout since it needs to exit
211
         * for the controller task.
212
         *
213
         * @param data The controller task's data.
214
         * @return coro::task<void>
215
         */
216
        auto make_on_notify_callback_task(controller_data& data) -> coro::task<void>
18✔
217
        {
218
            co_await data.m_notify_callback;
219

220
            // If this is the condition and not a timeout resume from this task.
221
            if (m_status.value() == std::cv_status::no_timeout)
222
            {
223
                // The condition coroutine is resumed from here instead of the awaiter hook task to
224
                // guarantee the controller is still alive until the caller's coroutine is completed.
225
                m_awaiting_coroutine.resume();
226
            }
227

228
            // This was a timeout, do nothing but exit to let the controller task know it can safely exit now.
229
            co_return;
230
        }
36✔
231

232
        /**
233
         * @brief Task to handle the timeout case, this will always wait the duration of the timeout before exiting.
234
         *
235
         * @param data The controller task data.
236
         * @return coro::task<void>
237
         */
238
        auto make_timeout_task(controller_data& data) -> coro::task<void>
18✔
239
        {
240
            co_await m_executor->schedule_after(m_wait_for);
241
            auto lock = co_await data.m_event_mutex.scoped_lock();
242
            bool expected{false};
243
            if (data.m_awaiter_completed.compare_exchange_strong(
244
                    expected, true, std::memory_order::release, std::memory_order::relaxed))
245
            {
246
                m_status = {std::cv_status::timeout};
247
                lock.unlock();
248

249
                // This means the timeout has occurred first. Before resuming the wait_[for|until]() caller the lock
250
                // must be re-acquired.
251
                co_await m_lock.m_mutex->lock();
252
                m_predicate_result = data.m_predicate.has_value() ? data.m_predicate.value()() : true;
253
                m_awaiting_coroutine.resume();
254
                co_return;
255
            }
256

257
            // The no_timeout has occurred first, exit so the controller task can complete.
258
            co_return;
259
        }
36✔
260

261
        /**
262
         * @brief Task to manage the no_timeout and timeout tasks, this holds the state for both tasks since they need
263
         * to reference the actual wait_[for|until]() awaiter, but need to do so safely while it is still alive. To
264
         * access the true awaiter the awaiter_completed atomic bool must be acquired, if it is not acquired the calling
265
         * awaiter is invalid since it has already been resumed with the first event of timeout or no_timeout.
266
         *
267
         * @return coro::detail::task_self_deleting This task is self deleting since it has an indeterminate lifetime.
268
         */
269
        auto make_controller_task() -> coro::detail::task_self_deleting
18✔
270
        {
271
            controller_data data{m_status, m_predicate_result, std::move(m_predicate), std::move(m_stop_token)};
272
            // We enqueue the hook_task since we can make it live until the notify occurs and will properly resume the
273
            // actual coroutine only once.
274
            awaiter_with_wait_hook hook_task{m_condition_variable, m_lock, data};
275
            detail::awaiter_list_push(m_condition_variable.m_awaiters, static_cast<awaiter_base*>(&hook_task));
276
            m_lock.m_mutex->unlock(); // Unlock the actual lock now that we are setup, not the fake hook task.
277

278
            co_await coro::when_all(make_on_notify_callback_task(data), make_timeout_task(data));
279
            co_return;
280
        }
36✔
281

282
        auto await_ready() noexcept -> bool
18✔
283
        {
284
            // If there is no predicate then we are not ready.
285
            if (!m_predicate.has_value())
18✔
286
            {
287
                return false;
10✔
288
            }
289

290
            m_predicate_result = m_predicate.value()();
8✔
291
            if (m_predicate_result && m_stop_token.has_value() && m_stop_token.value().stop_requested())
8✔
292
            {
293
                m_status = std::cv_status::no_timeout;
×
294
            }
295
            return m_predicate_result;
8✔
296
        }
297

298
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
18✔
299
        {
300
            m_awaiting_coroutine = awaiting_coroutine; // This is the real coroutine to resume.
18✔
301

302
            // Make the background controller which proxies between the notify task and the timeout task.
303
            auto controller = make_controller_task();
18✔
304
            controller.resume();
18✔
305
            return true;
18✔
306
        }
307

308
        auto await_resume() noexcept -> return_type
18✔
309
        {
310
            if constexpr (std::is_same_v<return_type, bool>)
311
            {
312
                return m_predicate_result;
8✔
313
            }
314
            else
315
            {
316
                return m_status.value();
10✔
317
            }
318
        }
319

NEW
320
        auto on_notify() -> coro::task<notify_status_t> override { throw std::runtime_error("should never be called"); }
×
321

322
        /// @brief The io_executor used to wait for the timeout.
323
        std::unique_ptr<io_executor_type>& m_executor;
324
        /// @brief The amount of time to wait for before timing out.
325
        const std::chrono::nanoseconds m_wait_for;
326
        /// @brief If the condition timed out or not.
327
        std::optional<std::cv_status> m_status{std::nullopt};
328
        /// @brief The last m_predicate() call result.
329
        bool m_predicate_result{false};
330
        /// @brief The predicate, this can be no predicate, default predicate or stop token predicate. This value is
331
        /// only valid until the awaiter data object takes ownership.
332
        std::optional<predicate_type> m_predicate{std::nullopt};
333
        /// @brief The stop token. This value is only valid until the awater data object takes onwership.
334
        std::optional<const std::stop_token> m_stop_token{std::nullopt};
335
    };
336

337
#endif
338

339
public:
340
    condition_variable()  = default;
1✔
341
    ~condition_variable() = default;
342

343
    condition_variable(const condition_variable&)                    = delete;
344
    condition_variable(condition_variable&&)                         = delete;
345
    auto operator=(const condition_variable&) -> condition_variable& = delete;
346
    auto operator=(condition_variable&&) -> condition_variable&      = delete;
347

348
    /**
349
     * @brief Notifies a single waiter.
350
     */
351
    auto notify_one() -> coro::task<void>;
352

353
    /**
354
     * @brief Notifies a single waiter and resumes the waiter on the given executor.
355
     *
356
     * @tparam executor_type The type of executor that the waiter will be resumed on.
357
     * @param executor The executor that the waiter will be resumed on.
358
     */
359
    template<coro::concepts::executor executor_type>
360
    auto notify_one(std::unique_ptr<executor_type>& executor) -> void
5✔
361
    {
362
        executor->spawn_detached(notify_one());
5✔
363
    }
5✔
364

365
    /**
366
     * @brief Notifies all waiters.
367
     */
368
    auto notify_all() -> coro::task<void>;
369

370
    /**
371
     * @brief Notifies all waiters and resumes them on the given executor. Note that each waiter must be notified
372
     * synchronously so this is useful if the task is long lived and can be immediately parallelized after the condition
373
     * is ready. This does not need to be co_await'ed like `notify_all()` since this will execute the notify on the
374
     * given executor.
375
     *
376
     * @tparam executor_type The type of executor that the waiters will be resumed on.
377
     * @param executor The executor that each waiter will be resumed on.
378
     * @return void
379
     */
380
    template<coro::concepts::executor executor_type>
381
    auto notify_all(std::unique_ptr<executor_type>& executor) -> void
1✔
382
    {
383
        auto* waiter = detail::awaiter_list_pop_all(m_awaiters);
1✔
384

385
        while (waiter != nullptr)
11✔
386
        {
387
            // Need to grab next before notifying since the notifier will self destruct after completing.
388
            awaiter_base* next = waiter->m_next;
10✔
389
            // This will kick off each task in parallel on the scheduler, they will fight over the lock
390
            // but will give the best parallelism scheduling them immediately.
391
            executor->spawn_detached(make_notify_all_executor_individual_task(waiter));
10✔
392
            waiter = next;
10✔
393
        }
394

395
        return;
1✔
396
    }
397

398
    /**
399
     * @brief Waits until notified.
400
     *
401
     * @param lock A lock that must be locked by the caller.
402
     * @return awaiter
403
     */
404
    [[nodiscard]] auto wait(coro::scoped_lock& lock) -> awaiter;
405

406
    /**
407
     * @brief Waits until notified but only wakes up if the predicate passes.
408
     *
409
     * @param lock A lock that must be locked by the caller.
410
     * @param predicate The predicate to check whether the waiting can be completed.
411
     * @return awaiter_with_predicate
412
     */
413
    [[nodiscard]] auto wait(coro::scoped_lock& lock, predicate_type predicate) -> awaiter_with_predicate;
414

415
#ifndef EMSCRIPTEN
416
    /**
417
     * @brief Waits until notified and wakes up if a stop is requseted or the predicate passes.
418
     *
419
     * @param lock A lock which must be locked by the caller.
420
     * @param stop_token A stop token to register interruption for.
421
     * @param predicate The predicate to check whether the waiting can be completed.
422
     * @return awaiter_with_predicate_stop_token The final predicate call result.
423
     */
424
    [[nodiscard]] auto wait(coro::scoped_lock& lock, std::stop_token stop_token, predicate_type predicate)
425
        -> awaiter_with_predicate_stop_token;
426
#endif
427

428
#ifdef LIBCORO_FEATURE_NETWORKING
429

430
    template<concepts::io_executor io_executor_type, class rep_type, class period_type>
431
    [[nodiscard]] auto wait_for(
8✔
432
        std::unique_ptr<io_executor_type>&                 executor,
433
        coro::scoped_lock&                                 lock,
434
        const std::chrono::duration<rep_type, period_type> wait_for)
435
        -> awaiter_with_wait<io_executor_type, std::cv_status>
436
    {
437
        return awaiter_with_wait<io_executor_type, std::cv_status>{
438
            executor, *this, lock, std::chrono::duration_cast<std::chrono::nanoseconds>(wait_for)};
8✔
439
    }
440

441
    template<concepts::io_executor io_executor_type, class rep_type, class period_type>
442
    [[nodiscard]] auto wait_for(
3✔
443
        std::unique_ptr<io_executor_type>&                 executor,
444
        coro::scoped_lock&                                 lock,
445
        const std::chrono::duration<rep_type, period_type> wait_for,
446
        predicate_type                                     predicate) -> awaiter_with_wait<io_executor_type, bool>
447
    {
448
        return awaiter_with_wait<io_executor_type, bool>{
449
            executor,
450
            *this,
451
            lock,
452
            std::chrono::duration_cast<std::chrono::nanoseconds>(wait_for),
3✔
453
            std::move(predicate)};
3✔
454
    }
455

456
    template<concepts::io_executor io_executor_type, class rep_type, class period_type>
457
    [[nodiscard]] auto wait_for(
1✔
458
        std::unique_ptr<io_executor_type>&                 executor,
459
        coro::scoped_lock&                                 lock,
460
        std::stop_token                                    stop_token,
461
        const std::chrono::duration<rep_type, period_type> wait_for,
462
        predicate_type                                     predicate) -> awaiter_with_wait<io_executor_type, bool>
463
    {
464
        return awaiter_with_wait<io_executor_type, bool>{
465
            executor,
466
            *this,
467
            lock,
468
            std::chrono::duration_cast<std::chrono::nanoseconds>(wait_for),
1✔
469
            std::move(predicate),
1✔
470
            std::move(stop_token)};
1✔
471
    }
472

473
    template<concepts::io_executor io_executor_type, class clock_type, class duration_type>
474
    auto wait_until(
2✔
475
        std::unique_ptr<io_executor_type>&                       executor,
476
        coro::scoped_lock&                                       lock,
477
        const std::chrono::time_point<clock_type, duration_type> wait_until_time)
478
        -> awaiter_with_wait<io_executor_type, std::cv_status>
479
    {
480
        auto now      = std::chrono::time_point<clock_type, duration_type>::clock::now();
2✔
481
        auto wait_for = (now < wait_until_time) ? (wait_until_time - now) : std::chrono::nanoseconds{1};
2✔
482
        return awaiter_with_wait<io_executor_type, std::cv_status>{
483
            executor, *this, lock, std::chrono::duration_cast<std::chrono::nanoseconds>(wait_for)};
2✔
484
    }
485

486
    template<concepts::io_executor io_executor_type, class clock_type, class duration_type>
487
    auto wait_until(
2✔
488
        std::unique_ptr<io_executor_type>&                       executor,
489
        coro::scoped_lock&                                       lock,
490
        const std::chrono::time_point<clock_type, duration_type> wait_until_time,
491
        predicate_type                                           predicate) -> awaiter_with_wait<io_executor_type, bool>
492
    {
493
        auto now      = std::chrono::time_point<clock_type, duration_type>::clock::now();
2✔
494
        auto wait_for = (now < wait_until_time) ? (wait_until_time - now) : std::chrono::nanoseconds{1};
2✔
495
        return awaiter_with_wait<io_executor_type, bool>{
496
            executor,
497
            *this,
498
            lock,
499
            std::chrono::duration_cast<std::chrono::nanoseconds>(wait_for),
2✔
500
            std::move(predicate)};
2✔
501
    }
502

503
    template<concepts::io_executor io_executor_type, class clock_type, class duration_type>
504
    auto wait_until(
2✔
505
        std::unique_ptr<io_executor_type>&                       executor,
506
        coro::scoped_lock&                                       lock,
507
        std::stop_token                                          stop_token,
508
        const std::chrono::time_point<clock_type, duration_type> wait_until_time,
509
        predicate_type                                           predicate) -> awaiter_with_wait<io_executor_type, bool>
510
    {
511
        auto now      = std::chrono::time_point<clock_type, duration_type>::clock::now();
2✔
512
        auto wait_for = (now < wait_until_time) ? (wait_until_time - now) : std::chrono::nanoseconds{1};
2✔
513
        return awaiter_with_wait<io_executor_type, bool>{
514
            executor,
515
            *this,
516
            lock,
517
            std::chrono::duration_cast<std::chrono::nanoseconds>(wait_for),
2✔
518
            std::move(predicate),
2✔
519
            std::move(stop_token)};
2✔
520
    }
521
#endif
522

523
private:
524
    /// @brief The list of waiters.
525
    std::atomic<awaiter_base*> m_awaiters{nullptr};
526

527
    auto make_notify_all_executor_individual_task(awaiter_base* waiter) -> coro::task<void>
10✔
528
    {
529
        switch (co_await waiter->on_notify())
530
        {
531
            case notify_status_t::not_ready:
532
                // Re-enqueue since the predicate isn't ready and return since the notify has been satisfied.
533
                detail::awaiter_list_push(m_awaiters, waiter);
534
                break;
535
            case notify_status_t::ready:
536
            case notify_status_t::awaiter_dead:
537
                // Don't re-enqueue any awaiters that are ready or dead.
538
                break;
539
        }
540
    }
20✔
541
};
542

543
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc