• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28398085302

29 Jun 2026 07:43PM UTC coverage: 50.067% (-2.2%) from 52.278%
28398085302

Pull #83

github

web-flow
Merge 9a615085e into 2efed6649
Pull Request #83: refactor and improve code QoL

16342 of 44293 branches covered (36.9%)

Branch coverage included in aggregate %.

613 of 1132 new or added lines in 52 files covered. (54.15%)

687 existing lines in 116 files now uncovered.

21698 of 31685 relevant lines covered (68.48%)

12958.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.88
/src/dftracer/utils/core/rocksdb/database.cpp
1
#include <dftracer/utils/core/common/constants.h>
2
#include <dftracer/utils/core/common/filesystem.h>
3
#include <dftracer/utils/core/env.h>
4
#include <dftracer/utils/core/rocksdb/database.h>
5
#include <dftracer/utils/core/rocksdb/filesystem.h>
6
#include <rocksdb/slice.h>
7
#include <rocksdb/table.h>
8

9
#include <algorithm>
10
#include <atomic>
11
#include <cstdlib>
12
#include <stdexcept>
13
#include <utility>
14

15
namespace dftracer::utils::rocksdb {
16

17
namespace {
18

19
std::atomic<bool>& process_exiting_flag() {
5,522✔
20
    static std::atomic<bool> flag{false};
21
    return flag;
5,522✔
22
}
23

24
const ::rocksdb::ReadOptions& read_options() {
11,080✔
25
    static const ::rocksdb::ReadOptions options;
11,080✔
26
    return options;
11,075✔
27
}
28

29
const ::rocksdb::WriteOptions& write_options() {
2,028✔
30
    static const auto options = [] {
95✔
31
        ::rocksdb::WriteOptions wo;
95✔
32
        wo.disableWAL = true;
95✔
33
        return wo;
95✔
34
    }();
2,028!
35
    return options;
2,029✔
36
}
37

38
void cleanup_failed_open(::rocksdb::DB*& db,
×
39
                         std::vector<::rocksdb::ColumnFamilyHandle*>& handles) {
40
    if (db != nullptr) {
×
41
        for (auto* handle : handles) {
×
42
            if (handle != nullptr) {
×
43
                db->DestroyColumnFamilyHandle(handle);
×
44
            }
45
        }
46
        static_cast<void>(db->Close());
×
47
        delete db;
×
48
        db = nullptr;
×
49
    }
50
    handles.clear();
×
51
}
×
52

53
}  // namespace
54

55
void mark_process_exiting_for_rocksdb() {
3✔
56
    process_exiting_flag().store(true, std::memory_order_relaxed);
3✔
57
}
3✔
58

59
RocksDatabase::RocksDatabase() = default;
5,533✔
60

61
RocksDatabase::RocksDatabase(const std::string& db_path, OpenMode open_mode) {
3✔
62
    open(db_path, open_mode);
3!
63
}
3✔
64

65
RocksDatabase::~RocksDatabase() { close(); }
5,541✔
66

67
RocksDatabase::RocksDatabase(RocksDatabase&& other) noexcept
×
68
    : db_path_(std::move(other.db_path_)),
×
69
      open_mode_(other.open_mode_),
×
70
      file_system_(std::move(other.file_system_)),
×
71
      env_(std::move(other.env_)),
×
72
      db_(std::exchange(other.db_, nullptr)),
×
73
      column_families_(std::move(other.column_families_)) {}
×
74

75
RocksDatabase& RocksDatabase::operator=(RocksDatabase&& other) noexcept {
×
76
    if (this != &other) {
×
77
        close();
×
78
        db_path_ = std::move(other.db_path_);
×
79
        open_mode_ = other.open_mode_;
×
80
        file_system_ = std::move(other.file_system_);
×
81
        env_ = std::move(other.env_);
×
82
        db_ = std::exchange(other.db_, nullptr);
×
83
        column_families_ = std::move(other.column_families_);
×
84
    }
85
    return *this;
×
86
}
87

88
const decltype(cf::ALL)& RocksDatabase::default_column_families() {
1,382✔
89
    return cf::ALL;
1,382✔
90
}
91

92
::rocksdb::Options RocksDatabase::default_options() {
5,777✔
93
    ::rocksdb::Options options;
5,777✔
94
    options.create_if_missing = true;
5,777✔
95
    options.create_missing_column_families = true;
5,777✔
96
    options.allow_concurrent_memtable_write = true;
5,777✔
97
    options.enable_pipelined_write = true;
5,777✔
98
    options.max_open_files = Env::rocksdb_max_open_files();
5,777!
99
    options.max_background_jobs = 8;
5,777✔
100
    options.max_subcompactions = 8;
5,777✔
101
    options.write_buffer_size = 256 * 1024 * 1024;
5,777✔
102
    options.max_write_buffer_number = 4;
5,777✔
103
    return options;
5,777✔
UNCOV
104
}
×
105

106
::rocksdb::ColumnFamilyOptions RocksDatabase::default_column_family_options() {
5,776✔
107
    ::rocksdb::ColumnFamilyOptions options;
5,776!
108

109
    ::rocksdb::BlockBasedTableOptions bbt;
5,777✔
110
    bbt.block_size = 32 * 1024;
5,776✔
111
    bbt.format_version = 5;
5,776✔
112
    bbt.index_block_restart_interval = 16;
5,776✔
113
    options.table_factory.reset(::rocksdb::NewBlockBasedTableFactory(bbt));
5,776!
114

115
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
116
    options.compression = ::rocksdb::kZSTD;
117
    options.compression_opts.level = constants::rocksdb::ZSTD_COMPRESSION_LEVEL;
118
    options.compression_opts.max_dict_bytes =
119
        constants::rocksdb::ZSTD_MAX_DICT_BYTES;
120
    options.compression_opts.zstd_max_train_bytes =
121
        constants::rocksdb::ZSTD_MAX_TRAIN_BYTES;
122
    options.compression_opts.enabled = true;
123
    options.bottommost_compression = ::rocksdb::kZSTD;
124
    options.bottommost_compression_opts.level =
125
        constants::rocksdb::ZSTD_COMPRESSION_LEVEL;
126
    options.bottommost_compression_opts.max_dict_bytes =
127
        constants::rocksdb::ZSTD_MAX_DICT_BYTES;
128
    options.bottommost_compression_opts.zstd_max_train_bytes =
129
        constants::rocksdb::ZSTD_MAX_TRAIN_BYTES;
130
    options.bottommost_compression_opts.enabled = true;
131
#elif defined(DFTRACER_UTILS_ENABLE_LZ4)
132
    options.compression = ::rocksdb::kLZ4Compression;
133
    options.bottommost_compression = ::rocksdb::kZlibCompression;
134
#else
135
    options.compression = ::rocksdb::kZlibCompression;
5,777✔
136
    options.bottommost_compression = ::rocksdb::kZlibCompression;
5,777✔
137
#endif
138
    return options;
11,553✔
139
}
5,777✔
140

141
bool RocksDatabase::open(const std::string& db_path, OpenMode open_mode) {
5,540✔
142
    close();
5,540!
143
    db_path_ = db_path;
5,541!
144
    open_mode_ = open_mode;
5,540✔
145

146
    std::error_code ec;
5,540✔
147
    if (open_mode_ == OpenMode::ReadWrite) {
5,541✔
148
        fs::create_directories(fs::path(db_path_), ec);
838!
149
    }
150

151
    auto db_options = default_options();
5,541!
152
    if (open_mode_ == OpenMode::ReadOnly) {
5,541✔
153
        db_options.create_if_missing = false;
4,703✔
154
        db_options.create_missing_column_families = false;
4,703✔
155
    }
156
    file_system_ = make_dftracer_file_system();
5,541!
157
    env_ = make_dftracer_env(file_system_);
5,541!
158
    db_options.env = env_.get();
5,538✔
159
    auto cf_options = default_column_family_options();
5,539!
160

161
    std::vector<std::string> column_family_names;
5,540✔
162
    auto list_status = ::rocksdb::DB::ListColumnFamilies(db_options, db_path_,
5,541✔
163
                                                         &column_family_names);
5,541!
164
    if (!list_status.ok()) {
5,540!
165
        if (open_mode_ == OpenMode::ReadOnly) {
567✔
166
            throw std::runtime_error(
22!
167
                "Failed to list RocksDB column families at '" + db_path_ +
44!
168
                "': " + list_status.ToString());
88!
169
        }
170
        column_family_names.reserve(default_column_families().size());
545!
171
        for (auto name : default_column_families()) {
14,739!
172
            column_family_names.emplace_back(name);
14,194!
173
        }
174
    } else {
175
        if (open_mode_ == OpenMode::ReadWrite) {
4,972✔
176
            for (const auto& name : default_column_families()) {
7,857!
177
                if (std::find(column_family_names.begin(),
7,564!
178
                              column_family_names.end(),
179
                              name) == column_family_names.end()) {
15,131!
180
                    column_family_names.emplace_back(name);
×
181
                }
182
            }
183
        }
184
    }
185

186
    std::vector<::rocksdb::ColumnFamilyDescriptor> descriptors;
5,519✔
187
    descriptors.reserve(column_family_names.size());
5,518!
188
    for (const auto& name : column_family_names) {
148,984✔
189
        auto opts = cf_options;
143,475!
190
        if (cf_options_override_) {
143,459✔
191
            cf_options_override_(name, opts);
142,577!
192
        }
193
        descriptors.emplace_back(name, opts);
143,466!
194
    }
143,468✔
195

196
    std::vector<::rocksdb::ColumnFamilyHandle*> handles;
5,518✔
197
    ::rocksdb::Status status;
5,519✔
198
    if (open_mode_ == OpenMode::ReadOnly) {
5,519✔
199
        status = ::rocksdb::DB::OpenForReadOnly(
4,680✔
200
            db_options, db_path_, descriptors, &handles, &db_, false);
4,681!
201
    } else {
202
        status = ::rocksdb::DB::Open(db_options, db_path_, descriptors,
1,675!
203
                                     &handles, &db_);
837✔
204
    }
205
    if (!status.ok()) {
5,516!
206
        cleanup_failed_open(db_, handles);
×
207
        throw std::runtime_error("Failed to open RocksDB at '" + db_path_ +
×
208
                                 "': " + status.ToString());
×
209
    }
210

211
    column_families_.clear();
5,517✔
212
    for (std::size_t i = 0; i < descriptors.size(); ++i) {
148,986✔
213
        column_families_.emplace(descriptors[i].name, handles[i]);
143,467!
214
    }
215

216
    return true;
5,519✔
217
}
5,603✔
218

219
void RocksDatabase::close() {
11,080✔
220
    if (db_ == nullptr) {
11,080✔
221
        column_families_.clear();
5,562✔
222
        return;
5,562✔
223
    }
224

225
    if (process_exiting_flag().load(std::memory_order_relaxed)) {
5,518!
226
        db_ = nullptr;
×
227
        column_families_.clear();
×
228
        env_.reset();
×
229
        file_system_.reset();
×
230
        db_path_.clear();
×
231
        return;
×
232
    }
233

234
    for (auto& entry : column_families_) {
148,956✔
235
        if (entry.second != nullptr) {
143,447!
236
            db_->DestroyColumnFamilyHandle(entry.second);
143,469!
237
            entry.second = nullptr;
143,461✔
238
        }
239
    }
240
    column_families_.clear();
5,519✔
241

242
    auto* db = db_;
5,519✔
243
    db_ = nullptr;
5,519✔
244
    static_cast<void>(db->Close());
5,519✔
245
    delete db;
5,519!
246
    env_.reset();
5,518✔
247
    file_system_.reset();
5,519✔
248
    db_path_.clear();
5,519✔
249
}
250

251
bool RocksDatabase::is_open() const noexcept { return db_ != nullptr; }
18✔
252

253
bool RocksDatabase::is_read_only() const noexcept {
1,037✔
254
    return open_mode_ == OpenMode::ReadOnly;
1,037✔
255
}
256

257
const std::string& RocksDatabase::path() const noexcept { return db_path_; }
×
258

259
::rocksdb::DB* RocksDatabase::get() const noexcept { return db_; }
×
260

261
::rocksdb::ColumnFamilyHandle* RocksDatabase::column_family_handle(
37,482✔
262
    std::string_view column_family) const {
263
    const auto name = column_family.empty() ? std::string(cf::DEFAULT)
37,482✔
264
                                            : std::string(column_family);
75,001!
265
    const auto it = column_families_.find(name);
37,522!
266
    if (it == column_families_.end() || it->second == nullptr) {
37,469!
267
        throw std::invalid_argument("Unknown RocksDB column family: " + name);
×
268
    }
269
    return it->second;
74,887✔
270
}
37,393✔
271

272
::rocksdb::Status RocksDatabase::put(std::string_view key,
718✔
273
                                     std::string_view value,
274
                                     std::string_view column_family) {
275
    return db_->Put(write_options(), column_family_handle(column_family),
718✔
276
                    ::rocksdb::Slice(key.data(), key.size()),
718✔
277
                    ::rocksdb::Slice(value.data(), value.size()));
2,154!
278
}
279

280
::rocksdb::Status RocksDatabase::get(std::string_view key, std::string* value,
6,307✔
281
                                     std::string_view column_family) const {
282
    return db_->Get(read_options(), column_family_handle(column_family),
6,307✔
283
                    ::rocksdb::Slice(key.data(), key.size()), value);
12,613!
284
}
285

286
::rocksdb::Status RocksDatabase::merge(std::string_view key,
×
287
                                       std::string_view value,
288
                                       std::string_view column_family) {
289
    return db_->Merge(write_options(), column_family_handle(column_family),
×
290
                      ::rocksdb::Slice(key.data(), key.size()),
×
291
                      ::rocksdb::Slice(value.data(), value.size()));
×
292
}
293

294
void RocksDatabase::set_cf_options_override(CfOptionsOverride override) {
5,502✔
295
    cf_options_override_ = std::move(override);
5,502✔
296
}
5,506✔
297

298
::rocksdb::Status RocksDatabase::merge(Batch& batch,
881✔
299
                                       std::string_view column_family,
300
                                       std::string_view key,
301
                                       std::string_view value) {
302
    return batch.Merge(column_family_handle(column_family),
881✔
303
                       ::rocksdb::Slice(key.data(), key.size()),
879✔
304
                       ::rocksdb::Slice(value.data(), value.size()));
2,642!
305
}
306

307
::rocksdb::Status RocksDatabase::del(std::string_view key,
×
308
                                     std::string_view column_family) {
309
    return db_->Delete(write_options(), column_family_handle(column_family),
×
310
                       ::rocksdb::Slice(key.data(), key.size()));
×
311
}
312

313
::rocksdb::Status RocksDatabase::delete_range(std::string_view begin_key,
×
314
                                              std::string_view end_key,
315
                                              std::string_view column_family) {
316
    return db_->DeleteRange(
×
317
        write_options(), column_family_handle(column_family),
318
        ::rocksdb::Slice(begin_key.data(), begin_key.size()),
×
319
        ::rocksdb::Slice(end_key.data(), end_key.size()));
×
320
}
321

322
::rocksdb::Status RocksDatabase::put(Batch& batch,
24,526✔
323
                                     std::string_view column_family,
324
                                     std::string_view key,
325
                                     std::string_view value) {
326
    return batch.Put(column_family_handle(column_family),
24,526✔
327
                     ::rocksdb::Slice(key.data(), key.size()),
24,482✔
328
                     ::rocksdb::Slice(value.data(), value.size()));
73,632!
329
}
330

331
::rocksdb::Status RocksDatabase::del(Batch& batch,
60✔
332
                                     std::string_view column_family,
333
                                     std::string_view key) {
334
    return batch.Delete(column_family_handle(column_family),
60✔
335
                        ::rocksdb::Slice(key.data(), key.size()));
120!
336
}
337

338
RocksDatabase::Batch RocksDatabase::begin_batch() const { return Batch(); }
1,314✔
339

340
::rocksdb::Status RocksDatabase::commit_batch(Batch& batch) {
1,311✔
341
    return db_->Write(write_options(), &batch);
1,311✔
342
}
343

344
std::unique_ptr<::rocksdb::Iterator> RocksDatabase::new_iterator(
4,773✔
345
    std::string_view column_family) const {
346
    return std::unique_ptr<::rocksdb::Iterator>(
347
        db_->NewIterator(read_options(), column_family_handle(column_family)));
4,773✔
348
}
349

350
::rocksdb::Status RocksDatabase::compact(std::string_view column_family) {
24✔
351
    ::rocksdb::CompactRangeOptions opts;
24✔
352
    opts.max_subcompactions = 8;
24✔
353
    return db_->CompactRange(opts, column_family_handle(column_family), nullptr,
24✔
354
                             nullptr);
48!
355
}
356

357
::rocksdb::Status RocksDatabase::ingest_external_files(
235✔
358
    std::string_view column_family,
359
    const std::vector<std::string>& external_files, bool ingest_behind) {
360
    if (external_files.empty()) {
235✔
361
        return ::rocksdb::Status::OK();
13✔
362
    }
363
    ::rocksdb::IngestExternalFileOptions opts;
222✔
364
    opts.move_files = false;
222✔
365
    opts.snapshot_consistency = true;
222✔
366
    opts.allow_global_seqno = true;
222✔
367
    opts.allow_blocking_flush = true;
222✔
368
    opts.ingest_behind = ingest_behind;
222✔
369
    return db_->IngestExternalFile(column_family_handle(column_family),
222✔
370
                                   external_files, opts);
222!
371
}
372

373
}  // namespace dftracer::utils::rocksdb
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc