• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28693295402

04 Jul 2026 03:17AM UTC coverage: 52.408% (+0.1%) from 52.278%
28693295402

push

github

hariharan-devarajan
feat: silence noisy warnings on aarch64

37318 of 92666 branches covered (40.27%)

Branch coverage included in aggregate %.

33462 of 42389 relevant lines covered (78.94%)

20557.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.38
/src/dftracer/utils/core/runtime.cpp
1
#include <dftracer/utils/core/common/error.h>
2
#include <dftracer/utils/core/common/logging.h>
3
#include <dftracer/utils/core/common/platform_compat.h>
4
#include <dftracer/utils/core/env.h>
5
#include <dftracer/utils/core/runtime.h>
6

7
#include <algorithm>
8
#include <cstdlib>
9
#include <stdexcept>
10
#include <string>
11
#include <string_view>
12
#include <thread>
13

14
namespace dftracer::utils {
15

16
namespace {
17
// Resolve the worker-thread count. DFTRACER_UTILS_THREADS overrides everything
18
// (a debugging lever: force a fixed count, e.g. 1 for a single-threaded async
19
// loop). Otherwise 0 means hardware_concurrency.
20
std::size_t resolve_threads(std::size_t requested) {
874✔
21
    if (auto env = Env::get<std::string_view>("DFTRACER_UTILS_THREADS");
1,748!
22
        env.has_value()) {
874✔
23
        const long long n =
24
            std::strtoll(std::string(*env).c_str(), nullptr, 10);
×
25
        if (n > 0) return static_cast<std::size_t>(n);
×
26
    }
27
    return requested == 0 ? dftracer_utils_hardware_concurrency() : requested;
874✔
28
}
437✔
29
}  // namespace
30

31
Runtime::Runtime(std::size_t threads) : threads_(resolve_threads(threads)) {
438!
32
    ExecutorConfig config;
292!
33
    config.num_threads = threads_;
292✔
34
    executor_ = std::make_unique<Executor>(config);
292!
35
    executor_->start();
292!
36

37
    watchdog_ = std::make_unique<Watchdog>();
292!
38
    watchdog_->set_executor(executor_.get());
292!
39
}
438✔
40

41
Runtime::Runtime(const ExecutorConfig& config, bool enable_watchdog)
234✔
42
    : threads_(resolve_threads(config.num_threads)) {
234!
43
    ExecutorConfig cfg = config;
156✔
44
    cfg.num_threads = threads_;
156✔
45
    executor_ = std::make_unique<Executor>(cfg);
156!
46
    executor_->start();
156!
47

48
    if (enable_watchdog) {
156!
49
        watchdog_ = std::make_unique<Watchdog>();
156!
50
        watchdog_->set_executor(executor_.get());
156!
51
    }
78✔
52
}
234✔
53

54
Runtime::Runtime(const ExecutorConfig& config,
639✔
55
                 std::unique_ptr<Watchdog> watchdog)
213✔
56
    : threads_(resolve_threads(config.num_threads)) {
639!
57
    ExecutorConfig cfg = config;
426✔
58
    cfg.num_threads = threads_;
426✔
59
    executor_ = std::make_unique<Executor>(cfg);
426!
60
    executor_->start();
426!
61

62
    watchdog_ = std::move(watchdog);
426✔
63
    if (watchdog_) {
426✔
64
        watchdog_->set_executor(executor_.get());
162!
65
    }
81✔
66
}
639✔
67

68
Runtime::~Runtime() { shutdown(); }
1,311!
69

70
TaskHandle Runtime::submit(coro::CoroTask<void> task, std::string name) {
976✔
71
    if (shutdown_called_.load(std::memory_order_acquire)) {
976✔
72
        throw DFTUtilsException(ErrorCode::PIPELINE, "Runtime is shut down");
4!
73
    }
74
    if (name.empty()) {
972✔
75
        name = "task-" + std::to_string(task_name_counter_++);
6!
76
    }
3✔
77

78
    auto promise = std::make_shared<std::promise<void>>();
972!
79
    auto future = promise->get_future().share();
972!
80
    auto tid = std::make_shared<std::atomic<TaskIndex>>(-1);
972!
81

82
    auto wrapper =
486✔
83
        [](coro::CoroTask<void> t, std::shared_ptr<std::promise<void>> p,
3,882!
84
           Executor* exec,
85
           std::shared_ptr<std::atomic<TaskIndex>> task_id) -> coro::Coro {
486!
86
        try {
87
            co_await std::move(t);
2,424!
88
            t = coro::CoroTask<void>{
966!
89
                std::coroutine_handle<coro::CoroTask<void>::promise_type>{}};
483✔
90
            exec->mark_coro_completed(task_id->load(std::memory_order_acquire));
483!
91
        } catch (...) {
486✔
92
            t = coro::CoroTask<void>{
6!
93
                std::coroutine_handle<coro::CoroTask<void>::promise_type>{}};
3✔
94
            exec->mark_coro_completed(task_id->load(std::memory_order_acquire));
3!
95
            p->set_exception(std::current_exception());
3!
96
            co_return;
3✔
97
        }
3!
98
        p->set_value();
483!
99
    };
1,941!
100

101
    // Set the executor on the task's promise so awaitables (e.g. channels)
102
    // that capture `get_root_promise()->get_executor()` can schedule
103
    // resumption. Without this, awaiters end up with executor=nullptr because
104
    // the wrapping `coro::Coro` doesn't extend PromiseBase and the
105
    // root-promise chain stops at the user's CoroTask.
106
    if (task.handle()) {
972✔
107
        task.handle().promise().set_executor(executor_.get());
972!
108
    }
486✔
109
    auto coro = wrapper(std::move(task), promise, executor_.get(), tid);
1,458!
110
    TaskIndex id = executor_->enqueue_tracked(std::move(coro), name, tid);
972!
111

112
    {
113
        std::lock_guard<std::mutex> lock(futures_mutex_);
972!
114
        cleanup_completed_futures();
972!
115
        outstanding_futures_.push_back(future);
972!
116
    }
972✔
117

118
    return TaskHandle{future, id, std::move(name)};
1,458!
119
}
974✔
120

121
void Runtime::wait_all() {
200✔
122
    std::vector<std::shared_future<void>> futures;
200✔
123
    {
124
        std::lock_guard<std::mutex> lock(futures_mutex_);
200!
125
        futures = std::move(outstanding_futures_);
200✔
126
        outstanding_futures_.clear();
200✔
127
    }
200✔
128
    for (auto& f : futures) {
276✔
129
        f.wait();
76!
130
    }
131
}
200✔
132

133
void Runtime::cleanup_completed_futures() {
1,144✔
134
    outstanding_futures_.erase(
2,288!
135
        std::remove_if(outstanding_futures_.begin(), outstanding_futures_.end(),
1,144!
136
                       [](const std::shared_future<void>& f) {
827✔
137
                           return f.wait_for(std::chrono::seconds(0)) ==
827!
138
                                  std::future_status::ready;
413✔
139
                       }),
140
        outstanding_futures_.end());
1,144✔
141
}
1,144✔
142

143
ExecutorProgress Runtime::get_progress() const {
38✔
144
    return executor_->get_progress();
38✔
145
}
146

147
bool Runtime::is_responsive() const { return executor_->is_responsive(); }
4✔
148

149
void Runtime::set_global_timeout(std::chrono::milliseconds timeout) {
×
150
    if (!watchdog_) {
×
151
        throw DFTUtilsException(
×
152
            ErrorCode::PIPELINE,
153
            "Cannot set timeout: Runtime created without watchdog");
×
154
    }
155
    watchdog_->set_global_timeout(timeout);
×
156
}
×
157

158
void Runtime::set_default_task_timeout(std::chrono::milliseconds timeout) {
×
159
    if (!watchdog_) {
×
160
        throw DFTUtilsException(
×
161
            ErrorCode::PIPELINE,
162
            "Cannot set timeout: Runtime created without watchdog");
×
163
    }
164
    watchdog_->set_default_task_timeout(timeout);
×
165
}
×
166

167
void Runtime::shutdown() {
1,650✔
168
    bool expected = false;
1,650✔
169
    if (!shutdown_called_.compare_exchange_strong(expected, true)) return;
1,650✔
170
    if (watchdog_) watchdog_->stop();
874!
171
    if (executor_) executor_->shutdown();
874!
172
}
825✔
173

174
std::size_t Runtime::threads() const { return threads_; }
308✔
175

176
std::size_t Runtime::io_threads() const {
×
177
    return executor_ ? executor_->get_io_pool_size() : 0;
×
178
}
179

180
}  // namespace dftracer::utils
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc