• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 22550233967

01 Mar 2026 06:56PM UTC coverage: 86.144%. First build
22550233967

Pull #444

github

web-flow
Merge 0efc86c28 into 0161911f2
Pull Request #444: scheduler remove lock for scheduled|resumed tasks

34 of 47 new or added lines in 4 files covered. (72.34%)

1890 of 2194 relevant lines covered (86.14%)

4780638.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.3
/src/scheduler.cpp
1
#include "coro/scheduler.hpp"
2
#include "coro/detail/task_self_deleting.hpp"
3

4
#include <atomic>
5
#include <cstring>
6
#include <iostream>
7
#include <optional>
8
#include <sys/socket.h>
9
#include <sys/types.h>
10
#include <unistd.h>
11

12
using namespace std::chrono_literals;
13

14
namespace coro
15
{
16

17
namespace detail
18
{
19
static auto
20
    make_spawned_joinable_wait_task(std::unique_ptr<coro::task_group<coro::scheduler>> group_ptr) -> coro::task<void>
×
21
{
22
    co_await *group_ptr;
23
    co_return;
24
}
×
25

26
} // namespace detail
27

28
scheduler::scheduler(options&& opts, private_constructor)
98✔
29
    : m_opts(opts),
98✔
30
      m_io_notifier(),
98✔
31
      m_timer(static_cast<const void*>(&m_timer_object), m_io_notifier)
196✔
32
{
33
    if (!m_io_notifier.watch(m_shutdown_pipe.read_fd(), coro::poll_op::read, const_cast<void*>(m_shutdown_ptr), true))
98✔
34
    {
35
        throw std::runtime_error("Failed to register m_shutdown_pipe.read_fd() for read events.");
×
36
    }
37

38
    if (!m_io_notifier.watch(m_schedule_pipe.read_fd(), coro::poll_op::read, const_cast<void*>(m_schedule_ptr), true))
98✔
39
    {
40
        throw std::runtime_error("Failed to register m_schedule.pipe.read_rd() for read events.");
×
41
    }
42

43
    m_recent_events.reserve(m_max_events);
98✔
44

45
    if (m_opts.execution_strategy == execution_strategy_t::process_tasks_on_thread_pool)
98✔
46
    {
47
        m_thread_pool = thread_pool::make_unique(std::move(m_opts.pool));
55✔
48
    }
49
}
98✔
50

51
auto scheduler::make_unique(options opts) -> std::unique_ptr<scheduler>
98✔
52
{
53
    auto s = std::make_unique<scheduler>(std::move(opts), private_constructor{});
98✔
54

55
    // Spawn the dedicated event loop thread once the scheduler is fully constructed
56
    // so it has a full object to work with.
57
    if (s->m_opts.thread_strategy == thread_strategy_t::spawn)
98✔
58
    {
59
        s->m_io_thread = std::thread([s = s.get()]() { s->process_events_dedicated_thread(); });
192✔
60
    }
61
    // else manual mode, the user must call process_events.
62

63
    return s;
98✔
64
}
×
65

66
scheduler::~scheduler()
196✔
67
{
68
    shutdown();
98✔
69

70
    if (m_io_thread.joinable())
98✔
71
    {
72
        m_io_thread.join();
×
73
    }
74

75
    m_shutdown_pipe.close();
98✔
76
    m_schedule_pipe.close();
98✔
77
}
196✔
78

79
auto scheduler::process_events(std::chrono::milliseconds timeout) -> std::size_t
4✔
80
{
81
    process_events_manual(timeout);
4✔
82
    return size();
4✔
83
}
84

85
auto scheduler::spawn_detached(coro::task<void>&& task) -> bool
200,318✔
86
{
87
    m_size.fetch_add(1, std::memory_order::release);
200,318✔
88
    auto wrapper_task = detail::make_task_self_deleting(std::move(task));
200,318✔
89
    wrapper_task.promise().user_final_suspend([this]() -> void { m_size.fetch_sub(1, std::memory_order::release); });
400,635✔
90
    return resume(wrapper_task.handle());
200,318✔
91
}
92

93
auto scheduler::spawn_joinable(coro::task<void>&& task) -> coro::task<void>
×
94
{
95
    auto group_ptr = std::make_unique<coro::task_group<coro::scheduler>>(this, std::move(task));
×
96
    return detail::make_spawned_joinable_wait_task(std::move(group_ptr));
×
97
}
×
98

99
auto scheduler::schedule_at(time_point time) -> coro::task<void>
3✔
100
{
101
    return yield_until(time);
3✔
102
}
103

104
auto scheduler::yield_until(time_point time) -> coro::task<void>
4✔
105
{
106
    auto now = clock::now();
107

108
    // If the requested time is in the past (or now!) bail out!
109
    if (time <= now)
110
    {
111
        co_await schedule();
112
    }
113
    else
114
    {
115
        m_size.fetch_add(1, std::memory_order::release);
116

117
        auto amount = std::chrono::duration_cast<std::chrono::milliseconds>(time - now);
118

119
        detail::poll_info pi{};
120
        add_timer_token(now + amount, pi);
121
        co_await pi;
122
    }
123
    co_return;
124
}
8✔
125

126
auto scheduler::poll(
2,495,575✔
127
    fd_t                           fd,
128
    coro::poll_op                  op,
129
    std::chrono::milliseconds      timeout,
130
    std::optional<poll_stop_token> cancel_trigger) -> coro::task<poll_status>
131
{
132
    // Because the size will drop when this coroutine suspends every poll needs to undo the subtraction
133
    // on the number of active tasks in the scheduler.  When this task is resumed by the event loop.
134
    m_size.fetch_add(1, std::memory_order::release);
135

136
    // Setup two events, a timeout event and the actual poll for op event.
137
    // Whichever triggers first will delete the other to guarantee only one wins.
138
    // The resume token will be set by the scheduler to what the event turned out to be.
139

140
    bool timeout_requested = (timeout > 0ms);
141

142
    auto pi = detail::poll_info{fd, op, cancel_trigger};
143

144
    if (timeout_requested)
145
    {
146
        pi.m_timer_pos = add_timer_token(clock::now() + timeout, pi);
147
    }
148

149
    if (!m_io_notifier.watch(pi))
150
    {
151
        std::cerr << "Failed to add " << fd << " to watch list\n";
152
    }
153

154
    // The event loop will 'clean-up' whichever event didn't win since the coroutine is scheduled
155
    // onto the thread poll its possible the other type of event could trigger while its waiting
156
    // to execute again, thus restarting the coroutine twice, that would be quite bad.
157
    auto result = co_await pi;
158
    co_return result;
159
}
4,672,437✔
160

161
auto scheduler::resume(std::coroutine_handle<> handle) -> bool
200,378✔
162
{
163
    if (handle == nullptr || handle.done())
200,378✔
164
    {
165
        return false;
×
166
    }
167

168
    if (m_shutdown_requested.load(std::memory_order::acquire))
200,378✔
169
    {
170
        return false;
×
171
    }
172

173
    if (m_opts.execution_strategy == execution_strategy_t::process_tasks_inline)
200,378✔
174
    {
175
        auto* schedule_op        = new schedule_operation{*this};
101✔
176
        schedule_op->m_allocated = true;
101✔
177
        schedule_op->await_suspend(handle);
101✔
178
        return true;
101✔
179
    }
180
    else
181
    {
182
        return m_thread_pool->resume(handle);
200,277✔
183
    }
184
}
185

186
auto scheduler::shutdown() noexcept -> void
142✔
187
{
188
    // Only allow shutdown to occur once.
189
    if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
142✔
190
    {
191
        // Signal the event loop to stop asap.
192
        const constexpr int value{1};
98✔
193
        ssize_t             written = m_shutdown_pipe.write(&value, sizeof(value));
98✔
194
        if (written != sizeof(value))
98✔
195
        {
NEW
196
            std::cerr << "libcoro::scheduler::shutdown() failed to write to shutdown pipe, bytes written=" << written
×
NEW
197
                      << "\n";
×
198
        }
199

200
        if (m_io_thread.joinable())
98✔
201
        {
202
            m_io_thread.join();
96✔
203
        }
204

205
        if (m_thread_pool != nullptr)
98✔
206
        {
207
            m_thread_pool->shutdown();
55✔
208
        }
209
    }
210
}
142✔
211

212
auto scheduler::yield_for_internal(std::chrono::nanoseconds amount) -> coro::task<void>
5,000,175✔
213
{
214
    if (amount <= 0ms)
215
    {
216
        co_await schedule();
217
    }
218
    else
219
    {
220
        // Yield/timeout tasks are considered live in the scheduler and must be accounted for. Note
221
        // that if the user gives an invalid amount and schedule() is directly called it will account
222
        // for the scheduled task there.
223
        m_size.fetch_add(1, std::memory_order::release);
224

225
        // Yielding does not require setting the timer position on the poll info since
226
        // it doesn't have a corresponding 'event' that can trigger, it always waits for
227
        // the timeout to occur before resuming.
228

229
        detail::poll_info pi{};
230
        add_timer_token(clock::now() + amount, pi);
231
        co_await pi;
232
    }
233
    co_return;
234
}
10,000,350✔
235

236
auto scheduler::process_events_manual(std::chrono::milliseconds timeout) -> void
4✔
237
{
238
    bool expected{false};
4✔
239
    if (m_io_processing.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed))
4✔
240
    {
241
        process_events_execute(timeout);
4✔
242
        m_io_processing.exchange(false, std::memory_order::release);
4✔
243
    }
244
}
4✔
245

246
auto scheduler::process_events_dedicated_thread() -> void
96✔
247
{
248
    if (m_opts.on_io_thread_start_functor != nullptr)
96✔
249
    {
250
        m_opts.on_io_thread_start_functor();
×
251
    }
252

253
    m_io_processing.exchange(true, std::memory_order::release);
96✔
254
    // Execute tasks until stopped or there are no more tasks to complete.
255
    while (!m_shutdown_requested.load(std::memory_order::acquire) || size() > 0)
1,531,554✔
256
    {
257
        process_events_execute(m_default_timeout);
1,530,823✔
258
    }
259
    m_io_processing.exchange(false, std::memory_order::release);
260

261
    if (m_opts.on_io_thread_stop_functor != nullptr)
96✔
262
    {
263
        m_opts.on_io_thread_stop_functor();
×
264
    }
265
}
96✔
266

267
auto scheduler::process_events_execute(std::chrono::milliseconds timeout) -> void
1,530,319✔
268
{
269
    // Clear the recent events without decreasing the allocated capacity to reduce allocations
270
    m_recent_events.clear();
1,530,319✔
271
    m_io_notifier.next_events(m_recent_events, timeout);
1,531,558✔
272

273
    for (auto& [handle_ptr, poll_status] : m_recent_events)
5,208,888✔
274
    {
275
        if (handle_ptr == m_timer_ptr)
3,680,152✔
276
        {
277
            // Process all events that have timed out.
278
            process_timeout_execute();
1,113,660✔
279
        }
280
        else if (handle_ptr == m_schedule_ptr)
2,566,492✔
281
        {
282
            // Process scheduled coroutines.
283
            process_scheduled_execute_inline();
41✔
284
        }
285
        else if (handle_ptr == m_shutdown_ptr) [[unlikely]]
2,566,451✔
286
        {
287
            // Nothing to do, just needed to wake-up and smell the flowers
288
        }
289
        else
290
        {
291
            // Individual poll task wake-up.
292
            process_event_execute(static_cast<detail::poll_info*>(handle_ptr), poll_status);
2,566,356✔
293
        }
294
    }
295

296
    // Its important to not resume any handles until the full set is accounted for.  If a timeout
297
    // and an event for the same handle happen in the same epoll_wait() call then inline processing
298
    // will destruct the poll_info object before the second event is handled.  This is also possible
299
    // with thread pool processing, but probably has an extremely low chance of occuring due to
300
    // the thread switch required.  If m_max_events == 1 this would be unnecessary.
301

302
    if (!m_handles_to_resume.empty())
1,530,152✔
303
    {
304
        if (m_opts.execution_strategy == execution_strategy_t::process_tasks_inline)
1,532,194✔
305
        {
306
            std::size_t resumed{0};
241,263✔
307
            for (auto& handle : m_handles_to_resume)
1,870,826✔
308
            {
309
                handle.resume();
1,950,694✔
310
                ++resumed;
1,597,425✔
311
            }
312
            if (resumed > 0)
219,136✔
313
            {
314
                m_size.fetch_sub(resumed, std::memory_order::release);
241,327✔
315
            }
316
        }
317
        else
318
        {
319
            m_thread_pool->resume(m_handles_to_resume);
1,290,931✔
320
            m_size.fetch_sub(m_handles_to_resume.size(), std::memory_order::release);
1,290,970✔
321
        }
322

323
        m_handles_to_resume.clear();
1,510,149✔
324
    }
325
}
1,531,248✔
326

327
auto scheduler::process_scheduled_execute_inline() -> void
41✔
328
{
329
    // Clear the notification by reading until the pipe is cleared, this is done before
330
    // resetting the flag that writes to the pipe need to happen.
331
    while (true)
332
    {
333
        constexpr std::size_t       READ_COUNT{4};
41✔
334
        constexpr ssize_t           READ_COUNT_BYTES = READ_COUNT * sizeof(int);
41✔
335
        std::array<int, READ_COUNT> control{};
41✔
336
        const ssize_t               read_bytes = m_schedule_pipe.read(control.data(), READ_COUNT_BYTES);
41✔
337
        if (read_bytes == READ_COUNT_BYTES)
41✔
338
        {
NEW
339
            continue;
×
340
        }
341

342
        // If we got nothing, or we got a partial read break the loop since the pipe is empty.
343
        if (read_bytes >= 0)
41✔
344
        {
345
            break;
41✔
346
        }
347

348
        // pipe is set to O_NONBLOCK so ignore empty blocking reads.
NEW
349
        if (errno == EAGAIN)
×
350
        {
351
            break;
×
352
        }
353

354
        // Not much we can do here, we're in a very bad state, lets report to stderr.
NEW
355
        std::cerr << "::read(m_schedule_pipe.read_fd()) error[" << errno << "] " << ::strerror(errno) << " fd=["
×
NEW
356
                  << m_schedule_pipe.read_fd() << "]" << std::endl;
×
NEW
357
        break;
×
358
    }
×
359

360
    // Note to all producers that the pipe is cleared and any new additions need to trigger the pipe.
361
    m_schedule_pipe_triggered.exchange(false, std::memory_order::release);
41✔
362

363
    // Now it is safe to acquire all scheduled ops.
364
    auto* ops = detail::awaiter_list_pop_all(m_scheduled_ops);
41✔
365

366
    if (ops != nullptr)
41✔
367
    {
368
        ops = detail::awaiter_list_reverse(ops);
41✔
369

370
        while (ops != nullptr)
277✔
371
        {
372
            auto* next = ops->m_next;
236✔
373
            m_handles_to_resume.emplace_back(ops->m_awaiting_coroutine);
236✔
374

375
            if (ops->m_allocated)
236✔
376
            {
377
                delete ops;
101✔
378
            }
379

380
            ops = next;
236✔
381
        }
382
    }
383
}
41✔
384

385
auto scheduler::process_event_execute(detail::poll_info* pi, poll_status status) -> void
2,563,380✔
386
{
387
    if (!pi->m_processed)
2,563,380✔
388
    {
389
        std::atomic_thread_fence(std::memory_order::acquire);
390
        // Its possible the event and the timeout occurred in the same epoll, make sure only one
391
        // is ever processed, the other is discarded.
392
        pi->m_processed = true;
2,564,675✔
393

394
        // Given a valid fd always remove it from epoll so the next poll can blindly EPOLL_CTL_ADD.
395
        if (pi->m_fd != -1)
2,564,675✔
396
        {
397
            m_io_notifier.unwatch(*pi);
2,573,183✔
398
        }
399

400
        // Since this event triggered, remove its corresponding timeout if it has one.
401
        if (pi->m_timer_pos.has_value())
2,561,374✔
402
        {
403
            remove_timer_token(pi->m_timer_pos.value());
100,420✔
404
        }
405

406
        pi->m_poll_status = status;
2,572,041✔
407

408
        while (pi->m_awaiting_coroutine == nullptr)
58,793,629✔
409
        {
410
            std::atomic_thread_fence(std::memory_order::acquire);
411
        }
412

413
        m_handles_to_resume.emplace_back(pi->m_awaiting_coroutine);
2,569,997✔
414
    }
415
}
2,563,580✔
416

417
auto scheduler::process_timeout_execute() -> void
1,113,660✔
418
{
419
    std::vector<detail::poll_info*> poll_infos{};
1,113,660✔
420
    auto                            now = clock::now();
1,113,660✔
421

422
    {
423
        std::scoped_lock lk{m_timed_events_mutex};
1,113,660✔
424
        while (!m_timed_events.empty())
6,113,852✔
425
        {
426
            auto first    = m_timed_events.begin();
6,113,796✔
427
            auto [tp, pi] = *first;
6,113,796✔
428

429
            if (tp <= now)
6,113,796✔
430
            {
431
                m_timed_events.erase(first);
5,000,192✔
432
                poll_infos.emplace_back(pi);
5,000,192✔
433
            }
434
            else
435
            {
436
                break;
1,113,604✔
437
            }
438
        }
439
    }
1,113,660✔
440

441
    for (auto pi : poll_infos)
6,113,852✔
442
    {
443
        if (!pi->m_processed)
5,000,192✔
444
        {
445
            // Its possible the event and the timeout occurred in the same epoll, make sure only one
446
            // is ever processed, the other is discarded.
447
            pi->m_processed = true;
5,000,192✔
448

449
            // Since this timed out, remove its corresponding event if it has one.
450
            if (pi->m_fd != -1)
5,000,192✔
451
            {
452
                m_io_notifier.unwatch(*pi);
16✔
453
            }
454

455
            while (pi->m_awaiting_coroutine == nullptr)
5,000,192✔
456
            {
457
                std::atomic_thread_fence(std::memory_order::acquire);
458
            }
459

460
            m_handles_to_resume.emplace_back(pi->m_awaiting_coroutine);
5,000,192✔
461
            pi->m_poll_status = coro::poll_status::timeout;
5,000,192✔
462
        }
463
    }
464

465
    // Update the time to the next smallest time point, re-take the current now time
466
    // since updating and resuming tasks could shift the time.
467
    update_timeout(clock::now());
1,113,660✔
468
}
1,113,660✔
469

470
auto scheduler::add_timer_token(time_point tp, detail::poll_info& pi) -> timed_events::iterator
5,100,391✔
471
{
472
    std::scoped_lock lk{m_timed_events_mutex};
5,100,391✔
473
    auto             pos = m_timed_events.emplace(tp, &pi);
5,100,613✔
474

475
    // If this item was inserted as the smallest time point, update the timeout.
476
    if (pos == m_timed_events.begin())
5,100,613✔
477
    {
478
        update_timeout(clock::now());
629✔
479
    }
480

481
    return pos;
5,100,611✔
482
}
5,100,613✔
483

484
auto scheduler::remove_timer_token(timed_events::iterator pos) -> void
100,420✔
485
{
486
    {
487
        std::scoped_lock lk{m_timed_events_mutex};
100,420✔
488
        auto             is_first = (m_timed_events.begin() == pos);
100,421✔
489

490
        m_timed_events.erase(pos);
100,421✔
491

492
        // If this was the first item, update the timeout.  It would be acceptable to just let it
493
        // also fire the timeout as the event loop will ignore it since nothing will have timed
494
        // out but it feels like the right thing to do to update it to the correct timeout value.
495
        if (is_first)
100,420✔
496
        {
497
            update_timeout(clock::now());
35,335✔
498
        }
499
    }
100,417✔
500
}
100,421✔
501

502
auto scheduler::update_timeout(time_point now) -> void
1,149,623✔
503
{
504
    if (!m_timed_events.empty())
1,149,623✔
505
    {
506
        auto& [tp, pi] = *m_timed_events.begin();
1,149,138✔
507

508
        auto amount = tp - now;
1,149,139✔
509

510
        if (!m_io_notifier.watch_timer(m_timer, amount))
1,149,138✔
511
        {
512
            std::cerr << "Failed to set timerfd errorno=[" << std::string{strerror(errno)} << "].";
×
513
        }
514
    }
515
    else
516
    {
517
        m_io_notifier.unwatch_timer(m_timer);
485✔
518
    }
519
}
1,149,621✔
520

521
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc