• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28521653886

01 Jul 2026 01:36PM UTC coverage: 50.92% (-1.4%) from 52.278%
28521653886

Pull #83

github

web-flow
Merge 9bdedb1e9 into 2efed6649
Pull Request #83: refactor and improve code QoL

31893 of 80049 branches covered (39.84%)

Branch coverage included in aggregate %.

789 of 1613 new or added lines in 87 files covered. (48.92%)

5007 existing lines in 181 files now uncovered.

32812 of 47024 relevant lines covered (69.78%)

9905.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.7
/src/dftracer/utils/server/trace_index.cpp
1
#include <dftracer/utils/core/common/filesystem.h>
2
#include <dftracer/utils/core/common/logging.h>
3
#include <dftracer/utils/core/coro/channel.h>
4
#include <dftracer/utils/core/io/io_backend.h>
5
#include <dftracer/utils/core/pipeline/pipeline.h>
6
#include <dftracer/utils/core/pipeline/pipeline_config.h>
7
#include <dftracer/utils/core/tasks/coro_scope.h>
8
#include <dftracer/utils/core/tasks/task.h>
9
#include <dftracer/utils/server/trace_index.h>
10
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
11
#include <dftracer/utils/utilities/composites/dft/metadata_collector_utility.h>
12
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
13
#include <dftracer/utils/utilities/indexer/index_builder_utility.h>
14
#include <dftracer/utils/utilities/indexer/index_database.h>
15
#include <dftracer/utils/utilities/indexer/internal/helpers.h>
16

17
#include <cinttypes>
18
#include <limits>
19

20
namespace dftracer::utils::server {
21

22
using namespace dftracer::utils::utilities::composites::dft;
23
using namespace dftracer::utils::utilities::composites::dft::indexing;
24
using namespace dftracer::utils::utilities::filesystem;
25
namespace indexer = dftracer::utils::utilities::indexer;
26

27
TraceIndex::TraceIndex(const std::string& directory,
24!
28
                       const std::string& index_dir, std::size_t max_concurrent)
29
    : directory_(directory),
8✔
30
      index_dir_(index_dir),
8!
31
      max_concurrent_(max_concurrent == 0 ? 8 : max_concurrent) {}
16!
32

33
coro::CoroTask<void> TraceIndex::initialize() {
49!
34
    PatternDirectoryScannerUtility scanner;
21!
35
    PatternDirectoryScannerUtilityInput scan_input{
42!
36
        directory_, {".pfw", ".pfw.gz"}, false};
21!
37
    auto entries = co_await scanner.process(scan_input);
28!
38

39
    files_.clear();
7✔
40
    path_to_index_.clear();
7✔
41
    files_.reserve(entries.size());
7!
42

43
    global_min_ts_ = std::numeric_limits<std::uint64_t>::max();
7✔
44
    global_max_ts_ = 0;
7✔
45

46
    std::vector<std::size_t> needs_build;
7✔
47
    std::vector<std::size_t> large_files;
7✔
48

49
    for (const auto& entry : entries) {
14✔
50
        FileInfo info;
7✔
51
        info.path = entry.path.string();
7!
52
        info.index_path = internal::determine_index_path(info.path, index_dir_);
7!
53

54
        std::error_code ec;
7✔
55
        auto fsize = fs::file_size(info.path, ec);
7!
56
        info.compressed_size = (!ec && fsize > 0) ? fsize : 0;
7!
57

58
        std::size_t idx = files_.size();
7✔
59
        path_to_index_[info.path] = idx;
7!
60

61
        info.has_bloom_data = fs::exists(info.index_path);
7!
62
        info.has_checkpoint_index = fs::exists(info.index_path);
7!
63
        if (!info.has_bloom_data) {
7!
64
            needs_build.push_back(idx);
7!
65
        } else {
7✔
UNCOV
66
            large_files.push_back(idx);
×
67
        }
68

69
        files_.push_back(std::move(info));
7!
70
    }
7✔
71

72
    if (!needs_build.empty() || !large_files.empty()) {
7!
73
        auto pipeline_config =
6✔
74
            PipelineConfig()
12!
75
                .with_name("TraceIndex Init")
6!
76
                .with_compute_threads(max_concurrent_)
6!
77
                .with_watchdog(false)
6!
78
                .with_global_timeout(std::chrono::seconds(0))
6!
79
                .with_task_timeout(std::chrono::seconds(0))
6!
80
                .with_io_backend(io::IoBackendType::THREADPOOL)
6!
81
                .with_io_batch_size(1);
6!
82

83
        Pipeline pipeline(pipeline_config);
6!
84

85
        auto* files_ptr = &files_;
6✔
86
        auto* needs_build_ptr = &needs_build;
6✔
87
        auto* large_files_ptr = &large_files;
6✔
88
        auto* global_min_ts_ptr = &global_min_ts_;
6✔
89
        auto* global_max_ts_ptr = &global_max_ts_;
6✔
90
        std::string index_dir = index_dir_;
6!
91
        std::size_t max_concurrent = max_concurrent_;
6✔
92

93
        auto init_task = make_task(
6!
94
            [files_ptr, needs_build_ptr, large_files_ptr, global_min_ts_ptr,
66!
95
             global_max_ts_ptr, index_dir,
12!
96
             max_concurrent](CoroScope& ctx) -> coro::CoroTask<void> {
12!
97
                if (!needs_build_ptr->empty()) {
30✔
98
                    DFTRACER_UTILS_LOG_INFO(
18!
99
                        "TraceIndex: building index for %zu file(s) ...",
100
                        needs_build_ptr->size());
101

102
                    auto file_chan =
18✔
103
                        coro::make_channel<std::size_t>(max_concurrent * 2);
18!
104

105
                    const auto* index_dir_ptr = &index_dir;
18✔
106
                    co_await ctx.scope([&file_chan, files_ptr, needs_build_ptr,
96!
107
                                        index_dir_ptr,
18✔
108
                                        max_concurrent](CoroScope& scope)
18✔
109
                                           -> coro::CoroTask<void> {
6!
110
                        scope.spawn(
6!
111
                            [ch = file_chan->producer(), needs_build_ptr](
64!
112
                                CoroScope&) mutable -> coro::CoroTask<void> {
6!
113
                                auto guard = ch.guard();
6!
114
                                for (auto idx : *needs_build_ptr) {
41✔
115
                                    if (!co_await ch.send(idx)) co_return;
34!
116
                                }
7✔
117
                                co_return;
6✔
118
                            });
48✔
119

120
                        for (std::size_t w = 0; w < max_concurrent; ++w) {
42✔
121
                            scope.spawn([ch = file_chan->consumer(), files_ptr,
260!
122
                                         index_dir_ptr](CoroScope&)
36✔
123
                                            -> coro::CoroTask<void> {
36!
124
                                while (auto fi_opt = co_await ch.receive()) {
133!
125
                                    std::size_t fi = *fi_opt;
21✔
126
                                    auto* info = &(*files_ptr)[fi];
21✔
127

128
                                    indexer::IndexBuilderUtility builder;
21!
129
                                    auto config =
21✔
130
                                        indexer::IndexBuildConfig::for_file(
63!
131
                                            info->path)
21✔
132
                                            .with_index_dir(*index_dir_ptr);
21✔
133
                                    auto result =
21✔
134
                                        co_await builder.process(config);
28!
135

136
                                    if (result.success) {
7✔
137
                                        info->index_path =
6✔
138
                                            internal::determine_index_path(
12!
139
                                                info->path, *index_dir_ptr);
6✔
140
                                        info->has_bloom_data = true;
6✔
141
                                        info->has_checkpoint_index =
6✔
142
                                            fs::exists(info->index_path);
6!
143
                                    } else {
6✔
144
                                        DFTRACER_UTILS_LOG_WARN(
1!
145
                                            "TraceIndex: failed to "
146
                                            "index %s: %s",
147
                                            info->path.c_str(),
148
                                            result.error_message.c_str());
149
                                    }
150
                                }
57!
151
                                co_return;
36✔
152
                            });
114✔
153
                        }
36✔
154
                        co_return;
12✔
UNCOV
155
                    });
×
156

157
                    for (auto idx : *needs_build_ptr) {
13✔
158
                        if ((*files_ptr)[idx].has_bloom_data) {
7✔
159
                            large_files_ptr->push_back(idx);
6!
160
                        }
6✔
161
                    }
7✔
162
                }
6!
163

164
                if (!large_files_ptr->empty()) {
18!
165
                    auto meta_chan =
18✔
166
                        coro::make_channel<std::size_t>(max_concurrent * 2);
18!
167

168
                    co_await ctx.scope([&meta_chan, files_ptr, large_files_ptr,
72!
169
                                        max_concurrent](CoroScope& scope)
18✔
170
                                           -> coro::CoroTask<void> {
6!
171
                        scope.spawn(
6!
172
                            [ch = meta_chan->producer(), large_files_ptr](
60!
173
                                CoroScope&) mutable -> coro::CoroTask<void> {
6!
174
                                auto guard = ch.guard();
6!
175
                                for (auto idx : *large_files_ptr) {
36✔
176
                                    if (!co_await ch.send(idx)) co_return;
30!
177
                                }
6✔
178
                                co_return;
6✔
179
                            });
42✔
180

181
                        for (std::size_t w = 0; w < max_concurrent; ++w) {
42✔
182
                            scope.spawn([ch = meta_chan->consumer(),
220!
183
                                         files_ptr](CoroScope&)
36✔
184
                                            -> coro::CoroTask<void> {
36!
185
                                while (auto fi_opt = co_await ch.receive()) {
99!
186
                                    std::size_t fi = *fi_opt;
18✔
187
                                    auto* info = &(*files_ptr)[fi];
18✔
188

189
                                    if (info->has_bloom_data) {
18✔
190
                                        try {
191
                                            indexer::IndexDatabase idx_db(
12!
192
                                                info->index_path);
6✔
193
                                            auto logical = indexer::internal::
12!
194
                                                get_logical_path(info->path);
6✔
195
                                            int fid = idx_db.get_file_info_id(
12!
196
                                                logical);
6✔
197
                                            if (fid >= 0) {
6!
198
                                                auto bounds =
6✔
199
                                                    idx_db.query_time_bounds(
6!
200
                                                        fid);
6✔
201
                                                if (bounds.valid) {
6!
202
                                                    info->min_timestamp_us =
6✔
203
                                                        bounds.min_timestamp_us;
6✔
204
                                                    info->max_timestamp_us =
6✔
205
                                                        bounds.max_timestamp_us;
6✔
206
                                                }
6✔
207
                                            }
6✔
208
                                        } catch (const std::exception& e) {
6!
UNCOV
209
                                            DFTRACER_UTILS_LOG_WARN(
×
210
                                                "TraceIndex: failed to "
211
                                                "read time bounds from "
212
                                                "%s: %s",
213
                                                info->index_path.c_str(),
214
                                                e.what());
UNCOV
215
                                        }
×
216
                                    }
6✔
217

218
                                    auto meta_input =
18✔
219
                                        MetadataCollectorUtilityInput::
36!
220
                                            from_file(info->path)
18!
221
                                                .with_index(info->index_path);
18!
222
                                    auto metadata =
18✔
223
                                        co_await MetadataCollectorUtility{}
42!
224
                                            .process(meta_input);
18!
225
                                    if (metadata.success) {
6!
226
                                        info->uncompressed_size =
6✔
227
                                            metadata.uncompressed_size;
6✔
228
                                        info->num_checkpoints =
6✔
229
                                            metadata.num_checkpoints;
6✔
230
                                        info->checkpoint_size =
6✔
231
                                            metadata.checkpoint_size;
6✔
232
                                        info->compressed_size =
6✔
233
                                            metadata.compressed_size;
6✔
234
                                        info->num_lines = metadata.num_lines;
6✔
235
                                        info->size_mb = metadata.size_mb;
6✔
236
                                    }
6✔
237
                                }
54!
238
                                co_return;
36✔
239
                            });
64✔
240
                        }
36✔
241
                        co_return;
12✔
UNCOV
242
                    });
×
243

244
                    for (auto fi : *large_files_ptr) {
12✔
245
                        const auto& info = (*files_ptr)[fi];
6✔
246
                        if (info.min_timestamp_us > 0 &&
6!
247
                            info.min_timestamp_us < *global_min_ts_ptr)
6✔
248
                            *global_min_ts_ptr = info.min_timestamp_us;
6✔
249
                        if (info.max_timestamp_us > *global_max_ts_ptr)
6!
250
                            *global_max_ts_ptr = info.max_timestamp_us;
6✔
251
                    }
6✔
252
                }
6!
253

254
                co_return;
6✔
255
            },
24✔
256
            "TraceIndexInit");
6!
257

258
        pipeline.set_source(init_task);
6!
259
        pipeline.set_destination(init_task);
6!
260
        pipeline.execute();
6!
261
    }
6✔
262

263
    DFTRACER_UTILS_LOG_INFO("TraceIndex: found %zu trace files in %s",
7!
264
                            files_.size(), directory_.c_str());
265
    if (global_max_ts_ > 0) {
7✔
266
        DFTRACER_UTILS_LOG_INFO("TraceIndex: global time range [%" PRIu64
6!
267
                                ", %" PRIu64 "] us",
268
                                global_min_ts_, global_max_ts_);
269
    }
6✔
270
}
49!
271

272
const TraceIndex::FileInfo* TraceIndex::find_file(
3✔
273
    const std::string& path) const {
274
    auto it = path_to_index_.find(path);
3✔
275
    if (it == path_to_index_.end()) return nullptr;
3✔
276
    return &files_[it->second];
2✔
277
}
3✔
278

279
const TraceIndex::FileInfo* TraceIndex::file_at(std::size_t index) const {
2✔
280
    if (index >= files_.size()) return nullptr;
2✔
281
    return &files_[index];
1✔
282
}
2✔
283

284
}  // namespace dftracer::utils::server
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc