• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28693295402

04 Jul 2026 03:17AM UTC coverage: 52.408% (+0.1%) from 52.278%
28693295402

push

github

hariharan-devarajan
feat: silence noisy warnings on aarch64

37318 of 92666 branches covered (40.27%)

Branch coverage included in aggregate %.

33462 of 42389 relevant lines covered (78.94%)

20557.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.31
/src/dftracer/utils/server/trace_index.cpp
1
#include <dftracer/utils/core/common/filesystem.h>
2
#include <dftracer/utils/core/common/logging.h>
3
#include <dftracer/utils/core/coro/channel.h>
4
#include <dftracer/utils/core/io/io_backend.h>
5
#include <dftracer/utils/core/pipeline/pipeline.h>
6
#include <dftracer/utils/core/pipeline/pipeline_config.h>
7
#include <dftracer/utils/core/tasks/coro_scope.h>
8
#include <dftracer/utils/core/tasks/task.h>
9
#include <dftracer/utils/server/router.h>
10
#include <dftracer/utils/server/trace_index.h>
11
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
12
#include <dftracer/utils/utilities/composites/dft/metadata_collector_utility.h>
13
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
14
#include <dftracer/utils/utilities/indexer/index_builder_utility.h>
15
#include <dftracer/utils/utilities/indexer/index_database.h>
16
#include <dftracer/utils/utilities/indexer/internal/helpers.h>
17

18
#include <cinttypes>
19
#include <limits>
20

21
namespace dftracer::utils::server {
22

23
using namespace dftracer::utils::utilities::composites::dft;
24
using namespace dftracer::utils::utilities::composites::dft::indexing;
25
using namespace dftracer::utils::utilities::filesystem;
26
namespace indexer = dftracer::utils::utilities::indexer;
27

28
TraceIndex::TraceIndex(const std::string& directory,
32!
29
                       const std::string& index_dir, std::size_t max_concurrent)
8✔
30
    : directory_(directory),
16✔
31
      index_dir_(index_dir),
16!
32
      max_concurrent_(max_concurrent == 0 ? 8 : max_concurrent) {}
24!
33

34
coro::CoroTask<void> TraceIndex::initialize() {
56!
35
    PatternDirectoryScannerUtility scanner;
21!
36
    PatternDirectoryScannerUtilityInput scan_input{
42!
37
        directory_, {".pfw", ".pfw.gz"}, false};
21!
38
    auto entries = co_await scanner.process(scan_input);
28!
39

40
    files_.clear();
7✔
41
    path_to_index_.clear();
7✔
42
    files_.reserve(entries.size());
7!
43

44
    global_min_ts_ = std::numeric_limits<std::uint64_t>::max();
7✔
45
    global_max_ts_ = 0;
7✔
46

47
    std::vector<std::size_t> needs_build;
7✔
48
    std::vector<std::size_t> large_files;
7✔
49

50
    for (const auto& entry : entries) {
14✔
51
        FileInfo info;
7✔
52
        info.path = entry.path.string();
7!
53
        info.index_path = internal::determine_index_path(info.path, index_dir_);
7!
54

55
        std::error_code ec;
7✔
56
        auto fsize = fs::file_size(info.path, ec);
7!
57
        info.compressed_size = (!ec && fsize > 0) ? fsize : 0;
7!
58

59
        std::size_t idx = files_.size();
7✔
60
        path_to_index_[info.path] = idx;
7!
61

62
        info.has_bloom_data = fs::exists(info.index_path);
7!
63
        info.has_checkpoint_index = fs::exists(info.index_path);
7!
64
        if (!info.has_bloom_data) {
7!
65
            needs_build.push_back(idx);
7!
66
        } else {
7✔
67
            large_files.push_back(idx);
×
68
        }
69

70
        files_.push_back(std::move(info));
7!
71
    }
7✔
72

73
    if (!needs_build.empty() || !large_files.empty()) {
7!
74
        auto pipeline_config =
6✔
75
            PipelineConfig()
12!
76
                .with_name("TraceIndex Init")
6!
77
                .with_compute_threads(max_concurrent_)
6!
78
                .with_watchdog(false)
6!
79
                .with_global_timeout(std::chrono::seconds(0))
6!
80
                .with_task_timeout(std::chrono::seconds(0))
6!
81
                .with_io_backend(io::IoBackendType::THREADPOOL)
6!
82
                .with_io_batch_size(1);
6!
83

84
        Pipeline pipeline(pipeline_config);
6!
85

86
        auto* files_ptr = &files_;
6✔
87
        auto* needs_build_ptr = &needs_build;
6✔
88
        auto* large_files_ptr = &large_files;
6✔
89
        auto* global_min_ts_ptr = &global_min_ts_;
6✔
90
        auto* global_max_ts_ptr = &global_max_ts_;
6✔
91
        std::string index_dir = index_dir_;
6!
92
        std::size_t max_concurrent = max_concurrent_;
6✔
93

94
        auto init_task = make_task(
6!
95
            [files_ptr, needs_build_ptr, large_files_ptr, global_min_ts_ptr,
72!
96
             global_max_ts_ptr, index_dir,
12!
97
             max_concurrent](CoroScope& ctx) -> coro::CoroTask<void> {
12!
98
                if (!needs_build_ptr->empty()) {
30✔
99
                    DFTRACER_UTILS_LOG_INFO(
18!
100
                        "TraceIndex: building index for %zu file(s) ...",
101
                        needs_build_ptr->size());
102

103
                    auto file_chan =
18✔
104
                        coro::make_channel<std::size_t>(max_concurrent * 2);
18!
105

106
                    const auto* index_dir_ptr = &index_dir;
18✔
107
                    co_await ctx.scope([&file_chan, files_ptr, needs_build_ptr,
102!
108
                                        index_dir_ptr,
18✔
109
                                        max_concurrent](CoroScope& scope)
18✔
110
                                           -> coro::CoroTask<void> {
6!
111
                        scope.spawn(
6!
112
                            [ch = file_chan->producer(), needs_build_ptr](
70!
113
                                CoroScope&) mutable -> coro::CoroTask<void> {
6!
114
                                auto guard = ch.guard();
6!
115
                                for (auto idx : *needs_build_ptr) {
41✔
116
                                    if (!co_await ch.send(idx)) co_return;
34!
117
                                }
7✔
118
                                co_return;
6✔
119
                            });
60!
120

121
                        for (std::size_t w = 0; w < max_concurrent; ++w) {
42✔
122
                            scope.spawn([ch = file_chan->consumer(), files_ptr,
294!
123
                                         index_dir_ptr](CoroScope&)
36✔
124
                                            -> coro::CoroTask<void> {
36!
125
                                while (auto fi_opt = co_await ch.receive()) {
133!
126
                                    std::size_t fi = *fi_opt;
21✔
127
                                    auto* info = &(*files_ptr)[fi];
21✔
128

129
                                    indexer::IndexBuilderUtility builder;
21!
130
                                    auto config =
21✔
131
                                        indexer::IndexBuildConfig::for_file(
63!
132
                                            info->path)
21✔
133
                                            .with_index_dir(*index_dir_ptr);
21✔
134
                                    auto result =
21✔
135
                                        co_await builder.process(config);
28!
136

137
                                    if (result.success) {
7!
138
                                        info->index_path =
7✔
139
                                            internal::determine_index_path(
14!
140
                                                info->path, *index_dir_ptr);
7✔
141
                                        info->has_bloom_data = true;
7✔
142
                                        info->has_checkpoint_index =
7✔
143
                                            fs::exists(info->index_path);
7!
144
                                    } else {
7✔
145
                                        DFTRACER_UTILS_LOG_WARN(
×
146
                                            "TraceIndex: failed to "
147
                                            "index %s: %s",
148
                                            info->path.c_str(),
149
                                            result.error_message.c_str());
150
                                    }
151
                                }
57!
152
                                co_return;
36✔
153
                            });
183!
154
                        }
36✔
155
                        co_return;
12✔
156
                    });
12!
157

158
                    for (auto idx : *needs_build_ptr) {
13✔
159
                        if ((*files_ptr)[idx].has_bloom_data) {
7!
160
                            large_files_ptr->push_back(idx);
7!
161
                        }
7✔
162
                    }
7✔
163
                }
6!
164

165
                if (!large_files_ptr->empty()) {
18!
166
                    auto meta_chan =
18✔
167
                        coro::make_channel<std::size_t>(max_concurrent * 2);
18!
168

169
                    co_await ctx.scope([&meta_chan, files_ptr, large_files_ptr,
78!
170
                                        max_concurrent](CoroScope& scope)
18✔
171
                                           -> coro::CoroTask<void> {
6!
172
                        scope.spawn(
6!
173
                            [ch = meta_chan->producer(), large_files_ptr](
70!
174
                                CoroScope&) mutable -> coro::CoroTask<void> {
6!
175
                                auto guard = ch.guard();
6!
176
                                for (auto idx : *large_files_ptr) {
41✔
177
                                    if (!co_await ch.send(idx)) co_return;
34!
178
                                }
7✔
179
                                co_return;
6✔
180
                            });
60!
181

182
                        for (std::size_t w = 0; w < max_concurrent; ++w) {
42✔
183
                            scope.spawn([ch = meta_chan->consumer(),
291!
184
                                         files_ptr](CoroScope&)
36✔
185
                                            -> coro::CoroTask<void> {
36!
186
                                while (auto fi_opt = co_await ch.receive()) {
130!
187
                                    std::size_t fi = *fi_opt;
21✔
188
                                    auto* info = &(*files_ptr)[fi];
21✔
189

190
                                    if (info->has_bloom_data) {
21✔
191
                                        try {
192
                                            indexer::IndexDatabase idx_db(
14!
193
                                                info->index_path);
7✔
194
                                            auto logical = indexer::internal::
14!
195
                                                get_logical_path(info->path);
7✔
196
                                            int fid = idx_db.get_file_info_id(
14!
197
                                                logical);
7✔
198
                                            if (fid >= 0) {
7!
199
                                                auto bounds =
7✔
200
                                                    idx_db.query_time_bounds(
7!
201
                                                        fid);
7✔
202
                                                if (bounds.valid) {
7!
203
                                                    info->min_timestamp_us =
7✔
204
                                                        bounds.min_timestamp_us;
7✔
205
                                                    info->max_timestamp_us =
7✔
206
                                                        bounds.max_timestamp_us;
7✔
207
                                                }
7✔
208
                                            }
7✔
209
                                        } catch (const std::exception& e) {
7!
210
                                            DFTRACER_UTILS_LOG_WARN(
×
211
                                                "TraceIndex: failed to "
212
                                                "read time bounds from "
213
                                                "%s: %s",
214
                                                info->index_path.c_str(),
215
                                                e.what());
216
                                        }
×
217
                                    }
7✔
218

219
                                    auto meta_input =
21✔
220
                                        MetadataCollectorUtilityInput::
42!
221
                                            from_file(info->path)
21!
222
                                                .with_index(info->index_path);
21!
223
                                    auto metadata =
21✔
224
                                        co_await MetadataCollectorUtility{}
49!
225
                                            .process(meta_input);
21!
226
                                    if (metadata.success) {
7!
227
                                        info->uncompressed_size =
7✔
228
                                            metadata.uncompressed_size;
7✔
229
                                        info->num_checkpoints =
7✔
230
                                            metadata.num_checkpoints;
7✔
231
                                        info->checkpoint_size =
7✔
232
                                            metadata.checkpoint_size;
7✔
233
                                        info->compressed_size =
7✔
234
                                            metadata.compressed_size;
7✔
235
                                        info->num_lines = metadata.num_lines;
7✔
236
                                        info->size_mb = metadata.size_mb;
7✔
237
                                    }
7✔
238
                                }
57!
239
                                co_return;
36✔
240
                            });
182!
241
                        }
36✔
242
                        co_return;
12✔
243
                    });
12!
244

245
                    for (auto fi : *large_files_ptr) {
13✔
246
                        const auto& info = (*files_ptr)[fi];
7✔
247
                        if (info.min_timestamp_us > 0 &&
7!
248
                            info.min_timestamp_us < *global_min_ts_ptr)
7✔
249
                            *global_min_ts_ptr = info.min_timestamp_us;
6✔
250
                        if (info.max_timestamp_us > *global_max_ts_ptr)
7✔
251
                            *global_max_ts_ptr = info.max_timestamp_us;
6✔
252
                    }
7✔
253
                }
6!
254

255
                co_return;
6✔
256
            },
36!
257
            "TraceIndexInit");
6!
258

259
        pipeline.set_source(init_task);
6!
260
        pipeline.set_destination(init_task);
6!
261
        pipeline.execute();
6!
262
    }
6✔
263

264
    DFTRACER_UTILS_LOG_INFO("TraceIndex: found %zu trace files in %s",
7!
265
                            files_.size(), directory_.c_str());
266
    if (global_max_ts_ > 0) {
7✔
267
        DFTRACER_UTILS_LOG_INFO("TraceIndex: global time range [%" PRIu64
6!
268
                                ", %" PRIu64 "] us",
269
                                global_min_ts_, global_max_ts_);
270
    }
6✔
271
}
63!
272

273
const TraceIndex::FileInfo* TraceIndex::find_file(
6✔
274
    const std::string& path) const {
275
    auto it = path_to_index_.find(path);
6!
276
    if (it == path_to_index_.end()) return nullptr;
6✔
277
    return &files_[it->second];
4✔
278
}
3✔
279

280
const TraceIndex::FileInfo* TraceIndex::file_at(std::size_t index) const {
4✔
281
    if (index >= files_.size()) return nullptr;
4✔
282
    return &files_[index];
2✔
283
}
2✔
284

285
std::vector<const TraceIndex::FileInfo*> collect_candidate_files(
16✔
286
    TraceIndex& index, const QueryParams& params) {
287
    std::vector<const TraceIndex::FileInfo*> files;
16✔
288
    auto file_param = params.get("file");
16!
289
    if (!file_param.empty()) {
16!
290
        auto* f = index.find_file(std::string(file_param));
×
291
        if (f) files.push_back(f);
×
292
    } else {
293
        for (const auto& f : index.files()) {
32✔
294
            files.push_back(&f);
16!
295
        }
296
    }
297
    return files;
24✔
298
}
8!
299

300
}  // namespace dftracer::utils::server
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc