• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 20009361012

07 Dec 2025 07:41PM UTC coverage: 86.572%. First build
20009361012

Pull #423

github

web-flow
Merge 81b88315a into e7a183f25
Pull Request #423: executor->spawn(joinable)->task

49 of 58 new or added lines in 6 files covered. (84.48%)

1702 of 1966 relevant lines covered (86.57%)

5072833.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.11
/src/io_scheduler.cpp
1
#include "coro/io_scheduler.hpp"
2
#include "coro/detail/task_self_deleting.hpp"
3

4
#include <atomic>
5
#include <cstring>
6
#include <optional>
7
#include <sys/socket.h>
8
#include <sys/types.h>
9
#include <unistd.h>
10

11
using namespace std::chrono_literals;
12

13
namespace coro
14
{
15

16
namespace detail
17
{
18
static auto
NEW
19
    make_spawned_joinable_wait_task(std::unique_ptr<coro::task_group<coro::io_scheduler>> group_ptr) -> coro::task<void>
×
20
{
21
    co_await *group_ptr;
22
    co_return;
NEW
23
}
×
24

25
} // namespace detail
26

27
io_scheduler::io_scheduler(options&& opts, private_constructor)
89✔
28
    : m_opts(opts),
89✔
29
      m_io_notifier(),
89✔
30
      m_timer(static_cast<const void*>(&m_timer_object), m_io_notifier)
178✔
31
{
32
    if (!m_io_notifier.watch(m_shutdown_pipe.read_fd(), coro::poll_op::read, const_cast<void*>(m_shutdown_ptr), true))
89✔
33
    {
34
        throw std::runtime_error("Failed to register m_shutdown_pipe.read_fd() for read events.");
×
35
    }
36

37
    if (!m_io_notifier.watch(m_schedule_pipe.read_fd(), coro::poll_op::read, const_cast<void*>(m_schedule_ptr), true))
89✔
38
    {
39
        throw std::runtime_error("Failed to register m_schedule.pipe.read_rd() for read events.");
×
40
    }
41

42
    m_recent_events.reserve(m_max_events);
89✔
43

44
    if (m_opts.execution_strategy == execution_strategy_t::process_tasks_on_thread_pool)
89✔
45
    {
46
        m_thread_pool = thread_pool::make_unique(std::move(m_opts.pool));
46✔
47
    }
48
}
89✔
49

50
auto io_scheduler::make_unique(options opts) -> std::unique_ptr<io_scheduler>
89✔
51
{
52
    auto s = std::make_unique<io_scheduler>(std::move(opts), private_constructor{});
89✔
53

54
    // Spawn the dedicated event loop thread once the scheduler is fully constructed
55
    // so it has a full object to work with.
56
    if (s->m_opts.thread_strategy == thread_strategy_t::spawn)
89✔
57
    {
58
        s->m_io_thread = std::thread([s = s.get()]() { s->process_events_dedicated_thread(); });
174✔
59
    }
60
    // else manual mode, the user must call process_events.
61

62
    return s;
89✔
63
}
×
64

65
io_scheduler::~io_scheduler()
178✔
66
{
67
    shutdown();
89✔
68

69
    if (m_io_thread.joinable())
89✔
70
    {
71
        m_io_thread.join();
×
72
    }
73

74
    m_shutdown_pipe.close();
89✔
75
    m_schedule_pipe.close();
89✔
76
}
178✔
77

78
auto io_scheduler::process_events(std::chrono::milliseconds timeout) -> std::size_t
4✔
79
{
80
    process_events_manual(timeout);
4✔
81
    return size();
4✔
82
}
83

84
auto io_scheduler::spawn_detached(coro::task<void>&& task) -> bool
200,320✔
85
{
86
    m_size.fetch_add(1, std::memory_order::release);
200,320✔
87
    auto wrapper_task = detail::make_task_self_deleting(std::move(task));
200,320✔
88
    wrapper_task.promise().user_final_suspend([this]() -> void { m_size.fetch_sub(1, std::memory_order::release); });
400,639✔
89
    return resume(wrapper_task.handle());
200,320✔
90
}
91

NEW
92
auto io_scheduler::spawn_joinable(coro::task<void>&& task) -> coro::task<void>
×
93
{
NEW
94
    auto group_ptr = std::make_unique<coro::task_group<coro::io_scheduler>>(this, std::move(task));
×
NEW
95
    return detail::make_spawned_joinable_wait_task(std::move(group_ptr));
×
NEW
96
}
×
97

98
auto io_scheduler::schedule_at(time_point time) -> coro::task<void>
3✔
99
{
100
    return yield_until(time);
3✔
101
}
102

103
auto io_scheduler::yield_until(time_point time) -> coro::task<void>
4✔
104
{
105
    auto now = clock::now();
106

107
    // If the requested time is in the past (or now!) bail out!
108
    if (time <= now)
109
    {
110
        co_await schedule();
111
    }
112
    else
113
    {
114
        m_size.fetch_add(1, std::memory_order::release);
115

116
        auto amount = std::chrono::duration_cast<std::chrono::milliseconds>(time - now);
117

118
        detail::poll_info pi{};
119
        add_timer_token(now + amount, pi);
120
        co_await pi;
121

122
        m_size.fetch_sub(1, std::memory_order::release);
123
    }
124
    co_return;
125
}
8✔
126

127
auto io_scheduler::poll(fd_t fd, coro::poll_op op, std::chrono::milliseconds timeout) -> coro::task<poll_status>
600,366✔
128
{
129
    // Because the size will drop when this coroutine suspends every poll needs to undo the subtraction
130
    // on the number of active tasks in the scheduler.  When this task is resumed by the event loop.
131
    m_size.fetch_add(1, std::memory_order::release);
132

133
    // Setup two events, a timeout event and the actual poll for op event.
134
    // Whichever triggers first will delete the other to guarantee only one wins.
135
    // The resume token will be set by the scheduler to what the event turned out to be.
136

137
    bool timeout_requested = (timeout > 0ms);
138

139
    auto pi = detail::poll_info{fd, op};
140

141
    if (timeout_requested)
142
    {
143
        pi.m_timer_pos = add_timer_token(clock::now() + timeout, pi);
144
    }
145

146
    if (!m_io_notifier.watch(pi))
147
    {
148
        std::cerr << "Failed to add " << fd << " to watch list\n";
149
    }
150

151
    // The event loop will 'clean-up' whichever event didn't win since the coroutine is scheduled
152
    // onto the thread poll its possible the other type of event could trigger while its waiting
153
    // to execute again, thus restarting the coroutine twice, that would be quite bad.
154
    auto result = co_await pi;
155
    m_size.fetch_sub(1, std::memory_order::release);
156
    co_return result;
157
}
1,199,772✔
158

159
auto io_scheduler::resume(std::coroutine_handle<> handle) -> bool
200,380✔
160
{
161
    if (handle == nullptr || handle.done())
200,380✔
162
    {
163
        return false;
×
164
    }
165

166
    if (m_shutdown_requested.load(std::memory_order::acquire))
200,380✔
167
    {
168
        return false;
×
169
    }
170

171
    if (m_opts.execution_strategy == execution_strategy_t::process_tasks_inline)
200,380✔
172
    {
173
        m_size.fetch_add(1, std::memory_order::release);
102✔
174
        {
175
            std::scoped_lock lk{m_scheduled_tasks_mutex};
102✔
176
            m_scheduled_tasks.emplace_back(handle);
102✔
177
        }
102✔
178

179
        bool expected{false};
102✔
180
        if (m_schedule_pipe_triggered.compare_exchange_strong(
102✔
181
                expected, true, std::memory_order::release, std::memory_order::relaxed))
182
        {
183
            const int value = 1;
102✔
184
            ::write(m_schedule_pipe.write_fd(), reinterpret_cast<const void*>(&value), sizeof(value));
102✔
185
        }
186

187
        return true;
102✔
188
    }
189
    else
190
    {
191
        return m_thread_pool->resume(handle);
200,278✔
192
    }
193
}
194

195
auto io_scheduler::shutdown() noexcept -> void
133✔
196
{
197
    // Only allow shutdown to occur once.
198
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
133✔
199
    {
200
        // Signal the event loop to stop asap.
201
        const int value{1};
89✔
202
        ::write(m_shutdown_pipe.write_fd(), reinterpret_cast<const void*>(&value), sizeof(value));
89✔
203

204
        if (m_io_thread.joinable())
89✔
205
        {
206
            m_io_thread.join();
87✔
207
        }
208

209
        if (m_thread_pool != nullptr)
89✔
210
        {
211
            m_thread_pool->shutdown();
46✔
212
        }
213
    }
214
}
133✔
215

216
auto io_scheduler::yield_for_internal(std::chrono::nanoseconds amount) -> coro::task<void>
5,000,170✔
217
{
218
    if (amount <= 0ms)
219
    {
220
        co_await schedule();
221
    }
222
    else
223
    {
224
        // Yield/timeout tasks are considered live in the scheduler and must be accounted for. Note
225
        // that if the user gives an invalid amount and schedule() is directly called it will account
226
        // for the scheduled task there.
227
        m_size.fetch_add(1, std::memory_order::release);
228

229
        // Yielding does not require setting the timer position on the poll info since
230
        // it doesn't have a corresponding 'event' that can trigger, it always waits for
231
        // the timeout to occur before resuming.
232

233
        detail::poll_info pi{};
234
        add_timer_token(clock::now() + amount, pi);
235
        co_await pi;
236

237
        m_size.fetch_sub(1, std::memory_order::release);
238
    }
239
    co_return;
240
}
10,000,340✔
241

242
auto io_scheduler::process_events_manual(std::chrono::milliseconds timeout) -> void
4✔
243
{
244
    bool expected{false};
4✔
245
    if (m_io_processing.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed))
4✔
246
    {
247
        process_events_execute(timeout);
4✔
248
        m_io_processing.exchange(false, std::memory_order::release);
4✔
249
    }
250
}
4✔
251

252
auto io_scheduler::process_events_dedicated_thread() -> void
87✔
253
{
254
    if (m_opts.on_io_thread_start_functor != nullptr)
87✔
255
    {
256
        m_opts.on_io_thread_start_functor();
×
257
    }
258

259
    m_io_processing.exchange(true, std::memory_order::release);
87✔
260
    // Execute tasks until stopped or there are no more tasks to complete.
261
    while (!m_shutdown_requested.load(std::memory_order::acquire) || size() > 0)
935,809✔
262
    {
263
        process_events_execute(m_default_timeout);
935,722✔
264
    }
265
    m_io_processing.exchange(false, std::memory_order::release);
85✔
266

267
    if (m_opts.on_io_thread_stop_functor != nullptr)
87✔
268
    {
269
        m_opts.on_io_thread_stop_functor();
×
270
    }
271
}
87✔
272

273
auto io_scheduler::process_events_execute(std::chrono::milliseconds timeout) -> void
935,718✔
274
{
275
    // Clear the recent events without decreasing the allocated capacity to reduce allocations
276
    m_recent_events.clear();
935,718✔
277
    m_io_notifier.next_events(m_recent_events, timeout);
935,720✔
278

279
    for (auto& [handle_ptr, poll_status] : m_recent_events)
2,323,616✔
280
    {
281
        if (handle_ptr == m_timer_ptr)
1,388,148✔
282
        {
283
            // Process all events that have timed out.
284
            process_timeout_execute();
786,766✔
285
        }
286
        else if (handle_ptr == m_schedule_ptr)
601,382✔
287
        {
288
            // Process scheduled coroutines.
289
            process_scheduled_execute_inline();
130✔
290
        }
291
        else if (handle_ptr == m_shutdown_ptr) [[unlikely]]
601,252✔
292
        {
293
            // Nothing to do, just needed to wake-up and smell the flowers
294
        }
295
        else
296
        {
297
            // Individual poll task wake-up.
298
            process_event_execute(static_cast<detail::poll_info*>(handle_ptr), poll_status);
601,165✔
299
        }
300
    }
301

302
    // Its important to not resume any handles until the full set is accounted for.  If a timeout
303
    // and an event for the same handle happen in the same epoll_wait() call then inline processing
304
    // will destruct the poll_info object before the second event is handled.  This is also possible
305
    // with thread pool processing, but probably has an extremely low chance of occuring due to
306
    // the thread switch required.  If m_max_events == 1 this would be unnecessary.
307

308
    if (!m_handles_to_resume.empty())
935,600✔
309
    {
310
        if (m_opts.execution_strategy == execution_strategy_t::process_tasks_inline)
935,613✔
311
        {
312
            for (auto& handle : m_handles_to_resume)
1,038✔
313
            {
314
                handle.resume();
756✔
315
            }
316
        }
317
        else
318
        {
319
            m_thread_pool->resume(m_handles_to_resume);
935,332✔
320
        }
321

322
        m_handles_to_resume.clear();
935,629✔
323
    }
324
}
935,718✔
325

326
auto io_scheduler::process_scheduled_execute_inline() -> void
130✔
327
{
328
    std::vector<std::coroutine_handle<>> tasks{};
130✔
329
    {
330
        // Acquire the entire list, and then reset it.
331
        std::scoped_lock lk{m_scheduled_tasks_mutex};
130✔
332
        tasks.swap(m_scheduled_tasks);
130✔
333

334
        // Clear the notification by reading until the pipe is cleared.
335
        while (true)
336
        {
337
            constexpr std::size_t       READ_COUNT{4};
130✔
338
            constexpr ssize_t           READ_COUNT_BYTES = READ_COUNT * sizeof(int);
130✔
339
            std::array<int, READ_COUNT> control{};
130✔
340
            const ssize_t               result =
341
                ::read(m_schedule_pipe.read_fd(), reinterpret_cast<void*>(control.data()), READ_COUNT_BYTES);
130✔
342
            if (result == READ_COUNT_BYTES)
130✔
343
            {
344
                continue;
×
345
            }
346

347
            // If we got nothing, or we got a partial read break the loop since the pipe is empty.
348
            if (result >= 0)
130✔
349
            {
350
                break;
130✔
351
            }
352

353
            // pipe is set to O_NONBLOCK so ignore empty blocking reads.
354
            if (errno == EAGAIN)
×
355
            {
356
                break;
×
357
            }
358

359
            // Not much we can do here, we're in a very bad state, lets report to stderr.
NEW
360
            std::cerr << "::read(m_schedule_pipe.read_fd()) error[" << errno << "] " << ::strerror(errno) << " fd=["
×
NEW
361
                      << m_schedule_pipe.read_fd() << "]" << std::endl;
×
362
            break;
×
363
        }
×
364

365
        // Clear the in memory flag to reduce eventfd_* calls on scheduling.
366
        m_schedule_pipe_triggered.exchange(false, std::memory_order::release);
130✔
367
    }
130✔
368

369
    // This set of handles can be safely resumed now since they do not have a corresponding timeout event.
370
    for (auto& task : tasks)
367✔
371
    {
372
        task.resume();
236✔
373
    }
374
    m_size.fetch_sub(tasks.size(), std::memory_order::release);
130✔
375
}
130✔
376

377
auto io_scheduler::process_event_execute(detail::poll_info* pi, poll_status status) -> void
601,104✔
378
{
379
    if (!pi->m_processed)
601,104✔
380
    {
381
        std::atomic_thread_fence(std::memory_order::acquire);
382
        // Its possible the event and the timeout occurred in the same epoll, make sure only one
383
        // is ever processed, the other is discarded.
384
        pi->m_processed = true;
601,200✔
385

386
        // Given a valid fd always remove it from epoll so the next poll can blindly EPOLL_CTL_ADD.
387
        if (pi->m_fd != -1)
601,200✔
388
        {
389
            m_io_notifier.unwatch(*pi);
601,471✔
390
        }
391

392
        // Since this event triggered, remove its corresponding timeout if it has one.
393
        if (pi->m_timer_pos.has_value())
601,142✔
394
        {
395
            remove_timer_token(pi->m_timer_pos.value());
100,606✔
396
        }
397

398
        pi->m_poll_status = status;
601,312✔
399

400
        while (pi->m_awaiting_coroutine == nullptr)
77,852,420✔
401
        {
402
            std::atomic_thread_fence(std::memory_order::acquire);
403
        }
404

405
        m_handles_to_resume.emplace_back(pi->m_awaiting_coroutine);
601,099✔
406
    }
407
}
600,684✔
408

409
auto io_scheduler::process_timeout_execute() -> void
786,766✔
410
{
411
    std::vector<detail::poll_info*> poll_infos{};
786,766✔
412
    auto                            now = clock::now();
786,766✔
413

414
    {
415
        std::scoped_lock lk{m_timed_events_mutex};
786,766✔
416
        while (!m_timed_events.empty())
5,786,950✔
417
        {
418
            auto first    = m_timed_events.begin();
5,786,900✔
419
            auto [tp, pi] = *first;
5,786,900✔
420

421
            if (tp <= now)
5,786,900✔
422
            {
423
                m_timed_events.erase(first);
5,000,184✔
424
                poll_infos.emplace_back(pi);
5,000,184✔
425
            }
426
            else
427
            {
428
                break;
786,716✔
429
            }
430
        }
431
    }
786,766✔
432

433
    for (auto pi : poll_infos)
5,786,950✔
434
    {
435
        if (!pi->m_processed)
5,000,184✔
436
        {
437
            // Its possible the event and the timeout occurred in the same epoll, make sure only one
438
            // is ever processed, the other is discarded.
439
            pi->m_processed = true;
5,000,184✔
440

441
            // Since this timed out, remove its corresponding event if it has one.
442
            if (pi->m_fd != -1)
5,000,184✔
443
            {
444
                m_io_notifier.unwatch(*pi);
13✔
445
            }
446

447
            while (pi->m_awaiting_coroutine == nullptr)
5,000,184✔
448
            {
449
                std::atomic_thread_fence(std::memory_order::acquire);
450
            }
451

452
            m_handles_to_resume.emplace_back(pi->m_awaiting_coroutine);
5,000,184✔
453
            pi->m_poll_status = coro::poll_status::timeout;
5,000,184✔
454
        }
455
    }
456

457
    // Update the time to the next smallest time point, re-take the current now time
458
    // since updating and resuming tasks could shift the time.
459
    update_timeout(clock::now());
786,766✔
460
}
786,766✔
461

462
auto io_scheduler::add_timer_token(time_point tp, detail::poll_info& pi) -> timed_events::iterator
5,100,479✔
463
{
464
    std::scoped_lock lk{m_timed_events_mutex};
5,100,479✔
465
    auto             pos = m_timed_events.emplace(tp, &pi);
5,100,792✔
466

467
    // If this item was inserted as the smallest time point, update the timeout.
468
    if (pos == m_timed_events.begin())
5,100,792✔
469
    {
470
        update_timeout(clock::now());
742✔
471
    }
472

473
    return pos;
5,100,784✔
474
}
5,100,792✔
475

476
auto io_scheduler::remove_timer_token(timed_events::iterator pos) -> void
100,606✔
477
{
478
    {
479
        std::scoped_lock lk{m_timed_events_mutex};
100,606✔
480
        auto             is_first = (m_timed_events.begin() == pos);
100,608✔
481

482
        m_timed_events.erase(pos);
100,608✔
483

484
        // If this was the first item, update the timeout.  It would be acceptable to just let it
485
        // also fire the timeout as the event loop will ignore it since nothing will have timed
486
        // out but it feels like the right thing to do to update it to the correct timeout value.
487
        if (is_first)
100,607✔
488
        {
489
            update_timeout(clock::now());
39,150✔
490
        }
491
    }
100,608✔
492
}
100,608✔
493

494
auto io_scheduler::update_timeout(time_point now) -> void
826,659✔
495
{
496
    if (!m_timed_events.empty())
826,659✔
497
    {
498
        auto& [tp, pi] = *m_timed_events.begin();
826,065✔
499

500
        auto amount = tp - now;
826,065✔
501

502
        if (!m_io_notifier.watch_timer(m_timer, amount))
826,065✔
503
        {
504
            std::cerr << "Failed to set timerfd errorno=[" << std::string{strerror(errno)} << "].";
×
505
        }
506
    }
507
    else
508
    {
509
        m_io_notifier.unwatch_timer(m_timer);
593✔
510
    }
511
}
826,659✔
512

513
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc