• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26043728131

18 May 2026 03:37PM UTC coverage: 51.706% (-0.4%) from 52.076%
26043728131

push

github

hariharan-devarajan
feat(perf): performance improvements for parallel reading, indexing, and aggregation

Indexer
- Streaming parse-and-emit worker pipeline with bounded memory usage
- Concurrent SST artifact ingestion with staging support
- Gzip member slicing for parallel indexing
- Lazy decoding for compressed value counts
- Bypass DOM wrapper for indexer hot path (simdjson on_demand)
- Decoupled write workers from parse workers
- --rebuild-summaries flag and optimized root summary rebuild

Aggregator / MPI
- Task-based DAG execution for aggregator pipeline
- Shared staging for multi-node artifact relocation
- Per-node thread scaling to avoid oversubscription
- Unified distributed aggregation tracking, removed manifest consolidation
- Deterministic aggregation and intra-file parallelism

Trace reader / query
- Compiled predicate evaluation for AND-of-EQ queries
- Uniform-match shortcut for AND-of-EQ queries
- Line-range support for work items and checkpoint processing
- Optimized chunk pruning and checkpoint handling

Replay
- Pipelined replay with coroutines and channels
- JsonParser-based trace processing
- Optimized string handling and i/o buffering

Organize / writer / dft
- Parallel slice creation and merging in organize visitor
- Inline indexer in organize
- Gzip member tracking in writer
- Coroutine-based event dispatcher with extracted parse logic
- Batch flushing in organize visitor

Arrow / call_tree
- Optimized arrow conversion
- Arrow IPC support and improved save/load in call_tree

Build / infrastructure
- zlib-ng option, system simdjson fallback
- cgroup v1/v2 memory limit detection
- Auto-computed per-file memory estimates and batch sizes
- CI: perf branch trigger, formatting

Docs
- Rewritten indexer and trace reader API references

35907 of 90345 branches covered (39.74%)

Branch coverage included in aggregate %.

16869 of 21880 new or added lines in 137 files covered. (77.1%)

273 existing lines in 39 files now uncovered.

32021 of 41028 relevant lines covered (78.05%)

13164.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.19
/src/dftracer/utils/core/runtime.cpp
1
#include <dftracer/utils/core/common/logging.h>
2
#include <dftracer/utils/core/common/platform_compat.h>
3
#include <dftracer/utils/core/runtime.h>
4

5
#include <algorithm>
6
#include <stdexcept>
7
#include <thread>
8

9
namespace dftracer::utils {
10

11
Runtime::Runtime(std::size_t threads)
435✔
12
    : threads_(threads == 0 ? dftracer_utils_hardware_concurrency() : threads) {
435!
13
    ExecutorConfig config;
290!
14
    config.num_threads = threads_;
290✔
15
    executor_ = std::make_unique<Executor>(config);
290!
16
    executor_->start();
290!
17

18
    watchdog_ = std::make_unique<Watchdog>();
290!
19
    watchdog_->set_executor(executor_.get());
290!
20
}
290✔
21

22
Runtime::Runtime(const ExecutorConfig& config, bool enable_watchdog)
234✔
23
    : threads_(config.num_threads == 0 ? dftracer_utils_hardware_concurrency()
156!
24
                                       : config.num_threads) {
311✔
25
    executor_ = std::make_unique<Executor>(config);
156!
26
    executor_->start();
156!
27

28
    if (enable_watchdog) {
156!
29
        watchdog_ = std::make_unique<Watchdog>();
156!
30
        watchdog_->set_executor(executor_.get());
156!
31
    }
78✔
32
}
156✔
33

34
Runtime::Runtime(const ExecutorConfig& config,
615✔
35
                 std::unique_ptr<Watchdog> watchdog)
205✔
36
    : threads_(config.num_threads == 0 ? dftracer_utils_hardware_concurrency()
410!
37
                                       : config.num_threads) {
818✔
38
    executor_ = std::make_unique<Executor>(config);
410!
39
    executor_->start();
410!
40

41
    watchdog_ = std::move(watchdog);
410✔
42
    if (watchdog_) {
410✔
43
        watchdog_->set_executor(executor_.get());
162!
44
    }
81✔
45
}
410✔
46

47
Runtime::~Runtime() { shutdown(); }
1,284!
48

49
TaskHandle Runtime::submit(coro::CoroTask<void> task, std::string name) {
946✔
50
    if (shutdown_called_.load(std::memory_order_acquire)) {
946✔
51
        throw std::runtime_error("Runtime is shut down");
4!
52
    }
53
    if (name.empty()) {
942✔
54
        name = "task-" + std::to_string(task_name_counter_++);
6!
55
    }
3✔
56

57
    auto promise = std::make_shared<std::promise<void>>();
942!
58
    auto future = promise->get_future().share();
942!
59
    auto tid = std::make_shared<std::atomic<TaskIndex>>(-1);
942!
60

61
    auto wrapper =
471✔
62
        [](coro::CoroTask<void> t, std::shared_ptr<std::promise<void>> p,
3,762!
63
           Executor* exec,
64
           std::shared_ptr<std::atomic<TaskIndex>> task_id) -> coro::Coro {
471!
65
        try {
66
            co_await std::move(t);
2,349!
67
            t = coro::CoroTask<void>{
936!
68
                std::coroutine_handle<coro::CoroTask<void>::promise_type>{}};
468✔
69
            exec->mark_coro_completed(task_id->load(std::memory_order_acquire));
468!
70
        } catch (...) {
471✔
71
            t = coro::CoroTask<void>{
6!
72
                std::coroutine_handle<coro::CoroTask<void>::promise_type>{}};
3✔
73
            exec->mark_coro_completed(task_id->load(std::memory_order_acquire));
3!
74
            p->set_exception(std::current_exception());
3!
75
            co_return;
3✔
76
        }
3!
77
        p->set_value();
468!
78
    };
1,881!
79

80
    // Set the executor on the task's promise so awaitables (e.g. channels)
81
    // that capture `get_root_promise()->get_executor()` can schedule
82
    // resumption. Without this, awaiters end up with executor=nullptr because
83
    // the wrapping `coro::Coro` doesn't extend PromiseBase and the
84
    // root-promise chain stops at the user's CoroTask.
85
    if (task.handle()) {
942✔
86
        task.handle().promise().set_executor(executor_.get());
942!
87
    }
471✔
88
    auto coro = wrapper(std::move(task), promise, executor_.get(), tid);
1,413!
89
    TaskIndex id = executor_->enqueue_tracked(std::move(coro), name, tid);
942!
90

91
    {
92
        std::lock_guard<std::mutex> lock(futures_mutex_);
942!
93
        cleanup_completed_futures();
942!
94
        outstanding_futures_.push_back(future);
942!
95
    }
942✔
96

97
    return TaskHandle{future, id, std::move(name)};
1,413!
98
}
942✔
99

100
void Runtime::wait_all() {
198✔
101
    std::vector<std::shared_future<void>> futures;
198✔
102
    {
103
        std::lock_guard<std::mutex> lock(futures_mutex_);
198!
104
        futures = std::move(outstanding_futures_);
198✔
105
        outstanding_futures_.clear();
198✔
106
    }
198✔
107
    for (auto& f : futures) {
276✔
108
        f.wait();
78!
109
    }
110
}
198✔
111

112
void Runtime::cleanup_completed_futures() {
1,112✔
113
    outstanding_futures_.erase(
2,224!
114
        std::remove_if(outstanding_futures_.begin(), outstanding_futures_.end(),
1,112!
115
                       [](const std::shared_future<void>& f) {
836✔
116
                           return f.wait_for(std::chrono::seconds(0)) ==
836!
117
                                  std::future_status::ready;
424✔
118
                       }),
119
        outstanding_futures_.end());
1,112✔
120
}
1,112✔
121

122
ExecutorProgress Runtime::get_progress() const {
38✔
123
    return executor_->get_progress();
38✔
124
}
125

126
bool Runtime::is_responsive() const { return executor_->is_responsive(); }
4✔
127

128
void Runtime::set_global_timeout(std::chrono::milliseconds timeout) {
×
129
    if (!watchdog_) {
×
130
        throw std::runtime_error(
×
131
            "Cannot set timeout: Runtime created without watchdog");
132
    }
133
    watchdog_->set_global_timeout(timeout);
×
134
}
×
135

136
void Runtime::set_default_task_timeout(std::chrono::milliseconds timeout) {
×
137
    if (!watchdog_) {
×
138
        throw std::runtime_error(
×
139
            "Cannot set timeout: Runtime created without watchdog");
140
    }
141
    watchdog_->set_default_task_timeout(timeout);
×
142
}
×
143

144
void Runtime::shutdown() {
1,614✔
145
    bool expected = false;
1,614✔
146
    if (!shutdown_called_.compare_exchange_strong(expected, true)) return;
1,614✔
147
    if (watchdog_) watchdog_->stop();
856!
148
    if (executor_) executor_->shutdown();
856!
149
}
807✔
150

151
std::size_t Runtime::threads() const { return threads_; }
304✔
152

NEW
153
std::size_t Runtime::io_threads() const {
×
NEW
154
    return executor_ ? executor_->get_io_pool_size() : 0;
×
155
}
156

157
}  // namespace dftracer::utils
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc