• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 24057299873

07 Apr 2026 12:01AM UTC coverage: 52.076% (+0.8%) from 51.228%
24057299873

push

github

rayandrew
feat(rocksdb): migrate SQLite indexing to RocksDB

Replace SQLite-backed indexing and provenance storage with RocksDB-backed stores.

  Key changes:
  - add RocksDB async/database/db-manager/filesystem/key-codec layers
  - migrate index and provenance databases from SQLite to RocksDB
  - update index builder, trace reader, reorganize, view, stats, and comparator paths for
  RocksDB
  - harden transaction atomicity and rollback behavior with TransactionScope
  - add iterator status checking for prefix scans
  - harden gzip/tar indexer cache state and metadata handling
  - capture executor context in RocksDB awaitables
  - clean up failed RocksDB open paths and manager lifecycle behavior
  - vendor CPM 0.42.1 and update CI/build integration
  - refresh docs, Python bindings, and C++/Python test coverage for the new backend

  Validation:
  - full test suite passed
  - Ubuntu 22.04 Docker run passed
  - focused RocksDB/indexer regression tests passed.

24097 of 59624 branches covered (40.41%)

Branch coverage included in aggregate %.

2516 of 3144 new or added lines in 75 files covered. (80.03%)

72 existing lines in 15 files now uncovered.

20858 of 26701 relevant lines covered (78.12%)

14113.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.66
/src/dftracer/utils/core/rocksdb/filesystem.cpp
1
#include <dftracer/utils/core/common/object_pool.h>
2
#include <dftracer/utils/core/io/io_backend.h>
3
#include <dftracer/utils/core/io/io_thread_pool.h>
4
#include <dftracer/utils/core/pipeline/executor.h>
5
#include <dftracer/utils/core/rocksdb/filesystem.h>
6
#include <fcntl.h>
7
#include <rocksdb/env.h>
8
#include <rocksdb/file_system.h>
9
#include <rocksdb/io_status.h>
10
#include <rocksdb/slice.h>
11
#include <sys/stat.h>
12
#include <unistd.h>
13

14
#include <algorithm>
15
#include <cerrno>
16
#include <condition_variable>
17
#include <cstdint>
18
#include <cstring>
19
#include <mutex>
20
#include <string>
21
#include <string_view>
22

23
namespace dftracer::utils::rocksdb {
24

25
namespace {
26

27
io::IoBackend* current_io_backend() {
200,797✔
28
    auto* executor = Executor::current();
200,797✔
29
    if (executor == nullptr || !executor->has_io_backend()) {
200,842✔
30
        return nullptr;
154,057✔
31
    }
32
    return &executor->io_backend();
46,791✔
33
}
102,367✔
34

35
class DfTracerFileSystem;
36

37
struct AsyncReadHandle {
38
    explicit AsyncReadHandle(DfTracerFileSystem* owner_) : owner(owner_) {}
5!
39

40
    static void* operator new(std::size_t size) {
2✔
41
        return ObjectPool::instance().allocate(size);
2✔
42
    }
43

44
    static void operator delete(void* ptr, std::size_t size) noexcept {
2✔
45
        ObjectPool::instance().deallocate(ptr, size);
2!
46
    }
2✔
47

48
    DfTracerFileSystem* owner;
49
    std::mutex mutex;
50
    std::condition_variable cv;
51
    bool finished = false;
1✔
52
    bool callback_delivered = false;
1✔
53
    bool aborted = false;
1✔
54
    bool running = false;
1✔
55
    std::string path;
56
    std::uint64_t offset = 0;
1✔
57
    std::size_t len = 0;
1✔
58
    char* scratch = nullptr;
1✔
59
    ::rocksdb::Slice result;
60
    ::rocksdb::IOStatus status;
61
    std::function<void(::rocksdb::FSReadRequest&, void*)> callback;
62
    void* callback_arg = nullptr;
1✔
63
};
64

65
::rocksdb::IOStatus io_error(std::string_view op, std::string_view path) {
1,084✔
66
    return ::rocksdb::IOStatus::IOError(
542!
67
        std::string(path), std::string(op) + ": " + std::strerror(errno));
1,084!
68
}
69

70
ssize_t pread_sync(int fd, void* buf, std::size_t len, off_t offset) {
131,666✔
71
    if (auto* backend = current_io_backend(); backend != nullptr) {
131,666✔
72
        return backend->submit_read_sync(fd, buf, len, offset);
24,734✔
73
    }
74
    return ::pread(fd, buf, len, offset);
106,952✔
75
}
67,778✔
76

77
ssize_t pwrite_sync(int fd, const void* buf, std::size_t len, off_t offset) {
38,596✔
78
    if (auto* backend = current_io_backend(); backend != nullptr) {
38,596✔
79
        return backend->submit_write_sync(fd, buf, len, offset);
13,003✔
80
    }
81
    return ::pwrite(fd, buf, len, offset);
25,605✔
82
}
19,312✔
83

84
int fsync_sync(int fd) {
18,069✔
85
    if (auto* backend = current_io_backend(); backend != nullptr) {
18,069✔
86
        return backend->submit_fsync_sync(fd);
5,455✔
87
    }
88
    return ::fsync(fd);
12,619✔
89
}
9,038✔
90

91
int ftruncate_sync(int fd, off_t length) {
285✔
92
    if (auto* backend = current_io_backend(); backend != nullptr) {
285✔
93
        return backend->submit_ftruncate_sync(fd, length);
154✔
94
    }
95
    return ::ftruncate(fd, length);
132✔
96
}
143✔
97

98
int fstat_sync(int fd, struct stat* st) {
12,191✔
99
    if (auto* backend = current_io_backend(); backend != nullptr) {
12,191✔
100
        return backend->submit_fstat_sync(fd, st);
3,460✔
101
    }
102
    return ::fstat(fd, st);
8,744✔
103
}
6,102✔
104

105
class DfTracerSequentialFile final : public ::rocksdb::FSSequentialFile {
106
   public:
107
    DfTracerSequentialFile(std::string path, int fd)
152,506✔
108
        : path_(std::move(path)), fd_(fd) {}
152,506✔
109

110
    ~DfTracerSequentialFile() override {
188,747✔
111
        if (fd_ >= 0) {
75,004✔
112
            ::close(fd_);
75,015!
113
        }
38,744✔
114
    }
188,772✔
115

116
    ::rocksdb::IOStatus Read(std::size_t n, const ::rocksdb::IOOptions&,
117,610✔
117
                             ::rocksdb::Slice* result, char* scratch,
118
                             ::rocksdb::IODebugContext*) override {
119
        std::lock_guard<std::mutex> lock(mutex_);
117,610!
120
        const ssize_t bytes =
60,753✔
121
            pread_sync(fd_, scratch, n, static_cast<off_t>(offset_));
117,635✔
122
        if (bytes < 0) {
117,628!
NEW
123
            return io_error("read", path_);
×
124
        }
125
        offset_ += static_cast<std::uint64_t>(bytes);
117,628✔
126
        *result = ::rocksdb::Slice(scratch, static_cast<std::size_t>(bytes));
117,628!
127
        return ::rocksdb::IOStatus::OK();
117,631✔
128
    }
117,635✔
129

NEW
130
    ::rocksdb::IOStatus Skip(std::uint64_t n) override {
×
NEW
131
        std::lock_guard<std::mutex> lock(mutex_);
×
NEW
132
        offset_ += n;
×
NEW
133
        return ::rocksdb::IOStatus::OK();
×
NEW
134
    }
×
135

NEW
136
    ::rocksdb::IOStatus InvalidateCache(std::size_t, std::size_t) override {
×
NEW
137
        return ::rocksdb::IOStatus::OK();
×
138
    }
139

140
   private:
141
    std::string path_;
142
    int fd_;
143
    std::uint64_t offset_ = 0;
38,745✔
144
    std::mutex mutex_;
145
};
146

147
class DfTracerRandomAccessFile final : public ::rocksdb::FSRandomAccessFile {
148
   public:
149
    DfTracerRandomAccessFile(DfTracerFileSystem* owner, std::string path,
6,874✔
150
                             int fd)
151
        : owner_(owner), path_(std::move(path)), fd_(fd) {}
6,874✔
152

153
    ~DfTracerRandomAccessFile() override {
11,442✔
154
        if (fd_ >= 0) {
4,576✔
155
            ::close(fd_);
4,578!
156
        }
2,292✔
157
    }
11,450✔
158

159
    ::rocksdb::IOStatus Read(std::uint64_t offset, std::size_t n,
14,053✔
160
                             const ::rocksdb::IOOptions&,
161
                             ::rocksdb::Slice* result, char* scratch,
162
                             ::rocksdb::IODebugContext*) const override {
163
        const ssize_t bytes =
7,030✔
164
            pread_sync(fd_, scratch, n, static_cast<off_t>(offset));
14,053✔
165
        if (bytes < 0) {
14,059✔
NEW
166
            return io_error("pread", path_);
×
167
        }
168
        *result = ::rocksdb::Slice(scratch, static_cast<std::size_t>(bytes));
14,059✔
169
        return ::rocksdb::IOStatus::OK();
14,058✔
170
    }
7,030✔
171

172
    ::rocksdb::IOStatus Prefetch(std::uint64_t, std::size_t,
3,003✔
173
                                 const ::rocksdb::IOOptions&,
174
                                 ::rocksdb::IODebugContext*) override {
175
        return ::rocksdb::IOStatus::OK();
3,003✔
176
    }
177

178
    ::rocksdb::IOStatus ReadAsync(
179
        ::rocksdb::FSReadRequest& req, const ::rocksdb::IOOptions& opts,
180
        std::function<void(::rocksdb::FSReadRequest&, void*)> cb, void* cb_arg,
181
        void** io_handle, ::rocksdb::IOHandleDeleter* del_fn,
182
        ::rocksdb::IODebugContext* dbg) override;
183

NEW
184
    ::rocksdb::IOStatus InvalidateCache(std::size_t, std::size_t) override {
×
NEW
185
        return ::rocksdb::IOStatus::OK();
×
186
    }
187

188
    ::rocksdb::IOStatus GetFileSize(std::uint64_t* result) override {
3,274✔
189
        struct stat st{};
3,274✔
190
        if (fstat_sync(fd_, &st) != 0) {
3,274!
NEW
191
            return io_error("fstat", path_);
×
192
        }
193
        *result = static_cast<std::uint64_t>(st.st_size);
3,275✔
194
        return ::rocksdb::IOStatus::OK();
3,275!
195
    }
1,638✔
196

197
   private:
198
    DfTracerFileSystem* owner_;
199
    std::string path_;
200
    int fd_;
201
};
202

203
class DfTracerWritableFile final : public ::rocksdb::FSWritableFile {
204
   public:
205
    using ::rocksdb::FSWritableFile::Append;
206
    using ::rocksdb::FSWritableFile::PositionedAppend;
207

208
    DfTracerWritableFile(std::string path, int fd,
17,857✔
209
                         const ::rocksdb::FileOptions& options)
210
        : ::rocksdb::FSWritableFile(options), path_(std::move(path)), fd_(fd) {
13,392✔
211
        struct stat st{};
8,921✔
212
        if (fstat_sync(fd_, &st) == 0) {
8,921!
213
            size_ = static_cast<std::uint64_t>(st.st_size);
8,930✔
214
        }
4,465✔
215
    }
13,389✔
216

217
    ~DfTracerWritableFile() override {
22,309✔
218
        if (fd_ >= 0) {
8,922✔
219
            static_cast<void>(close_fd());
3,202!
220
        }
1,602✔
221
    }
22,311✔
222

223
    ::rocksdb::IOStatus Append(const ::rocksdb::Slice& data,
38,598✔
224
                               const ::rocksdb::IOOptions&,
225
                               ::rocksdb::IODebugContext*) override {
226
        std::lock_guard<std::mutex> lock(mutex_);
38,598!
227
        return write_at(data, size_);
57,926!
228
    }
38,616✔
229

NEW
230
    ::rocksdb::IOStatus PositionedAppend(const ::rocksdb::Slice& data,
×
231
                                         std::uint64_t offset,
232
                                         const ::rocksdb::IOOptions&,
233
                                         ::rocksdb::IODebugContext*) override {
NEW
234
        std::lock_guard<std::mutex> lock(mutex_);
×
NEW
235
        return write_at(data, offset);
×
NEW
236
    }
×
237

238
    ::rocksdb::IOStatus Truncate(std::uint64_t size,
284✔
239
                                 const ::rocksdb::IOOptions&,
240
                                 ::rocksdb::IODebugContext*) override {
241
        std::lock_guard<std::mutex> lock(mutex_);
284!
242
        if (ftruncate_sync(fd_, static_cast<off_t>(size)) != 0) {
286!
NEW
243
            return io_error("ftruncate", path_);
×
244
        }
245
        size_ = size;
284✔
246
        return ::rocksdb::IOStatus::OK();
284!
247
    }
286✔
248

249
    ::rocksdb::IOStatus Close(const ::rocksdb::IOOptions&,
5,722✔
250
                              ::rocksdb::IODebugContext*) override {
251
        std::lock_guard<std::mutex> lock(mutex_);
5,722!
252
        return close_fd();
8,588!
253
    }
5,723✔
254

255
    ::rocksdb::IOStatus Flush(const ::rocksdb::IOOptions&,
57,255✔
256
                              ::rocksdb::IODebugContext*) override {
257
        return ::rocksdb::IOStatus::OK();
57,255✔
258
    }
259

260
    ::rocksdb::IOStatus Sync(const ::rocksdb::IOOptions&,
18,068✔
261
                             ::rocksdb::IODebugContext*) override {
262
        std::lock_guard<std::mutex> lock(mutex_);
18,068!
263
        if (fd_ < 0) {
18,076!
NEW
264
            return ::rocksdb::IOStatus::OK();
×
265
        }
266
        if (fsync_sync(fd_) != 0) {
18,076!
NEW
267
            return io_error("fsync", path_);
×
268
        }
269
        return ::rocksdb::IOStatus::OK();
18,076!
270
    }
18,075✔
271

NEW
272
    bool IsSyncThreadSafe() const override { return true; }
×
273

274
    std::uint64_t GetFileSize(const ::rocksdb::IOOptions&,
35,404✔
275
                              ::rocksdb::IODebugContext*) override {
276
        std::lock_guard<std::mutex> lock(mutex_);
35,404!
277
        return size_;
35,411✔
278
    }
35,413✔
279

NEW
280
    ::rocksdb::IOStatus InvalidateCache(std::size_t, std::size_t) override {
×
NEW
281
        return ::rocksdb::IOStatus::OK();
×
282
    }
283

NEW
284
    ::rocksdb::IOStatus RangeSync(std::uint64_t, std::uint64_t,
×
285
                                  const ::rocksdb::IOOptions& options,
286
                                  ::rocksdb::IODebugContext* dbg) override {
NEW
287
        return Sync(options, dbg);
×
288
    }
289

290
   private:
291
    ::rocksdb::IOStatus write_at(const ::rocksdb::Slice& data,
38,602✔
292
                                 std::uint64_t offset) {
293
        const ssize_t bytes = pwrite_sync(fd_, data.data(), data.size(),
57,915✔
294
                                          static_cast<off_t>(offset));
19,313✔
295
        if (bytes < 0 || static_cast<std::size_t>(bytes) != data.size()) {
38,614!
NEW
296
            return io_error("pwrite", path_);
×
297
        }
298
        size_ = std::max(size_, offset + static_cast<std::uint64_t>(bytes));
38,614✔
299
        return ::rocksdb::IOStatus::OK();
38,614✔
300
    }
19,313✔
301

302
    ::rocksdb::IOStatus close_fd() {
8,924✔
303
        if (fd_ < 0) {
8,924✔
NEW
304
            return ::rocksdb::IOStatus::OK();
×
305
        }
306
        if (::close(fd_) != 0) {
8,924!
NEW
307
            return io_error("close", path_);
×
308
        }
309
        fd_ = -1;
8,929✔
310
        return ::rocksdb::IOStatus::OK();
8,929✔
311
    }
4,465✔
312

313
    std::string path_;
314
    int fd_;
315
    std::uint64_t size_ = 0;
4,465✔
316
    mutable std::mutex mutex_;
317
};
318

319
class LocalFileSystemWrapper : public ::rocksdb::FileSystem {
320
   public:
321
    explicit LocalFileSystemWrapper(
11,339✔
322
        const std::shared_ptr<::rocksdb::FileSystem>& target)
323
        : target_(target) {}
11,339✔
324

325
    ::rocksdb::FileSystem* target() const { return target_.get(); }
326

NEW
327
    ::rocksdb::IOStatus NewSequentialFile(
×
328
        const std::string& f, const ::rocksdb::FileOptions& file_opts,
329
        std::unique_ptr<::rocksdb::FSSequentialFile>* r,
330
        ::rocksdb::IODebugContext* dbg) override {
NEW
331
        return target_->NewSequentialFile(f, file_opts, r, dbg);
×
332
    }
333

NEW
334
    ::rocksdb::IOStatus NewRandomAccessFile(
×
335
        const std::string& f, const ::rocksdb::FileOptions& file_opts,
336
        std::unique_ptr<::rocksdb::FSRandomAccessFile>* r,
337
        ::rocksdb::IODebugContext* dbg) override {
NEW
338
        return target_->NewRandomAccessFile(f, file_opts, r, dbg);
×
339
    }
340

NEW
341
    ::rocksdb::IOStatus NewWritableFile(
×
342
        const std::string& f, const ::rocksdb::FileOptions& file_opts,
343
        std::unique_ptr<::rocksdb::FSWritableFile>* r,
344
        ::rocksdb::IODebugContext* dbg) override {
NEW
345
        return target_->NewWritableFile(f, file_opts, r, dbg);
×
346
    }
347

NEW
348
    ::rocksdb::IOStatus ReopenWritableFile(
×
349
        const std::string& fname, const ::rocksdb::FileOptions& file_opts,
350
        std::unique_ptr<::rocksdb::FSWritableFile>* result,
351
        ::rocksdb::IODebugContext* dbg) override {
NEW
352
        return target_->ReopenWritableFile(fname, file_opts, result, dbg);
×
353
    }
354

NEW
355
    ::rocksdb::IOStatus ReuseWritableFile(
×
356
        const std::string& fname, const std::string& old_fname,
357
        const ::rocksdb::FileOptions& file_opts,
358
        std::unique_ptr<::rocksdb::FSWritableFile>* r,
359
        ::rocksdb::IODebugContext* dbg) override {
NEW
360
        return target_->ReuseWritableFile(fname, old_fname, file_opts, r, dbg);
×
361
    }
362

NEW
363
    ::rocksdb::IOStatus NewRandomRWFile(
×
364
        const std::string& fname, const ::rocksdb::FileOptions& file_opts,
365
        std::unique_ptr<::rocksdb::FSRandomRWFile>* result,
366
        ::rocksdb::IODebugContext* dbg) override {
NEW
367
        return target_->NewRandomRWFile(fname, file_opts, result, dbg);
×
368
    }
369

NEW
370
    ::rocksdb::IOStatus NewMemoryMappedFileBuffer(
×
371
        const std::string& fname,
372
        std::unique_ptr<::rocksdb::MemoryMappedFileBuffer>* result) override {
NEW
373
        return target_->NewMemoryMappedFileBuffer(fname, result);
×
374
    }
375

376
    ::rocksdb::IOStatus NewDirectory(
15,486✔
377
        const std::string& name, const ::rocksdb::IOOptions& io_opts,
378
        std::unique_ptr<::rocksdb::FSDirectory>* result,
379
        ::rocksdb::IODebugContext* dbg) override {
380
        return target_->NewDirectory(name, io_opts, result, dbg);
15,486✔
381
    }
382

383
    ::rocksdb::IOStatus FileExists(const std::string& f,
13,857✔
384
                                   const ::rocksdb::IOOptions& io_opts,
385
                                   ::rocksdb::IODebugContext* dbg) override {
386
        return target_->FileExists(f, io_opts, dbg);
13,857✔
387
    }
388

389
    ::rocksdb::IOStatus GetChildren(const std::string& dir,
18,842✔
390
                                    const ::rocksdb::IOOptions& io_opts,
391
                                    std::vector<std::string>* r,
392
                                    ::rocksdb::IODebugContext* dbg) override {
393
        return target_->GetChildren(dir, io_opts, r, dbg);
18,842✔
394
    }
395

NEW
396
    ::rocksdb::IOStatus GetChildrenFileAttributes(
×
397
        const std::string& dir, const ::rocksdb::IOOptions& options,
398
        std::vector<::rocksdb::FileAttributes>* result,
399
        ::rocksdb::IODebugContext* dbg) override {
NEW
400
        return target_->GetChildrenFileAttributes(dir, options, result, dbg);
×
401
    }
402

403
    ::rocksdb::IOStatus DeleteFile(const std::string& f,
1,658✔
404
                                   const ::rocksdb::IOOptions& options,
405
                                   ::rocksdb::IODebugContext* dbg) override {
406
        return target_->DeleteFile(f, options, dbg);
1,658✔
407
    }
408

NEW
409
    ::rocksdb::IOStatus Truncate(const std::string& fname, size_t size,
×
410
                                 const ::rocksdb::IOOptions& options,
411
                                 ::rocksdb::IODebugContext* dbg) override {
NEW
412
        return target_->Truncate(fname, size, options, dbg);
×
413
    }
414

NEW
415
    ::rocksdb::IOStatus CreateDir(const std::string& d,
×
416
                                  const ::rocksdb::IOOptions& options,
417
                                  ::rocksdb::IODebugContext* dbg) override {
NEW
418
        return target_->CreateDir(d, options, dbg);
×
419
    }
420

421
    ::rocksdb::IOStatus CreateDirIfMissing(
17,177✔
422
        const std::string& d, const ::rocksdb::IOOptions& options,
423
        ::rocksdb::IODebugContext* dbg) override {
424
        return target_->CreateDirIfMissing(d, options, dbg);
17,177✔
425
    }
426

NEW
427
    ::rocksdb::IOStatus DeleteDir(const std::string& d,
×
428
                                  const ::rocksdb::IOOptions& options,
429
                                  ::rocksdb::IODebugContext* dbg) override {
NEW
430
        return target_->DeleteDir(d, options, dbg);
×
431
    }
432

433
    ::rocksdb::IOStatus GetFileSize(const std::string& f,
33,663✔
434
                                    const ::rocksdb::IOOptions& options,
435
                                    uint64_t* s,
436
                                    ::rocksdb::IODebugContext* dbg) override {
437
        return target_->GetFileSize(f, options, s, dbg);
33,663✔
438
    }
439

NEW
440
    ::rocksdb::IOStatus GetFileModificationTime(
×
441
        const std::string& fname, const ::rocksdb::IOOptions& options,
442
        uint64_t* file_mtime, ::rocksdb::IODebugContext* dbg) override {
NEW
443
        return target_->GetFileModificationTime(fname, options, file_mtime,
×
NEW
444
                                                dbg);
×
445
    }
446

447
    ::rocksdb::IOStatus GetAbsolutePath(
1,307✔
448
        const std::string& db_path, const ::rocksdb::IOOptions& options,
449
        std::string* output_path, ::rocksdb::IODebugContext* dbg) override {
450
        return target_->GetAbsolutePath(db_path, options, output_path, dbg);
1,307✔
451
    }
452

453
    ::rocksdb::IOStatus RenameFile(const std::string& s, const std::string& t,
4,791✔
454
                                   const ::rocksdb::IOOptions& options,
455
                                   ::rocksdb::IODebugContext* dbg) override {
456
        return target_->RenameFile(s, t, options, dbg);
4,791✔
457
    }
458

NEW
459
    ::rocksdb::IOStatus LinkFile(const std::string& s, const std::string& t,
×
460
                                 const ::rocksdb::IOOptions& options,
461
                                 ::rocksdb::IODebugContext* dbg) override {
NEW
462
        return target_->LinkFile(s, t, options, dbg);
×
463
    }
464

NEW
465
    ::rocksdb::IOStatus NumFileLinks(const std::string& fname,
×
466
                                     const ::rocksdb::IOOptions& options,
467
                                     uint64_t* count,
468
                                     ::rocksdb::IODebugContext* dbg) override {
NEW
469
        return target_->NumFileLinks(fname, options, count, dbg);
×
470
    }
471

NEW
472
    ::rocksdb::IOStatus AreFilesSame(const std::string& first,
×
473
                                     const std::string& second,
474
                                     const ::rocksdb::IOOptions& options,
475
                                     bool* res,
476
                                     ::rocksdb::IODebugContext* dbg) override {
NEW
477
        return target_->AreFilesSame(first, second, options, res, dbg);
×
478
    }
479

480
    ::rocksdb::IOStatus LockFile(const std::string& f,
1,307✔
481
                                 const ::rocksdb::IOOptions& options,
482
                                 ::rocksdb::FileLock** l,
483
                                 ::rocksdb::IODebugContext* dbg) override {
484
        return target_->LockFile(f, options, l, dbg);
1,307✔
485
    }
486

487
    ::rocksdb::IOStatus UnlockFile(::rocksdb::FileLock* l,
1,303✔
488
                                   const ::rocksdb::IOOptions& options,
489
                                   ::rocksdb::IODebugContext* dbg) override {
490
        return target_->UnlockFile(l, options, dbg);
1,303✔
491
    }
492

NEW
493
    ::rocksdb::IOStatus GetTestDirectory(
×
494
        const ::rocksdb::IOOptions& options, std::string* path,
495
        ::rocksdb::IODebugContext* dbg) override {
NEW
496
        return target_->GetTestDirectory(options, path, dbg);
×
497
    }
498

499
    ::rocksdb::IOStatus NewLogger(const std::string& fname,
1,307✔
500
                                  const ::rocksdb::IOOptions& options,
501
                                  std::shared_ptr<::rocksdb::Logger>* result,
502
                                  ::rocksdb::IODebugContext* dbg) override {
503
        return target_->NewLogger(fname, options, result, dbg);
1,307✔
504
    }
505

NEW
506
    void SanitizeFileOptions(::rocksdb::FileOptions* opts) const override {
×
NEW
507
        target_->SanitizeFileOptions(opts);
×
NEW
508
    }
×
509

510
    ::rocksdb::FileOptions OptimizeForLogRead(
10,269✔
511
        const ::rocksdb::FileOptions& file_options) const override {
512
        return target_->OptimizeForLogRead(file_options);
10,269✔
513
    }
514

515
    ::rocksdb::FileOptions OptimizeForManifestRead(
11,291✔
516
        const ::rocksdb::FileOptions& file_options) const override {
517
        return target_->OptimizeForManifestRead(file_options);
11,291✔
518
    }
519

520
    ::rocksdb::FileOptions OptimizeForLogWrite(
1,541✔
521
        const ::rocksdb::FileOptions& file_options,
522
        const ::rocksdb::DBOptions& db_options) const override {
523
        return target_->OptimizeForLogWrite(file_options, db_options);
1,541✔
524
    }
525

526
    ::rocksdb::FileOptions OptimizeForManifestWrite(
12,939✔
527
        const ::rocksdb::FileOptions& file_options) const override {
528
        return target_->OptimizeForManifestWrite(file_options);
12,939✔
529
    }
530

531
    ::rocksdb::FileOptions OptimizeForCompactionTableWrite(
11,288✔
532
        const ::rocksdb::FileOptions& file_options,
533
        const ::rocksdb::ImmutableDBOptions& immutable_opts) const override {
534
        return target_->OptimizeForCompactionTableWrite(file_options,
17,108✔
535
                                                        immutable_opts);
11,288✔
536
    }
537

NEW
538
    ::rocksdb::FileOptions OptimizeForCompactionTableRead(
×
539
        const ::rocksdb::FileOptions& file_options,
540
        const ::rocksdb::ImmutableDBOptions& db_options) const override {
NEW
541
        return target_->OptimizeForCompactionTableRead(file_options,
×
NEW
542
                                                       db_options);
×
543
    }
544

NEW
545
    ::rocksdb::FileOptions OptimizeForBlobFileRead(
×
546
        const ::rocksdb::FileOptions& file_options,
547
        const ::rocksdb::ImmutableDBOptions& db_options) const override {
NEW
548
        return target_->OptimizeForBlobFileRead(file_options, db_options);
×
549
    }
550

NEW
551
    ::rocksdb::IOStatus GetFreeSpace(const std::string& path,
×
552
                                     const ::rocksdb::IOOptions& options,
553
                                     uint64_t* diskfree,
554
                                     ::rocksdb::IODebugContext* dbg) override {
NEW
555
        return target_->GetFreeSpace(path, options, diskfree, dbg);
×
556
    }
557

NEW
558
    ::rocksdb::IOStatus IsDirectory(const std::string& path,
×
559
                                    const ::rocksdb::IOOptions& options,
560
                                    bool* is_dir,
561
                                    ::rocksdb::IODebugContext* dbg) override {
NEW
562
        return target_->IsDirectory(path, options, is_dir, dbg);
×
563
    }
564

NEW
565
    const ::rocksdb::Customizable* Inner() const override {
×
NEW
566
        return target_.get();
×
567
    }
568

NEW
569
    ::rocksdb::Status PrepareOptions(
×
570
        const ::rocksdb::ConfigOptions& options) override {
NEW
571
        return target_->PrepareOptions(options);
×
572
    }
573

NEW
574
    std::string SerializeOptions(const ::rocksdb::ConfigOptions& config_options,
×
575
                                 const std::string& header) const override {
NEW
576
        return ::rocksdb::FileSystem::SerializeOptions(config_options, header);
×
577
    }
578

NEW
579
    ::rocksdb::IOStatus Poll(std::vector<void*>& io_handles,
×
580
                             size_t min_completions) override {
NEW
581
        return target_->Poll(io_handles, min_completions);
×
582
    }
583

NEW
584
    ::rocksdb::IOStatus AbortIO(std::vector<void*>& io_handles) override {
×
NEW
585
        return target_->AbortIO(io_handles);
×
586
    }
587

NEW
588
    void DiscardCacheForDirectory(const std::string& path) override {
×
NEW
589
        target_->DiscardCacheForDirectory(path);
×
NEW
590
    }
×
591

NEW
592
    void SupportedOps(int64_t& supported_ops) override {
×
NEW
593
        target_->SupportedOps(supported_ops);
×
NEW
594
    }
×
595

596
   protected:
597
    std::shared_ptr<::rocksdb::FileSystem> target_;
598
};
599

600
class DfTracerFileSystem final : public LocalFileSystemWrapper {
601
   public:
602
    explicit DfTracerFileSystem(
17,182✔
603
        const std::shared_ptr<::rocksdb::FileSystem>& target)
604
        : LocalFileSystemWrapper(target), fallback_pool_(4) {
17,182!
605
        fallback_pool_.start();
11,334!
606
    }
17,184✔
607

608
    ~DfTracerFileSystem() override { fallback_pool_.stop(); }
17,174!
609

610
    static const char* kClassName() { return "DfTracerFileSystem"; }
11,292✔
611

612
    const char* Name() const override { return kClassName(); }
11,292✔
613

NEW
614
    bool IsInstanceOf(const std::string& name) const override {
×
NEW
615
        return name == kClassName() ||
×
NEW
616
               LocalFileSystemWrapper::IsInstanceOf(name);
×
617
    }
618

619
    void SupportedOps(int64_t& supported_ops) override {
429,215✔
620
        supported_ops = 0;
429,215✔
621
        supported_ops |= (1 << ::rocksdb::FSSupportedOps::kAsyncIO);
429,215✔
622
        supported_ops |= (1 << ::rocksdb::FSSupportedOps::kFSPrefetch);
429,215✔
623
    }
429,215✔
624

625
    ::rocksdb::IOStatus NewSequentialFile(
76,020✔
626
        const std::string& fname, const ::rocksdb::FileOptions&,
627
        std::unique_ptr<::rocksdb::FSSequentialFile>* result,
628
        ::rocksdb::IODebugContext*) override {
629
        int fd = ::open(fname.c_str(), O_RDONLY | O_CLOEXEC);
76,020✔
630
        if (fd < 0) {
76,032✔
631
            return io_error("open", fname);
1,018!
632
        }
633
        result->reset(new DfTracerSequentialFile(fname, fd));
75,014!
634
        return ::rocksdb::IOStatus::OK();
75,022✔
635
    }
39,251✔
636

637
    ::rocksdb::IOStatus NewRandomAccessFile(
4,630✔
638
        const std::string& fname, const ::rocksdb::FileOptions&,
639
        std::unique_ptr<::rocksdb::FSRandomAccessFile>* result,
640
        ::rocksdb::IODebugContext*) override {
641
        int fd = ::open(fname.c_str(), O_RDONLY | O_CLOEXEC);
4,630✔
642
        if (fd < 0) {
4,631✔
643
            return io_error("open", fname);
48!
644
        }
645
        result->reset(new DfTracerRandomAccessFile(this, fname, fd));
4,583!
646
        return ::rocksdb::IOStatus::OK();
4,583✔
647
    }
2,316✔
648

649
    ::rocksdb::IOStatus NewWritableFile(
8,649✔
650
        const std::string& fname, const ::rocksdb::FileOptions& file_opts,
651
        std::unique_ptr<::rocksdb::FSWritableFile>* result,
652
        ::rocksdb::IODebugContext*) override {
653
        int fd =
4,331✔
654
            ::open(fname.c_str(), O_CREAT | O_TRUNC | O_RDWR | O_CLOEXEC, 0644);
8,649✔
655
        if (fd < 0) {
8,657✔
656
            return io_error("open", fname);
18!
657
        }
658
        result->reset(new DfTracerWritableFile(fname, fd, file_opts));
8,639!
659
        return ::rocksdb::IOStatus::OK();
8,644✔
660
    }
4,331✔
661

662
    ::rocksdb::IOStatus ReopenWritableFile(
284✔
663
        const std::string& fname, const ::rocksdb::FileOptions& file_opts,
664
        std::unique_ptr<::rocksdb::FSWritableFile>* result,
665
        ::rocksdb::IODebugContext*) override {
666
        int fd = ::open(fname.c_str(), O_CREAT | O_RDWR | O_CLOEXEC, 0644);
284✔
667
        if (fd < 0) {
285✔
NEW
668
            return io_error("open", fname);
×
669
        }
670
        result->reset(new DfTracerWritableFile(fname, fd, file_opts));
285!
671
        return ::rocksdb::IOStatus::OK();
286✔
672
    }
143✔
673

NEW
674
    ::rocksdb::IOStatus ReuseWritableFile(
×
675
        const std::string& fname, const std::string& old_fname,
676
        const ::rocksdb::FileOptions& file_opts,
677
        std::unique_ptr<::rocksdb::FSWritableFile>* result,
678
        ::rocksdb::IODebugContext*) override {
NEW
679
        ::unlink(fname.c_str());
×
NEW
680
        if (::rename(old_fname.c_str(), fname.c_str()) != 0) {
×
NEW
681
            return io_error("rename", old_fname);
×
682
        }
NEW
683
        return ReopenWritableFile(fname, file_opts, result, nullptr);
×
684
    }
685

686
    ::rocksdb::IOStatus Poll(std::vector<void*>& io_handles,
2✔
687
                             size_t min_completions) override {
688
        const size_t target = std::min(min_completions, io_handles.size());
2✔
689
        if (target == 0) {
2✔
NEW
690
            return ::rocksdb::IOStatus::OK();
×
691
        }
692
        std::unique_lock<std::mutex> lock(completions_mutex_);
2!
693
        completions_cv_.wait(lock, [&] {
4!
694
            size_t completed = 0;
4✔
695
            for (void* io_handle : io_handles) {
8✔
696
                auto* handle = static_cast<AsyncReadHandle*>(io_handle);
4✔
697
                std::lock_guard<std::mutex> handle_lock(handle->mutex);
4!
698
                if (handle->finished && !handle->callback_delivered) {
4✔
699
                    ++completed;
2✔
700
                }
1✔
701
            }
4✔
702
            return completed >= target;
4✔
703
        });
704
        lock.unlock();
2!
705

706
        for (void* io_handle : io_handles) {
4✔
707
            auto* handle = static_cast<AsyncReadHandle*>(io_handle);
2✔
708
            std::unique_lock<std::mutex> handle_lock(handle->mutex);
2!
709
            if (!handle->finished || handle->callback_delivered ||
2!
710
                handle->aborted) {
2!
NEW
711
                continue;
×
712
            }
713
            handle->callback_delivered = true;
2✔
714
            auto callback = handle->callback;
2✔
715
            auto callback_arg = handle->callback_arg;
2✔
716
            ::rocksdb::FSReadRequest req;
2✔
717
            req.offset = handle->offset;
2✔
718
            req.len = handle->len;
2✔
719
            req.scratch = handle->scratch;
2✔
720
            req.result = handle->result;
2✔
721
            req.status = handle->status;
2!
722
            handle_lock.unlock();
2!
723
            callback(req, callback_arg);
2!
724
        }
2!
725
        return ::rocksdb::IOStatus::OK();
2!
726
    }
2✔
727

NEW
728
    ::rocksdb::IOStatus AbortIO(std::vector<void*>& io_handles) override {
×
NEW
729
        for (void* io_handle : io_handles) {
×
NEW
730
            auto* handle = static_cast<AsyncReadHandle*>(io_handle);
×
NEW
731
            std::lock_guard<std::mutex> lock(handle->mutex);
×
NEW
732
            handle->aborted = true;
×
NEW
733
        }
×
734

NEW
735
        for (void* io_handle : io_handles) {
×
NEW
736
            auto* handle = static_cast<AsyncReadHandle*>(io_handle);
×
NEW
737
            std::unique_lock<std::mutex> lock(handle->mutex);
×
NEW
738
            handle->cv.wait(lock, [&] { return handle->finished; });
×
NEW
739
            handle->callback_delivered = true;
×
NEW
740
        }
×
741

NEW
742
        return ::rocksdb::IOStatus::OK();
×
743
    }
744

745
    void submit_async_read(AsyncReadHandle* handle, int fd, std::string path,
2✔
746
                           ::rocksdb::IODebugContext* dbg) {
747
        {
748
            std::lock_guard<std::mutex> lock(handle->mutex);
2!
749
            handle->running = true;
2✔
750
            handle->path = path;
2!
751
        }
2✔
752
        if (auto* backend = current_io_backend(); backend != nullptr) {
2!
NEW
753
            backend->submit_pread_callback(fd, handle->scratch, handle->len,
×
NEW
754
                                           static_cast<off_t>(handle->offset),
×
755
                                           &DfTracerFileSystem::on_pread_done,
756
                                           handle);
NEW
757
            return;
×
758
        }
759

760
        fallback_pool_.submit([this, handle, fd, path = std::move(path), dbg] {
3!
761
            ::rocksdb::Slice result;
2✔
762
            auto status = read_async_impl(fd, path, handle->offset, handle->len,
2✔
763
                                          &result, handle->scratch, dbg);
2!
764
            complete_async_read(handle, status, result);
2!
765
        });
2✔
766
    }
1✔
767

768
    static ::rocksdb::IOStatus read_async_impl(
2✔
769
        int fd, std::string_view path, std::uint64_t offset, std::size_t n,
770
        ::rocksdb::Slice* result, char* scratch, ::rocksdb::IODebugContext*) {
771
        const ssize_t bytes =
1✔
772
            ::pread(fd, scratch, n, static_cast<off_t>(offset));
2✔
773
        if (bytes < 0) {
2✔
NEW
774
            return io_error("pread", path);
×
775
        }
776
        *result = ::rocksdb::Slice(scratch, static_cast<std::size_t>(bytes));
2✔
777
        return ::rocksdb::IOStatus::OK();
2✔
778
    }
1✔
779

780
    static void delete_async_read_handle(void* io_handle) {
2✔
781
        delete static_cast<AsyncReadHandle*>(io_handle);
2✔
782
    }
2✔
783

784
   private:
785
    void complete_async_read(AsyncReadHandle* handle,
2✔
786
                             const ::rocksdb::IOStatus& status,
787
                             const ::rocksdb::Slice& result) {
788
        {
789
            std::lock_guard<std::mutex> lock(handle->mutex);
2!
790
            handle->result = result;
2✔
791
            handle->status = status;
2!
792
            handle->running = false;
2✔
793
            handle->finished = true;
2✔
794
        }
2✔
795
        handle->cv.notify_all();
2✔
796

797
        std::lock_guard<std::mutex> lock(completions_mutex_);
2!
798
        completions_cv_.notify_all();
2✔
799
    }
2✔
800

NEW
801
    static void on_pread_done(void* context, ssize_t result) noexcept {
×
NEW
802
        auto* handle = static_cast<AsyncReadHandle*>(context);
×
NEW
803
        ::rocksdb::IOStatus status = ::rocksdb::IOStatus::OK();
×
NEW
804
        ::rocksdb::Slice slice;
×
NEW
805
        if (result < 0) {
×
NEW
806
            errno = static_cast<int>(-result);
×
NEW
807
            status = io_error("pread", handle->path);
×
808
        } else {
NEW
809
            slice = ::rocksdb::Slice(handle->scratch,
×
810
                                     static_cast<std::size_t>(result));
811
        }
NEW
812
        handle->owner->complete_async_read(handle, status, slice);
×
NEW
813
    }
×
814

815
    io::IoThreadPool fallback_pool_;
816
    std::mutex completions_mutex_;
817
    std::condition_variable completions_cv_;
818
};
819

820
::rocksdb::IOStatus DfTracerRandomAccessFile::ReadAsync(
2✔
821
    ::rocksdb::FSReadRequest& req, const ::rocksdb::IOOptions&,
822
    std::function<void(::rocksdb::FSReadRequest&, void*)> cb, void* cb_arg,
823
    void** io_handle, ::rocksdb::IOHandleDeleter* del_fn,
824
    ::rocksdb::IODebugContext* dbg) {
825
    auto* handle = new AsyncReadHandle(owner_);
2!
826
    handle->offset = req.offset;
2✔
827
    handle->len = req.len;
2✔
828
    handle->scratch = req.scratch;
2✔
829
    handle->callback = std::move(cb);
2✔
830
    handle->callback_arg = cb_arg;
2✔
831
    *io_handle = static_cast<void*>(handle);
2✔
832
    *del_fn = &DfTracerFileSystem::delete_async_read_handle;
2✔
833
    owner_->submit_async_read(handle, fd_, path_, dbg);
2!
834
    return ::rocksdb::IOStatus::OK();
2✔
835
}
836

837
}  // namespace
838

839
std::shared_ptr<::rocksdb::FileSystem> make_dftracer_file_system() {
11,338✔
840
    return std::make_shared<DfTracerFileSystem>(
16,837!
841
        ::rocksdb::FileSystem::Default());
22,328✔
842
}
843

844
std::unique_ptr<::rocksdb::Env> make_dftracer_env(
11,339✔
845
    const std::shared_ptr<::rocksdb::FileSystem>& file_system) {
846
    return ::rocksdb::NewCompositeEnv(file_system);
11,339✔
847
}
848

849
}  // namespace dftracer::utils::rocksdb
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc