• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 27171677342

08 Jun 2026 10:43PM UTC coverage: 51.99% (+0.05%) from 51.937%
27171677342

Pull #77

github

web-flow
Merge 3a1432eec into 8045f0be3
Pull Request #77: chore: bump version to 0.0.10

36972 of 92663 branches covered (39.9%)

Branch coverage included in aggregate %.

33405 of 42703 relevant lines covered (78.23%)

20411.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.15
/src/dftracer/utils/core/rocksdb/database.cpp
1
#include <dftracer/utils/core/common/filesystem.h>
2
#include <dftracer/utils/core/env.h>
3
#include <dftracer/utils/core/rocksdb/database.h>
4
#include <dftracer/utils/core/rocksdb/filesystem.h>
5
#include <rocksdb/slice.h>
6
#include <rocksdb/table.h>
7

8
#include <algorithm>
9
#include <atomic>
10
#include <cstdlib>
11
#include <stdexcept>
12
#include <utility>
13

14
namespace dftracer::utils::rocksdb {
15

16
namespace {
17

18
std::atomic<bool>& process_exiting_flag() {
11,263✔
19
    static std::atomic<bool> flag{false};
20
    return flag;
11,263✔
21
}
22

23
const ::rocksdb::ReadOptions& read_options() {
22,105✔
24
    static const ::rocksdb::ReadOptions options;
22,105!
25
    return options;
22,111✔
26
}
27

28
const ::rocksdb::WriteOptions& write_options() {
4,058✔
29
    static const auto options = [] {
2,222!
30
        ::rocksdb::WriteOptions wo;
190✔
31
        wo.disableWAL = true;
190✔
32
        return wo;
190✔
33
    }();
2,026✔
34
    return options;
4,060✔
35
}
36

37
void cleanup_failed_open(::rocksdb::DB*& db,
×
38
                         std::vector<::rocksdb::ColumnFamilyHandle*>& handles) {
39
    if (db != nullptr) {
×
40
        for (auto* handle : handles) {
×
41
            if (handle != nullptr) {
×
42
                db->DestroyColumnFamilyHandle(handle);
×
43
            }
44
        }
45
        static_cast<void>(db->Close());
×
46
        delete db;
×
47
        db = nullptr;
×
48
    }
49
    handles.clear();
×
50
}
×
51

52
}  // namespace
53

54
void mark_process_exiting_for_rocksdb() {
6✔
55
    process_exiting_flag().store(true, std::memory_order_relaxed);
6✔
56
}
6✔
57

58
RocksDatabase::RocksDatabase() = default;
17,083✔
59

60
RocksDatabase::RocksDatabase(const std::string& db_path, OpenMode open_mode) {
9✔
61
    open(db_path, open_mode);
6!
62
}
6✔
63

64
RocksDatabase::~RocksDatabase() { close(); }
17,094!
65

66
RocksDatabase::RocksDatabase(RocksDatabase&& other) noexcept
×
67
    : db_path_(std::move(other.db_path_)),
×
68
      open_mode_(other.open_mode_),
×
69
      file_system_(std::move(other.file_system_)),
×
70
      env_(std::move(other.env_)),
×
71
      db_(std::exchange(other.db_, nullptr)),
×
72
      column_families_(std::move(other.column_families_)) {}
×
73

74
RocksDatabase& RocksDatabase::operator=(RocksDatabase&& other) noexcept {
×
75
    if (this != &other) {
×
76
        close();
×
77
        db_path_ = std::move(other.db_path_);
×
78
        open_mode_ = other.open_mode_;
×
79
        file_system_ = std::move(other.file_system_);
×
80
        env_ = std::move(other.env_);
×
81
        db_ = std::exchange(other.db_, nullptr);
×
82
        column_families_ = std::move(other.column_families_);
×
83
    }
84
    return *this;
×
85
}
86

87
const decltype(cf::ALL)& RocksDatabase::default_column_families() {
2,767✔
88
    return cf::ALL;
2,767✔
89
}
90

91
::rocksdb::Options RocksDatabase::default_options() {
11,773✔
92
    ::rocksdb::Options options;
11,773✔
93
    options.create_if_missing = true;
11,773✔
94
    options.create_missing_column_families = true;
11,773✔
95
    options.allow_concurrent_memtable_write = true;
11,773✔
96
    options.enable_pipelined_write = true;
11,773✔
97
    options.max_open_files = Env::rocksdb_max_open_files();
11,773!
98
    options.max_background_jobs = 8;
11,773✔
99
    options.max_subcompactions = 8;
11,773✔
100
    options.write_buffer_size = 256 * 1024 * 1024;
11,773✔
101
    options.max_write_buffer_number = 4;
11,773✔
102
    return options;
11,773✔
103
}
6,029!
104

105
::rocksdb::ColumnFamilyOptions RocksDatabase::default_column_family_options() {
11,771✔
106
    ::rocksdb::ColumnFamilyOptions options;
11,771!
107

108
    ::rocksdb::BlockBasedTableOptions bbt;
11,773✔
109
    bbt.block_size = 32 * 1024;
11,773✔
110
    bbt.format_version = 5;
11,773✔
111
    bbt.index_block_restart_interval = 16;
11,773✔
112
    options.table_factory.reset(::rocksdb::NewBlockBasedTableFactory(bbt));
11,773!
113

114
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
115
    options.compression = ::rocksdb::kZSTD;
116
    options.compression_opts.level = 9;
117
    options.compression_opts.max_dict_bytes = 262144;
118
    options.compression_opts.zstd_max_train_bytes = 1048576;
119
    options.compression_opts.enabled = true;
120
    options.bottommost_compression = ::rocksdb::kZSTD;
121
    options.bottommost_compression_opts.level = 9;
122
    options.bottommost_compression_opts.max_dict_bytes = 262144;
123
    options.bottommost_compression_opts.zstd_max_train_bytes = 1048576;
124
    options.bottommost_compression_opts.enabled = true;
125
#elif defined(DFTRACER_UTILS_ENABLE_LZ4)
126
    options.compression = ::rocksdb::kLZ4Compression;
127
    options.bottommost_compression = ::rocksdb::kZlibCompression;
128
#else
129
    options.compression = ::rocksdb::kZlibCompression;
11,772✔
130
    options.bottommost_compression = ::rocksdb::kZlibCompression;
11,772✔
131
#endif
132
    return options;
17,514✔
133
}
11,772!
134

135
bool RocksDatabase::open(const std::string& db_path, OpenMode open_mode) {
11,300✔
136
    close();
11,300!
137
    db_path_ = db_path;
11,300!
138
    open_mode_ = open_mode;
11,301✔
139

140
    std::error_code ec;
11,301✔
141
    if (open_mode_ == OpenMode::ReadWrite) {
11,299✔
142
        fs::create_directories(fs::path(db_path_), ec);
1,697!
143
    }
838✔
144

145
    auto db_options = default_options();
11,300!
146
    if (open_mode_ == OpenMode::ReadOnly) {
11,301✔
147
        db_options.create_if_missing = false;
9,625✔
148
        db_options.create_missing_column_families = false;
9,625✔
149
    }
4,955✔
150
    file_system_ = make_dftracer_file_system();
11,301!
151
    env_ = make_dftracer_env(file_system_);
11,301!
152
    db_options.env = env_.get();
11,299✔
153
    auto cf_options = default_column_family_options();
11,301!
154

155
    std::vector<std::string> column_family_names;
11,299✔
156
    auto list_status = ::rocksdb::DB::ListColumnFamilies(db_options, db_path_,
11,299!
157
                                                         &column_family_names);
5,506!
158
    if (!list_status.ok()) {
11,300!
159
        if (open_mode_ == OpenMode::ReadOnly) {
1,138✔
160
            throw std::runtime_error(
66!
161
                "Failed to list RocksDB column families at '" + db_path_ +
66!
162
                "': " + list_status.ToString());
110!
163
        }
164
        column_family_names.reserve(default_column_families().size());
1,094!
165
        for (auto name : default_column_families()) {
29,499✔
166
            column_family_names.emplace_back(name);
28,405!
167
        }
168
    } else {
547✔
169
        if (open_mode_ == OpenMode::ReadWrite) {
10,163✔
170
            for (const auto& name : default_column_families()) {
15,713✔
171
                if (std::find(column_family_names.begin(),
22,697!
172
                              column_family_names.end(),
7,566✔
173
                              name) == column_family_names.end()) {
22,696!
174
                    column_family_names.emplace_back(name);
×
175
                }
176
            }
177
        }
291✔
178
    }
179

180
    std::vector<::rocksdb::ColumnFamilyDescriptor> descriptors;
11,258✔
181
    descriptors.reserve(column_family_names.size());
11,255!
182
    for (const auto& name : column_family_names) {
303,929✔
183
        auto opts = cf_options;
292,674!
184
        if (cf_options_override_) {
292,670✔
185
            cf_options_override_(name, opts);
290,902!
186
        }
149,162✔
187
        descriptors.emplace_back(name, opts);
292,667!
188
    }
292,670✔
189

190
    std::vector<::rocksdb::ColumnFamilyHandle*> handles;
11,253✔
191
    ::rocksdb::Status status;
11,256!
192
    if (open_mode_ == OpenMode::ReadOnly) {
11,257✔
193
        status = ::rocksdb::DB::OpenForReadOnly(
9,581!
194
            db_options, db_path_, descriptors, &handles, &db_, false);
9,581!
195
    } else {
4,933✔
196
        status = ::rocksdb::DB::Open(db_options, db_path_, descriptors,
3,352!
197
                                     &handles, &db_);
1,675✔
198
    }
199
    if (!status.ok()) {
11,256!
200
        cleanup_failed_open(db_, handles);
×
201
        throw std::runtime_error("Failed to open RocksDB at '" + db_path_ +
×
202
                                 "': " + status.ToString());
×
203
    }
204

205
    column_families_.clear();
11,256✔
206
    for (std::size_t i = 0; i < descriptors.size(); ++i) {
303,934✔
207
        column_families_.emplace(descriptors[i].name, handles[i]);
292,676!
208
    }
150,046✔
209

210
    return true;
5,486✔
211
}
11,389✔
212

213
void RocksDatabase::close() {
22,602✔
214
    if (db_ == nullptr) {
22,602✔
215
        column_families_.clear();
11,345✔
216
        return;
11,344✔
217
    }
218

219
    if (process_exiting_flag().load(std::memory_order_relaxed)) {
11,257!
220
        db_ = nullptr;
×
221
        column_families_.clear();
×
222
        env_.reset();
×
223
        file_system_.reset();
×
224
        db_path_.clear();
×
225
        return;
×
226
    }
227

228
    for (auto& entry : column_families_) {
303,937✔
229
        if (entry.second != nullptr) {
292,677✔
230
            db_->DestroyColumnFamilyHandle(entry.second);
292,679!
231
            entry.second = nullptr;
292,682✔
232
        }
150,046✔
233
    }
234
    column_families_.clear();
11,257✔
235

236
    auto* db = db_;
11,257✔
237
    db_ = nullptr;
11,257✔
238
    static_cast<void>(db->Close());
11,257✔
239
    delete db;
11,257✔
240
    env_.reset();
11,257✔
241
    file_system_.reset();
11,257✔
242
    db_path_.clear();
11,257✔
243
}
11,586✔
244

245
bool RocksDatabase::is_open() const noexcept { return db_ != nullptr; }
36✔
246

247
bool RocksDatabase::is_read_only() const noexcept {
1,827✔
248
    return open_mode_ == OpenMode::ReadOnly;
1,827✔
249
}
250

251
const std::string& RocksDatabase::path() const noexcept { return db_path_; }
×
252

253
::rocksdb::DB* RocksDatabase::get() const noexcept { return db_; }
×
254

255
::rocksdb::ColumnFamilyHandle* RocksDatabase::column_family_handle(
75,103✔
256
    std::string_view column_family) const {
257
    const auto name = column_family.empty() ? std::string(cf::DEFAULT)
75,103!
258
                                            : std::string(column_family);
112,692!
259
    const auto it = column_families_.find(name);
75,096✔
260
    if (it == column_families_.end() || it->second == nullptr) {
75,108!
261
        throw std::invalid_argument("Unknown RocksDB column family: " + name);
×
262
    }
263
    return it->second;
112,726✔
264
}
75,140✔
265

266
::rocksdb::Status RocksDatabase::put(std::string_view key,
1,435✔
267
                                     std::string_view value,
268
                                     std::string_view column_family) {
269
    return db_->Put(write_options(), column_family_handle(column_family),
2,153✔
270
                    ::rocksdb::Slice(key.data(), key.size()),
1,436✔
271
                    ::rocksdb::Slice(value.data(), value.size()));
2,870!
272
}
273

274
::rocksdb::Status RocksDatabase::get(std::string_view key, std::string* value,
12,592✔
275
                                     std::string_view column_family) const {
276
    return db_->Get(read_options(), column_family_handle(column_family),
18,896✔
277
                    ::rocksdb::Slice(key.data(), key.size()), value);
18,842!
278
}
279

280
::rocksdb::Status RocksDatabase::merge(std::string_view key,
×
281
                                       std::string_view value,
282
                                       std::string_view column_family) {
283
    return db_->Merge(write_options(), column_family_handle(column_family),
×
284
                      ::rocksdb::Slice(key.data(), key.size()),
×
285
                      ::rocksdb::Slice(value.data(), value.size()));
×
286
}
287

288
void RocksDatabase::set_cf_options_override(CfOptionsOverride override) {
11,228✔
289
    cf_options_override_ = std::move(override);
11,228✔
290
}
11,233✔
291

292
::rocksdb::Status RocksDatabase::merge(Batch& batch,
1,766✔
293
                                       std::string_view column_family,
294
                                       std::string_view key,
295
                                       std::string_view value) {
296
    return batch.Merge(column_family_handle(column_family),
2,652✔
297
                       ::rocksdb::Slice(key.data(), key.size()),
1,766✔
298
                       ::rocksdb::Slice(value.data(), value.size()));
3,528!
299
}
300

301
::rocksdb::Status RocksDatabase::del(std::string_view key,
×
302
                                     std::string_view column_family) {
303
    return db_->Delete(write_options(), column_family_handle(column_family),
×
304
                       ::rocksdb::Slice(key.data(), key.size()));
×
305
}
306

307
::rocksdb::Status RocksDatabase::delete_range(std::string_view begin_key,
×
308
                                              std::string_view end_key,
309
                                              std::string_view column_family) {
310
    return db_->DeleteRange(
×
311
        write_options(), column_family_handle(column_family),
312
        ::rocksdb::Slice(begin_key.data(), begin_key.size()),
×
313
        ::rocksdb::Slice(end_key.data(), end_key.size()));
×
314
}
315

316
::rocksdb::Status RocksDatabase::put(Batch& batch,
49,165✔
317
                                     std::string_view column_family,
318
                                     std::string_view key,
319
                                     std::string_view value) {
320
    return batch.Put(column_family_handle(column_family),
73,698✔
321
                     ::rocksdb::Slice(key.data(), key.size()),
49,176✔
322
                     ::rocksdb::Slice(value.data(), value.size()));
98,467!
323
}
324

325
::rocksdb::Status RocksDatabase::del(Batch& batch,
120✔
326
                                     std::string_view column_family,
327
                                     std::string_view key) {
328
    return batch.Delete(column_family_handle(column_family),
180✔
329
                        ::rocksdb::Slice(key.data(), key.size()));
180!
330
}
331

332
RocksDatabase::Batch RocksDatabase::begin_batch() const { return Batch(); }
2,625✔
333

334
::rocksdb::Status RocksDatabase::commit_batch(Batch& batch) {
2,624✔
335
    return db_->Write(write_options(), &batch);
2,624✔
336
}
337

338
std::unique_ptr<::rocksdb::Iterator> RocksDatabase::new_iterator(
9,525✔
339
    std::string_view column_family) const {
340
    return std::unique_ptr<::rocksdb::Iterator>(
4,763✔
341
        db_->NewIterator(read_options(), column_family_handle(column_family)));
9,525✔
342
}
343

344
::rocksdb::Status RocksDatabase::compact(std::string_view column_family) {
48✔
345
    ::rocksdb::CompactRangeOptions opts;
48✔
346
    opts.max_subcompactions = 8;
48✔
347
    return db_->CompactRange(opts, column_family_handle(column_family), nullptr,
48✔
348
                             nullptr);
48!
349
}
350

351
::rocksdb::Status RocksDatabase::ingest_external_files(
470✔
352
    std::string_view column_family,
353
    const std::vector<std::string>& external_files, bool ingest_behind) {
354
    if (external_files.empty()) {
470✔
355
        return ::rocksdb::Status::OK();
26✔
356
    }
357
    ::rocksdb::IngestExternalFileOptions opts;
444✔
358
    opts.move_files = false;
444✔
359
    opts.snapshot_consistency = true;
444✔
360
    opts.allow_global_seqno = true;
444✔
361
    opts.allow_blocking_flush = true;
444✔
362
    opts.ingest_behind = ingest_behind;
444✔
363
    return db_->IngestExternalFile(column_family_handle(column_family),
666✔
364
                                   external_files, opts);
444!
365
}
235✔
366

367
}  // namespace dftracer::utils::rocksdb
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc