• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 19783664285

29 Nov 2025 12:09PM UTC coverage: 86.934%. First build
19783664285

Pull #422

github

web-flow
Merge 7d51c87ed into f671edb4c
Pull Request #422: Ref/394 kqueue socket shutdown

79 of 81 new or added lines in 10 files covered. (97.53%)

1710 of 1967 relevant lines covered (86.93%)

5076104.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.16
/src/io_scheduler.cpp
1
#include "coro/io_scheduler.hpp"
2
#include "coro/detail/task_self_deleting.hpp"
3

4
#include <atomic>
5
#include <cstring>
6
#include <iostream>
7
#include <optional>
8
#include <sys/socket.h>
9
#include <sys/types.h>
10
#include <unistd.h>
11

12
using namespace std::chrono_literals;
13

14
namespace coro
15
{
16

17
io_scheduler::io_scheduler(options&& opts, private_constructor)
89✔
18
    : m_opts(opts),
89✔
19
      m_io_notifier(),
89✔
20
      m_timer(static_cast<const void*>(&m_timer_object), m_io_notifier)
178✔
21
{
22
    if (!m_io_notifier.watch(m_shutdown_pipe.read_fd(), coro::poll_op::read, const_cast<void*>(m_shutdown_ptr), true))
89✔
23
    {
24
        throw std::runtime_error("Failed to register m_shutdown_pipe.read_fd() for read events.");
×
25
    }
26

27
    if (!m_io_notifier.watch(m_schedule_pipe.read_fd(), coro::poll_op::read, const_cast<void*>(m_schedule_ptr), true))
89✔
28
    {
29
        throw std::runtime_error("Failed to register m_schedule.pipe.read_rd() for read events.");
×
30
    }
31

32
    m_recent_events.reserve(m_max_events);
89✔
33

34
    if (m_opts.execution_strategy == execution_strategy_t::process_tasks_on_thread_pool)
89✔
35
    {
36
        m_thread_pool = thread_pool::make_unique(std::move(m_opts.pool));
46✔
37
    }
38
}
89✔
39

40
auto io_scheduler::make_unique(options opts) -> std::unique_ptr<io_scheduler>
89✔
41
{
42
    auto s = std::make_unique<io_scheduler>(std::move(opts), private_constructor{});
89✔
43

44
    // Spawn the dedicated event loop thread once the scheduler is fully constructed
45
    // so it has a full object to work with.
46
    if (s->m_opts.thread_strategy == thread_strategy_t::spawn)
89✔
47
    {
48
        s->m_io_thread = std::thread([s = s.get()]() { s->process_events_dedicated_thread(); });
174✔
49
    }
50
    // else manual mode, the user must call process_events.
51

52
    return s;
89✔
53
}
×
54

55
io_scheduler::~io_scheduler()
178✔
56
{
57
    shutdown();
89✔
58

59
    if (m_io_thread.joinable())
89✔
60
    {
61
        m_io_thread.join();
×
62
    }
63

64
    m_shutdown_pipe.close();
89✔
65
    m_schedule_pipe.close();
89✔
66
}
178✔
67

68
auto io_scheduler::process_events(std::chrono::milliseconds timeout) -> std::size_t
4✔
69
{
70
    process_events_manual(timeout);
4✔
71
    return size();
4✔
72
}
73

74
auto io_scheduler::spawn(coro::task<void>&& task) -> bool
200,320✔
75
{
76
    m_size.fetch_add(1, std::memory_order::release);
200,320✔
77
    auto wrapper_task = detail::make_task_self_deleting(std::move(task));
200,320✔
78
    wrapper_task.promise().user_final_suspend([this]() -> void
200,319✔
79
    {
80
        m_size.fetch_sub(1, std::memory_order::release);
200,319✔
81
    });
200,319✔
82
    return resume(wrapper_task.handle());
200,319✔
83
}
84

85
auto io_scheduler::schedule_at(time_point time) -> coro::task<void>
3✔
86
{
87
    return yield_until(time);
3✔
88
}
89

90
auto io_scheduler::yield_until(time_point time) -> coro::task<void>
4✔
91
{
92
    auto now = clock::now();
93

94
    // If the requested time is in the past (or now!) bail out!
95
    if (time <= now)
96
    {
97
        co_await schedule();
98
    }
99
    else
100
    {
101
        m_size.fetch_add(1, std::memory_order::release);
102

103
        auto amount = std::chrono::duration_cast<std::chrono::milliseconds>(time - now);
104

105
        detail::poll_info pi{};
106
        add_timer_token(now + amount, pi);
107
        co_await pi;
108

109
        m_size.fetch_sub(1, std::memory_order::release);
110
    }
111
    co_return;
112
}
8✔
113

114
auto io_scheduler::poll(
600,137✔
115
    fd_t fd, coro::poll_op op, std::chrono::milliseconds timeout, std::optional<poll_stop_token> cancel_trigger)
116
    -> coro::task<poll_status>
117
{
118
    // Because the size will drop when this coroutine suspends every poll needs to undo the subtraction
119
    // on the number of active tasks in the scheduler.  When this task is resumed by the event loop.
120
    m_size.fetch_add(1, std::memory_order::release);
121

122
    // Setup two events, a timeout event and the actual poll for op event.
123
    // Whichever triggers first will delete the other to guarantee only one wins.
124
    // The resume token will be set by the scheduler to what the event turned out to be.
125

126
    bool timeout_requested = (timeout > 0ms);
127

128
    auto pi = detail::poll_info{fd, op, cancel_trigger};
129

130
    if (timeout_requested)
131
    {
132
        pi.m_timer_pos = add_timer_token(clock::now() + timeout, pi);
133
    }
134

135
    if (!m_io_notifier.watch(pi))
136
    {
137
        std::cerr << "Failed to add " << fd << " to watch list\n";
138
    }
139

140
    // The event loop will 'clean-up' whichever event didn't win since the coroutine is scheduled
141
    // onto the thread poll its possible the other type of event could trigger while its waiting
142
    // to execute again, thus restarting the coroutine twice, that would be quite bad.
143
    auto result = co_await pi;
144
    m_size.fetch_sub(1, std::memory_order::release);
145
    co_return result;
146
}
1,199,604✔
147

148
auto io_scheduler::resume(std::coroutine_handle<> handle) -> bool
200,380✔
149
{
150
    if (handle == nullptr || handle.done())
200,380✔
151
    {
152
        return false;
×
153
    }
154

155
    if (m_shutdown_requested.load(std::memory_order::acquire))
200,379✔
156
    {
157
        return false;
×
158
    }
159

160
    if (m_opts.execution_strategy == execution_strategy_t::process_tasks_inline)
200,380✔
161
    {
162
        m_size.fetch_add(1, std::memory_order::release);
102✔
163
        {
164
            std::scoped_lock lk{m_scheduled_tasks_mutex};
102✔
165
            m_scheduled_tasks.emplace_back(handle);
102✔
166
        }
102✔
167

168
        bool expected{false};
102✔
169
        if (m_schedule_pipe_triggered.compare_exchange_strong(
102✔
170
                expected, true, std::memory_order::release, std::memory_order::relaxed))
171
        {
172
            const int value = 1;
102✔
173
            ::write(m_schedule_pipe.write_fd(), reinterpret_cast<const void*>(&value), sizeof(value));
102✔
174
        }
175

176
        return true;
102✔
177
    }
178
    else
179
    {
180
        return m_thread_pool->resume(handle);
200,278✔
181
    }
182
}
183

184
auto io_scheduler::shutdown() noexcept -> void
133✔
185
{
186
    // Only allow shutdown to occur once.
187
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
133✔
188
    {
189
        // Signal the event loop to stop asap.
190
        const int value{1};
89✔
191
        ::write(m_shutdown_pipe.write_fd(), reinterpret_cast<const void*>(&value), sizeof(value));
89✔
192

193
        if (m_io_thread.joinable())
89✔
194
        {
195
            m_io_thread.join();
87✔
196
        }
197

198
        if (m_thread_pool != nullptr)
89✔
199
        {
200
            m_thread_pool->shutdown();
46✔
201
        }
202
    }
203
}
133✔
204

205
auto io_scheduler::yield_for_internal(std::chrono::nanoseconds amount) -> coro::task<void>
5,000,170✔
206
{
207
    if (amount <= 0ms)
208
    {
209
        co_await schedule();
210
    }
211
    else
212
    {
213
        // Yield/timeout tasks are considered live in the scheduler and must be accounted for. Note
214
        // that if the user gives an invalid amount and schedule() is directly called it will account
215
        // for the scheduled task there.
216
        m_size.fetch_add(1, std::memory_order::release);
217

218
        // Yielding does not require setting the timer position on the poll info since
219
        // it doesn't have a corresponding 'event' that can trigger, it always waits for
220
        // the timeout to occur before resuming.
221

222
        detail::poll_info pi{};
223
        add_timer_token(clock::now() + amount, pi);
224
        co_await pi;
225

226
        m_size.fetch_sub(1, std::memory_order::release);
227
    }
228
    co_return;
229
}
10,000,340✔
230

231
auto io_scheduler::process_events_manual(std::chrono::milliseconds timeout) -> void
4✔
232
{
233
    bool expected{false};
4✔
234
    if (m_io_processing.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed))
4✔
235
    {
236
        process_events_execute(timeout);
4✔
237
        m_io_processing.exchange(false, std::memory_order::release);
4✔
238
    }
239
}
4✔
240

241
auto io_scheduler::process_events_dedicated_thread() -> void
87✔
242
{
243
    if (m_opts.on_io_thread_start_functor != nullptr)
87✔
244
    {
245
        m_opts.on_io_thread_start_functor();
×
246
    }
247

248
    m_io_processing.exchange(true, std::memory_order::release);
87✔
249
    // Execute tasks until stopped or there are no more tasks to complete.
250
    while (!m_shutdown_requested.load(std::memory_order::acquire) || size() > 0)
961,946✔
251
    {
252
        process_events_execute(m_default_timeout);
961,842✔
253
    }
254
    m_io_processing.exchange(false, std::memory_order::release);
92✔
255

256
    if (m_opts.on_io_thread_stop_functor != nullptr)
87✔
257
    {
258
        m_opts.on_io_thread_stop_functor();
×
259
    }
260
}
87✔
261

262
auto io_scheduler::process_events_execute(std::chrono::milliseconds timeout) -> void
961,841✔
263
{
264
    // Clear the recent events without decreasing the allocated capacity to reduce allocations
265
    m_recent_events.clear();
961,841✔
266
    m_io_notifier.next_events(m_recent_events, timeout);
961,845✔
267

268
    for (auto& [handle_ptr, poll_status] : m_recent_events)
2,382,361✔
269
    {
270
        if (handle_ptr == m_timer_ptr)
1,420,340✔
271
        {
272
            // Process all events that have timed out.
273
            process_timeout_execute();
819,312✔
274
        }
275
        else if (handle_ptr == m_schedule_ptr)
601,028✔
276
        {
277
            // Process scheduled coroutines.
278
            process_scheduled_execute_inline();
130✔
279
        }
280
        else if (handle_ptr == m_shutdown_ptr) [[unlikely]]
600,898✔
281
        {
282
            // Nothing to do, just needed to wake-up and smell the flowers
283
        }
284
        else
285
        {
286
            // Individual poll task wake-up.
287
            process_event_execute(static_cast<detail::poll_info*>(handle_ptr), poll_status);
600,812✔
288
        }
289
    }
290

291
    // Its important to not resume any handles until the full set is accounted for.  If a timeout
292
    // and an event for the same handle happen in the same epoll_wait() call then inline processing
293
    // will destruct the poll_info object before the second event is handled.  This is also possible
294
    // with thread pool processing, but probably has an extremely low chance of occuring due to
295
    // the thread switch required.  If m_max_events == 1 this would be unnecessary.
296

297
    if (!m_handles_to_resume.empty())
961,676✔
298
    {
299
        if (m_opts.execution_strategy == execution_strategy_t::process_tasks_inline)
961,724✔
300
        {
301
            for (auto& handle : m_handles_to_resume)
1,062✔
302
            {
303
                handle.resume();
758✔
304
            }
305
        }
306
        else
307
        {
308
            m_thread_pool->resume(m_handles_to_resume);
961,421✔
309
        }
310

311
        m_handles_to_resume.clear();
961,752✔
312
    }
313
}
961,850✔
314

315
auto io_scheduler::process_scheduled_execute_inline() -> void
130✔
316
{
317
    std::vector<std::coroutine_handle<>> tasks{};
130✔
318
    {
319
        // Acquire the entire list, and then reset it.
320
        std::scoped_lock lk{m_scheduled_tasks_mutex};
130✔
321
        tasks.swap(m_scheduled_tasks);
130✔
322

323
        // Clear the notification by reading until the pipe is cleared.
324
        while (true)
325
        {
326
            constexpr std::size_t       READ_COUNT{4};
130✔
327
            constexpr ssize_t           READ_COUNT_BYTES = READ_COUNT * sizeof(int);
130✔
328
            std::array<int, READ_COUNT> control{};
130✔
329
            const ssize_t               result =
330
                ::read(m_schedule_pipe.read_fd(), reinterpret_cast<void*>(control.data()), READ_COUNT_BYTES);
130✔
331
            if (result == READ_COUNT_BYTES)
130✔
332
            {
333
                continue;
×
334
            }
335

336
            // If we got nothing, or we got a partial read break the loop since the pipe is empty.
337
            if (result >= 0)
130✔
338
            {
339
                break;
130✔
340
            }
341

342
            // pipe is set to O_NONBLOCK so ignore empty blocking reads.
343
            if (errno == EAGAIN)
×
344
            {
345
                break;
×
346
            }
347

348
            // Not much we can do here, we're in a very bad state, lets report to stderr.
NEW
349
            std::cerr << "::read(m_schedule_pipe.read_fd()) error[" << errno << "] " << ::strerror(errno) << " fd=["
×
NEW
350
                      << m_schedule_pipe.read_fd() << "]" << std::endl;
×
351
            break;
×
352
        }
×
353

354
        // Clear the in memory flag to reduce eventfd_* calls on scheduling.
355
        m_schedule_pipe_triggered.exchange(false, std::memory_order::release);
130✔
356
    }
130✔
357

358
    // This set of handles can be safely resumed now since they do not have a corresponding timeout event.
359
    for (auto& task : tasks)
366✔
360
    {
361
        task.resume();
237✔
362
    }
363
    m_size.fetch_sub(tasks.size(), std::memory_order::release);
129✔
364
}
130✔
365

366
auto io_scheduler::process_event_execute(detail::poll_info* pi, poll_status status) -> void
600,904✔
367
{
368
    if (!pi->m_processed)
600,904✔
369
    {
370
        std::atomic_thread_fence(std::memory_order::acquire);
371
        // Its possible the event and the timeout occurred in the same epoll, make sure only one
372
        // is ever processed, the other is discarded.
373
        pi->m_processed = true;
600,972✔
374

375
        // Given a valid fd always remove it from epoll so the next poll can blindly EPOLL_CTL_ADD.
376
        if (pi->m_fd != -1)
600,972✔
377
        {
378
            m_io_notifier.unwatch(*pi);
601,423✔
379
        }
380

381
        // Since this event triggered, remove its corresponding timeout if it has one.
382
        if (pi->m_timer_pos.has_value())
600,917✔
383
        {
384
            remove_timer_token(pi->m_timer_pos.value());
100,607✔
385
        }
386

387
        pi->m_poll_status = status;
601,234✔
388

389
        while (pi->m_awaiting_coroutine == nullptr)
69,396,378✔
390
        {
391
            std::atomic_thread_fence(std::memory_order::acquire);
392
        }
393

394
        m_handles_to_resume.emplace_back(pi->m_awaiting_coroutine);
601,076✔
395
    }
396
}
600,778✔
397

398
auto io_scheduler::process_timeout_execute() -> void
819,312✔
399
{
400
    std::vector<detail::poll_info*> poll_infos{};
819,312✔
401
    auto                            now = clock::now();
819,312✔
402

403
    {
404
        std::scoped_lock lk{m_timed_events_mutex};
819,312✔
405
        while (!m_timed_events.empty())
5,819,496✔
406
        {
407
            auto first    = m_timed_events.begin();
5,819,446✔
408
            auto [tp, pi] = *first;
5,819,446✔
409

410
            if (tp <= now)
5,819,446✔
411
            {
412
                m_timed_events.erase(first);
5,000,184✔
413
                poll_infos.emplace_back(pi);
5,000,184✔
414
            }
415
            else
416
            {
417
                break;
819,262✔
418
            }
419
        }
420
    }
819,312✔
421

422
    for (auto pi : poll_infos)
5,819,496✔
423
    {
424
        if (!pi->m_processed)
5,000,184✔
425
        {
426
            // Its possible the event and the timeout occurred in the same epoll, make sure only one
427
            // is ever processed, the other is discarded.
428
            pi->m_processed = true;
5,000,184✔
429

430
            // Since this timed out, remove its corresponding event if it has one.
431
            if (pi->m_fd != -1)
5,000,184✔
432
            {
433
                m_io_notifier.unwatch(*pi);
13✔
434
            }
435

436
            while (pi->m_awaiting_coroutine == nullptr)
5,000,184✔
437
            {
438
                std::atomic_thread_fence(std::memory_order::acquire);
439
            }
440

441
            m_handles_to_resume.emplace_back(pi->m_awaiting_coroutine);
5,000,184✔
442
            pi->m_poll_status = coro::poll_status::timeout;
5,000,184✔
443
        }
444
    }
445

446
    // Update the time to the next smallest time point, re-take the current now time
447
    // since updating and resuming tasks could shift the time.
448
    update_timeout(clock::now());
819,312✔
449
}
819,312✔
450

451
auto io_scheduler::add_timer_token(time_point tp, detail::poll_info& pi) -> timed_events::iterator
5,100,398✔
452
{
453
    std::scoped_lock lk{m_timed_events_mutex};
5,100,398✔
454
    auto             pos = m_timed_events.emplace(tp, &pi);
5,100,790✔
455

456
    // If this item was inserted as the smallest time point, update the timeout.
457
    if (pos == m_timed_events.begin())
5,100,789✔
458
    {
459
        update_timeout(clock::now());
745✔
460
    }
461

462
    return pos;
5,100,783✔
463
}
5,100,790✔
464

465
auto io_scheduler::remove_timer_token(timed_events::iterator pos) -> void
100,606✔
466
{
467
    {
468
        std::scoped_lock lk{m_timed_events_mutex};
100,606✔
469
        auto             is_first = (m_timed_events.begin() == pos);
100,605✔
470

471
        m_timed_events.erase(pos);
100,605✔
472

473
        // If this was the first item, update the timeout.  It would be acceptable to just let it
474
        // also fire the timeout as the event loop will ignore it since nothing will have timed
475
        // out but it feels like the right thing to do to update it to the correct timeout value.
476
        if (is_first)
100,607✔
477
        {
478
            update_timeout(clock::now());
39,395✔
479
        }
480
    }
100,607✔
481
}
100,606✔
482

483
auto io_scheduler::update_timeout(time_point now) -> void
859,452✔
484
{
485
    if (!m_timed_events.empty())
859,452✔
486
    {
487
        auto& [tp, pi] = *m_timed_events.begin();
858,864✔
488

489
        auto amount = tp - now;
858,863✔
490

491
        if (!m_io_notifier.watch_timer(m_timer, amount))
858,864✔
492
        {
493
            std::cerr << "Failed to set timerfd errorno=[" << std::string{strerror(errno)} << "].";
×
494
        }
495
    }
496
    else
497
    {
498
        m_io_notifier.unwatch_timer(m_timer);
589✔
499
    }
500
}
859,453✔
501

502
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc