• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26195612357

20 May 2026 11:19PM UTC coverage: 49.859% (-2.3%) from 52.2%
26195612357

push

github

hariharan-devarajan
feat(aggregator): improve system metrics scanning and persistence error handling

16041 of 43831 branches covered (36.6%)

Branch coverage included in aggregate %.

6 of 17 new or added lines in 2 files covered. (35.29%)

1072 existing lines in 104 files now uncovered.

21423 of 31309 relevant lines covered (68.42%)

13054.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.88
/src/dftracer/utils/core/rocksdb/database.cpp
1
#include <dftracer/utils/core/common/filesystem.h>
2
#include <dftracer/utils/core/env.h>
3
#include <dftracer/utils/core/rocksdb/database.h>
4
#include <dftracer/utils/core/rocksdb/filesystem.h>
5
#include <rocksdb/slice.h>
6
#include <rocksdb/table.h>
7

8
#include <algorithm>
9
#include <atomic>
10
#include <cstdlib>
11
#include <stdexcept>
12
#include <utility>
13

14
namespace dftracer::utils::rocksdb {
15

16
namespace {
17

18
std::atomic<bool>& process_exiting_flag() {
5,501✔
19
    static std::atomic<bool> flag{false};
20
    return flag;
5,501✔
21
}
22

23
const ::rocksdb::ReadOptions& read_options() {
10,984✔
24
    static const ::rocksdb::ReadOptions options;
10,984✔
25
    return options;
10,974✔
26
}
27

28
const ::rocksdb::WriteOptions& write_options() {
2,005✔
29
    static const auto options = [] {
95✔
30
        ::rocksdb::WriteOptions wo;
95✔
31
        wo.disableWAL = true;
95✔
32
        return wo;
95✔
33
    }();
2,005!
34
    return options;
2,005✔
35
}
36

37
void cleanup_failed_open(::rocksdb::DB*& db,
×
38
                         std::vector<::rocksdb::ColumnFamilyHandle*>& handles) {
39
    if (db != nullptr) {
×
40
        for (auto* handle : handles) {
×
41
            if (handle != nullptr) {
×
42
                db->DestroyColumnFamilyHandle(handle);
×
43
            }
44
        }
45
        static_cast<void>(db->Close());
×
46
        delete db;
×
47
        db = nullptr;
×
48
    }
49
    handles.clear();
×
50
}
×
51

52
}  // namespace
53

54
void mark_process_exiting_for_rocksdb() {
3✔
55
    process_exiting_flag().store(true, std::memory_order_relaxed);
3✔
56
}
3✔
57

58
RocksDatabase::RocksDatabase() = default;
5,516✔
59

60
RocksDatabase::RocksDatabase(const std::string& db_path, OpenMode open_mode) {
3✔
61
    open(db_path, open_mode);
3!
62
}
3✔
63

64
RocksDatabase::~RocksDatabase() { close(); }
5,520✔
65

66
RocksDatabase::RocksDatabase(RocksDatabase&& other) noexcept
×
67
    : db_path_(std::move(other.db_path_)),
×
68
      open_mode_(other.open_mode_),
×
69
      file_system_(std::move(other.file_system_)),
×
70
      env_(std::move(other.env_)),
×
71
      db_(std::exchange(other.db_, nullptr)),
×
72
      column_families_(std::move(other.column_families_)) {}
×
73

74
RocksDatabase& RocksDatabase::operator=(RocksDatabase&& other) noexcept {
×
75
    if (this != &other) {
×
76
        close();
×
77
        db_path_ = std::move(other.db_path_);
×
78
        open_mode_ = other.open_mode_;
×
79
        file_system_ = std::move(other.file_system_);
×
80
        env_ = std::move(other.env_);
×
81
        db_ = std::exchange(other.db_, nullptr);
×
82
        column_families_ = std::move(other.column_families_);
×
83
    }
84
    return *this;
×
85
}
86

87
const decltype(cf::ALL)& RocksDatabase::default_column_families() {
1,377✔
88
    return cf::ALL;
1,377✔
89
}
90

91
::rocksdb::Options RocksDatabase::default_options() {
5,756✔
92
    ::rocksdb::Options options;
5,756✔
93
    options.create_if_missing = true;
5,756✔
94
    options.create_missing_column_families = true;
5,756✔
95
    options.allow_concurrent_memtable_write = true;
5,756✔
96
    options.enable_pipelined_write = true;
5,756✔
97
    options.max_open_files = Env::rocksdb_max_open_files();
5,756!
98
    options.max_background_jobs = 8;
5,756✔
99
    options.max_subcompactions = 8;
5,756✔
100
    options.write_buffer_size = 256 * 1024 * 1024;
5,756✔
101
    options.max_write_buffer_number = 4;
5,756✔
102
    return options;
5,756✔
UNCOV
103
}
×
104

105
::rocksdb::ColumnFamilyOptions RocksDatabase::default_column_family_options() {
5,756✔
106
    ::rocksdb::ColumnFamilyOptions options;
5,756!
107

108
    ::rocksdb::BlockBasedTableOptions bbt;
5,756✔
109
    bbt.block_size = 32 * 1024;
5,755✔
110
    bbt.format_version = 5;
5,755✔
111
    bbt.index_block_restart_interval = 16;
5,755✔
112
    options.table_factory.reset(::rocksdb::NewBlockBasedTableFactory(bbt));
5,755!
113

114
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
115
    options.compression = ::rocksdb::kZSTD;
116
    options.compression_opts.level = 9;
117
    options.compression_opts.max_dict_bytes = 262144;
118
    options.compression_opts.zstd_max_train_bytes = 1048576;
119
    options.compression_opts.enabled = true;
120
    options.bottommost_compression = ::rocksdb::kZSTD;
121
    options.bottommost_compression_opts.level = 9;
122
    options.bottommost_compression_opts.max_dict_bytes = 262144;
123
    options.bottommost_compression_opts.zstd_max_train_bytes = 1048576;
124
    options.bottommost_compression_opts.enabled = true;
125
#elif defined(DFTRACER_UTILS_ENABLE_LZ4)
126
    options.compression = ::rocksdb::kLZ4Compression;
127
    options.bottommost_compression = ::rocksdb::kZlibCompression;
128
#else
129
    options.compression = ::rocksdb::kZlibCompression;
5,756✔
130
    options.bottommost_compression = ::rocksdb::kZlibCompression;
5,756✔
131
#endif
132
    return options;
11,510✔
133
}
5,756✔
134

135
bool RocksDatabase::open(const std::string& db_path, OpenMode open_mode) {
5,519✔
136
    close();
5,519!
137
    db_path_ = db_path;
5,520!
138
    open_mode_ = open_mode;
5,518✔
139

140
    std::error_code ec;
5,518✔
141
    if (open_mode_ == OpenMode::ReadWrite) {
5,520✔
142
        fs::create_directories(fs::path(db_path_), ec);
834!
143
    }
144

145
    auto db_options = default_options();
5,520!
146
    if (open_mode_ == OpenMode::ReadOnly) {
5,520✔
147
        db_options.create_if_missing = false;
4,686✔
148
        db_options.create_missing_column_families = false;
4,686✔
149
    }
150
    file_system_ = make_dftracer_file_system();
5,520!
151
    env_ = make_dftracer_env(file_system_);
5,519!
152
    db_options.env = env_.get();
5,519✔
153
    auto cf_options = default_column_family_options();
5,519!
154

155
    std::vector<std::string> column_family_names;
5,518✔
156
    auto list_status = ::rocksdb::DB::ListColumnFamilies(db_options, db_path_,
5,518✔
157
                                                         &column_family_names);
5,518!
158
    if (!list_status.ok()) {
5,520!
159
        if (open_mode_ == OpenMode::ReadOnly) {
566✔
160
            throw std::runtime_error(
22!
161
                "Failed to list RocksDB column families at '" + db_path_ +
44!
162
                "': " + list_status.ToString());
88!
163
        }
164
        column_family_names.reserve(default_column_families().size());
544!
165
        for (auto name : default_column_families()) {
14,597!
166
            column_family_names.emplace_back(name);
14,058!
167
        }
168
    } else {
169
        if (open_mode_ == OpenMode::ReadWrite) {
4,954✔
170
            for (const auto& name : default_column_families()) {
7,824!
171
                if (std::find(column_family_names.begin(),
7,533!
172
                              column_family_names.end(),
173
                              name) == column_family_names.end()) {
15,066!
174
                    column_family_names.emplace_back(name);
×
175
                }
176
            }
177
        }
178
    }
179

180
    std::vector<::rocksdb::ColumnFamilyDescriptor> descriptors;
5,494✔
181
    descriptors.reserve(column_family_names.size());
5,498!
182
    for (const auto& name : column_family_names) {
148,389✔
183
        auto opts = cf_options;
142,893!
184
        if (cf_options_override_) {
142,904✔
185
            cf_options_override_(name, opts);
142,019!
186
        }
187
        descriptors.emplace_back(name, opts);
142,909!
188
    }
142,892✔
189

190
    std::vector<::rocksdb::ColumnFamilyHandle*> handles;
5,497✔
191
    ::rocksdb::Status status;
5,497✔
192
    if (open_mode_ == OpenMode::ReadOnly) {
5,498✔
193
        status = ::rocksdb::DB::OpenForReadOnly(
4,664✔
194
            db_options, db_path_, descriptors, &handles, &db_, false);
4,664!
195
    } else {
196
        status = ::rocksdb::DB::Open(db_options, db_path_, descriptors,
1,668!
197
                                     &handles, &db_);
834✔
198
    }
199
    if (!status.ok()) {
5,497!
200
        cleanup_failed_open(db_, handles);
×
201
        throw std::runtime_error("Failed to open RocksDB at '" + db_path_ +
×
202
                                 "': " + status.ToString());
×
203
    }
204

205
    column_families_.clear();
5,494✔
206
    for (std::size_t i = 0; i < descriptors.size(); ++i) {
148,434✔
207
        column_families_.emplace(descriptors[i].name, handles[i]);
142,936!
208
    }
209

210
    return true;
5,498✔
211
}
5,587✔
212

213
void RocksDatabase::close() {
11,040✔
214
    if (db_ == nullptr) {
11,040✔
215
        column_families_.clear();
5,542✔
216
        return;
5,542✔
217
    }
218

219
    if (process_exiting_flag().load(std::memory_order_relaxed)) {
5,498!
220
        db_ = nullptr;
×
221
        column_families_.clear();
×
222
        env_.reset();
×
223
        file_system_.reset();
×
224
        db_path_.clear();
×
225
        return;
×
226
    }
227

228
    for (auto& entry : column_families_) {
148,446✔
229
        if (entry.second != nullptr) {
142,948!
230
            db_->DestroyColumnFamilyHandle(entry.second);
142,948!
231
            entry.second = nullptr;
142,948✔
232
        }
233
    }
234
    column_families_.clear();
5,498✔
235

236
    auto* db = db_;
5,498✔
237
    db_ = nullptr;
5,498✔
238
    static_cast<void>(db->Close());
5,498✔
239
    delete db;
5,498!
240
    env_.reset();
5,498✔
241
    file_system_.reset();
5,498✔
242
    db_path_.clear();
5,498✔
243
}
244

245
bool RocksDatabase::is_open() const noexcept { return db_ != nullptr; }
17✔
246

247
bool RocksDatabase::is_read_only() const noexcept {
1,015✔
248
    return open_mode_ == OpenMode::ReadOnly;
1,015✔
249
}
250

251
const std::string& RocksDatabase::path() const noexcept { return db_path_; }
×
252

253
::rocksdb::DB* RocksDatabase::get() const noexcept { return db_; }
×
254

255
::rocksdb::ColumnFamilyHandle* RocksDatabase::column_family_handle(
36,601✔
256
    std::string_view column_family) const {
257
    const auto name = column_family.empty() ? std::string(cf::DEFAULT)
36,601✔
258
                                            : std::string(column_family);
73,208!
259
    const auto it = column_families_.find(name);
36,608!
260
    if (it == column_families_.end() || it->second == nullptr) {
36,615!
261
        throw std::invalid_argument("Unknown RocksDB column family: " + name);
×
262
    }
263
    return it->second;
73,168✔
264
}
36,566✔
265

266
::rocksdb::Status RocksDatabase::put(std::string_view key,
712✔
267
                                     std::string_view value,
268
                                     std::string_view column_family) {
269
    return db_->Put(write_options(), column_family_handle(column_family),
712✔
270
                    ::rocksdb::Slice(key.data(), key.size()),
712✔
271
                    ::rocksdb::Slice(value.data(), value.size()));
2,136!
272
}
273

274
::rocksdb::Status RocksDatabase::get(std::string_view key, std::string* value,
6,269✔
275
                                     std::string_view column_family) const {
276
    return db_->Get(read_options(), column_family_handle(column_family),
6,269✔
277
                    ::rocksdb::Slice(key.data(), key.size()), value);
12,530!
278
}
279

280
::rocksdb::Status RocksDatabase::merge(std::string_view key,
×
281
                                       std::string_view value,
282
                                       std::string_view column_family) {
283
    return db_->Merge(write_options(), column_family_handle(column_family),
×
284
                      ::rocksdb::Slice(key.data(), key.size()),
×
285
                      ::rocksdb::Slice(value.data(), value.size()));
×
286
}
287

288
void RocksDatabase::set_cf_options_override(CfOptionsOverride override) {
5,481✔
289
    cf_options_override_ = std::move(override);
5,481✔
290
}
5,485✔
291

292
::rocksdb::Status RocksDatabase::merge(Batch& batch,
758✔
293
                                       std::string_view column_family,
294
                                       std::string_view key,
295
                                       std::string_view value) {
296
    return batch.Merge(column_family_handle(column_family),
758✔
297
                       ::rocksdb::Slice(key.data(), key.size()),
760✔
298
                       ::rocksdb::Slice(value.data(), value.size()));
2,282!
299
}
300

301
::rocksdb::Status RocksDatabase::del(std::string_view key,
×
302
                                     std::string_view column_family) {
303
    return db_->Delete(write_options(), column_family_handle(column_family),
×
304
                       ::rocksdb::Slice(key.data(), key.size()));
×
305
}
306

307
::rocksdb::Status RocksDatabase::delete_range(std::string_view begin_key,
×
308
                                              std::string_view end_key,
309
                                              std::string_view column_family) {
310
    return db_->DeleteRange(
×
311
        write_options(), column_family_handle(column_family),
312
        ::rocksdb::Slice(begin_key.data(), begin_key.size()),
×
313
        ::rocksdb::Slice(end_key.data(), end_key.size()));
×
314
}
315

316
::rocksdb::Status RocksDatabase::put(Batch& batch,
23,864✔
317
                                     std::string_view column_family,
318
                                     std::string_view key,
319
                                     std::string_view value) {
320
    return batch.Put(column_family_handle(column_family),
23,864✔
321
                     ::rocksdb::Slice(key.data(), key.size()),
23,845✔
322
                     ::rocksdb::Slice(value.data(), value.size()));
71,615!
323
}
324

325
::rocksdb::Status RocksDatabase::del(Batch& batch,
60✔
326
                                     std::string_view column_family,
327
                                     std::string_view key) {
328
    return batch.Delete(column_family_handle(column_family),
60✔
329
                        ::rocksdb::Slice(key.data(), key.size()));
120!
330
}
331

332
RocksDatabase::Batch RocksDatabase::begin_batch() const { return Batch(); }
1,294✔
333

334
::rocksdb::Status RocksDatabase::commit_batch(Batch& batch) {
1,294✔
335
    return db_->Write(write_options(), &batch);
1,294✔
336
}
337

338
std::unique_ptr<::rocksdb::Iterator> RocksDatabase::new_iterator(
4,719✔
339
    std::string_view column_family) const {
340
    return std::unique_ptr<::rocksdb::Iterator>(
341
        db_->NewIterator(read_options(), column_family_handle(column_family)));
4,719✔
342
}
343

344
::rocksdb::Status RocksDatabase::compact(std::string_view column_family) {
18✔
345
    ::rocksdb::CompactRangeOptions opts;
18✔
346
    opts.max_subcompactions = 8;
18✔
347
    return db_->CompactRange(opts, column_family_handle(column_family), nullptr,
18✔
348
                             nullptr);
36!
349
}
350

351
::rocksdb::Status RocksDatabase::ingest_external_files(
235✔
352
    std::string_view column_family,
353
    const std::vector<std::string>& external_files, bool ingest_behind) {
354
    if (external_files.empty()) {
235✔
355
        return ::rocksdb::Status::OK();
13✔
356
    }
357
    ::rocksdb::IngestExternalFileOptions opts;
222✔
358
    opts.move_files = false;
222✔
359
    opts.snapshot_consistency = true;
222✔
360
    opts.allow_global_seqno = true;
222✔
361
    opts.allow_blocking_flush = true;
222✔
362
    opts.ingest_behind = ingest_behind;
222✔
363
    return db_->IngestExternalFile(column_family_handle(column_family),
222✔
364
                                   external_files, opts);
222!
365
}
366

367
}  // namespace dftracer::utils::rocksdb
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc