• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28356348514

29 Jun 2026 07:40AM UTC coverage: 52.174% (-0.1%) from 52.278%
28356348514

Pull #83

github

web-flow
Merge 278203630 into 2efed6649
Pull Request #83: refactor and improve code QoL

37276 of 92891 branches covered (40.13%)

Branch coverage included in aggregate %.

671 of 1173 new or added lines in 58 files covered. (57.2%)

66 existing lines in 30 files now uncovered.

33619 of 42991 relevant lines covered (78.2%)

20387.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.33
/src/dftracer/utils/core/runtime.cpp
1
#include <dftracer/utils/core/common/logging.h>
2
#include <dftracer/utils/core/common/platform_compat.h>
3
#include <dftracer/utils/core/env.h>
4
#include <dftracer/utils/core/runtime.h>
5

6
#include <algorithm>
7
#include <cstdlib>
8
#include <stdexcept>
9
#include <string>
10
#include <string_view>
11
#include <thread>
12

13
namespace dftracer::utils {
14

15
namespace {
16
// Resolve the worker-thread count. DFTRACER_UTILS_THREADS overrides everything
17
// (a debugging lever: force a fixed count, e.g. 1 for a single-threaded async
18
// loop). Otherwise 0 means hardware_concurrency.
19
std::size_t resolve_threads(std::size_t requested) {
860✔
20
    if (auto env = Env::get<std::string_view>("DFTRACER_UTILS_THREADS");
1,720!
21
        env.has_value()) {
860✔
22
        const long long n =
NEW
23
            std::strtoll(std::string(*env).c_str(), nullptr, 10);
×
NEW
24
        if (n > 0) return static_cast<std::size_t>(n);
×
25
    }
26
    return requested == 0 ? dftracer_utils_hardware_concurrency() : requested;
860✔
27
}
430✔
28
}  // namespace
29

30
Runtime::Runtime(std::size_t threads) : threads_(resolve_threads(threads)) {
435!
31
    ExecutorConfig config;
290!
32
    config.num_threads = threads_;
290✔
33
    executor_ = std::make_unique<Executor>(config);
290!
34
    executor_->start();
290!
35

36
    watchdog_ = std::make_unique<Watchdog>();
290!
37
    watchdog_->set_executor(executor_.get());
290!
38
}
435✔
39

40
Runtime::Runtime(const ExecutorConfig& config, bool enable_watchdog)
234✔
41
    : threads_(resolve_threads(config.num_threads)) {
234!
42
    ExecutorConfig cfg = config;
156✔
43
    cfg.num_threads = threads_;
156✔
44
    executor_ = std::make_unique<Executor>(cfg);
156!
45
    executor_->start();
156!
46

47
    if (enable_watchdog) {
156!
48
        watchdog_ = std::make_unique<Watchdog>();
156!
49
        watchdog_->set_executor(executor_.get());
156!
50
    }
78✔
51
}
234✔
52

53
Runtime::Runtime(const ExecutorConfig& config,
621✔
54
                 std::unique_ptr<Watchdog> watchdog)
207✔
55
    : threads_(resolve_threads(config.num_threads)) {
621!
56
    ExecutorConfig cfg = config;
414✔
57
    cfg.num_threads = threads_;
414✔
58
    executor_ = std::make_unique<Executor>(cfg);
414!
59
    executor_->start();
414!
60

61
    watchdog_ = std::move(watchdog);
414✔
62
    if (watchdog_) {
414✔
63
        watchdog_->set_executor(executor_.get());
162!
64
    }
81✔
65
}
621✔
66

67
Runtime::~Runtime() { shutdown(); }
1,290!
68

69
TaskHandle Runtime::submit(coro::CoroTask<void> task, std::string name) {
974✔
70
    if (shutdown_called_.load(std::memory_order_acquire)) {
974✔
71
        throw std::runtime_error("Runtime is shut down");
4!
72
    }
73
    if (name.empty()) {
970✔
74
        name = "task-" + std::to_string(task_name_counter_++);
6!
75
    }
3✔
76

77
    auto promise = std::make_shared<std::promise<void>>();
970!
78
    auto future = promise->get_future().share();
970!
79
    auto tid = std::make_shared<std::atomic<TaskIndex>>(-1);
970!
80

81
    auto wrapper =
485✔
82
        [](coro::CoroTask<void> t, std::shared_ptr<std::promise<void>> p,
3,874!
83
           Executor* exec,
84
           std::shared_ptr<std::atomic<TaskIndex>> task_id) -> coro::Coro {
485!
85
        try {
86
            co_await std::move(t);
2,419!
87
            t = coro::CoroTask<void>{
964!
88
                std::coroutine_handle<coro::CoroTask<void>::promise_type>{}};
482✔
89
            exec->mark_coro_completed(task_id->load(std::memory_order_acquire));
482!
90
        } catch (...) {
485✔
91
            t = coro::CoroTask<void>{
6!
92
                std::coroutine_handle<coro::CoroTask<void>::promise_type>{}};
3✔
93
            exec->mark_coro_completed(task_id->load(std::memory_order_acquire));
3!
94
            p->set_exception(std::current_exception());
3!
95
            co_return;
3✔
96
        }
3!
97
        p->set_value();
482!
98
    };
1,937!
99

100
    // Set the executor on the task's promise so awaitables (e.g. channels)
101
    // that capture `get_root_promise()->get_executor()` can schedule
102
    // resumption. Without this, awaiters end up with executor=nullptr because
103
    // the wrapping `coro::Coro` doesn't extend PromiseBase and the
104
    // root-promise chain stops at the user's CoroTask.
105
    if (task.handle()) {
970✔
106
        task.handle().promise().set_executor(executor_.get());
970!
107
    }
485✔
108
    auto coro = wrapper(std::move(task), promise, executor_.get(), tid);
1,455!
109
    TaskIndex id = executor_->enqueue_tracked(std::move(coro), name, tid);
970!
110

111
    {
112
        std::lock_guard<std::mutex> lock(futures_mutex_);
970!
113
        cleanup_completed_futures();
970!
114
        outstanding_futures_.push_back(future);
970!
115
    }
970✔
116

117
    return TaskHandle{future, id, std::move(name)};
1,455!
118
}
970✔
119

120
void Runtime::wait_all() {
198✔
121
    std::vector<std::shared_future<void>> futures;
198✔
122
    {
123
        std::lock_guard<std::mutex> lock(futures_mutex_);
198!
124
        futures = std::move(outstanding_futures_);
198✔
125
        outstanding_futures_.clear();
198✔
126
    }
198✔
127
    for (auto& f : futures) {
269✔
128
        f.wait();
71!
129
    }
130
}
198✔
131

132
void Runtime::cleanup_completed_futures() {
1,140✔
133
    outstanding_futures_.erase(
2,280!
134
        std::remove_if(outstanding_futures_.begin(), outstanding_futures_.end(),
1,140!
135
                       [](const std::shared_future<void>& f) {
828✔
136
                           return f.wait_for(std::chrono::seconds(0)) ==
828!
137
                                  std::future_status::ready;
403✔
138
                       }),
139
        outstanding_futures_.end());
1,140✔
140
}
1,140✔
141

142
ExecutorProgress Runtime::get_progress() const {
38✔
143
    return executor_->get_progress();
38✔
144
}
145

146
bool Runtime::is_responsive() const { return executor_->is_responsive(); }
4✔
147

148
void Runtime::set_global_timeout(std::chrono::milliseconds timeout) {
×
149
    if (!watchdog_) {
×
150
        throw std::runtime_error(
×
151
            "Cannot set timeout: Runtime created without watchdog");
152
    }
153
    watchdog_->set_global_timeout(timeout);
×
154
}
×
155

156
void Runtime::set_default_task_timeout(std::chrono::milliseconds timeout) {
×
157
    if (!watchdog_) {
×
158
        throw std::runtime_error(
×
159
            "Cannot set timeout: Runtime created without watchdog");
160
    }
161
    watchdog_->set_default_task_timeout(timeout);
×
162
}
×
163

164
void Runtime::shutdown() {
1,622✔
165
    bool expected = false;
1,622✔
166
    if (!shutdown_called_.compare_exchange_strong(expected, true)) return;
1,622✔
167
    if (watchdog_) watchdog_->stop();
860!
168
    if (executor_) executor_->shutdown();
860!
169
}
811✔
170

171
std::size_t Runtime::threads() const { return threads_; }
306✔
172

173
std::size_t Runtime::io_threads() const {
×
174
    return executor_ ? executor_->get_io_pool_size() : 0;
×
175
}
176

177
}  // namespace dftracer::utils
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc