• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / b3fb243086f752d5358cb072f986ec410e86a3c8

01 Dec 2023 05:49PM UTC coverage: 83.609% (+0.3%) from 83.333%
b3fb243086f752d5358cb072f986ec410e86a3c8

push

github

web-flow
Changes to be able to build to webassembly with emscripten. (#201)

* Changes to enable libcoro to be built with emscripten
* Removed the TL::expected dependency from submodules
* Tweaked a size test in test_task.cpp that failed in wasm as it
em++ seems to be adding padding.

556 of 665 relevant lines covered (83.61%)

6116968.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.44
/src/io_scheduler.cpp
1
#include "coro/io_scheduler.hpp"
2

3
#include <atomic>
4
#include <cstring>
5
#include <optional>
6
#include <sys/epoll.h>
7
#include <sys/eventfd.h>
8
#include <sys/socket.h>
9
#include <sys/timerfd.h>
10
#include <sys/types.h>
11
#include <unistd.h>
12

13
using namespace std::chrono_literals;
14

15
namespace coro
16
{
17
io_scheduler::io_scheduler(options opts)
54✔
18
    : m_opts(std::move(opts)),
54✔
19
      m_epoll_fd(epoll_create1(EPOLL_CLOEXEC)),
54✔
20
      m_shutdown_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)),
54✔
21
      m_timer_fd(timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC)),
54✔
22
      m_schedule_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)),
54✔
23
      m_owned_tasks(new coro::task_container<coro::io_scheduler>(*this))
108✔
24
{
25
    if (opts.execution_strategy == execution_strategy_t::process_tasks_on_thread_pool)
54✔
26
    {
27
        m_thread_pool = std::make_unique<thread_pool>(std::move(m_opts.pool));
33✔
28
    }
29

30
    epoll_event e{};
54✔
31
    e.events = EPOLLIN;
54✔
32

33
    e.data.ptr = const_cast<void*>(m_shutdown_ptr);
54✔
34
    epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_shutdown_fd, &e);
54✔
35

36
    e.data.ptr = const_cast<void*>(m_timer_ptr);
54✔
37
    epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_timer_fd, &e);
54✔
38

39
    e.data.ptr = const_cast<void*>(m_schedule_ptr);
54✔
40
    epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_schedule_fd, &e);
54✔
41

42
    if (m_opts.thread_strategy == thread_strategy_t::spawn)
54✔
43
    {
44
        m_io_thread = std::thread([this]() { process_events_dedicated_thread(); });
104✔
45
    }
46
    // else manual mode, the user must call process_events.
47
}
54✔
48

49
io_scheduler::~io_scheduler()
42✔
50
{
51
    shutdown();
42✔
52

53
    if (m_io_thread.joinable())
42✔
54
    {
55
        m_io_thread.join();
×
56
    }
57

58
    if (m_epoll_fd != -1)
42✔
59
    {
60
        close(m_epoll_fd);
42✔
61
        m_epoll_fd = -1;
42✔
62
    }
63
    if (m_timer_fd != -1)
42✔
64
    {
65
        close(m_timer_fd);
42✔
66
        m_timer_fd = -1;
42✔
67
    }
68
    if (m_schedule_fd != -1)
42✔
69
    {
70
        close(m_schedule_fd);
42✔
71
        m_schedule_fd = -1;
42✔
72
    }
73

74
    if (m_owned_tasks != nullptr)
42✔
75
    {
76
        delete static_cast<coro::task_container<coro::io_scheduler>*>(m_owned_tasks);
42✔
77
        m_owned_tasks = nullptr;
42✔
78
    }
79
}
42✔
80

81
auto io_scheduler::process_events(std::chrono::milliseconds timeout) -> std::size_t
4✔
82
{
83
    process_events_manual(timeout);
4✔
84
    return size();
4✔
85
}
86

87
auto io_scheduler::schedule_after(std::chrono::milliseconds amount) -> coro::task<void>
3✔
88
{
89
    return yield_for(amount);
3✔
90
}
91

92
auto io_scheduler::schedule_at(time_point time) -> coro::task<void>
3✔
93
{
94
    return yield_until(time);
3✔
95
}
96

97
auto io_scheduler::yield_for(std::chrono::milliseconds amount) -> coro::task<void>
5,000,102✔
98
{
99
    if (amount <= 0ms)
100
    {
101
        co_await schedule();
102
    }
103
    else
104
    {
105
        // Yield/timeout tasks are considered live in the scheduler and must be accounted for. Note
106
        // that if the user gives an invalid amount and schedule() is directly called it will account
107
        // for the scheduled task there.
108
        m_size.fetch_add(1, std::memory_order::release);
109

110
        // Yielding does not requiring setting the timer position on the poll info since
111
        // it doesn't have a corresponding 'event' that can trigger, it always waits for
112
        // the timeout to occur before resuming.
113

114
        detail::poll_info pi{};
115
        add_timer_token(clock::now() + amount, pi);
116
        co_await pi;
117

118
        m_size.fetch_sub(1, std::memory_order::release);
119
    }
120
    co_return;
121
}
10,000,204✔
122

123
auto io_scheduler::yield_until(time_point time) -> coro::task<void>
4✔
124
{
125
    auto now = clock::now();
126

127
    // If the requested time is in the past (or now!) bail out!
128
    if (time <= now)
129
    {
130
        co_await schedule();
131
    }
132
    else
133
    {
134
        m_size.fetch_add(1, std::memory_order::release);
135

136
        auto amount = std::chrono::duration_cast<std::chrono::milliseconds>(time - now);
137

138
        detail::poll_info pi{};
139
        add_timer_token(now + amount, pi);
140
        co_await pi;
141

142
        m_size.fetch_sub(1, std::memory_order::release);
143
    }
144
    co_return;
145
}
8✔
146

147
auto io_scheduler::poll(fd_t fd, coro::poll_op op, std::chrono::milliseconds timeout) -> coro::task<poll_status>
383,977✔
148
{
149
    // Because the size will drop when this coroutine suspends every poll needs to undo the subtraction
150
    // on the number of active tasks in the scheduler.  When this task is resumed by the event loop.
151
    m_size.fetch_add(1, std::memory_order::release);
152

153
    // Setup two events, a timeout event and the actual poll for op event.
154
    // Whichever triggers first will delete the other to guarantee only one wins.
155
    // The resume token will be set by the scheduler to what the event turned out to be.
156

157
    bool timeout_requested = (timeout > 0ms);
158

159
    detail::poll_info pi{};
160
    pi.m_fd = fd;
161

162
    if (timeout_requested)
163
    {
164
        pi.m_timer_pos = add_timer_token(clock::now() + timeout, pi);
165
    }
166

167
    epoll_event e{};
168
    e.events   = static_cast<uint32_t>(op) | EPOLLONESHOT | EPOLLRDHUP;
169
    e.data.ptr = &pi;
170
    if (epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &e) == -1)
171
    {
172
        std::cerr << "epoll ctl error on fd " << fd << "\n";
173
    }
174

175
    // The event loop will 'clean-up' whichever event didn't win since the coroutine is scheduled
176
    // onto the thread poll its possible the other type of event could trigger while its waiting
177
    // to execute again, thus restarting the coroutine twice, that would be quite bad.
178
    auto result = co_await pi;
179
    m_size.fetch_sub(1, std::memory_order::release);
180
    co_return result;
181
}
742,877✔
182

183
auto io_scheduler::shutdown() noexcept -> void
83✔
184
{
185
    // Only allow shutdown to occur once.
186
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
83✔
187
    {
188
        if (m_thread_pool != nullptr)
52✔
189
        {
190
            m_thread_pool->shutdown();
31✔
191
        }
192

193
        // Signal the event loop to stop asap, triggering the event fd is safe.
194
        uint64_t value{1};
52✔
195
        auto     written = ::write(m_shutdown_fd, &value, sizeof(value));
52✔
196
        (void)written;
197

198
        if (m_io_thread.joinable())
52✔
199
        {
200
            m_io_thread.join();
50✔
201
        }
202
    }
203
}
83✔
204

205
auto io_scheduler::process_events_manual(std::chrono::milliseconds timeout) -> void
4✔
206
{
207
    bool expected{false};
4✔
208
    if (m_io_processing.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed))
4✔
209
    {
210
        process_events_execute(timeout);
4✔
211
        m_io_processing.exchange(false, std::memory_order::release);
4✔
212
    }
213
}
4✔
214

215
auto io_scheduler::process_events_dedicated_thread() -> void
52✔
216
{
217
    if (m_opts.on_io_thread_start_functor != nullptr)
52✔
218
    {
219
        m_opts.on_io_thread_start_functor();
×
220
    }
221

222
    m_io_processing.exchange(true, std::memory_order::release);
52✔
223
    // Execute tasks until stopped or there are no more tasks to complete.
224
    while (!m_shutdown_requested.load(std::memory_order::acquire) || size() > 0)
898,204✔
225
    {
226
        process_events_execute(m_default_timeout);
898,075✔
227
    }
228
    m_io_processing.exchange(false, std::memory_order::release);
83✔
229

230
    if (m_opts.on_io_thread_stop_functor != nullptr)
50✔
231
    {
232
        m_opts.on_io_thread_stop_functor();
×
233
    }
234
}
50✔
235

236
auto io_scheduler::process_events_execute(std::chrono::milliseconds timeout) -> void
897,924✔
237
{
238
    auto event_count = epoll_wait(m_epoll_fd, m_events.data(), m_max_events, timeout.count());
897,924✔
239
    if (event_count > 0)
900,099✔
240
    {
241
        for (std::size_t i = 0; i < static_cast<std::size_t>(event_count); ++i)
2,086,128✔
242
        {
243
            epoll_event& event      = m_events[i];
1,188,085✔
244
            void*        handle_ptr = event.data.ptr;
1,187,747✔
245

246
            if (handle_ptr == m_timer_ptr)
1,187,747✔
247
            {
248
                // Process all events that have timed out.
249
                process_timeout_execute();
788,053✔
250
            }
251
            else if (handle_ptr == m_schedule_ptr)
399,694✔
252
            {
253
                // Process scheduled coroutines.
254
                process_scheduled_execute_inline();
120✔
255
            }
256
            else if (handle_ptr == m_shutdown_ptr) [[unlikely]]
399,574✔
257
            {
258
                // Nothing to do , just needed to wake-up and smell the flowers
259
            }
260
            else
261
            {
262
                // Individual poll task wake-up.
263
                process_event_execute(static_cast<detail::poll_info*>(handle_ptr), event_to_poll_status(event.events));
399,529✔
264
            }
265
        }
266
    }
267

268
    // Its important to not resume any handles until the full set is accounted for.  If a timeout
269
    // and an event for the same handle happen in the same epoll_wait() call then inline processing
270
    // will destruct the poll_info object before the second event is handled.  This is also possible
271
    // with thread pool processing, but probably has an extremely low chance of occuring due to
272
    // the thread switch required.  If m_max_events == 1 this would be unnecessary.
273

274
    if (!m_handles_to_resume.empty())
898,464✔
275
    {
276
        if (m_opts.execution_strategy == execution_strategy_t::process_tasks_inline)
899,518✔
277
        {
278
            for (auto& handle : m_handles_to_resume)
273,048✔
279
            {
280
                handle.resume();
196,023✔
281
            }
282
        }
283
        else
284
        {
285
            m_thread_pool->resume(m_handles_to_resume);
822,750✔
286
        }
287

288
        m_handles_to_resume.clear();
898,034✔
289
    }
290
}
897,925✔
291

292
auto io_scheduler::event_to_poll_status(uint32_t events) -> poll_status
399,312✔
293
{
294
    if (events & EPOLLIN || events & EPOLLOUT)
399,312✔
295
    {
296
        return poll_status::event;
399,312✔
297
    }
298
    else if (events & EPOLLERR)
×
299
    {
300
        return poll_status::error;
×
301
    }
302
    else if (events & EPOLLRDHUP || events & EPOLLHUP)
×
303
    {
304
        return poll_status::closed;
×
305
    }
306

307
    throw std::runtime_error{"invalid epoll state"};
×
308
}
309

310
auto io_scheduler::process_scheduled_execute_inline() -> void
120✔
311
{
312
    std::vector<std::coroutine_handle<>> tasks{};
241✔
313
    {
314
        // Acquire the entire list, and then reset it.
315
        std::scoped_lock lk{m_scheduled_tasks_mutex};
241✔
316
        tasks.swap(m_scheduled_tasks);
120✔
317

318
        // Clear the schedule eventfd if this is a scheduled task.
319
        eventfd_t value{0};
120✔
320
        eventfd_read(m_schedule_fd, &value);
120✔
321

322
        // Clear the in memory flag to reduce eventfd_* calls on scheduling.
323
        m_schedule_fd_triggered.exchange(false, std::memory_order::release);
121✔
324
    }
325

326
    // This set of handles can be safely resumed now since they do not have a corresponding timeout event.
327
    for (auto& task : tasks)
332✔
328
    {
329
        task.resume();
209✔
330
    }
331
    m_size.fetch_sub(tasks.size(), std::memory_order::release);
121✔
332
}
120✔
333

334
auto io_scheduler::process_event_execute(detail::poll_info* pi, poll_status status) -> void
399,166✔
335
{
336
    if (!pi->m_processed)
399,166✔
337
    {
338
        std::atomic_thread_fence(std::memory_order::acquire);
339
        // Its possible the event and the timeout occurred in the same epoll, make sure only one
340
        // is ever processed, the other is discarded.
341
        pi->m_processed = true;
399,441✔
342

343
        // Given a valid fd always remove it from epoll so the next poll can blindly EPOLL_CTL_ADD.
344
        if (pi->m_fd != -1)
399,441✔
345
        {
346
            epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, pi->m_fd, nullptr);
400,329✔
347
        }
348

349
        // Since this event triggered, remove its corresponding timeout if it has one.
350
        if (pi->m_timer_pos.has_value())
399,352✔
351
        {
352
            remove_timer_token(pi->m_timer_pos.value());
202✔
353
        }
354

355
        pi->m_poll_status = status;
399,000✔
356

357
        while (pi->m_awaiting_coroutine == nullptr)
5,162,150✔
358
        {
359
            std::atomic_thread_fence(std::memory_order::acquire);
360
        }
361

362
        m_handles_to_resume.emplace_back(pi->m_awaiting_coroutine);
398,732✔
363
    }
364
}
398,191✔
365

366
auto io_scheduler::process_timeout_execute() -> void
788,356✔
367
{
368
    std::vector<detail::poll_info*> poll_infos{};
1,576,712✔
369
    auto                            now = clock::now();
788,356✔
370

371
    {
372
        std::scoped_lock lk{m_timed_events_mutex};
1,576,712✔
373
        while (!m_timed_events.empty())
5,788,856✔
374
        {
375
            auto first    = m_timed_events.begin();
5,788,439✔
376
            auto [tp, pi] = *first;
5,788,439✔
377

378
            if (tp <= now)
5,788,439✔
379
            {
380
                m_timed_events.erase(first);
5,000,500✔
381
                poll_infos.emplace_back(pi);
5,000,500✔
382
            }
383
            else
384
            {
385
                break;
787,939✔
386
            }
387
        }
388
    }
389

390
    for (auto pi : poll_infos)
5,788,856✔
391
    {
392
        if (!pi->m_processed)
5,000,500✔
393
        {
394
            // Its possible the event and the timeout occurred in the same epoll, make sure only one
395
            // is ever processed, the other is discarded.
396
            pi->m_processed = true;
5,000,500✔
397

398
            // Since this timed out, remove its corresponding event if it has one.
399
            if (pi->m_fd != -1)
5,000,500✔
400
            {
401
                epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, pi->m_fd, nullptr);
397✔
402
            }
403

404
            while (pi->m_awaiting_coroutine == nullptr)
5,000,500✔
405
            {
406
                std::atomic_thread_fence(std::memory_order::acquire);
407
                // std::cerr << "process_event_execute() has a nullptr event\n";
408
            }
409

410
            m_handles_to_resume.emplace_back(pi->m_awaiting_coroutine);
5,000,500✔
411
            pi->m_poll_status = coro::poll_status::timeout;
5,000,500✔
412
        }
413
    }
414

415
    // Update the time to the next smallest time point, re-take the current now time
416
    // since updating and resuming tasks could shift the time.
417
    update_timeout(clock::now());
788,356✔
418
}
788,356✔
419

420
auto io_scheduler::add_timer_token(time_point tp, detail::poll_info& pi) -> timed_events::iterator
5,000,702✔
421
{
422
    std::scoped_lock lk{m_timed_events_mutex};
5,000,702✔
423
    auto             pos = m_timed_events.emplace(tp, &pi);
5,000,702✔
424

425
    // If this item was inserted as the smallest time point, update the timeout.
426
    if (pos == m_timed_events.begin())
5,000,702✔
427
    {
428
        update_timeout(clock::now());
707✔
429
    }
430

431
    return pos;
10,001,404✔
432
}
433

434
auto io_scheduler::remove_timer_token(timed_events::iterator pos) -> void
202✔
435
{
436
    {
437
        std::scoped_lock lk{m_timed_events_mutex};
404✔
438
        auto             is_first = (m_timed_events.begin() == pos);
202✔
439

440
        m_timed_events.erase(pos);
202✔
441

442
        // If this was the first item, update the timeout.  It would be acceptable to just let it
443
        // also fire the timeout as the event loop will ignore it since nothing will have timed
444
        // out but it feels like the right thing to do to update it to the correct timeout value.
445
        if (is_first)
202✔
446
        {
447
            update_timeout(clock::now());
202✔
448
        }
449
    }
450
}
202✔
451

452
auto io_scheduler::update_timeout(time_point now) -> void
789,265✔
453
{
454
    if (!m_timed_events.empty())
789,265✔
455
    {
456
        auto& [tp, pi] = *m_timed_events.begin();
788,649✔
457

458
        auto amount = tp - now;
788,649✔
459

460
        auto seconds = std::chrono::duration_cast<std::chrono::seconds>(amount);
788,647✔
461
        amount -= seconds;
788,648✔
462
        auto nanoseconds = std::chrono::duration_cast<std::chrono::nanoseconds>(amount);
788,648✔
463

464
        // As a safeguard if both values end up as zero (or negative) then trigger the timeout
465
        // immediately as zero disarms timerfd according to the man pages and negative values
466
        // will result in an error return value.
467
        if (seconds <= 0s)
788,648✔
468
        {
469
            seconds = 0s;
788,646✔
470
            if (nanoseconds <= 0ns)
788,647✔
471
            {
472
                // just trigger "immediately"!
473
                nanoseconds = 1ns;
551,787✔
474
            }
475
        }
476

477
        itimerspec ts{};
788,649✔
478
        ts.it_value.tv_sec  = seconds.count();
788,649✔
479
        ts.it_value.tv_nsec = nanoseconds.count();
788,649✔
480

481
        if (timerfd_settime(m_timer_fd, 0, &ts, nullptr) == -1)
788,649✔
482
        {
483
            std::cerr << "Failed to set timerfd errorno=[" << std::string{strerror(errno)} << "].";
×
484
        }
485
    }
486
    else
487
    {
488
        // Setting these values to zero disables the timer.
489
        itimerspec ts{};
617✔
490
        ts.it_value.tv_sec  = 0;
617✔
491
        ts.it_value.tv_nsec = 0;
617✔
492
        if (timerfd_settime(m_timer_fd, 0, &ts, nullptr) == -1)
617✔
493
        {
494
            std::cerr << "Failed to set timerfd errorno=[" << std::string{strerror(errno)} << "].";
×
495
        }
496
    }
497
}
789,265✔
498

499
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc