• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28496595030

01 Jul 2026 05:50AM UTC coverage: 50.727% (-1.6%) from 52.278%
28496595030

Pull #83

github

web-flow
Merge 8f1ff4df5 into 2efed6649
Pull Request #83: refactor and improve code QoL

31872 of 80367 branches covered (39.66%)

Branch coverage included in aggregate %.

770 of 1591 new or added lines in 85 files covered. (48.4%)

5070 existing lines in 182 files now uncovered.

32742 of 47009 relevant lines covered (69.65%)

9887.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.62
/src/dftracer/utils/core/runtime.cpp
1
#include <dftracer/utils/core/common/error.h>
2
#include <dftracer/utils/core/common/logging.h>
3
#include <dftracer/utils/core/common/platform_compat.h>
4
#include <dftracer/utils/core/env.h>
5
#include <dftracer/utils/core/runtime.h>
6

7
#include <algorithm>
8
#include <cstdlib>
9
#include <stdexcept>
10
#include <string>
11
#include <string_view>
12
#include <thread>
13

14
namespace dftracer::utils {
15

16
namespace {
17
// Resolve the worker-thread count. DFTRACER_UTILS_THREADS overrides everything
18
// (a debugging lever: force a fixed count, e.g. 1 for a single-threaded async
19
// loop). Otherwise 0 means hardware_concurrency.
20
std::size_t resolve_threads(std::size_t requested) {
430✔
21
    if (auto env = Env::get<std::string_view>("DFTRACER_UTILS_THREADS");
860!
22
        env.has_value()) {
430✔
NEW
23
        const long long n =
×
NEW
24
            std::strtoll(std::string(*env).c_str(), nullptr, 10);
×
NEW
25
        if (n > 0) return static_cast<std::size_t>(n);
×
NEW
26
    }
×
27
    return requested == 0 ? dftracer_utils_hardware_concurrency() : requested;
430✔
28
}
430✔
29
}  // namespace
30

31
Runtime::Runtime(std::size_t threads) : threads_(resolve_threads(threads)) {
290!
32
    ExecutorConfig config;
145!
33
    config.num_threads = threads_;
145✔
34
    executor_ = std::make_unique<Executor>(config);
145!
35
    executor_->start();
145!
36

37
    watchdog_ = std::make_unique<Watchdog>();
145!
38
    watchdog_->set_executor(executor_.get());
145!
39
}
145✔
40

41
Runtime::Runtime(const ExecutorConfig& config, bool enable_watchdog)
156✔
42
    : threads_(resolve_threads(config.num_threads)) {
156!
43
    ExecutorConfig cfg = config;
78✔
44
    cfg.num_threads = threads_;
78✔
45
    executor_ = std::make_unique<Executor>(cfg);
78!
46
    executor_->start();
78!
47

48
    if (enable_watchdog) {
78!
49
        watchdog_ = std::make_unique<Watchdog>();
78!
50
        watchdog_->set_executor(executor_.get());
78!
51
    }
78✔
52
}
78✔
53

54
Runtime::Runtime(const ExecutorConfig& config,
414✔
55
                 std::unique_ptr<Watchdog> watchdog)
56
    : threads_(resolve_threads(config.num_threads)) {
414!
57
    ExecutorConfig cfg = config;
207✔
58
    cfg.num_threads = threads_;
207✔
59
    executor_ = std::make_unique<Executor>(cfg);
207!
60
    executor_->start();
207!
61

62
    watchdog_ = std::move(watchdog);
207✔
63
    if (watchdog_) {
207✔
64
        watchdog_->set_executor(executor_.get());
81!
65
    }
81✔
66
}
207✔
67

68
Runtime::~Runtime() { shutdown(); }
860!
69

70
TaskHandle Runtime::submit(coro::CoroTask<void> task, std::string name) {
487✔
71
    if (shutdown_called_.load(std::memory_order_acquire)) {
487✔
72
        throw DFTUtilsException(ErrorCode::PIPELINE, "Runtime is shut down");
2!
73
    }
74
    if (name.empty()) {
485✔
75
        name = "task-" + std::to_string(task_name_counter_++);
3!
76
    }
3✔
77

78
    auto promise = std::make_shared<std::promise<void>>();
485✔
79
    auto future = promise->get_future().share();
485!
80
    auto tid = std::make_shared<std::atomic<TaskIndex>>(-1);
485!
81

82
    auto wrapper =
485✔
83
        [](coro::CoroTask<void> t, std::shared_ptr<std::promise<void>> p,
3,389!
84
           Executor* exec,
85
           std::shared_ptr<std::atomic<TaskIndex>> task_id) -> coro::Coro {
485!
86
        try {
87
            co_await std::move(t);
2,419!
88
            t = coro::CoroTask<void>{
964!
89
                std::coroutine_handle<coro::CoroTask<void>::promise_type>{}};
482✔
90
            exec->mark_coro_completed(task_id->load(std::memory_order_acquire));
482!
91
        } catch (...) {
485✔
92
            t = coro::CoroTask<void>{
6!
93
                std::coroutine_handle<coro::CoroTask<void>::promise_type>{}};
3✔
94
            exec->mark_coro_completed(task_id->load(std::memory_order_acquire));
3!
95
            p->set_exception(std::current_exception());
3!
96
            co_return;
3✔
97
        }
3!
98
        p->set_value();
482!
99
    };
967✔
100

101
    // Set the executor on the task's promise so awaitables (e.g. channels)
102
    // that capture `get_root_promise()->get_executor()` can schedule
103
    // resumption. Without this, awaiters end up with executor=nullptr because
104
    // the wrapping `coro::Coro` doesn't extend PromiseBase and the
105
    // root-promise chain stops at the user's CoroTask.
106
    if (task.handle()) {
485!
107
        task.handle().promise().set_executor(executor_.get());
485!
108
    }
485✔
109
    auto coro = wrapper(std::move(task), promise, executor_.get(), tid);
485!
110
    TaskIndex id = executor_->enqueue_tracked(std::move(coro), name, tid);
485!
111

112
    {
113
        std::lock_guard<std::mutex> lock(futures_mutex_);
485!
114
        cleanup_completed_futures();
485!
115
        outstanding_futures_.push_back(future);
485!
116
    }
485✔
117

118
    return TaskHandle{future, id, std::move(name)};
485!
119
}
487✔
120

121
void Runtime::wait_all() {
99✔
122
    std::vector<std::shared_future<void>> futures;
99✔
123
    {
124
        std::lock_guard<std::mutex> lock(futures_mutex_);
99!
125
        futures = std::move(outstanding_futures_);
99✔
126
        outstanding_futures_.clear();
99✔
127
    }
99✔
128
    for (auto& f : futures) {
135✔
129
        f.wait();
36!
130
    }
131
}
99✔
132

133
void Runtime::cleanup_completed_futures() {
570✔
134
    outstanding_futures_.erase(
1,140✔
135
        std::remove_if(outstanding_futures_.begin(), outstanding_futures_.end(),
570✔
136
                       [](const std::shared_future<void>& f) {
403✔
137
                           return f.wait_for(std::chrono::seconds(0)) ==
403✔
138
                                  std::future_status::ready;
139
                       }),
140
        outstanding_futures_.end());
570✔
141
}
570✔
142

143
ExecutorProgress Runtime::get_progress() const {
19✔
144
    return executor_->get_progress();
19✔
145
}
146

147
bool Runtime::is_responsive() const { return executor_->is_responsive(); }
2✔
148

149
void Runtime::set_global_timeout(std::chrono::milliseconds timeout) {
×
150
    if (!watchdog_) {
×
NEW
151
        throw DFTUtilsException(
×
152
            ErrorCode::PIPELINE,
UNCOV
153
            "Cannot set timeout: Runtime created without watchdog");
×
154
    }
155
    watchdog_->set_global_timeout(timeout);
×
156
}
×
157

158
void Runtime::set_default_task_timeout(std::chrono::milliseconds timeout) {
×
159
    if (!watchdog_) {
×
NEW
160
        throw DFTUtilsException(
×
161
            ErrorCode::PIPELINE,
UNCOV
162
            "Cannot set timeout: Runtime created without watchdog");
×
163
    }
164
    watchdog_->set_default_task_timeout(timeout);
×
165
}
×
166

167
void Runtime::shutdown() {
811✔
168
    bool expected = false;
811✔
169
    if (!shutdown_called_.compare_exchange_strong(expected, true)) return;
811✔
170
    if (watchdog_) watchdog_->stop();
430✔
171
    if (executor_) executor_->shutdown();
430!
172
}
811✔
173

174
std::size_t Runtime::threads() const { return threads_; }
153✔
175

176
std::size_t Runtime::io_threads() const {
×
177
    return executor_ ? executor_->get_io_pool_size() : 0;
×
178
}
179

180
}  // namespace dftracer::utils
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc