• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28521653886

01 Jul 2026 01:36PM UTC coverage: 50.92% (-1.4%) from 52.278%
28521653886

Pull #83

github

web-flow
Merge 9bdedb1e9 into 2efed6649
Pull Request #83: refactor and improve code QoL

31893 of 80049 branches covered (39.84%)

Branch coverage included in aggregate %.

789 of 1613 new or added lines in 87 files covered. (48.92%)

5007 existing lines in 181 files now uncovered.

32812 of 47024 relevant lines covered (69.78%)

9905.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.78
/src/dftracer/utils/core/rocksdb/database.cpp
1
#include <dftracer/utils/core/common/constants.h>
2
#include <dftracer/utils/core/common/error.h>
3
#include <dftracer/utils/core/common/filesystem.h>
4
#include <dftracer/utils/core/env.h>
5
#include <dftracer/utils/core/rocksdb/database.h>
6
#include <dftracer/utils/core/rocksdb/filesystem.h>
7
#include <rocksdb/slice.h>
8
#include <rocksdb/table.h>
9

10
#include <algorithm>
11
#include <atomic>
12
#include <cstdlib>
13
#include <stdexcept>
14
#include <utility>
15

16
namespace dftracer::utils::rocksdb {
17

18
namespace {
19

20
std::atomic<bool>& process_exiting_flag() {
5,659✔
21
    static std::atomic<bool> flag{false};
22
    return flag;
5,659✔
23
}
24

25
const ::rocksdb::ReadOptions& read_options() {
11,051✔
26
    static const ::rocksdb::ReadOptions options;
11,051!
27
    return options;
11,051✔
UNCOV
28
}
×
29

30
const ::rocksdb::WriteOptions& write_options() {
2,028✔
31
    static const auto options = [] {
2,123!
32
        ::rocksdb::WriteOptions wo;
95✔
33
        wo.disableWAL = true;
95✔
34
        return wo;
95✔
35
    }();
36
    return options;
2,028✔
UNCOV
37
}
×
38

39
void cleanup_failed_open(::rocksdb::DB*& db,
×
40
                         std::vector<::rocksdb::ColumnFamilyHandle*>& handles) {
41
    if (db != nullptr) {
×
42
        for (auto* handle : handles) {
×
43
            if (handle != nullptr) {
×
44
                db->DestroyColumnFamilyHandle(handle);
×
UNCOV
45
            }
×
46
        }
47
        static_cast<void>(db->Close());
×
48
        delete db;
×
49
        db = nullptr;
×
UNCOV
50
    }
×
51
    handles.clear();
×
52
}
×
53

54
}  // namespace
55

56
void mark_process_exiting_for_rocksdb() {
3✔
57
    process_exiting_flag().store(true, std::memory_order_relaxed);
3✔
58
}
3✔
59

60
RocksDatabase::RocksDatabase() = default;
22,700✔
61

62
RocksDatabase::RocksDatabase(const std::string& db_path, OpenMode open_mode) {
12✔
63
    open(db_path, open_mode);
3!
64
}
6✔
65

66
RocksDatabase::~RocksDatabase() { close(); }
11,356!
67

68
RocksDatabase::RocksDatabase(RocksDatabase&& other) noexcept
×
69
    : db_path_(std::move(other.db_path_)),
×
70
      open_mode_(other.open_mode_),
×
71
      file_system_(std::move(other.file_system_)),
×
72
      env_(std::move(other.env_)),
×
73
      db_(std::exchange(other.db_, nullptr)),
×
74
      column_families_(std::move(other.column_families_)) {}
×
75

76
RocksDatabase& RocksDatabase::operator=(RocksDatabase&& other) noexcept {
×
77
    if (this != &other) {
×
78
        close();
×
79
        db_path_ = std::move(other.db_path_);
×
80
        open_mode_ = other.open_mode_;
×
81
        file_system_ = std::move(other.file_system_);
×
82
        env_ = std::move(other.env_);
×
83
        db_ = std::exchange(other.db_, nullptr);
×
84
        column_families_ = std::move(other.column_families_);
×
UNCOV
85
    }
×
86
    return *this;
×
87
}
88

89
const decltype(cf::ALL)& RocksDatabase::default_column_families() {
1,386✔
90
    return cf::ALL;
1,386✔
91
}
92

93
::rocksdb::Options RocksDatabase::default_options() {
5,914✔
94
    ::rocksdb::Options options;
5,914✔
95
    options.create_if_missing = true;
5,914✔
96
    options.create_missing_column_families = true;
5,914✔
97
    options.allow_concurrent_memtable_write = true;
5,914✔
98
    options.enable_pipelined_write = true;
5,914✔
99
    options.max_open_files = Env::rocksdb_max_open_files();
5,914!
100
    options.max_background_jobs = 8;
5,914✔
101
    options.max_subcompactions = 8;
5,914✔
102
    options.write_buffer_size = 256 * 1024 * 1024;
5,914✔
103
    options.max_write_buffer_number = 4;
5,914✔
104
    return options;
5,914✔
105
}
5,914!
106

107
::rocksdb::ColumnFamilyOptions RocksDatabase::default_column_family_options() {
5,914✔
108
    ::rocksdb::ColumnFamilyOptions options;
5,914✔
109

110
    ::rocksdb::BlockBasedTableOptions bbt;
5,914✔
111
    bbt.block_size = 32 * 1024;
5,914✔
112
    bbt.format_version = 5;
5,914✔
113
    bbt.index_block_restart_interval = 16;
5,914✔
114
    options.table_factory.reset(::rocksdb::NewBlockBasedTableFactory(bbt));
5,914!
115

116
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
117
    options.compression = ::rocksdb::kZSTD;
118
    options.compression_opts.level = constants::rocksdb::ZSTD_COMPRESSION_LEVEL;
119
    options.compression_opts.max_dict_bytes =
120
        constants::rocksdb::ZSTD_MAX_DICT_BYTES;
121
    options.compression_opts.zstd_max_train_bytes =
122
        constants::rocksdb::ZSTD_MAX_TRAIN_BYTES;
123
    options.compression_opts.enabled = true;
124
    options.bottommost_compression = ::rocksdb::kZSTD;
125
    options.bottommost_compression_opts.level =
126
        constants::rocksdb::ZSTD_COMPRESSION_LEVEL;
127
    options.bottommost_compression_opts.max_dict_bytes =
128
        constants::rocksdb::ZSTD_MAX_DICT_BYTES;
129
    options.bottommost_compression_opts.zstd_max_train_bytes =
130
        constants::rocksdb::ZSTD_MAX_TRAIN_BYTES;
131
    options.bottommost_compression_opts.enabled = true;
132
#elif defined(DFTRACER_UTILS_ENABLE_LZ4)
133
    options.compression = ::rocksdb::kLZ4Compression;
134
    options.bottommost_compression = ::rocksdb::kZlibCompression;
135
#else
136
    options.compression = ::rocksdb::kZlibCompression;
5,914✔
137
    options.bottommost_compression = ::rocksdb::kZlibCompression;
5,914✔
138
#endif
139
    return options;
5,914✔
140
}
5,914!
141

142
bool RocksDatabase::open(const std::string& db_path, OpenMode open_mode) {
5,678✔
143
    close();
5,678✔
144
    db_path_ = db_path;
5,678✔
145
    open_mode_ = open_mode;
5,678✔
146

147
    std::error_code ec;
5,678✔
148
    if (open_mode_ == OpenMode::ReadWrite) {
5,678✔
149
        fs::create_directories(fs::path(db_path_), ec);
861!
150
    }
839✔
151

152
    auto db_options = default_options();
5,678✔
153
    if (open_mode_ == OpenMode::ReadOnly) {
5,678✔
154
        db_options.create_if_missing = false;
4,839✔
155
        db_options.create_missing_column_families = false;
4,839✔
156
    }
4,839✔
157
    file_system_ = make_dftracer_file_system();
5,678!
158
    env_ = make_dftracer_env(file_system_);
5,678!
159
    db_options.env = env_.get();
5,678✔
160
    auto cf_options = default_column_family_options();
5,678!
161

162
    std::vector<std::string> column_family_names;
5,678✔
163
    auto list_status = ::rocksdb::DB::ListColumnFamilies(db_options, db_path_,
5,678!
164
                                                         &column_family_names);
165
    if (!list_status.ok()) {
5,678!
166
        if (open_mode_ == OpenMode::ReadOnly) {
569✔
167
            throw DFTUtilsException(
44!
168
                ErrorCode::IO, "Failed to list RocksDB column families at '" +
22!
169
                                   db_path_ + "': " + list_status.ToString());
22!
170
        }
171
        column_family_names.reserve(default_column_families().size());
547!
172
        for (auto name : default_column_families()) {
14,769✔
173
            column_family_names.emplace_back(name);
14,222!
174
        }
175
    } else {
547✔
176
        if (open_mode_ == OpenMode::ReadWrite) {
5,109✔
177
            for (const auto& name : default_column_families()) {
7,884✔
178
                if (std::find(column_family_names.begin(),
15,184!
179
                              column_family_names.end(),
7,592✔
180
                              name) == column_family_names.end()) {
7,592✔
181
                    column_family_names.emplace_back(name);
×
UNCOV
182
                }
×
183
            }
184
        }
292✔
185
    }
186

187
    std::vector<::rocksdb::ColumnFamilyDescriptor> descriptors;
5,656✔
188
    descriptors.reserve(column_family_names.size());
5,656!
189
    for (const auto& name : column_family_names) {
152,712✔
190
        auto opts = cf_options;
147,056!
191
        if (cf_options_override_) {
147,056✔
192
            cf_options_override_(name, opts);
146,172!
193
        }
146,172✔
194
        descriptors.emplace_back(name, opts);
147,056!
195
    }
147,056✔
196

197
    std::vector<::rocksdb::ColumnFamilyHandle*> handles;
5,656✔
198
    ::rocksdb::Status status;
5,656!
199
    if (open_mode_ == OpenMode::ReadOnly) {
5,656✔
200
        status = ::rocksdb::DB::OpenForReadOnly(
4,817!
201
            db_options, db_path_, descriptors, &handles, &db_, false);
4,817✔
202
    } else {
4,817✔
203
        status = ::rocksdb::DB::Open(db_options, db_path_, descriptors,
1,678!
204
                                     &handles, &db_);
839✔
205
    }
206
    if (!status.ok()) {
5,656!
207
        cleanup_failed_open(db_, handles);
×
NEW
208
        throw DFTUtilsException(ErrorCode::IO, "Failed to open RocksDB at '" +
×
NEW
209
                                                   db_path_ +
×
NEW
210
                                                   "': " + status.ToString());
×
211
    }
212

213
    column_families_.clear();
5,656✔
214
    for (std::size_t i = 0; i < descriptors.size(); ++i) {
152,712✔
215
        column_families_.emplace(descriptors[i].name, handles[i]);
147,056!
216
    }
147,056✔
217

218
    return true;
219
}
5,700✔
220

221
void RocksDatabase::close() {
11,356✔
222
    if (db_ == nullptr) {
11,356✔
223
        column_families_.clear();
5,700✔
224
        return;
5,700✔
225
    }
226

227
    if (process_exiting_flag().load(std::memory_order_relaxed)) {
5,656!
228
        db_ = nullptr;
×
229
        column_families_.clear();
×
230
        env_.reset();
×
231
        file_system_.reset();
×
232
        db_path_.clear();
×
233
        return;
×
234
    }
235

236
    for (auto& entry : column_families_) {
152,712✔
237
        if (entry.second != nullptr) {
147,056!
238
            db_->DestroyColumnFamilyHandle(entry.second);
147,056✔
239
            entry.second = nullptr;
147,056✔
240
        }
147,056✔
241
    }
242
    column_families_.clear();
5,656✔
243

244
    auto* db = db_;
5,656✔
245
    db_ = nullptr;
5,656✔
246
    static_cast<void>(db->Close());
5,656✔
247
    delete db;
5,656!
248
    env_.reset();
5,656✔
249
    file_system_.reset();
5,656✔
250
    db_path_.clear();
5,656✔
251
}
11,356✔
252

253
bool RocksDatabase::is_open() const noexcept { return db_ != nullptr; }
18✔
254

255
bool RocksDatabase::is_read_only() const noexcept {
893✔
256
    return open_mode_ == OpenMode::ReadOnly;
893✔
257
}
258

259
const std::string& RocksDatabase::path() const noexcept { return db_path_; }
×
260

261
::rocksdb::DB* RocksDatabase::get() const noexcept { return db_; }
×
262

263
::rocksdb::ColumnFamilyHandle* RocksDatabase::column_family_handle(
37,420✔
264
    std::string_view column_family) const {
265
    const auto name = column_family.empty() ? std::string(cf::DEFAULT)
37,420!
266
                                            : std::string(column_family);
37,420✔
267
    const auto it = column_families_.find(name);
37,420✔
268
    if (it == column_families_.end() || it->second == nullptr) {
37,421!
NEW
269
        throw DFTUtilsException(ErrorCode::INVALID_ARGUMENT,
×
NEW
270
                                "Unknown RocksDB column family: " + name);
×
271
    }
272
    return it->second;
37,402✔
273
}
37,422✔
274

275
::rocksdb::Status RocksDatabase::put(std::string_view key,
718✔
276
                                     std::string_view value,
277
                                     std::string_view column_family) {
278
    return db_->Put(write_options(), column_family_handle(column_family),
1,436✔
279
                    ::rocksdb::Slice(key.data(), key.size()),
718✔
280
                    ::rocksdb::Slice(value.data(), value.size()));
718✔
281
}
282

283
::rocksdb::Status RocksDatabase::get(std::string_view key, std::string* value,
6,296✔
284
                                     std::string_view column_family) const {
285
    return db_->Get(read_options(), column_family_handle(column_family),
12,592✔
286
                    ::rocksdb::Slice(key.data(), key.size()), value);
6,296✔
287
}
288

289
::rocksdb::Status RocksDatabase::merge(std::string_view key,
×
290
                                       std::string_view value,
291
                                       std::string_view column_family) {
292
    return db_->Merge(write_options(), column_family_handle(column_family),
×
293
                      ::rocksdb::Slice(key.data(), key.size()),
×
294
                      ::rocksdb::Slice(value.data(), value.size()));
×
295
}
296

297
void RocksDatabase::set_cf_options_override(CfOptionsOverride override) {
5,644✔
298
    cf_options_override_ = std::move(override);
5,644✔
299
}
5,644✔
300

301
::rocksdb::Status RocksDatabase::merge(Batch& batch,
886✔
302
                                       std::string_view column_family,
303
                                       std::string_view key,
304
                                       std::string_view value) {
305
    return batch.Merge(column_family_handle(column_family),
1,772✔
306
                       ::rocksdb::Slice(key.data(), key.size()),
886✔
307
                       ::rocksdb::Slice(value.data(), value.size()));
886✔
308
}
309

310
::rocksdb::Status RocksDatabase::del(std::string_view key,
×
311
                                     std::string_view column_family) {
312
    return db_->Delete(write_options(), column_family_handle(column_family),
×
313
                       ::rocksdb::Slice(key.data(), key.size()));
×
314
}
315

316
::rocksdb::Status RocksDatabase::delete_range(std::string_view begin_key,
×
317
                                              std::string_view end_key,
318
                                              std::string_view column_family) {
319
    return db_->DeleteRange(
×
UNCOV
320
        write_options(), column_family_handle(column_family),
×
321
        ::rocksdb::Slice(begin_key.data(), begin_key.size()),
×
322
        ::rocksdb::Slice(end_key.data(), end_key.size()));
×
323
}
324

325
::rocksdb::Status RocksDatabase::put(Batch& batch,
24,453✔
326
                                     std::string_view column_family,
327
                                     std::string_view key,
328
                                     std::string_view value) {
329
    return batch.Put(column_family_handle(column_family),
48,906✔
330
                     ::rocksdb::Slice(key.data(), key.size()),
24,453✔
331
                     ::rocksdb::Slice(value.data(), value.size()));
24,453✔
332
}
333

334
::rocksdb::Status RocksDatabase::del(Batch& batch,
60✔
335
                                     std::string_view column_family,
336
                                     std::string_view key) {
337
    return batch.Delete(column_family_handle(column_family),
120✔
338
                        ::rocksdb::Slice(key.data(), key.size()));
60✔
339
}
340

341
RocksDatabase::Batch RocksDatabase::begin_batch() const { return Batch(); }
1,313✔
342

343
::rocksdb::Status RocksDatabase::commit_batch(Batch& batch) {
1,312✔
344
    return db_->Write(write_options(), &batch);
1,312✔
345
}
346

347
std::unique_ptr<::rocksdb::Iterator> RocksDatabase::new_iterator(
4,754✔
348
    std::string_view column_family) const {
349
    return std::unique_ptr<::rocksdb::Iterator>(
4,754✔
350
        db_->NewIterator(read_options(), column_family_handle(column_family)));
4,754✔
351
}
352

353
::rocksdb::Status RocksDatabase::compact(std::string_view column_family) {
24✔
354
    ::rocksdb::CompactRangeOptions opts;
24✔
355
    opts.max_subcompactions = 8;
24✔
356
    return db_->CompactRange(opts, column_family_handle(column_family), nullptr,
24✔
357
                             nullptr);
358
}
359

360
::rocksdb::Status RocksDatabase::ingest_external_files(
235✔
361
    std::string_view column_family,
362
    const std::vector<std::string>& external_files, bool ingest_behind) {
363
    if (external_files.empty()) {
235✔
364
        return ::rocksdb::Status::OK();
13✔
365
    }
366
    ::rocksdb::IngestExternalFileOptions opts;
222✔
367
    opts.move_files = false;
222✔
368
    opts.snapshot_consistency = true;
222✔
369
    opts.allow_global_seqno = true;
222✔
370
    opts.allow_blocking_flush = true;
222✔
371
    opts.ingest_behind = ingest_behind;
222✔
372
    return db_->IngestExternalFile(column_family_handle(column_family),
444✔
373
                                   external_files, opts);
222✔
374
}
235✔
375

376
}  // namespace dftracer::utils::rocksdb
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc