• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28423703495

30 Jun 2026 05:59AM UTC coverage: 51.998% (-0.3%) from 52.278%
28423703495

Pull #83

github

web-flow
Merge fb542a938 into 2efed6649
Pull Request #83: refactor and improve code QoL

37282 of 93303 branches covered (39.96%)

Branch coverage included in aggregate %.

801 of 1525 new or added lines in 78 files covered. (52.52%)

98 existing lines in 37 files now uncovered.

33674 of 43157 relevant lines covered (78.03%)

20306.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.38
/src/dftracer/utils/core/runtime.cpp
1
#include <dftracer/utils/core/common/error.h>
2
#include <dftracer/utils/core/common/logging.h>
3
#include <dftracer/utils/core/common/platform_compat.h>
4
#include <dftracer/utils/core/env.h>
5
#include <dftracer/utils/core/runtime.h>
6

7
#include <algorithm>
8
#include <cstdlib>
9
#include <stdexcept>
10
#include <string>
11
#include <string_view>
12
#include <thread>
13

14
namespace dftracer::utils {
15

16
namespace {
17
// Resolve the worker-thread count. DFTRACER_UTILS_THREADS overrides everything
18
// (a debugging lever: force a fixed count, e.g. 1 for a single-threaded async
19
// loop). Otherwise 0 means hardware_concurrency.
20
std::size_t resolve_threads(std::size_t requested) {
860✔
21
    if (auto env = Env::get<std::string_view>("DFTRACER_UTILS_THREADS");
1,720!
22
        env.has_value()) {
860✔
23
        const long long n =
NEW
24
            std::strtoll(std::string(*env).c_str(), nullptr, 10);
×
NEW
25
        if (n > 0) return static_cast<std::size_t>(n);
×
26
    }
27
    return requested == 0 ? dftracer_utils_hardware_concurrency() : requested;
860✔
28
}
430✔
29
}  // namespace
30

31
Runtime::Runtime(std::size_t threads) : threads_(resolve_threads(threads)) {
435!
32
    ExecutorConfig config;
290!
33
    config.num_threads = threads_;
290✔
34
    executor_ = std::make_unique<Executor>(config);
290!
35
    executor_->start();
290!
36

37
    watchdog_ = std::make_unique<Watchdog>();
290!
38
    watchdog_->set_executor(executor_.get());
290!
39
}
435✔
40

41
Runtime::Runtime(const ExecutorConfig& config, bool enable_watchdog)
234✔
42
    : threads_(resolve_threads(config.num_threads)) {
234!
43
    ExecutorConfig cfg = config;
156✔
44
    cfg.num_threads = threads_;
156✔
45
    executor_ = std::make_unique<Executor>(cfg);
156!
46
    executor_->start();
156!
47

48
    if (enable_watchdog) {
156!
49
        watchdog_ = std::make_unique<Watchdog>();
156!
50
        watchdog_->set_executor(executor_.get());
156!
51
    }
78✔
52
}
234✔
53

54
Runtime::Runtime(const ExecutorConfig& config,
621✔
55
                 std::unique_ptr<Watchdog> watchdog)
207✔
56
    : threads_(resolve_threads(config.num_threads)) {
621!
57
    ExecutorConfig cfg = config;
414✔
58
    cfg.num_threads = threads_;
414✔
59
    executor_ = std::make_unique<Executor>(cfg);
414!
60
    executor_->start();
414!
61

62
    watchdog_ = std::move(watchdog);
414✔
63
    if (watchdog_) {
414✔
64
        watchdog_->set_executor(executor_.get());
162!
65
    }
81✔
66
}
621✔
67

68
Runtime::~Runtime() { shutdown(); }
1,290!
69

70
TaskHandle Runtime::submit(coro::CoroTask<void> task, std::string name) {
974✔
71
    if (shutdown_called_.load(std::memory_order_acquire)) {
974✔
72
        throw DFTUtilsException(ErrorCode::PIPELINE, "Runtime is shut down");
4!
73
    }
74
    if (name.empty()) {
970✔
75
        name = "task-" + std::to_string(task_name_counter_++);
6!
76
    }
3✔
77

78
    auto promise = std::make_shared<std::promise<void>>();
970!
79
    auto future = promise->get_future().share();
970!
80
    auto tid = std::make_shared<std::atomic<TaskIndex>>(-1);
970!
81

82
    auto wrapper =
485✔
83
        [](coro::CoroTask<void> t, std::shared_ptr<std::promise<void>> p,
3,874!
84
           Executor* exec,
85
           std::shared_ptr<std::atomic<TaskIndex>> task_id) -> coro::Coro {
485!
86
        try {
87
            co_await std::move(t);
2,419!
88
            t = coro::CoroTask<void>{
964!
89
                std::coroutine_handle<coro::CoroTask<void>::promise_type>{}};
482✔
90
            exec->mark_coro_completed(task_id->load(std::memory_order_acquire));
482!
91
        } catch (...) {
485✔
92
            t = coro::CoroTask<void>{
6!
93
                std::coroutine_handle<coro::CoroTask<void>::promise_type>{}};
3✔
94
            exec->mark_coro_completed(task_id->load(std::memory_order_acquire));
3!
95
            p->set_exception(std::current_exception());
3!
96
            co_return;
3✔
97
        }
3!
98
        p->set_value();
482!
99
    };
1,937!
100

101
    // Set the executor on the task's promise so awaitables (e.g. channels)
102
    // that capture `get_root_promise()->get_executor()` can schedule
103
    // resumption. Without this, awaiters end up with executor=nullptr because
104
    // the wrapping `coro::Coro` doesn't extend PromiseBase and the
105
    // root-promise chain stops at the user's CoroTask.
106
    if (task.handle()) {
970✔
107
        task.handle().promise().set_executor(executor_.get());
970!
108
    }
485✔
109
    auto coro = wrapper(std::move(task), promise, executor_.get(), tid);
1,455!
110
    TaskIndex id = executor_->enqueue_tracked(std::move(coro), name, tid);
970!
111

112
    {
113
        std::lock_guard<std::mutex> lock(futures_mutex_);
970!
114
        cleanup_completed_futures();
970!
115
        outstanding_futures_.push_back(future);
970!
116
    }
970✔
117

118
    return TaskHandle{future, id, std::move(name)};
1,455!
119
}
972✔
120

121
void Runtime::wait_all() {
198✔
122
    std::vector<std::shared_future<void>> futures;
198✔
123
    {
124
        std::lock_guard<std::mutex> lock(futures_mutex_);
198!
125
        futures = std::move(outstanding_futures_);
198✔
126
        outstanding_futures_.clear();
198✔
127
    }
198✔
128
    for (auto& f : futures) {
269✔
129
        f.wait();
71!
130
    }
131
}
198✔
132

133
void Runtime::cleanup_completed_futures() {
1,140✔
134
    outstanding_futures_.erase(
2,280!
135
        std::remove_if(outstanding_futures_.begin(), outstanding_futures_.end(),
1,140!
136
                       [](const std::shared_future<void>& f) {
807✔
137
                           return f.wait_for(std::chrono::seconds(0)) ==
807!
138
                                  std::future_status::ready;
402✔
139
                       }),
140
        outstanding_futures_.end());
1,140✔
141
}
1,140✔
142

143
ExecutorProgress Runtime::get_progress() const {
38✔
144
    return executor_->get_progress();
38✔
145
}
146

147
bool Runtime::is_responsive() const { return executor_->is_responsive(); }
4✔
148

149
void Runtime::set_global_timeout(std::chrono::milliseconds timeout) {
×
150
    if (!watchdog_) {
×
NEW
151
        throw DFTUtilsException(
×
152
            ErrorCode::PIPELINE,
UNCOV
153
            "Cannot set timeout: Runtime created without watchdog");
×
154
    }
155
    watchdog_->set_global_timeout(timeout);
×
156
}
×
157

158
void Runtime::set_default_task_timeout(std::chrono::milliseconds timeout) {
×
159
    if (!watchdog_) {
×
NEW
160
        throw DFTUtilsException(
×
161
            ErrorCode::PIPELINE,
UNCOV
162
            "Cannot set timeout: Runtime created without watchdog");
×
163
    }
164
    watchdog_->set_default_task_timeout(timeout);
×
165
}
×
166

167
void Runtime::shutdown() {
1,622✔
168
    bool expected = false;
1,622✔
169
    if (!shutdown_called_.compare_exchange_strong(expected, true)) return;
1,622✔
170
    if (watchdog_) watchdog_->stop();
860!
171
    if (executor_) executor_->shutdown();
860!
172
}
811✔
173

174
std::size_t Runtime::threads() const { return threads_; }
306✔
175

176
std::size_t Runtime::io_threads() const {
×
177
    return executor_ ? executor_->get_io_pool_size() : 0;
×
178
}
179

180
}  // namespace dftracer::utils
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc