• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28496595030

01 Jul 2026 05:50AM UTC coverage: 50.727% (-1.6%) from 52.278%
28496595030

Pull #83

github

web-flow
Merge 8f1ff4df5 into 2efed6649
Pull Request #83: refactor and improve code QoL

31872 of 80367 branches covered (39.66%)

Branch coverage included in aggregate %.

770 of 1591 new or added lines in 85 files covered. (48.4%)

5070 existing lines in 182 files now uncovered.

32742 of 47009 relevant lines covered (69.65%)

9887.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.27
/src/dftracer/utils/core/rocksdb/database.cpp
1
#include <dftracer/utils/core/common/constants.h>
2
#include <dftracer/utils/core/common/error.h>
3
#include <dftracer/utils/core/common/filesystem.h>
4
#include <dftracer/utils/core/env.h>
5
#include <dftracer/utils/core/rocksdb/database.h>
6
#include <dftracer/utils/core/rocksdb/filesystem.h>
7
#include <rocksdb/slice.h>
8
#include <rocksdb/table.h>
9

10
#include <algorithm>
11
#include <atomic>
12
#include <cstdlib>
13
#include <stdexcept>
14
#include <utility>
15

16
namespace dftracer::utils::rocksdb {
17

18
namespace {
19

20
std::atomic<bool>& process_exiting_flag() {
5,754✔
21
    static std::atomic<bool> flag{false};
22
    return flag;
5,754✔
23
}
24

25
const ::rocksdb::ReadOptions& read_options() {
11,078✔
26
    static const ::rocksdb::ReadOptions options;
11,078!
27
    return options;
11,078✔
UNCOV
28
}
×
29

30
const ::rocksdb::WriteOptions& write_options() {
2,029✔
31
    static const auto options = [] {
2,124!
32
        ::rocksdb::WriteOptions wo;
95✔
33
        wo.disableWAL = true;
95✔
34
        return wo;
95✔
35
    }();
36
    return options;
2,029✔
UNCOV
37
}
×
38

39
void cleanup_failed_open(::rocksdb::DB*& db,
×
40
                         std::vector<::rocksdb::ColumnFamilyHandle*>& handles) {
41
    if (db != nullptr) {
×
42
        for (auto* handle : handles) {
×
43
            if (handle != nullptr) {
×
44
                db->DestroyColumnFamilyHandle(handle);
×
UNCOV
45
            }
×
46
        }
47
        static_cast<void>(db->Close());
×
48
        delete db;
×
49
        db = nullptr;
×
UNCOV
50
    }
×
51
    handles.clear();
×
52
}
×
53

54
}  // namespace
55

56
void mark_process_exiting_for_rocksdb() {
3✔
57
    process_exiting_flag().store(true, std::memory_order_relaxed);
3✔
58
}
3✔
59

60
RocksDatabase::RocksDatabase() = default;
11,540✔
61

62
RocksDatabase::RocksDatabase(const std::string& db_path, OpenMode open_mode) {
6✔
63
    open(db_path, open_mode);
3!
64
}
3✔
65

66
RocksDatabase::~RocksDatabase() { close(); }
11,546!
67

68
RocksDatabase::RocksDatabase(RocksDatabase&& other) noexcept
×
69
    : db_path_(std::move(other.db_path_)),
×
70
      open_mode_(other.open_mode_),
×
71
      file_system_(std::move(other.file_system_)),
×
72
      env_(std::move(other.env_)),
×
73
      db_(std::exchange(other.db_, nullptr)),
×
74
      column_families_(std::move(other.column_families_)) {}
×
75

76
RocksDatabase& RocksDatabase::operator=(RocksDatabase&& other) noexcept {
×
77
    if (this != &other) {
×
78
        close();
×
79
        db_path_ = std::move(other.db_path_);
×
80
        open_mode_ = other.open_mode_;
×
81
        file_system_ = std::move(other.file_system_);
×
82
        env_ = std::move(other.env_);
×
83
        db_ = std::exchange(other.db_, nullptr);
×
84
        column_families_ = std::move(other.column_families_);
×
UNCOV
85
    }
×
86
    return *this;
×
87
}
88

89
const decltype(cf::ALL)& RocksDatabase::default_column_families() {
1,385✔
90
    return cf::ALL;
1,385✔
91
}
92

93
::rocksdb::Options RocksDatabase::default_options() {
6,009✔
94
    ::rocksdb::Options options;
6,009✔
95
    options.create_if_missing = true;
6,009✔
96
    options.create_missing_column_families = true;
6,009✔
97
    options.allow_concurrent_memtable_write = true;
6,009✔
98
    options.enable_pipelined_write = true;
6,009✔
99
    options.max_open_files = Env::rocksdb_max_open_files();
6,009!
100
    options.max_background_jobs = 8;
6,009✔
101
    options.max_subcompactions = 8;
6,009✔
102
    options.write_buffer_size = 256 * 1024 * 1024;
6,009✔
103
    options.max_write_buffer_number = 4;
6,009✔
104
    return options;
6,009✔
105
}
6,009!
106

107
::rocksdb::ColumnFamilyOptions RocksDatabase::default_column_family_options() {
6,009✔
108
    ::rocksdb::ColumnFamilyOptions options;
6,009✔
109

110
    ::rocksdb::BlockBasedTableOptions bbt;
6,009✔
111
    bbt.block_size = 32 * 1024;
6,009✔
112
    bbt.format_version = 5;
6,009✔
113
    bbt.index_block_restart_interval = 16;
6,009✔
114
    options.table_factory.reset(::rocksdb::NewBlockBasedTableFactory(bbt));
6,009!
115

116
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
117
    options.compression = ::rocksdb::kZSTD;
118
    options.compression_opts.level = constants::rocksdb::ZSTD_COMPRESSION_LEVEL;
119
    options.compression_opts.max_dict_bytes =
120
        constants::rocksdb::ZSTD_MAX_DICT_BYTES;
121
    options.compression_opts.zstd_max_train_bytes =
122
        constants::rocksdb::ZSTD_MAX_TRAIN_BYTES;
123
    options.compression_opts.enabled = true;
124
    options.bottommost_compression = ::rocksdb::kZSTD;
125
    options.bottommost_compression_opts.level =
126
        constants::rocksdb::ZSTD_COMPRESSION_LEVEL;
127
    options.bottommost_compression_opts.max_dict_bytes =
128
        constants::rocksdb::ZSTD_MAX_DICT_BYTES;
129
    options.bottommost_compression_opts.zstd_max_train_bytes =
130
        constants::rocksdb::ZSTD_MAX_TRAIN_BYTES;
131
    options.bottommost_compression_opts.enabled = true;
132
#elif defined(DFTRACER_UTILS_ENABLE_LZ4)
133
    options.compression = ::rocksdb::kLZ4Compression;
134
    options.bottommost_compression = ::rocksdb::kZlibCompression;
135
#else
136
    options.compression = ::rocksdb::kZlibCompression;
6,009✔
137
    options.bottommost_compression = ::rocksdb::kZlibCompression;
6,009✔
138
#endif
139
    return options;
6,009✔
140
}
6,009!
141

142
bool RocksDatabase::open(const std::string& db_path, OpenMode open_mode) {
5,773✔
143
    close();
5,773✔
144
    db_path_ = db_path;
5,773✔
145
    open_mode_ = open_mode;
5,773✔
146

147
    std::error_code ec;
5,773✔
148
    if (open_mode_ == OpenMode::ReadWrite) {
5,773✔
149
        fs::create_directories(fs::path(db_path_), ec);
860!
150
    }
838✔
151

152
    auto db_options = default_options();
5,773✔
153
    if (open_mode_ == OpenMode::ReadOnly) {
5,773✔
154
        db_options.create_if_missing = false;
4,935✔
155
        db_options.create_missing_column_families = false;
4,935✔
156
    }
4,935✔
157
    file_system_ = make_dftracer_file_system();
5,773!
158
    env_ = make_dftracer_env(file_system_);
5,773!
159
    db_options.env = env_.get();
5,773✔
160
    auto cf_options = default_column_family_options();
5,773!
161

162
    std::vector<std::string> column_family_names;
5,773✔
163
    auto list_status = ::rocksdb::DB::ListColumnFamilies(db_options, db_path_,
5,773!
164
                                                         &column_family_names);
165
    if (!list_status.ok()) {
5,773!
166
        if (open_mode_ == OpenMode::ReadOnly) {
569✔
167
            throw DFTUtilsException(
44!
168
                ErrorCode::IO, "Failed to list RocksDB column families at '" +
22!
169
                                   db_path_ + "': " + list_status.ToString());
22!
170
        }
171
        column_family_names.reserve(default_column_families().size());
547!
172
        for (auto name : default_column_families()) {
14,769✔
173
            column_family_names.emplace_back(name);
14,222!
174
        }
175
    } else {
547✔
176
        if (open_mode_ == OpenMode::ReadWrite) {
5,204✔
177
            for (const auto& name : default_column_families()) {
7,857✔
178
                if (std::find(column_family_names.begin(),
15,132!
179
                              column_family_names.end(),
7,566✔
180
                              name) == column_family_names.end()) {
7,566✔
181
                    column_family_names.emplace_back(name);
×
UNCOV
182
                }
×
183
            }
184
        }
291✔
185
    }
186

187
    std::vector<::rocksdb::ColumnFamilyDescriptor> descriptors;
5,751✔
188
    descriptors.reserve(column_family_names.size());
5,751!
189
    for (const auto& name : column_family_names) {
155,277✔
190
        auto opts = cf_options;
149,526!
191
        if (cf_options_override_) {
149,526✔
192
            cf_options_override_(name, opts);
148,641✔
193
        }
148,642✔
194
        descriptors.emplace_back(name, opts);
149,527✔
195
    }
149,528✔
196

197
    std::vector<::rocksdb::ColumnFamilyHandle*> handles;
5,751✔
198
    ::rocksdb::Status status;
5,751!
199
    if (open_mode_ == OpenMode::ReadOnly) {
5,751✔
200
        status = ::rocksdb::DB::OpenForReadOnly(
4,913!
201
            db_options, db_path_, descriptors, &handles, &db_, false);
4,913✔
202
    } else {
4,913✔
203
        status = ::rocksdb::DB::Open(db_options, db_path_, descriptors,
1,676!
204
                                     &handles, &db_);
838✔
205
    }
206
    if (!status.ok()) {
5,751!
207
        cleanup_failed_open(db_, handles);
×
NEW
208
        throw DFTUtilsException(ErrorCode::IO, "Failed to open RocksDB at '" +
×
NEW
209
                                                   db_path_ +
×
NEW
210
                                                   "': " + status.ToString());
×
211
    }
212

213
    column_families_.clear();
5,751✔
214
    for (std::size_t i = 0; i < descriptors.size(); ++i) {
155,277✔
215
        column_families_.emplace(descriptors[i].name, handles[i]);
149,526!
216
    }
149,526✔
217

218
    return true;
219
}
5,797✔
220

221
void RocksDatabase::close() {
11,546✔
222
    if (db_ == nullptr) {
11,546✔
223
        column_families_.clear();
5,795✔
224
        return;
5,795✔
225
    }
226

227
    if (process_exiting_flag().load(std::memory_order_relaxed)) {
5,751!
228
        db_ = nullptr;
×
229
        column_families_.clear();
×
230
        env_.reset();
×
231
        file_system_.reset();
×
232
        db_path_.clear();
×
233
        return;
×
234
    }
235

236
    for (auto& entry : column_families_) {
155,277✔
237
        if (entry.second != nullptr) {
149,526!
238
            db_->DestroyColumnFamilyHandle(entry.second);
149,526✔
239
            entry.second = nullptr;
149,526✔
240
        }
149,526✔
241
    }
242
    column_families_.clear();
5,751✔
243

244
    auto* db = db_;
5,751✔
245
    db_ = nullptr;
5,751✔
246
    static_cast<void>(db->Close());
5,751✔
247
    delete db;
5,751!
248
    env_.reset();
5,751✔
249
    file_system_.reset();
5,751✔
250
    db_path_.clear();
5,751✔
251
}
11,546✔
252

253
bool RocksDatabase::is_open() const noexcept { return db_ != nullptr; }
18✔
254

255
bool RocksDatabase::is_read_only() const noexcept {
805✔
256
    return open_mode_ == OpenMode::ReadOnly;
805✔
257
}
258

259
const std::string& RocksDatabase::path() const noexcept { return db_path_; }
×
260

261
::rocksdb::DB* RocksDatabase::get() const noexcept { return db_; }
×
262

263
::rocksdb::ColumnFamilyHandle* RocksDatabase::column_family_handle(
37,508✔
264
    std::string_view column_family) const {
265
    const auto name = column_family.empty() ? std::string(cf::DEFAULT)
37,508!
266
                                            : std::string(column_family);
37,508✔
267
    const auto it = column_families_.find(name);
37,508✔
268
    if (it == column_families_.end() || it->second == nullptr) {
37,514!
NEW
269
        throw DFTUtilsException(ErrorCode::INVALID_ARGUMENT,
×
NEW
270
                                "Unknown RocksDB column family: " + name);
×
271
    }
272
    return it->second;
37,497✔
273
}
37,520✔
274

275
::rocksdb::Status RocksDatabase::put(std::string_view key,
718✔
276
                                     std::string_view value,
277
                                     std::string_view column_family) {
278
    return db_->Put(write_options(), column_family_handle(column_family),
1,436✔
279
                    ::rocksdb::Slice(key.data(), key.size()),
718✔
280
                    ::rocksdb::Slice(value.data(), value.size()));
718✔
281
}
282

283
::rocksdb::Status RocksDatabase::get(std::string_view key, std::string* value,
6,307✔
284
                                     std::string_view column_family) const {
285
    return db_->Get(read_options(), column_family_handle(column_family),
12,614✔
286
                    ::rocksdb::Slice(key.data(), key.size()), value);
6,307✔
287
}
288

289
::rocksdb::Status RocksDatabase::merge(std::string_view key,
×
290
                                       std::string_view value,
291
                                       std::string_view column_family) {
292
    return db_->Merge(write_options(), column_family_handle(column_family),
×
293
                      ::rocksdb::Slice(key.data(), key.size()),
×
294
                      ::rocksdb::Slice(value.data(), value.size()));
×
295
}
296

297
void RocksDatabase::set_cf_options_override(CfOptionsOverride override) {
5,739✔
298
    cf_options_override_ = std::move(override);
5,739✔
299
}
5,739✔
300

301
::rocksdb::Status RocksDatabase::merge(Batch& batch,
884✔
302
                                       std::string_view column_family,
303
                                       std::string_view key,
304
                                       std::string_view value) {
305
    return batch.Merge(column_family_handle(column_family),
1,768✔
306
                       ::rocksdb::Slice(key.data(), key.size()),
884✔
307
                       ::rocksdb::Slice(value.data(), value.size()));
884✔
308
}
309

310
::rocksdb::Status RocksDatabase::del(std::string_view key,
×
311
                                     std::string_view column_family) {
312
    return db_->Delete(write_options(), column_family_handle(column_family),
×
313
                       ::rocksdb::Slice(key.data(), key.size()));
×
314
}
315

316
::rocksdb::Status RocksDatabase::delete_range(std::string_view begin_key,
×
317
                                              std::string_view end_key,
318
                                              std::string_view column_family) {
319
    return db_->DeleteRange(
×
UNCOV
320
        write_options(), column_family_handle(column_family),
×
321
        ::rocksdb::Slice(begin_key.data(), begin_key.size()),
×
322
        ::rocksdb::Slice(end_key.data(), end_key.size()));
×
323
}
324

325
::rocksdb::Status RocksDatabase::put(Batch& batch,
24,512✔
326
                                     std::string_view column_family,
327
                                     std::string_view key,
328
                                     std::string_view value) {
329
    return batch.Put(column_family_handle(column_family),
49,024✔
330
                     ::rocksdb::Slice(key.data(), key.size()),
24,512✔
331
                     ::rocksdb::Slice(value.data(), value.size()));
24,512✔
332
}
333

334
::rocksdb::Status RocksDatabase::del(Batch& batch,
60✔
335
                                     std::string_view column_family,
336
                                     std::string_view key) {
337
    return batch.Delete(column_family_handle(column_family),
120✔
338
                        ::rocksdb::Slice(key.data(), key.size()));
60✔
339
}
340

341
RocksDatabase::Batch RocksDatabase::begin_batch() const { return Batch(); }
1,314✔
342

343
::rocksdb::Status RocksDatabase::commit_batch(Batch& batch) {
1,312✔
344
    return db_->Write(write_options(), &batch);
1,312✔
345
}
346

347
std::unique_ptr<::rocksdb::Iterator> RocksDatabase::new_iterator(
4,773✔
348
    std::string_view column_family) const {
349
    return std::unique_ptr<::rocksdb::Iterator>(
4,773✔
350
        db_->NewIterator(read_options(), column_family_handle(column_family)));
4,773✔
351
}
352

353
::rocksdb::Status RocksDatabase::compact(std::string_view column_family) {
24✔
354
    ::rocksdb::CompactRangeOptions opts;
24✔
355
    opts.max_subcompactions = 8;
24✔
356
    return db_->CompactRange(opts, column_family_handle(column_family), nullptr,
24✔
357
                             nullptr);
358
}
359

360
::rocksdb::Status RocksDatabase::ingest_external_files(
235✔
361
    std::string_view column_family,
362
    const std::vector<std::string>& external_files, bool ingest_behind) {
363
    if (external_files.empty()) {
235✔
364
        return ::rocksdb::Status::OK();
13✔
365
    }
366
    ::rocksdb::IngestExternalFileOptions opts;
222✔
367
    opts.move_files = false;
222✔
368
    opts.snapshot_consistency = true;
222✔
369
    opts.allow_global_seqno = true;
222✔
370
    opts.allow_blocking_flush = true;
222✔
371
    opts.ingest_behind = ingest_behind;
222✔
372
    return db_->IngestExternalFile(column_family_handle(column_family),
444✔
373
                                   external_files, opts);
222✔
374
}
235✔
375

376
}  // namespace dftracer::utils::rocksdb
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc