• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 24057299873

07 Apr 2026 12:01AM UTC coverage: 52.076% (+0.8%) from 51.228%
24057299873

push

github

rayandrew
feat(rocksdb): migrate SQLite indexing to RocksDB

Replace SQLite-backed indexing and provenance storage with RocksDB-backed stores.

  Key changes:
  - add RocksDB async/database/db-manager/filesystem/key-codec layers
  - migrate index and provenance databases from SQLite to RocksDB
  - update index builder, trace reader, reorganize, view, stats, and comparator paths for
  RocksDB
  - harden transaction atomicity and rollback behavior with TransactionScope
  - add iterator status checking for prefix scans
  - harden gzip/tar indexer cache state and metadata handling
  - capture executor context in RocksDB awaitables
  - clean up failed RocksDB open paths and manager lifecycle behavior
  - vendor CPM 0.42.1 and update CI/build integration
  - refresh docs, Python bindings, and C++/Python test coverage for the new backend

  Validation:
  - full test suite passed
  - Ubuntu 22.04 Docker run passed
  - focused RocksDB/indexer regression tests passed.

24097 of 59624 branches covered (40.41%)

Branch coverage included in aggregate %.

2516 of 3144 new or added lines in 75 files covered. (80.03%)

72 existing lines in 15 files now uncovered.

20858 of 26701 relevant lines covered (78.12%)

14113.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

14.75
/src/dftracer/utils/server/trace_index.cpp
1
#include <dftracer/utils/core/common/filesystem.h>
2
#include <dftracer/utils/core/common/logging.h>
3
#include <dftracer/utils/core/coro/channel.h>
4
#include <dftracer/utils/core/io/io_backend.h>
5
#include <dftracer/utils/core/pipeline/pipeline.h>
6
#include <dftracer/utils/core/pipeline/pipeline_config.h>
7
#include <dftracer/utils/core/rocksdb/async.h>
8
#include <dftracer/utils/core/tasks/coro_scope.h>
9
#include <dftracer/utils/core/tasks/task.h>
10
#include <dftracer/utils/server/trace_index.h>
11
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
12
#include <dftracer/utils/utilities/composites/dft/metadata_collector_utility.h>
13
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
14
#include <dftracer/utils/utilities/indexer/index_builder_utility.h>
15
#include <dftracer/utils/utilities/indexer/index_database.h>
16
#include <dftracer/utils/utilities/indexer/internal/helpers.h>
17

18
#include <cinttypes>
19
#include <limits>
20

21
namespace dftracer::utils::server {
22

23
using namespace dftracer::utils::utilities::composites::dft;
24
using namespace dftracer::utils::utilities::composites::dft::indexing;
25
using namespace dftracer::utils::utilities::filesystem;
26
namespace indexer = dftracer::utils::utilities::indexer;
27

28
TraceIndex::TraceIndex(const std::string& directory,
24!
29
                       const std::string& index_dir, std::size_t max_concurrent)
8✔
30
    : directory_(directory),
16✔
31
      index_dir_(index_dir),
16!
32
      max_concurrent_(max_concurrent == 0 ? 8 : max_concurrent) {}
16!
33

34
coro::CoroTask<void> TraceIndex::initialize() {
56!
35
    PatternDirectoryScannerUtility scanner;
21!
36
    PatternDirectoryScannerUtilityInput scan_input{
42!
37
        directory_, {".pfw", ".pfw.gz"}, false};
21!
38
    auto entries = co_await scanner.process(scan_input);
28!
39

40
    files_.clear();
7✔
41
    path_to_index_.clear();
7✔
42
    files_.reserve(entries.size());
7!
43

44
    global_min_ts_ = std::numeric_limits<std::uint64_t>::max();
7✔
45
    global_max_ts_ = 0;
7✔
46

47
    std::vector<std::size_t> needs_build;
7✔
48
    std::vector<std::size_t> large_files;
7✔
49
    std::size_t small_count = 0;
7✔
50

51
    for (const auto& entry : entries) {
14✔
52
        FileInfo info;
7✔
53
        info.path = entry.path.string();
7!
54
        info.index_path = internal::determine_index_path(info.path, index_dir_);
7!
55

56
        std::error_code ec;
7✔
57
        auto fsize = fs::file_size(info.path, ec);
7!
58
        info.compressed_size = (!ec && fsize > 0) ? fsize : 0;
7!
59
        info.is_small = info.compressed_size > 0 &&
14!
60
                        info.compressed_size < INDEX_SIZE_THRESHOLD;
7✔
61

62
        std::size_t idx = files_.size();
7✔
63
        path_to_index_[info.path] = idx;
7!
64

65
        if (info.is_small) {
7!
66
            info.has_bloom_data = false;
7✔
67
            info.has_checkpoint_index = false;
7✔
68
            info.size_mb =
7✔
69
                static_cast<double>(info.compressed_size) / (1024.0 * 1024.0);
7✔
70
            small_count++;
7✔
71
        } else {
7✔
72
            info.has_bloom_data = fs::exists(info.index_path);
×
73
            info.has_checkpoint_index = fs::exists(info.index_path);
×
74
            if (!info.has_bloom_data) {
×
75
                needs_build.push_back(idx);
×
76
            } else {
77
                large_files.push_back(idx);
×
78
            }
79
        }
80

81
        files_.push_back(std::move(info));
7!
82
    }
7✔
83

84
    if (small_count > 0) {
7✔
85
        DFTRACER_UTILS_LOG_INFO(
6!
86
            "TraceIndex: %zu small file(s) (< %zu bytes) will be "
87
            "streamed directly (no .dftindex database)",
88
            small_count, INDEX_SIZE_THRESHOLD);
89
    }
6✔
90

91
    if (!needs_build.empty() || !large_files.empty()) {
7!
92
        auto pipeline_config =
93
            PipelineConfig()
×
94
                .with_name("TraceIndex Init")
×
95
                .with_compute_threads(max_concurrent_)
×
96
                .with_watchdog(false)
×
97
                .with_global_timeout(std::chrono::seconds(0))
×
98
                .with_task_timeout(std::chrono::seconds(0))
×
99
                .with_io_backend(io::IoBackendType::THREADPOOL)
×
100
                .with_io_batch_size(1);
×
101

102
        Pipeline pipeline(pipeline_config);
×
103

104
        auto* files_ptr = &files_;
105
        auto* needs_build_ptr = &needs_build;
106
        auto* large_files_ptr = &large_files;
107
        auto* global_min_ts_ptr = &global_min_ts_;
108
        auto* global_max_ts_ptr = &global_max_ts_;
109
        std::string index_dir = index_dir_;
×
110
        std::size_t max_concurrent = max_concurrent_;
111

112
        auto init_task = make_task(
×
113
            [files_ptr, needs_build_ptr, large_files_ptr, global_min_ts_ptr,
×
114
             global_max_ts_ptr, index_dir,
×
115
             max_concurrent](CoroScope& ctx) -> coro::CoroTask<void> {
×
116
                if (!needs_build_ptr->empty()) {
×
117
                    DFTRACER_UTILS_LOG_INFO(
×
118
                        "TraceIndex: building index for %zu file(s) ...",
119
                        needs_build_ptr->size());
120

121
                    auto file_chan =
122
                        coro::make_channel<std::size_t>(max_concurrent * 2);
×
123

124
                    const auto* index_dir_ptr = &index_dir;
125
                    co_await ctx.scope([file_chan, files_ptr, needs_build_ptr,
×
126
                                        index_dir_ptr,
127
                                        max_concurrent](CoroScope& scope)
128
                                           -> coro::CoroTask<void> {
×
129
                        scope.spawn(
×
130
                            [ch = file_chan->producer(), needs_build_ptr](
×
131
                                CoroScope&) mutable -> coro::CoroTask<void> {
×
132
                                auto guard = ch.guard();
×
133
                                for (auto idx : *needs_build_ptr) {
×
134
                                    if (!co_await ch.send(idx)) co_return;
×
135
                                }
×
136
                                co_return;
137
                            });
×
138

139
                        for (std::size_t w = 0; w < max_concurrent; ++w) {
×
140
                            scope.spawn(
×
141
                                [file_chan, files_ptr, index_dir_ptr](
×
142
                                    CoroScope&) -> coro::CoroTask<void> {
×
143
                                    while (auto fi_opt =
×
144
                                               co_await file_chan->receive()) {
×
145
                                        std::size_t fi = *fi_opt;
146
                                        auto* info = &(*files_ptr)[fi];
147

148
                                        indexer::IndexBuilderUtility builder;
×
149
                                        auto config =
150
                                            indexer::IndexBuildConfig::for_file(
×
151
                                                info->path)
152
                                                .with_index_dir(*index_dir_ptr)
×
153
                                                .with_bloom(true)
×
154
                                                .with_index_threshold(0);
×
155
                                        auto result =
156
                                            co_await builder.process(config);
×
157

158
                                        if (result.success) {
×
159
                                            info->index_path =
160
                                                internal::determine_index_path(
×
161
                                                    info->path, *index_dir_ptr);
162
                                            info->has_bloom_data = true;
163
                                            info->has_checkpoint_index =
164
                                                fs::exists(info->index_path);
×
165
                                        } else {
166
                                            DFTRACER_UTILS_LOG_WARN(
×
167
                                                "TraceIndex: failed to "
168
                                                "index %s: %s",
169
                                                info->path.c_str(),
170
                                                result.error_message.c_str());
171
                                        }
172
                                    }
×
173
                                    co_return;
174
                                });
×
175
                        }
176
                        co_return;
177
                    });
×
178

179
                    for (auto idx : *needs_build_ptr) {
×
180
                        if ((*files_ptr)[idx].has_bloom_data) {
×
181
                            large_files_ptr->push_back(idx);
×
182
                        }
183
                    }
184
                }
×
185

186
                if (!large_files_ptr->empty()) {
×
187
                    auto meta_chan =
188
                        coro::make_channel<std::size_t>(max_concurrent * 2);
×
189

190
                    co_await ctx.scope([meta_chan, files_ptr, large_files_ptr,
×
191
                                        max_concurrent](CoroScope& scope)
192
                                           -> coro::CoroTask<void> {
×
193
                        scope.spawn(
×
194
                            [ch = meta_chan->producer(), large_files_ptr](
×
195
                                CoroScope&) mutable -> coro::CoroTask<void> {
×
196
                                auto guard = ch.guard();
×
197
                                for (auto idx : *large_files_ptr) {
×
198
                                    if (!co_await ch.send(idx)) co_return;
×
199
                                }
×
200
                                co_return;
201
                            });
×
202

203
                        for (std::size_t w = 0; w < max_concurrent; ++w) {
×
204
                            scope.spawn([meta_chan, files_ptr](CoroScope&)
×
205
                                            -> coro::CoroTask<void> {
×
206
                                while (auto fi_opt =
×
207
                                           co_await meta_chan->receive()) {
×
208
                                    std::size_t fi = *fi_opt;
209
                                    auto* info = &(*files_ptr)[fi];
210

211
                                    if (info->has_bloom_data) {
×
212
                                        try {
213
                                            const std::string path = info->path;
×
214
                                            const std::string index_path =
215
                                                info->index_path;
×
216
                                            const auto* path_ptr = &path;
217
                                            const auto* index_path_ptr =
218
                                                &index_path;
219
                                            auto bounds = co_await rocksdb::run(
×
NEW
220
                                                [path_ptr, index_path_ptr] {
×
221
                                                    indexer::IndexDatabase
NEW
222
                                                        idx_db(*index_path_ptr);
×
223
                                                    auto logical =
224
                                                        indexer::internal::
×
225
                                                            get_logical_path(
NEW
226
                                                                *path_ptr);
×
227
                                                    int fid =
NEW
228
                                                        idx_db.get_file_info_id(
×
229
                                                            logical);
NEW
230
                                                    if (fid < 0) {
×
231
                                                        return indexer::
232
                                                            IndexDatabase::
NEW
233
                                                                TimeBounds{};
×
234
                                                    }
235
                                                    return idx_db
NEW
236
                                                        .query_time_bounds(fid);
×
NEW
237
                                                });
×
238
                                            if (bounds.valid) {
×
239
                                                info->min_timestamp_us =
240
                                                    bounds.min_timestamp_us;
241
                                                info->max_timestamp_us =
242
                                                    bounds.max_timestamp_us;
243
                                            }
244
                                        } catch (const std::exception& e) {
×
245
                                            DFTRACER_UTILS_LOG_WARN(
×
246
                                                "TraceIndex: failed to "
247
                                                "read time bounds from "
248
                                                "%s: %s",
249
                                                info->index_path.c_str(),
250
                                                e.what());
251
                                        }
×
252
                                    }
253

254
                                    auto meta_input =
255
                                        MetadataCollectorUtilityInput::
×
256
                                            from_file(info->path)
×
257
                                                .with_index(info->index_path);
×
258
                                    auto metadata =
259
                                        co_await MetadataCollectorUtility{}
×
260
                                            .process(meta_input);
×
261
                                    if (metadata.success) {
×
262
                                        info->uncompressed_size =
263
                                            metadata.uncompressed_size;
264
                                        info->num_checkpoints =
265
                                            metadata.num_checkpoints;
266
                                        info->checkpoint_size =
267
                                            metadata.checkpoint_size;
268
                                        info->compressed_size =
269
                                            metadata.compressed_size;
270
                                        info->num_lines = metadata.num_lines;
271
                                        info->size_mb = metadata.size_mb;
272
                                    }
273
                                }
×
274
                                co_return;
275
                            });
×
276
                        }
277
                        co_return;
278
                    });
×
279

280
                    for (auto fi : *large_files_ptr) {
×
281
                        const auto& info = (*files_ptr)[fi];
282
                        if (info.min_timestamp_us > 0 &&
×
283
                            info.min_timestamp_us < *global_min_ts_ptr)
284
                            *global_min_ts_ptr = info.min_timestamp_us;
285
                        if (info.max_timestamp_us > *global_max_ts_ptr)
×
286
                            *global_max_ts_ptr = info.max_timestamp_us;
287
                    }
288
                }
×
289

290
                co_return;
291
            },
×
292
            "TraceIndexInit");
×
293

294
        pipeline.set_source(init_task);
×
295
        pipeline.set_destination(init_task);
×
296
        pipeline.execute();
×
297
    }
298

299
    DFTRACER_UTILS_LOG_INFO("TraceIndex: found %zu trace files in %s",
7!
300
                            files_.size(), directory_.c_str());
301
    if (global_max_ts_ > 0) {
7!
302
        DFTRACER_UTILS_LOG_INFO("TraceIndex: global time range [%" PRIu64
×
303
                                ", %" PRIu64 "] us",
304
                                global_min_ts_, global_max_ts_);
305
    }
306
}
63!
307

308
const TraceIndex::FileInfo* TraceIndex::find_file(
6✔
309
    const std::string& path) const {
310
    auto it = path_to_index_.find(path);
6!
311
    if (it == path_to_index_.end()) return nullptr;
6✔
312
    return &files_[it->second];
4✔
313
}
3✔
314

315
const TraceIndex::FileInfo* TraceIndex::file_at(std::size_t index) const {
4✔
316
    if (index >= files_.size()) return nullptr;
4✔
317
    return &files_[index];
2✔
318
}
2✔
319

320
}  // namespace dftracer::utils::server
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc