• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 23529483807

25 Mar 2026 07:17AM UTC coverage: 48.515% (-1.6%) from 50.098%
23529483807

Pull #57

github

web-flow
Merge 5b1e117ad into 38f9f3616
Pull Request #57: feat(comparator): add pairwise traces comparator

18829 of 49412 branches covered (38.11%)

Branch coverage included in aggregate %.

1584 of 1933 new or added lines in 14 files covered. (81.95%)

3552 existing lines in 135 files now uncovered.

18474 of 27477 relevant lines covered (67.23%)

241072.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

13.0
/src/dftracer/utils/server/trace_index.cpp
1
#include <dftracer/utils/core/common/filesystem.h>
2
#include <dftracer/utils/core/common/logging.h>
3
#include <dftracer/utils/core/coro/channel.h>
4
#include <dftracer/utils/core/io/io_backend.h>
5
#include <dftracer/utils/core/pipeline/pipeline.h>
6
#include <dftracer/utils/core/pipeline/pipeline_config.h>
7
#include <dftracer/utils/core/tasks/coro_scope.h>
8
#include <dftracer/utils/core/tasks/task.h>
9
#include <dftracer/utils/server/trace_index.h>
10
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
11
#include <dftracer/utils/utilities/composites/dft/metadata_collector_utility.h>
12
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
13
#include <dftracer/utils/utilities/indexer/index_builder_utility.h>
14
#include <dftracer/utils/utilities/indexer/index_database.h>
15
#include <dftracer/utils/utilities/indexer/internal/helpers.h>
16

17
#include <cinttypes>
18
#include <limits>
19

20
namespace dftracer::utils::server {
21

22
using namespace dftracer::utils::utilities::composites::dft;
23
using namespace dftracer::utils::utilities::composites::dft::indexing;
24
using namespace dftracer::utils::utilities::filesystem;
25
namespace indexer = dftracer::utils::utilities::indexer;
26

27
TraceIndex::TraceIndex(const std::string& directory,
16!
28
                       const std::string& index_dir, std::size_t max_concurrent)
29
    : directory_(directory),
8✔
30
      index_dir_(index_dir),
8!
31
      max_concurrent_(max_concurrent == 0 ? 8 : max_concurrent) {}
8✔
32

33
coro::CoroTask<void> TraceIndex::initialize() {
49!
34
    PatternDirectoryScannerUtility scanner;
21!
35
    PatternDirectoryScannerUtilityInput scan_input{
42!
36
        directory_, {".pfw", ".pfw.gz"}, false};
21!
37
    auto entries = co_await scanner.process(scan_input);
28!
38

39
    files_.clear();
7✔
40
    path_to_index_.clear();
7✔
41
    files_.reserve(entries.size());
7!
42

43
    global_min_ts_ = std::numeric_limits<std::uint64_t>::max();
7✔
44
    global_max_ts_ = 0;
7✔
45

46
    std::vector<std::size_t> needs_build;
7✔
47
    std::vector<std::size_t> large_files;
7✔
48
    std::size_t small_count = 0;
7✔
49

50
    for (const auto& entry : entries) {
14✔
51
        FileInfo info;
7✔
52
        info.path = entry.path.string();
7!
53
        info.idx_path = internal::determine_index_path(info.path, index_dir_);
7!
54

55
        std::error_code ec;
7✔
56
        auto fsize = fs::file_size(info.path, ec);
7!
57
        info.compressed_size = (!ec && fsize > 0) ? fsize : 0;
7!
58
        info.is_small = info.compressed_size > 0 &&
14!
59
                        info.compressed_size < INDEX_SIZE_THRESHOLD;
7✔
60

61
        std::size_t idx = files_.size();
7✔
62
        path_to_index_[info.path] = idx;
7!
63

64
        if (info.is_small) {
7!
65
            info.has_bloom_data = false;
7✔
66
            info.has_checkpoint_index = false;
7✔
67
            info.size_mb =
7✔
68
                static_cast<double>(info.compressed_size) / (1024.0 * 1024.0);
7✔
69
            small_count++;
7✔
70
        } else {
7✔
UNCOV
71
            info.has_bloom_data = fs::exists(info.idx_path);
×
UNCOV
72
            info.has_checkpoint_index = fs::exists(info.idx_path);
×
UNCOV
73
            if (!info.has_bloom_data) {
×
UNCOV
74
                needs_build.push_back(idx);
×
UNCOV
75
            } else {
×
UNCOV
76
                large_files.push_back(idx);
×
77
            }
78
        }
79

80
        files_.push_back(std::move(info));
7!
81
    }
7✔
82

83
    if (small_count > 0) {
7✔
84
        DFTRACER_UTILS_LOG_INFO(
6!
85
            "TraceIndex: %zu small file(s) (< %zu bytes) will be "
86
            "streamed directly (no sidecar indexes)",
87
            small_count, INDEX_SIZE_THRESHOLD);
88
    }
6✔
89

90
    if (!needs_build.empty() || !large_files.empty()) {
7!
UNCOV
91
        auto pipeline_config =
×
UNCOV
92
            PipelineConfig()
×
UNCOV
93
                .with_name("TraceIndex Init")
×
UNCOV
94
                .with_compute_threads(max_concurrent_)
×
UNCOV
95
                .with_watchdog(false)
×
UNCOV
96
                .with_global_timeout(std::chrono::seconds(0))
×
UNCOV
97
                .with_task_timeout(std::chrono::seconds(0))
×
UNCOV
98
                .with_io_backend(io::IoBackendType::THREADPOOL)
×
UNCOV
99
                .with_io_batch_size(1);
×
100

UNCOV
101
        Pipeline pipeline(pipeline_config);
×
102

UNCOV
103
        auto* files_ptr = &files_;
×
UNCOV
104
        auto* needs_build_ptr = &needs_build;
×
UNCOV
105
        auto* large_files_ptr = &large_files;
×
UNCOV
106
        auto* global_min_ts_ptr = &global_min_ts_;
×
UNCOV
107
        auto* global_max_ts_ptr = &global_max_ts_;
×
UNCOV
108
        std::string index_dir = index_dir_;
×
UNCOV
109
        std::size_t max_concurrent = max_concurrent_;
×
110

UNCOV
111
        auto init_task = make_task(
×
112
            [files_ptr, needs_build_ptr, large_files_ptr, global_min_ts_ptr,
×
UNCOV
113
             global_max_ts_ptr, index_dir,
×
UNCOV
114
             max_concurrent](CoroScope& ctx) -> coro::CoroTask<void> {
×
UNCOV
115
                if (!needs_build_ptr->empty()) {
×
UNCOV
116
                    DFTRACER_UTILS_LOG_INFO(
×
117
                        "TraceIndex: building index for %zu file(s) ...",
118
                        needs_build_ptr->size());
119

UNCOV
120
                    auto file_chan =
×
UNCOV
121
                        coro::make_channel<std::size_t>(max_concurrent * 2);
×
122

123
                    co_await ctx.scope([file_chan, files_ptr, needs_build_ptr,
×
UNCOV
124
                                        index_dir,
×
UNCOV
125
                                        max_concurrent](CoroScope& scope)
×
UNCOV
126
                                           -> coro::CoroTask<void> {
×
UNCOV
127
                        scope.spawn(
×
128
                            [ch = file_chan->producer(), needs_build_ptr](
×
UNCOV
129
                                CoroScope&) mutable -> coro::CoroTask<void> {
×
UNCOV
130
                                auto guard = ch.guard();
×
UNCOV
131
                                for (auto idx : *needs_build_ptr) {
×
UNCOV
132
                                    if (!co_await ch.send(idx)) co_return;
×
UNCOV
133
                                }
×
UNCOV
134
                                co_return;
×
135
                            });
×
136

UNCOV
137
                        for (std::size_t w = 0; w < max_concurrent; ++w) {
×
UNCOV
138
                            scope.spawn(
×
139
                                [file_chan, files_ptr, index_dir](
×
UNCOV
140
                                    CoroScope&) -> coro::CoroTask<void> {
×
UNCOV
141
                                    while (auto fi_opt =
×
UNCOV
142
                                               co_await file_chan->receive()) {
×
UNCOV
143
                                        std::size_t fi = *fi_opt;
×
UNCOV
144
                                        auto* info = &(*files_ptr)[fi];
×
145

UNCOV
146
                                        indexer::IndexBuilderUtility builder;
×
UNCOV
147
                                        auto config =
×
UNCOV
148
                                            indexer::IndexBuildConfig::for_file(
×
UNCOV
149
                                                info->path)
×
UNCOV
150
                                                .with_index_dir(index_dir)
×
UNCOV
151
                                                .with_bloom(true)
×
UNCOV
152
                                                .with_index_threshold(0);
×
UNCOV
153
                                        auto result =
×
UNCOV
154
                                            co_await builder.process(config);
×
155

UNCOV
156
                                        if (result.success) {
×
UNCOV
157
                                            info->idx_path =
×
UNCOV
158
                                                internal::determine_index_path(
×
UNCOV
159
                                                    info->path, index_dir);
×
UNCOV
160
                                            info->has_bloom_data = true;
×
UNCOV
161
                                            info->has_checkpoint_index =
×
UNCOV
162
                                                fs::exists(info->idx_path);
×
UNCOV
163
                                        } else {
×
UNCOV
164
                                            DFTRACER_UTILS_LOG_WARN(
×
165
                                                "TraceIndex: failed to "
166
                                                "index %s: %s",
167
                                                info->path.c_str(),
168
                                                result.error_message.c_str());
169
                                        }
UNCOV
170
                                    }
×
UNCOV
171
                                    co_return;
×
172
                                });
×
UNCOV
173
                        }
×
UNCOV
174
                        co_return;
×
175
                    });
×
176

UNCOV
177
                    for (auto idx : *needs_build_ptr) {
×
UNCOV
178
                        if ((*files_ptr)[idx].has_bloom_data) {
×
UNCOV
179
                            large_files_ptr->push_back(idx);
×
UNCOV
180
                        }
×
UNCOV
181
                    }
×
UNCOV
182
                }
×
183

UNCOV
184
                if (!large_files_ptr->empty()) {
×
UNCOV
185
                    auto meta_chan =
×
UNCOV
186
                        coro::make_channel<std::size_t>(max_concurrent * 2);
×
187

188
                    co_await ctx.scope([meta_chan, files_ptr, large_files_ptr,
×
UNCOV
189
                                        max_concurrent](CoroScope& scope)
×
UNCOV
190
                                           -> coro::CoroTask<void> {
×
UNCOV
191
                        scope.spawn(
×
192
                            [ch = meta_chan->producer(), large_files_ptr](
×
UNCOV
193
                                CoroScope&) mutable -> coro::CoroTask<void> {
×
UNCOV
194
                                auto guard = ch.guard();
×
UNCOV
195
                                for (auto idx : *large_files_ptr) {
×
UNCOV
196
                                    if (!co_await ch.send(idx)) co_return;
×
UNCOV
197
                                }
×
UNCOV
198
                                co_return;
×
199
                            });
×
200

UNCOV
201
                        for (std::size_t w = 0; w < max_concurrent; ++w) {
×
202
                            scope.spawn([meta_chan, files_ptr](CoroScope&)
×
UNCOV
203
                                            -> coro::CoroTask<void> {
×
UNCOV
204
                                while (auto fi_opt =
×
UNCOV
205
                                           co_await meta_chan->receive()) {
×
UNCOV
206
                                    std::size_t fi = *fi_opt;
×
UNCOV
207
                                    auto* info = &(*files_ptr)[fi];
×
208

UNCOV
209
                                    if (info->has_bloom_data) {
×
210
                                        try {
UNCOV
211
                                            indexer::IndexDatabase idx_db(
×
UNCOV
212
                                                info->idx_path);
×
UNCOV
213
                                            auto logical = indexer::internal::
×
UNCOV
214
                                                get_logical_path(info->path);
×
UNCOV
215
                                            int fid = idx_db.get_file_info_id(
×
216
                                                logical);
UNCOV
217
                                            if (fid >= 0) {
×
UNCOV
218
                                                auto tb =
×
UNCOV
219
                                                    idx_db.query_time_bounds(
×
UNCOV
220
                                                        fid);
×
UNCOV
221
                                                if (tb.valid) {
×
UNCOV
222
                                                    info->min_timestamp_us =
×
UNCOV
223
                                                        tb.min_timestamp_us;
×
UNCOV
224
                                                    info->max_timestamp_us =
×
UNCOV
225
                                                        tb.max_timestamp_us;
×
UNCOV
226
                                                }
×
UNCOV
227
                                            }
×
UNCOV
228
                                        } catch (const std::exception& e) {
×
UNCOV
229
                                            DFTRACER_UTILS_LOG_WARN(
×
230
                                                "TraceIndex: failed to "
231
                                                "read time bounds from "
232
                                                "%s: %s",
233
                                                info->idx_path.c_str(),
234
                                                e.what());
UNCOV
235
                                        }
×
UNCOV
236
                                    }
×
237

UNCOV
238
                                    auto meta_input =
×
UNCOV
239
                                        MetadataCollectorUtilityInput::
×
UNCOV
240
                                            from_file(info->path)
×
UNCOV
241
                                                .with_index(info->idx_path);
×
UNCOV
242
                                    auto metadata =
×
UNCOV
243
                                        co_await MetadataCollectorUtility{}
×
UNCOV
244
                                            .process(meta_input);
×
UNCOV
245
                                    if (metadata.success) {
×
UNCOV
246
                                        info->uncompressed_size =
×
UNCOV
247
                                            metadata.uncompressed_size;
×
UNCOV
248
                                        info->num_checkpoints =
×
UNCOV
249
                                            metadata.num_checkpoints;
×
UNCOV
250
                                        info->checkpoint_size =
×
UNCOV
251
                                            metadata.checkpoint_size;
×
UNCOV
252
                                        info->compressed_size =
×
UNCOV
253
                                            metadata.compressed_size;
×
UNCOV
254
                                        info->num_lines = metadata.num_lines;
×
UNCOV
255
                                        info->size_mb = metadata.size_mb;
×
UNCOV
256
                                    }
×
UNCOV
257
                                }
×
UNCOV
258
                                co_return;
×
259
                            });
×
UNCOV
260
                        }
×
UNCOV
261
                        co_return;
×
262
                    });
×
263

UNCOV
264
                    for (auto fi : *large_files_ptr) {
×
UNCOV
265
                        const auto& info = (*files_ptr)[fi];
×
UNCOV
266
                        if (info.min_timestamp_us > 0 &&
×
UNCOV
267
                            info.min_timestamp_us < *global_min_ts_ptr)
×
UNCOV
268
                            *global_min_ts_ptr = info.min_timestamp_us;
×
UNCOV
269
                        if (info.max_timestamp_us > *global_max_ts_ptr)
×
UNCOV
270
                            *global_max_ts_ptr = info.max_timestamp_us;
×
UNCOV
271
                    }
×
UNCOV
272
                }
×
273

UNCOV
274
                co_return;
×
275
            },
×
UNCOV
276
            "TraceIndexInit");
×
277

UNCOV
278
        pipeline.set_source(init_task);
×
UNCOV
279
        pipeline.set_destination(init_task);
×
UNCOV
280
        pipeline.execute();
×
UNCOV
281
    }
×
282

283
    DFTRACER_UTILS_LOG_INFO("TraceIndex: found %zu trace files in %s",
7!
284
                            files_.size(), directory_.c_str());
285
    if (global_max_ts_ > 0) {
7!
UNCOV
286
        DFTRACER_UTILS_LOG_INFO("TraceIndex: global time range [%" PRIu64
×
287
                                ", %" PRIu64 "] us",
288
                                global_min_ts_, global_max_ts_);
UNCOV
289
    }
×
290
}
49!
291

292
const TraceIndex::FileInfo* TraceIndex::find_file(
3✔
293
    const std::string& path) const {
294
    auto it = path_to_index_.find(path);
3✔
295
    if (it == path_to_index_.end()) return nullptr;
3✔
296
    return &files_[it->second];
2✔
297
}
3✔
298

299
const TraceIndex::FileInfo* TraceIndex::file_at(std::size_t index) const {
2✔
300
    if (index >= files_.size()) return nullptr;
2✔
301
    return &files_[index];
1✔
302
}
2✔
303

304
}  // namespace dftracer::utils::server
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc