• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26043728131

18 May 2026 03:37PM UTC coverage: 51.706% (-0.4%) from 52.076%
26043728131

push

github

hariharan-devarajan
feat(perf): performance improvements for parallel reading, indexing, and aggregation

Indexer
- Streaming parse-and-emit worker pipeline with bounded memory usage
- Concurrent SST artifact ingestion with staging support
- Gzip member slicing for parallel indexing
- Lazy decoding for compressed value counts
- Bypass DOM wrapper for indexer hot path (simdjson on_demand)
- Decoupled write workers from parse workers
- --rebuild-summaries flag and optimized root summary rebuild

Aggregator / MPI
- Task-based DAG execution for aggregator pipeline
- Shared staging for multi-node artifact relocation
- Per-node thread scaling to avoid oversubscription
- Unified distributed aggregation tracking, removed manifest consolidation
- Deterministic aggregation and intra-file parallelism

Trace reader / query
- Compiled predicate evaluation for AND-of-EQ queries
- Uniform-match shortcut for AND-of-EQ queries
- Line-range support for work items and checkpoint processing
- Optimized chunk pruning and checkpoint handling

Replay
- Pipelined replay with coroutines and channels
- JsonParser-based trace processing
- Optimized string handling and i/o buffering

Organize / writer / dft
- Parallel slice creation and merging in organize visitor
- Inline indexer in organize
- Gzip member tracking in writer
- Coroutine-based event dispatcher with extracted parse logic
- Batch flushing in organize visitor

Arrow / call_tree
- Optimized arrow conversion
- Arrow IPC support and improved save/load in call_tree

Build / infrastructure
- zlib-ng option, system simdjson fallback
- cgroup v1/v2 memory limit detection
- Auto-computed per-file memory estimates and batch sizes
- CI: perf branch trigger, formatting

Docs
- Rewritten indexer and trace reader API references

35907 of 90345 branches covered (39.74%)

Branch coverage included in aggregate %.

16869 of 21880 new or added lines in 137 files covered. (77.1%)

273 existing lines in 39 files now uncovered.

32021 of 41028 relevant lines covered (78.05%)

13164.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.94
/src/dftracer/utils/core/rocksdb/database.cpp
1
#include <dftracer/utils/core/common/filesystem.h>
2
#include <dftracer/utils/core/env.h>
3
#include <dftracer/utils/core/rocksdb/database.h>
4
#include <dftracer/utils/core/rocksdb/filesystem.h>
5
#include <rocksdb/slice.h>
6
#include <rocksdb/table.h>
7

8
#include <algorithm>
9
#include <atomic>
10
#include <cstdlib>
11
#include <stdexcept>
12
#include <utility>
13

14
namespace dftracer::utils::rocksdb {
15

16
namespace {
17

18
std::atomic<bool>& process_exiting_flag() {
11,101✔
19
    static std::atomic<bool> flag{false};
20
    return flag;
11,101✔
21
}
22

23
const ::rocksdb::ReadOptions& read_options() {
21,872✔
24
    static const ::rocksdb::ReadOptions options;
21,872!
25
    return options;
21,875✔
26
}
27

28
const ::rocksdb::WriteOptions& write_options() {
3,957✔
29
    static const auto options = [] {
2,167!
30
        ::rocksdb::WriteOptions wo;
186✔
31
        wo.disableWAL = true;
186✔
32
        return wo;
186✔
33
    }();
1,976!
34
    return options;
3,957✔
35
}
36

UNCOV
37
void cleanup_failed_open(::rocksdb::DB*& db,
×
38
                         std::vector<::rocksdb::ColumnFamilyHandle*>& handles) {
UNCOV
39
    if (db != nullptr) {
×
40
        for (auto* handle : handles) {
×
41
            if (handle != nullptr) {
×
42
                db->DestroyColumnFamilyHandle(handle);
×
43
            }
44
        }
45
        static_cast<void>(db->Close());
×
46
        delete db;
×
47
        db = nullptr;
×
48
    }
UNCOV
49
    handles.clear();
×
UNCOV
50
}
×
51

52
}  // namespace
53

54
void mark_process_exiting_for_rocksdb() {
6✔
55
    process_exiting_flag().store(true, std::memory_order_relaxed);
6✔
56
}
6✔
57

58
RocksDatabase::RocksDatabase() = default;
16,783✔
59

60
RocksDatabase::RocksDatabase(const std::string& db_path, OpenMode open_mode) {
9✔
61
    open(db_path, open_mode);
6!
62
}
6✔
63

64
RocksDatabase::~RocksDatabase() { close(); }
16,790!
65

66
RocksDatabase::RocksDatabase(RocksDatabase&& other) noexcept
×
67
    : db_path_(std::move(other.db_path_)),
×
68
      open_mode_(other.open_mode_),
×
69
      file_system_(std::move(other.file_system_)),
×
70
      env_(std::move(other.env_)),
×
71
      db_(std::exchange(other.db_, nullptr)),
×
72
      column_families_(std::move(other.column_families_)) {}
×
73

74
RocksDatabase& RocksDatabase::operator=(RocksDatabase&& other) noexcept {
×
75
    if (this != &other) {
×
76
        close();
×
77
        db_path_ = std::move(other.db_path_);
×
78
        open_mode_ = other.open_mode_;
×
79
        file_system_ = std::move(other.file_system_);
×
80
        env_ = std::move(other.env_);
×
81
        db_ = std::exchange(other.db_, nullptr);
×
82
        column_families_ = std::move(other.column_families_);
×
83
    }
84
    return *this;
×
85
}
86

87
const decltype(cf::ALL)& RocksDatabase::default_column_families() {
2,745✔
88
    return cf::ALL;
2,745✔
89
}
90

91
::rocksdb::Options RocksDatabase::default_options() {
11,613✔
92
    ::rocksdb::Options options;
11,613✔
93
    options.create_if_missing = true;
11,613✔
94
    options.create_missing_column_families = true;
11,613✔
95
    options.allow_concurrent_memtable_write = true;
11,613✔
96
    options.enable_pipelined_write = true;
11,613✔
97
    options.max_open_files = Env::rocksdb_max_open_files();
11,613!
98
    options.max_background_jobs = 8;
11,613✔
99
    options.max_subcompactions = 8;
11,613✔
100
    options.write_buffer_size = 256 * 1024 * 1024;
11,613✔
101
    options.max_write_buffer_number = 4;
11,613✔
102
    return options;
11,613✔
103
}
5,888!
104

105
::rocksdb::ColumnFamilyOptions RocksDatabase::default_column_family_options() {
11,612✔
106
    ::rocksdb::ColumnFamilyOptions options;
11,612!
107

108
    ::rocksdb::BlockBasedTableOptions bbt;
11,613✔
109
    bbt.block_size = 32 * 1024;
11,613✔
110
    bbt.format_version = 5;
11,613✔
111
    bbt.index_block_restart_interval = 16;
11,613✔
112
    options.table_factory.reset(::rocksdb::NewBlockBasedTableFactory(bbt));
11,613!
113

114
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
115
    options.compression = ::rocksdb::kZSTD;
116
    options.compression_opts.level = 9;
117
    options.compression_opts.max_dict_bytes = 262144;
118
    options.compression_opts.zstd_max_train_bytes = 1048576;
119
    options.compression_opts.enabled = true;
120
    options.bottommost_compression = ::rocksdb::kZSTD;
121
    options.bottommost_compression_opts.level = 9;
122
    options.bottommost_compression_opts.max_dict_bytes = 262144;
123
    options.bottommost_compression_opts.zstd_max_train_bytes = 1048576;
124
    options.bottommost_compression_opts.enabled = true;
125
#elif defined(DFTRACER_UTILS_ENABLE_LZ4)
126
    options.compression = ::rocksdb::kLZ4Compression;
127
    options.bottommost_compression = ::rocksdb::kZlibCompression;
128
#else
129
    options.compression = ::rocksdb::kZlibCompression;
11,613✔
130
    options.bottommost_compression = ::rocksdb::kZlibCompression;
11,613✔
131
#endif
132
    return options;
17,338✔
133
}
11,613!
134

135
bool RocksDatabase::open(const std::string& db_path, OpenMode open_mode) {
11,140✔
136
    close();
11,140!
137
    db_path_ = db_path;
11,141!
138
    open_mode_ = open_mode;
11,141✔
139

140
    std::error_code ec;
11,141✔
141
    if (open_mode_ == OpenMode::ReadWrite) {
11,141✔
142
        fs::create_directories(fs::path(db_path_), ec);
1,686!
143
    }
832✔
144

145
    auto db_options = default_options();
11,141!
146
    if (open_mode_ == OpenMode::ReadOnly) {
11,141✔
147
        db_options.create_if_missing = false;
9,477✔
148
        db_options.create_missing_column_families = false;
9,477✔
149
    }
4,820✔
150
    file_system_ = make_dftracer_file_system();
11,141!
151
    env_ = make_dftracer_env(file_system_);
11,141!
152
    db_options.env = env_.get();
11,140✔
153
    auto cf_options = default_column_family_options();
11,141!
154

155
    std::vector<std::string> column_family_names;
11,141✔
156
    auto list_status = ::rocksdb::DB::ListColumnFamilies(db_options, db_path_,
11,141!
157
                                                         &column_family_names);
5,489!
158
    if (!list_status.ok()) {
11,139!
159
        if (open_mode_ == OpenMode::ReadOnly) {
1,128✔
160
            throw std::runtime_error(
66!
161
                "Failed to list RocksDB column families at '" + db_path_ +
66!
162
                "': " + list_status.ToString());
110!
163
        }
164
        column_family_names.reserve(default_column_families().size());
1,084!
165
        for (auto name : default_column_families()) {
29,205✔
166
            column_family_names.emplace_back(name);
28,126!
167
        }
168
    } else {
542✔
169
        if (open_mode_ == OpenMode::ReadWrite) {
10,012✔
170
            for (const auto& name : default_column_families()) {
15,651✔
171
                if (std::find(column_family_names.begin(),
22,611!
172
                              column_family_names.end(),
7,540✔
173
                              name) == column_family_names.end()) {
22,602!
NEW
174
                    column_family_names.emplace_back(name);
×
175
                }
176
            }
177
        }
290✔
178
    }
179

180
    std::vector<::rocksdb::ColumnFamilyDescriptor> descriptors;
11,096✔
181
    descriptors.reserve(column_family_names.size());
11,096!
182
    for (const auto& name : column_family_names) {
299,597✔
183
        auto opts = cf_options;
288,498!
184
        if (cf_options_override_) {
288,496✔
185
            cf_options_override_(name, opts);
286,730!
186
        }
145,496✔
187
        descriptors.emplace_back(name, opts);
288,502!
188
    }
288,507✔
189

190
    std::vector<::rocksdb::ColumnFamilyHandle*> handles;
11,096✔
191
    ::rocksdb::Status status;
11,095!
192
    if (open_mode_ == OpenMode::ReadOnly) {
11,097✔
193
        status = ::rocksdb::DB::OpenForReadOnly(
9,433!
194
            db_options, db_path_, descriptors, &handles, &db_, false);
9,433!
195
    } else {
4,798✔
196
        status = ::rocksdb::DB::Open(db_options, db_path_, descriptors,
3,327!
197
                                     &handles, &db_);
1,663✔
198
    }
199
    if (!status.ok()) {
11,096!
UNCOV
200
        cleanup_failed_open(db_, handles);
×
UNCOV
201
        throw std::runtime_error("Failed to open RocksDB at '" + db_path_ +
×
UNCOV
202
                                 "': " + status.ToString());
×
203
    }
204

205
    column_families_.clear();
11,096✔
206
    for (std::size_t i = 0; i < descriptors.size(); ++i) {
299,612✔
207
        column_families_.emplace(descriptors[i].name, handles[i]);
288,515!
208
    }
146,378✔
209

210
    return true;
5,467✔
211
}
11,229✔
212

213
void RocksDatabase::close() {
22,280✔
214
    if (db_ == nullptr) {
22,280✔
215
        column_families_.clear();
11,185✔
216
        return;
11,185✔
217
    }
218

219
    if (process_exiting_flag().load(std::memory_order_relaxed)) {
11,095!
220
        db_ = nullptr;
×
221
        column_families_.clear();
×
222
        env_.reset();
×
223
        file_system_.reset();
×
224
        db_path_.clear();
×
225
        return;
×
226
    }
227

228
    for (auto& entry : column_families_) {
299,613✔
229
        if (entry.second != nullptr) {
288,519✔
230
            db_->DestroyColumnFamilyHandle(entry.second);
288,520!
231
            entry.second = nullptr;
288,519✔
232
        }
146,380✔
233
    }
234
    column_families_.clear();
11,097✔
235

236
    auto* db = db_;
11,096✔
237
    db_ = nullptr;
11,096✔
238
    static_cast<void>(db->Close());
11,096✔
239
    delete db;
11,097✔
240
    env_.reset();
11,096✔
241
    file_system_.reset();
11,097✔
242
    db_path_.clear();
11,097✔
243
}
11,304✔
244

245
bool RocksDatabase::is_open() const noexcept { return db_ != nullptr; }
34✔
246

247
bool RocksDatabase::is_read_only() const noexcept {
1,905✔
248
    return open_mode_ == OpenMode::ReadOnly;
1,905✔
249
}
250

251
const std::string& RocksDatabase::path() const noexcept { return db_path_; }
×
252

253
::rocksdb::DB* RocksDatabase::get() const noexcept { return db_; }
×
254

255
::rocksdb::ColumnFamilyHandle* RocksDatabase::column_family_handle(
72,733✔
256
    std::string_view column_family) const {
257
    const auto name = column_family.empty() ? std::string(cf::DEFAULT)
72,733!
258
                                            : std::string(column_family);
109,139!
259
    const auto it = column_families_.find(name);
72,758✔
260
    if (it == column_families_.end() || it->second == nullptr) {
72,726!
261
        throw std::invalid_argument("Unknown RocksDB column family: " + name);
×
262
    }
263
    return it->second;
109,106✔
264
}
72,762✔
265

266
::rocksdb::Status RocksDatabase::put(std::string_view key,
1,402✔
267
                                     std::string_view value,
268
                                     std::string_view column_family) {
269
    return db_->Put(write_options(), column_family_handle(column_family),
2,103✔
270
                    ::rocksdb::Slice(key.data(), key.size()),
1,402✔
271
                    ::rocksdb::Slice(value.data(), value.size()));
2,804!
272
}
273

274
::rocksdb::Status RocksDatabase::get(std::string_view key, std::string* value,
12,492✔
275
                                     std::string_view column_family) const {
276
    return db_->Get(read_options(), column_family_handle(column_family),
18,745✔
277
                    ::rocksdb::Slice(key.data(), key.size()), value);
18,708!
278
}
279

NEW
280
::rocksdb::Status RocksDatabase::merge(std::string_view key,
×
281
                                       std::string_view value,
282
                                       std::string_view column_family) {
NEW
283
    return db_->Merge(write_options(), column_family_handle(column_family),
×
NEW
284
                      ::rocksdb::Slice(key.data(), key.size()),
×
NEW
285
                      ::rocksdb::Slice(value.data(), value.size()));
×
286
}
287

288
void RocksDatabase::set_cf_options_override(CfOptionsOverride override) {
11,068✔
289
    cf_options_override_ = std::move(override);
11,068✔
290
}
11,073✔
291

292
::rocksdb::Status RocksDatabase::merge(Batch& batch,
1,492✔
293
                                       std::string_view column_family,
294
                                       std::string_view key,
295
                                       std::string_view value) {
296
    return batch.Merge(column_family_handle(column_family),
2,243✔
297
                       ::rocksdb::Slice(key.data(), key.size()),
1,490✔
298
                       ::rocksdb::Slice(value.data(), value.size()));
2,974!
299
}
300

UNCOV
301
::rocksdb::Status RocksDatabase::del(std::string_view key,
×
302
                                     std::string_view column_family) {
UNCOV
303
    return db_->Delete(write_options(), column_family_handle(column_family),
×
UNCOV
304
                       ::rocksdb::Slice(key.data(), key.size()));
×
305
}
306

NEW
307
::rocksdb::Status RocksDatabase::delete_range(std::string_view begin_key,
×
308
                                              std::string_view end_key,
309
                                              std::string_view column_family) {
NEW
310
    return db_->DeleteRange(
×
311
        write_options(), column_family_handle(column_family),
NEW
312
        ::rocksdb::Slice(begin_key.data(), begin_key.size()),
×
NEW
313
        ::rocksdb::Slice(end_key.data(), end_key.size()));
×
314
}
315

316
::rocksdb::Status RocksDatabase::put(Batch& batch,
47,335✔
317
                                     std::string_view column_family,
318
                                     std::string_view key,
319
                                     std::string_view value) {
320
    return batch.Put(column_family_handle(column_family),
71,010✔
321
                     ::rocksdb::Slice(key.data(), key.size()),
47,358✔
322
                     ::rocksdb::Slice(value.data(), value.size()));
94,719!
323
}
324

325
::rocksdb::Status RocksDatabase::del(Batch& batch,
120✔
326
                                     std::string_view column_family,
327
                                     std::string_view key) {
328
    return batch.Delete(column_family_handle(column_family),
180✔
329
                        ::rocksdb::Slice(key.data(), key.size()));
180!
330
}
331

332
RocksDatabase::Batch RocksDatabase::begin_batch() const { return Batch(); }
2,562✔
333

334
::rocksdb::Status RocksDatabase::commit_batch(Batch& batch) {
2,557✔
335
    return db_->Write(write_options(), &batch);
2,557✔
336
}
337

338
std::unique_ptr<::rocksdb::Iterator> RocksDatabase::new_iterator(
9,393✔
339
    std::string_view column_family) const {
340
    return std::unique_ptr<::rocksdb::Iterator>(
4,690✔
341
        db_->NewIterator(read_options(), column_family_handle(column_family)));
9,393✔
342
}
343

344
::rocksdb::Status RocksDatabase::compact(std::string_view column_family) {
36✔
345
    ::rocksdb::CompactRangeOptions opts;
36✔
346
    opts.max_subcompactions = 8;
36✔
347
    return db_->CompactRange(opts, column_family_handle(column_family), nullptr,
36✔
348
                             nullptr);
36!
349
}
350

351
::rocksdb::Status RocksDatabase::ingest_external_files(
470✔
352
    std::string_view column_family,
353
    const std::vector<std::string>& external_files, bool ingest_behind) {
354
    if (external_files.empty()) {
470✔
355
        return ::rocksdb::Status::OK();
26✔
356
    }
357
    ::rocksdb::IngestExternalFileOptions opts;
444✔
358
    opts.move_files = false;
444✔
359
    opts.snapshot_consistency = true;
444✔
360
    opts.allow_global_seqno = true;
444✔
361
    opts.allow_blocking_flush = true;
444✔
362
    opts.ingest_behind = ingest_behind;
444✔
363
    return db_->IngestExternalFile(column_family_handle(column_family),
666✔
364
                                   external_files, opts);
444!
365
}
235✔
366

367
}  // namespace dftracer::utils::rocksdb
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc