• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26195612357

20 May 2026 11:19PM UTC coverage: 49.859% (-2.3%) from 52.2%
26195612357

push

github

hariharan-devarajan
feat(aggregator): improve system metrics scanning and persistence error handling

16041 of 43831 branches covered (36.6%)

Branch coverage included in aggregate %.

6 of 17 new or added lines in 2 files covered. (35.29%)

1072 existing lines in 104 files now uncovered.

21423 of 31309 relevant lines covered (68.42%)

13054.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.32
/src/dftracer/utils/core/runtime.cpp
1
#include <dftracer/utils/core/common/logging.h>
2
#include <dftracer/utils/core/common/platform_compat.h>
3
#include <dftracer/utils/core/runtime.h>
4

5
#include <algorithm>
6
#include <stdexcept>
7
#include <thread>
8

9
namespace dftracer::utils {
10

11
Runtime::Runtime(std::size_t threads)
145✔
12
    : threads_(threads == 0 ? dftracer_utils_hardware_concurrency() : threads) {
145✔
13
    ExecutorConfig config;
145✔
14
    config.num_threads = threads_;
145✔
15
    executor_ = std::make_unique<Executor>(config);
145!
16
    executor_->start();
145!
17

18
    watchdog_ = std::make_unique<Watchdog>();
145!
19
    watchdog_->set_executor(executor_.get());
145!
20
}
145✔
21

22
Runtime::Runtime(const ExecutorConfig& config, bool enable_watchdog)
78✔
23
    : threads_(config.num_threads == 0 ? dftracer_utils_hardware_concurrency()
78✔
24
                                       : config.num_threads) {
156✔
25
    executor_ = std::make_unique<Executor>(config);
78!
26
    executor_->start();
78!
27

28
    if (enable_watchdog) {
78!
29
        watchdog_ = std::make_unique<Watchdog>();
78!
30
        watchdog_->set_executor(executor_.get());
78!
31
    }
32
}
78✔
33

34
Runtime::Runtime(const ExecutorConfig& config,
207✔
35
                 std::unique_ptr<Watchdog> watchdog)
207✔
36
    : threads_(config.num_threads == 0 ? dftracer_utils_hardware_concurrency()
207✔
37
                                       : config.num_threads) {
414✔
38
    executor_ = std::make_unique<Executor>(config);
207!
39
    executor_->start();
207!
40

41
    watchdog_ = std::move(watchdog);
207✔
42
    if (watchdog_) {
207✔
43
        watchdog_->set_executor(executor_.get());
81!
44
    }
45
}
207✔
46

47
Runtime::~Runtime() { shutdown(); }
430✔
48

49
TaskHandle Runtime::submit(coro::CoroTask<void> task, std::string name) {
473✔
50
    if (shutdown_called_.load(std::memory_order_acquire)) {
473✔
51
        throw std::runtime_error("Runtime is shut down");
2!
52
    }
53
    if (name.empty()) {
471✔
54
        name = "task-" + std::to_string(task_name_counter_++);
3!
55
    }
56

57
    auto promise = std::make_shared<std::promise<void>>();
471!
58
    auto future = promise->get_future().share();
471!
59
    auto tid = std::make_shared<std::atomic<TaskIndex>>(-1);
471!
60

61
    auto wrapper =
62
        [](coro::CoroTask<void> t, std::shared_ptr<std::promise<void>> p,
471!
63
           Executor* exec,
64
           std::shared_ptr<std::atomic<TaskIndex>> task_id) -> coro::Coro {
65
        try {
66
            co_await std::move(t);
67
            t = coro::CoroTask<void>{
68
                std::coroutine_handle<coro::CoroTask<void>::promise_type>{}};
69
            exec->mark_coro_completed(task_id->load(std::memory_order_acquire));
70
        } catch (...) {
71
            t = coro::CoroTask<void>{
72
                std::coroutine_handle<coro::CoroTask<void>::promise_type>{}};
73
            exec->mark_coro_completed(task_id->load(std::memory_order_acquire));
74
            p->set_exception(std::current_exception());
75
            co_return;
76
        }
77
        p->set_value();
78
    };
942!
79

80
    // Set the executor on the task's promise so awaitables (e.g. channels)
81
    // that capture `get_root_promise()->get_executor()` can schedule
82
    // resumption. Without this, awaiters end up with executor=nullptr because
83
    // the wrapping `coro::Coro` doesn't extend PromiseBase and the
84
    // root-promise chain stops at the user's CoroTask.
85
    if (task.handle()) {
471!
86
        task.handle().promise().set_executor(executor_.get());
471✔
87
    }
88
    auto coro = wrapper(std::move(task), promise, executor_.get(), tid);
942!
89
    TaskIndex id = executor_->enqueue_tracked(std::move(coro), name, tid);
471!
90

91
    {
92
        std::lock_guard<std::mutex> lock(futures_mutex_);
471!
93
        cleanup_completed_futures();
471!
94
        outstanding_futures_.push_back(future);
471!
95
    }
471✔
96

97
    return TaskHandle{future, id, std::move(name)};
942✔
98
}
471✔
99

100
void Runtime::wait_all() {
99✔
101
    std::vector<std::shared_future<void>> futures;
99✔
102
    {
103
        std::lock_guard<std::mutex> lock(futures_mutex_);
99!
104
        futures = std::move(outstanding_futures_);
99✔
105
        outstanding_futures_.clear();
99✔
106
    }
99✔
107
    for (auto& f : futures) {
138✔
108
        f.wait();
39!
109
    }
110
}
99✔
111

112
void Runtime::cleanup_completed_futures() {
556✔
113
    outstanding_futures_.erase(
1,112!
114
        std::remove_if(outstanding_futures_.begin(), outstanding_futures_.end(),
556!
115
                       [](const std::shared_future<void>& f) {
411✔
116
                           return f.wait_for(std::chrono::seconds(0)) ==
411!
117
                                  std::future_status::ready;
411✔
118
                       }),
119
        outstanding_futures_.end());
556✔
120
}
556✔
121

122
ExecutorProgress Runtime::get_progress() const {
19✔
123
    return executor_->get_progress();
19✔
124
}
125

126
bool Runtime::is_responsive() const { return executor_->is_responsive(); }
2✔
127

128
void Runtime::set_global_timeout(std::chrono::milliseconds timeout) {
×
129
    if (!watchdog_) {
×
130
        throw std::runtime_error(
×
UNCOV
131
            "Cannot set timeout: Runtime created without watchdog");
×
132
    }
133
    watchdog_->set_global_timeout(timeout);
×
134
}
×
135

136
void Runtime::set_default_task_timeout(std::chrono::milliseconds timeout) {
×
137
    if (!watchdog_) {
×
138
        throw std::runtime_error(
×
UNCOV
139
            "Cannot set timeout: Runtime created without watchdog");
×
140
    }
141
    watchdog_->set_default_task_timeout(timeout);
×
142
}
×
143

144
void Runtime::shutdown() {
811✔
145
    bool expected = false;
811✔
146
    if (!shutdown_called_.compare_exchange_strong(expected, true)) return;
811✔
147
    if (watchdog_) watchdog_->stop();
430!
148
    if (executor_) executor_->shutdown();
430!
149
}
150

151
std::size_t Runtime::threads() const { return threads_; }
152✔
152

153
std::size_t Runtime::io_threads() const {
×
154
    return executor_ ? executor_->get_io_pool_size() : 0;
×
155
}
156

157
}  // namespace dftracer::utils
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc