• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 27171677342

08 Jun 2026 10:43PM UTC coverage: 51.99% (+0.05%) from 51.937%
27171677342

Pull #77

github

web-flow
Merge 3a1432eec into 8045f0be3
Pull Request #77: chore: bump version to 0.0.10

36972 of 92663 branches covered (39.9%)

Branch coverage included in aggregate %.

33405 of 42703 relevant lines covered (78.23%)

20411.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.19
/src/dftracer/utils/core/runtime.cpp
1
#include <dftracer/utils/core/common/logging.h>
2
#include <dftracer/utils/core/common/platform_compat.h>
3
#include <dftracer/utils/core/runtime.h>
4

5
#include <algorithm>
6
#include <stdexcept>
7
#include <thread>
8

9
namespace dftracer::utils {
10

11
Runtime::Runtime(std::size_t threads)
435✔
12
    : threads_(threads == 0 ? dftracer_utils_hardware_concurrency() : threads) {
435!
13
    ExecutorConfig config;
290!
14
    config.num_threads = threads_;
290✔
15
    executor_ = std::make_unique<Executor>(config);
290!
16
    executor_->start();
290!
17

18
    watchdog_ = std::make_unique<Watchdog>();
290!
19
    watchdog_->set_executor(executor_.get());
290!
20
}
290✔
21

22
Runtime::Runtime(const ExecutorConfig& config, bool enable_watchdog)
234✔
23
    : threads_(config.num_threads == 0 ? dftracer_utils_hardware_concurrency()
156!
24
                                       : config.num_threads) {
311✔
25
    executor_ = std::make_unique<Executor>(config);
156!
26
    executor_->start();
156!
27

28
    if (enable_watchdog) {
156!
29
        watchdog_ = std::make_unique<Watchdog>();
156!
30
        watchdog_->set_executor(executor_.get());
156!
31
    }
78✔
32
}
156✔
33

34
Runtime::Runtime(const ExecutorConfig& config,
621✔
35
                 std::unique_ptr<Watchdog> watchdog)
207✔
36
    : threads_(config.num_threads == 0 ? dftracer_utils_hardware_concurrency()
414!
37
                                       : config.num_threads) {
826✔
38
    executor_ = std::make_unique<Executor>(config);
414!
39
    executor_->start();
414!
40

41
    watchdog_ = std::move(watchdog);
414✔
42
    if (watchdog_) {
414✔
43
        watchdog_->set_executor(executor_.get());
162!
44
    }
81✔
45
}
414✔
46

47
Runtime::~Runtime() { shutdown(); }
1,290!
48

49
TaskHandle Runtime::submit(coro::CoroTask<void> task, std::string name) {
974✔
50
    if (shutdown_called_.load(std::memory_order_acquire)) {
974✔
51
        throw std::runtime_error("Runtime is shut down");
4!
52
    }
53
    if (name.empty()) {
970✔
54
        name = "task-" + std::to_string(task_name_counter_++);
6!
55
    }
3✔
56

57
    auto promise = std::make_shared<std::promise<void>>();
970!
58
    auto future = promise->get_future().share();
970!
59
    auto tid = std::make_shared<std::atomic<TaskIndex>>(-1);
970!
60

61
    auto wrapper =
485✔
62
        [](coro::CoroTask<void> t, std::shared_ptr<std::promise<void>> p,
3,874!
63
           Executor* exec,
64
           std::shared_ptr<std::atomic<TaskIndex>> task_id) -> coro::Coro {
485!
65
        try {
66
            co_await std::move(t);
2,419!
67
            t = coro::CoroTask<void>{
964!
68
                std::coroutine_handle<coro::CoroTask<void>::promise_type>{}};
482✔
69
            exec->mark_coro_completed(task_id->load(std::memory_order_acquire));
482!
70
        } catch (...) {
485✔
71
            t = coro::CoroTask<void>{
6!
72
                std::coroutine_handle<coro::CoroTask<void>::promise_type>{}};
3✔
73
            exec->mark_coro_completed(task_id->load(std::memory_order_acquire));
3!
74
            p->set_exception(std::current_exception());
3!
75
            co_return;
3✔
76
        }
3!
77
        p->set_value();
482!
78
    };
1,937!
79

80
    // Set the executor on the task's promise so awaitables (e.g. channels)
81
    // that capture `get_root_promise()->get_executor()` can schedule
82
    // resumption. Without this, awaiters end up with executor=nullptr because
83
    // the wrapping `coro::Coro` doesn't extend PromiseBase and the
84
    // root-promise chain stops at the user's CoroTask.
85
    if (task.handle()) {
970✔
86
        task.handle().promise().set_executor(executor_.get());
970!
87
    }
485✔
88
    auto coro = wrapper(std::move(task), promise, executor_.get(), tid);
1,455!
89
    TaskIndex id = executor_->enqueue_tracked(std::move(coro), name, tid);
970!
90

91
    {
92
        std::lock_guard<std::mutex> lock(futures_mutex_);
970!
93
        cleanup_completed_futures();
970!
94
        outstanding_futures_.push_back(future);
970!
95
    }
970✔
96

97
    return TaskHandle{future, id, std::move(name)};
1,455!
98
}
970✔
99

100
void Runtime::wait_all() {
198✔
101
    std::vector<std::shared_future<void>> futures;
198✔
102
    {
103
        std::lock_guard<std::mutex> lock(futures_mutex_);
198!
104
        futures = std::move(outstanding_futures_);
198✔
105
        outstanding_futures_.clear();
198✔
106
    }
198✔
107
    for (auto& f : futures) {
272✔
108
        f.wait();
74!
109
    }
110
}
198✔
111

112
void Runtime::cleanup_completed_futures() {
1,140✔
113
    outstanding_futures_.erase(
2,280!
114
        std::remove_if(outstanding_futures_.begin(), outstanding_futures_.end(),
1,140!
115
                       [](const std::shared_future<void>& f) {
825✔
116
                           return f.wait_for(std::chrono::seconds(0)) ==
825!
117
                                  std::future_status::ready;
409✔
118
                       }),
119
        outstanding_futures_.end());
1,140✔
120
}
1,140✔
121

122
ExecutorProgress Runtime::get_progress() const {
38✔
123
    return executor_->get_progress();
38✔
124
}
125

126
bool Runtime::is_responsive() const { return executor_->is_responsive(); }
4✔
127

128
void Runtime::set_global_timeout(std::chrono::milliseconds timeout) {
×
129
    if (!watchdog_) {
×
130
        throw std::runtime_error(
×
131
            "Cannot set timeout: Runtime created without watchdog");
132
    }
133
    watchdog_->set_global_timeout(timeout);
×
134
}
×
135

136
void Runtime::set_default_task_timeout(std::chrono::milliseconds timeout) {
×
137
    if (!watchdog_) {
×
138
        throw std::runtime_error(
×
139
            "Cannot set timeout: Runtime created without watchdog");
140
    }
141
    watchdog_->set_default_task_timeout(timeout);
×
142
}
×
143

144
void Runtime::shutdown() {
1,622✔
145
    bool expected = false;
1,622✔
146
    if (!shutdown_called_.compare_exchange_strong(expected, true)) return;
1,622✔
147
    if (watchdog_) watchdog_->stop();
860!
148
    if (executor_) executor_->shutdown();
860!
149
}
811✔
150

151
std::size_t Runtime::threads() const { return threads_; }
306✔
152

153
std::size_t Runtime::io_threads() const {
×
154
    return executor_ ? executor_->get_io_pool_size() : 0;
×
155
}
156

157
}  // namespace dftracer::utils
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc