• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 20542218297

27 Dec 2025 05:36PM UTC coverage: 86.825%. First build
20542218297

Pull #433

github

web-flow
Merge c06ef8ce5 into b24feabaf
Pull Request #433: Fix latest gcc and clang warnings

9 of 13 new or added lines in 4 files covered. (69.23%)

1753 of 2019 relevant lines covered (86.83%)

4945590.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.3
/src/io_scheduler.cpp
1
#include "coro/io_scheduler.hpp"
2
#include "coro/detail/task_self_deleting.hpp"
3

4
#include <atomic>
5
#include <cstring>
6
#include <iostream>
7
#include <optional>
8
#include <sys/socket.h>
9
#include <sys/types.h>
10
#include <unistd.h>
11

12
using namespace std::chrono_literals;
13

14
namespace coro
15
{
16

17
namespace detail
18
{
19
static auto
20
    make_spawned_joinable_wait_task(std::unique_ptr<coro::task_group<coro::io_scheduler>> group_ptr) -> coro::task<void>
×
21
{
22
    co_await *group_ptr;
23
    co_return;
24
}
×
25

26
} // namespace detail
27

28
io_scheduler::io_scheduler(options&& opts, private_constructor)
89✔
29
    : m_opts(opts),
89✔
30
      m_io_notifier(),
89✔
31
      m_timer(static_cast<const void*>(&m_timer_object), m_io_notifier)
178✔
32
{
33
    if (!m_io_notifier.watch(m_shutdown_pipe.read_fd(), coro::poll_op::read, const_cast<void*>(m_shutdown_ptr), true))
89✔
34
    {
35
        throw std::runtime_error("Failed to register m_shutdown_pipe.read_fd() for read events.");
×
36
    }
37

38
    if (!m_io_notifier.watch(m_schedule_pipe.read_fd(), coro::poll_op::read, const_cast<void*>(m_schedule_ptr), true))
89✔
39
    {
40
        throw std::runtime_error("Failed to register m_schedule.pipe.read_rd() for read events.");
×
41
    }
42

43
    m_recent_events.reserve(m_max_events);
89✔
44

45
    if (m_opts.execution_strategy == execution_strategy_t::process_tasks_on_thread_pool)
89✔
46
    {
47
        m_thread_pool = thread_pool::make_unique(std::move(m_opts.pool));
46✔
48
    }
49
}
89✔
50

51
auto io_scheduler::make_unique(options opts) -> std::unique_ptr<io_scheduler>
89✔
52
{
53
    auto s = std::make_unique<io_scheduler>(std::move(opts), private_constructor{});
89✔
54

55
    // Spawn the dedicated event loop thread once the scheduler is fully constructed
56
    // so it has a full object to work with.
57
    if (s->m_opts.thread_strategy == thread_strategy_t::spawn)
89✔
58
    {
59
        s->m_io_thread = std::thread([s = s.get()]() { s->process_events_dedicated_thread(); });
174✔
60
    }
61
    // else manual mode, the user must call process_events.
62

63
    return s;
89✔
64
}
×
65

66
io_scheduler::~io_scheduler()
178✔
67
{
68
    shutdown();
89✔
69

70
    if (m_io_thread.joinable())
89✔
71
    {
72
        m_io_thread.join();
×
73
    }
74

75
    m_shutdown_pipe.close();
89✔
76
    m_schedule_pipe.close();
89✔
77
}
178✔
78

79
auto io_scheduler::process_events(std::chrono::milliseconds timeout) -> std::size_t
4✔
80
{
81
    process_events_manual(timeout);
4✔
82
    return size();
4✔
83
}
84

85
auto io_scheduler::spawn_detached(coro::task<void>&& task) -> bool
200,319✔
86
{
87
    m_size.fetch_add(1, std::memory_order::release);
200,319✔
88
    auto wrapper_task = detail::make_task_self_deleting(std::move(task));
200,319✔
89
    wrapper_task.promise().user_final_suspend([this]() -> void { m_size.fetch_sub(1, std::memory_order::release); });
400,637✔
90
    return resume(wrapper_task.handle());
200,318✔
91
}
92

93
auto io_scheduler::spawn_joinable(coro::task<void>&& task) -> coro::task<void>
×
94
{
95
    auto group_ptr = std::make_unique<coro::task_group<coro::io_scheduler>>(this, std::move(task));
×
96
    return detail::make_spawned_joinable_wait_task(std::move(group_ptr));
×
97
}
×
98

99
auto io_scheduler::schedule_at(time_point time) -> coro::task<void>
3✔
100
{
101
    return yield_until(time);
3✔
102
}
103

104
auto io_scheduler::yield_until(time_point time) -> coro::task<void>
4✔
105
{
106
    auto now = clock::now();
107

108
    // If the requested time is in the past (or now!) bail out!
109
    if (time <= now)
110
    {
111
        co_await schedule();
112
    }
113
    else
114
    {
115
        m_size.fetch_add(1, std::memory_order::release);
116

117
        auto amount = std::chrono::duration_cast<std::chrono::milliseconds>(time - now);
118

119
        detail::poll_info pi{};
120
        add_timer_token(now + amount, pi);
121
        co_await pi;
122

123
        m_size.fetch_sub(1, std::memory_order::release);
124
    }
125
    co_return;
126
}
8✔
127

128
auto io_scheduler::poll(
599,957✔
129
    fd_t fd, coro::poll_op op, std::chrono::milliseconds timeout, std::optional<poll_stop_token> cancel_trigger)
130
    -> coro::task<poll_status>
131
{
132
    // Because the size will drop when this coroutine suspends every poll needs to undo the subtraction
133
    // on the number of active tasks in the scheduler.  When this task is resumed by the event loop.
134
    m_size.fetch_add(1, std::memory_order::release);
135

136
    // Setup two events, a timeout event and the actual poll for op event.
137
    // Whichever triggers first will delete the other to guarantee only one wins.
138
    // The resume token will be set by the scheduler to what the event turned out to be.
139

140
    bool timeout_requested = (timeout > 0ms);
141

142
    auto pi = detail::poll_info{fd, op, cancel_trigger};
143

144
    if (timeout_requested)
145
    {
146
        pi.m_timer_pos = add_timer_token(clock::now() + timeout, pi);
147
    }
148

149
    if (!m_io_notifier.watch(pi))
150
    {
151
        std::cerr << "Failed to add " << fd << " to watch list\n";
152
    }
153

154
    // The event loop will 'clean-up' whichever event didn't win since the coroutine is scheduled
155
    // onto the thread poll its possible the other type of event could trigger while its waiting
156
    // to execute again, thus restarting the coroutine twice, that would be quite bad.
157
    auto result = co_await pi;
158
    m_size.fetch_sub(1, std::memory_order::release);
159
    co_return result;
160
}
1,199,211✔
161

162
auto io_scheduler::resume(std::coroutine_handle<> handle) -> bool
200,379✔
163
{
164
    if (handle == nullptr || handle.done())
200,379✔
165
    {
166
        return false;
×
167
    }
168

169
    if (m_shutdown_requested.load(std::memory_order::acquire))
200,379✔
170
    {
171
        return false;
×
172
    }
173

174
    if (m_opts.execution_strategy == execution_strategy_t::process_tasks_inline)
200,379✔
175
    {
176
        m_size.fetch_add(1, std::memory_order::release);
102✔
177
        {
178
            std::scoped_lock lk{m_scheduled_tasks_mutex};
102✔
179
            m_scheduled_tasks.emplace_back(handle);
102✔
180
        }
102✔
181

182
        bool expected{false};
102✔
183
        if (m_schedule_pipe_triggered.compare_exchange_strong(
102✔
184
                expected, true, std::memory_order::release, std::memory_order::relaxed))
185
        {
186
            const int value = 1;
102✔
187
            ssize_t written = ::write(m_schedule_pipe.write_fd(), reinterpret_cast<const void*>(&value), sizeof(value));
102✔
188
            if (written != sizeof(value))
102✔
189
            {
NEW
190
                std::cerr << "libcoro::io_scheduler::resume() failed to write to schedule pipe, bytes written=" << written << "\n";
×
191
            }
192
        }
193

194
        return true;
102✔
195
    }
196
    else
197
    {
198
        return m_thread_pool->resume(handle);
200,277✔
199
    }
200
}
201

202
auto io_scheduler::shutdown() noexcept -> void
133✔
203
{
204
    // Only allow shutdown to occur once.
205
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
133✔
206
    {
207
        // Signal the event loop to stop asap.
208
        const int value{1};
89✔
209
        ssize_t written = ::write(m_shutdown_pipe.write_fd(), reinterpret_cast<const void*>(&value), sizeof(value));
89✔
210
        if (written != sizeof(value))
89✔
211
        {
NEW
212
            std::cerr << "libcoro::io_scheduler::shutdown() failed to write to shutdown pipe, bytes written=" << written << "\n";
×
213
        }
214

215
        if (m_io_thread.joinable())
89✔
216
        {
217
            m_io_thread.join();
87✔
218
        }
219

220
        if (m_thread_pool != nullptr)
89✔
221
        {
222
            m_thread_pool->shutdown();
46✔
223
        }
224
    }
225
}
133✔
226

227
auto io_scheduler::yield_for_internal(std::chrono::nanoseconds amount) -> coro::task<void>
5,000,170✔
228
{
229
    if (amount <= 0ms)
230
    {
231
        co_await schedule();
232
    }
233
    else
234
    {
235
        // Yield/timeout tasks are considered live in the scheduler and must be accounted for. Note
236
        // that if the user gives an invalid amount and schedule() is directly called it will account
237
        // for the scheduled task there.
238
        m_size.fetch_add(1, std::memory_order::release);
239

240
        // Yielding does not require setting the timer position on the poll info since
241
        // it doesn't have a corresponding 'event' that can trigger, it always waits for
242
        // the timeout to occur before resuming.
243

244
        detail::poll_info pi{};
245
        add_timer_token(clock::now() + amount, pi);
246
        co_await pi;
247

248
        m_size.fetch_sub(1, std::memory_order::release);
249
    }
250
    co_return;
251
}
10,000,340✔
252

253
auto io_scheduler::process_events_manual(std::chrono::milliseconds timeout) -> void
4✔
254
{
255
    bool expected{false};
4✔
256
    if (m_io_processing.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed))
4✔
257
    {
258
        process_events_execute(timeout);
4✔
259
        m_io_processing.exchange(false, std::memory_order::release);
4✔
260
    }
261
}
4✔
262

263
auto io_scheduler::process_events_dedicated_thread() -> void
87✔
264
{
265
    if (m_opts.on_io_thread_start_functor != nullptr)
87✔
266
    {
267
        m_opts.on_io_thread_start_functor();
×
268
    }
269

270
    m_io_processing.exchange(true, std::memory_order::release);
87✔
271
    // Execute tasks until stopped or there are no more tasks to complete.
272
    while (!m_shutdown_requested.load(std::memory_order::acquire) || size() > 0)
949,449✔
273
    {
274
        process_events_execute(m_default_timeout);
949,343✔
275
    }
276
    m_io_processing.exchange(false, std::memory_order::release);
89✔
277

278
    if (m_opts.on_io_thread_stop_functor != nullptr)
87✔
279
    {
280
        m_opts.on_io_thread_stop_functor();
×
281
    }
282
}
87✔
283

284
auto io_scheduler::process_events_execute(std::chrono::milliseconds timeout) -> void
949,346✔
285
{
286
    // Clear the recent events without decreasing the allocated capacity to reduce allocations
287
    m_recent_events.clear();
949,346✔
288
    m_io_notifier.next_events(m_recent_events, timeout);
949,324✔
289

290
    for (auto& [handle_ptr, poll_status] : m_recent_events)
2,354,026✔
291
    {
292
        if (handle_ptr == m_timer_ptr)
1,404,698✔
293
        {
294
            // Process all events that have timed out.
295
            process_timeout_execute();
803,505✔
296
        }
297
        else if (handle_ptr == m_schedule_ptr)
601,193✔
298
        {
299
            // Process scheduled coroutines.
300
            process_scheduled_execute_inline();
128✔
301
        }
302
        else if (handle_ptr == m_shutdown_ptr) [[unlikely]]
601,065✔
303
        {
304
            // Nothing to do, just needed to wake-up and smell the flowers
305
        }
306
        else
307
        {
308
            // Individual poll task wake-up.
309
            process_event_execute(static_cast<detail::poll_info*>(handle_ptr), poll_status);
600,978✔
310
        }
311
    }
312

313
    // Its important to not resume any handles until the full set is accounted for.  If a timeout
314
    // and an event for the same handle happen in the same epoll_wait() call then inline processing
315
    // will destruct the poll_info object before the second event is handled.  This is also possible
316
    // with thread pool processing, but probably has an extremely low chance of occuring due to
317
    // the thread switch required.  If m_max_events == 1 this would be unnecessary.
318

319
    if (!m_handles_to_resume.empty())
949,158✔
320
    {
321
        if (m_opts.execution_strategy == execution_strategy_t::process_tasks_inline)
949,247✔
322
        {
323
            for (auto& handle : m_handles_to_resume)
1,030✔
324
            {
325
                handle.resume();
757✔
326
            }
327
        }
328
        else
329
        {
330
            m_thread_pool->resume(m_handles_to_resume);
948,975✔
331
        }
332

333
        m_handles_to_resume.clear();
949,251✔
334
    }
335
}
949,351✔
336

337
auto io_scheduler::process_scheduled_execute_inline() -> void
128✔
338
{
339
    std::vector<std::coroutine_handle<>> tasks{};
128✔
340
    {
341
        // Acquire the entire list, and then reset it.
342
        std::scoped_lock lk{m_scheduled_tasks_mutex};
128✔
343
        tasks.swap(m_scheduled_tasks);
129✔
344

345
        // Clear the notification by reading until the pipe is cleared.
346
        while (true)
347
        {
348
            constexpr std::size_t       READ_COUNT{4};
128✔
349
            constexpr ssize_t           READ_COUNT_BYTES = READ_COUNT * sizeof(int);
128✔
350
            std::array<int, READ_COUNT> control{};
128✔
351
            const ssize_t               result =
352
                ::read(m_schedule_pipe.read_fd(), reinterpret_cast<void*>(control.data()), READ_COUNT_BYTES);
128✔
353
            if (result == READ_COUNT_BYTES)
129✔
354
            {
355
                continue;
×
356
            }
357

358
            // If we got nothing, or we got a partial read break the loop since the pipe is empty.
359
            if (result >= 0)
129✔
360
            {
361
                break;
129✔
362
            }
363

364
            // pipe is set to O_NONBLOCK so ignore empty blocking reads.
365
            if (errno == EAGAIN)
×
366
            {
367
                break;
×
368
            }
369

370
            // Not much we can do here, we're in a very bad state, lets report to stderr.
371
            std::cerr << "::read(m_schedule_pipe.read_fd()) error[" << errno << "] " << ::strerror(errno) << " fd=["
×
372
                      << m_schedule_pipe.read_fd() << "]" << std::endl;
×
373
            break;
×
374
        }
×
375

376
        // Clear the in memory flag to reduce eventfd_* calls on scheduling.
377
        m_schedule_pipe_triggered.exchange(false, std::memory_order::release);
129✔
378
    }
129✔
379

380
    // This set of handles can be safely resumed now since they do not have a corresponding timeout event.
381
    for (auto& task : tasks)
364✔
382
    {
383
        task.resume();
229✔
384
    }
385
    m_size.fetch_sub(tasks.size(), std::memory_order::release);
128✔
386
}
128✔
387

388
auto io_scheduler::process_event_execute(detail::poll_info* pi, poll_status status) -> void
600,946✔
389
{
390
    if (!pi->m_processed)
600,946✔
391
    {
392
        std::atomic_thread_fence(std::memory_order::acquire);
393
        // Its possible the event and the timeout occurred in the same epoll, make sure only one
394
        // is ever processed, the other is discarded.
395
        pi->m_processed = true;
601,000✔
396

397
        // Given a valid fd always remove it from epoll so the next poll can blindly EPOLL_CTL_ADD.
398
        if (pi->m_fd != -1)
601,000✔
399
        {
400
            m_io_notifier.unwatch(*pi);
601,483✔
401
        }
402

403
        // Since this event triggered, remove its corresponding timeout if it has one.
404
        if (pi->m_timer_pos.has_value())
600,831✔
405
        {
406
            remove_timer_token(pi->m_timer_pos.value());
100,602✔
407
        }
408

409
        pi->m_poll_status = status;
601,225✔
410

411
        while (pi->m_awaiting_coroutine == nullptr)
72,467,645✔
412
        {
413
            std::atomic_thread_fence(std::memory_order::acquire);
414
        }
415

416
        m_handles_to_resume.emplace_back(pi->m_awaiting_coroutine);
601,091✔
417
    }
418
}
600,700✔
419

420
auto io_scheduler::process_timeout_execute() -> void
803,505✔
421
{
422
    std::vector<detail::poll_info*> poll_infos{};
803,505✔
423
    auto                            now = clock::now();
803,505✔
424

425
    {
426
        std::scoped_lock lk{m_timed_events_mutex};
803,505✔
427
        while (!m_timed_events.empty())
5,803,687✔
428
        {
429
            auto first    = m_timed_events.begin();
5,803,638✔
430
            auto [tp, pi] = *first;
5,803,638✔
431

432
            if (tp <= now)
5,803,638✔
433
            {
434
                m_timed_events.erase(first);
5,000,183✔
435
                poll_infos.emplace_back(pi);
5,000,184✔
436
            }
437
            else
438
            {
439
                break;
803,455✔
440
            }
441
        }
442
    }
803,505✔
443

444
    for (auto pi : poll_infos)
5,803,689✔
445
    {
446
        if (!pi->m_processed)
5,000,184✔
447
        {
448
            // Its possible the event and the timeout occurred in the same epoll, make sure only one
449
            // is ever processed, the other is discarded.
450
            pi->m_processed = true;
5,000,184✔
451

452
            // Since this timed out, remove its corresponding event if it has one.
453
            if (pi->m_fd != -1)
5,000,184✔
454
            {
455
                m_io_notifier.unwatch(*pi);
13✔
456
            }
457

458
            while (pi->m_awaiting_coroutine == nullptr)
5,000,184✔
459
            {
460
                std::atomic_thread_fence(std::memory_order::acquire);
461
            }
462

463
            m_handles_to_resume.emplace_back(pi->m_awaiting_coroutine);
5,000,183✔
464
            pi->m_poll_status = coro::poll_status::timeout;
5,000,184✔
465
        }
466
    }
467

468
    // Update the time to the next smallest time point, re-take the current now time
469
    // since updating and resuming tasks could shift the time.
470
    update_timeout(clock::now());
803,505✔
471
}
803,505✔
472

473
auto io_scheduler::add_timer_token(time_point tp, detail::poll_info& pi) -> timed_events::iterator
5,100,530✔
474
{
475
    std::scoped_lock lk{m_timed_events_mutex};
5,100,530✔
476
    auto             pos = m_timed_events.emplace(tp, &pi);
5,100,787✔
477

478
    // If this item was inserted as the smallest time point, update the timeout.
479
    if (pos == m_timed_events.begin())
5,100,788✔
480
    {
481
        update_timeout(clock::now());
751✔
482
    }
483

484
    return pos;
5,100,787✔
485
}
5,100,786✔
486

487
auto io_scheduler::remove_timer_token(timed_events::iterator pos) -> void
100,603✔
488
{
489
    {
490
        std::scoped_lock lk{m_timed_events_mutex};
100,603✔
491
        auto             is_first = (m_timed_events.begin() == pos);
100,605✔
492

493
        m_timed_events.erase(pos);
100,601✔
494

495
        // If this was the first item, update the timeout.  It would be acceptable to just let it
496
        // also fire the timeout as the event loop will ignore it since nothing will have timed
497
        // out but it feels like the right thing to do to update it to the correct timeout value.
498
        if (is_first)
100,603✔
499
        {
500
            update_timeout(clock::now());
39,148✔
501
        }
502
    }
100,605✔
503
}
100,600✔
504

505
auto io_scheduler::update_timeout(time_point now) -> void
843,406✔
506
{
507
    if (!m_timed_events.empty())
843,406✔
508
    {
509
        auto& [tp, pi] = *m_timed_events.begin();
842,805✔
510

511
        auto amount = tp - now;
842,804✔
512

513
        if (!m_io_notifier.watch_timer(m_timer, amount))
842,801✔
514
        {
515
            std::cerr << "Failed to set timerfd errorno=[" << std::string{strerror(errno)} << "].";
×
516
        }
517
    }
518
    else
519
    {
520
        m_io_notifier.unwatch_timer(m_timer);
602✔
521
    }
522
}
843,408✔
523

524
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc