• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 24057299873

07 Apr 2026 12:01AM UTC coverage: 52.076% (+0.8%) from 51.228%
24057299873

push

github

rayandrew
feat(rocksdb): migrate SQLite indexing to RocksDB

Replace SQLite-backed indexing and provenance storage with RocksDB-backed stores.

  Key changes:
  - add RocksDB async/database/db-manager/filesystem/key-codec layers
  - migrate index and provenance databases from SQLite to RocksDB
  - update index builder, trace reader, reorganize, view, stats, and comparator paths for
  RocksDB
  - harden transaction atomicity and rollback behavior with TransactionScope
  - add iterator status checking for prefix scans
  - harden gzip/tar indexer cache state and metadata handling
  - capture executor context in RocksDB awaitables
  - clean up failed RocksDB open paths and manager lifecycle behavior
  - vendor CPM 0.42.1 and update CI/build integration
  - refresh docs, Python bindings, and C++/Python test coverage for the new backend

  Validation:
  - full test suite passed
  - Ubuntu 22.04 Docker run passed
  - focused RocksDB/indexer regression tests passed.

24097 of 59624 branches covered (40.41%)

Branch coverage included in aggregate %.

2516 of 3144 new or added lines in 75 files covered. (80.03%)

72 existing lines in 15 files now uncovered.

20858 of 26701 relevant lines covered (78.12%)

14113.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.27
/src/dftracer/utils/core/io/thread_pool_backend.cpp
1
#include <dftracer/utils/core/io/thread_pool_backend.h>
2
#include <dftracer/utils/core/pipeline/executor.h>
3
#include <fcntl.h>
4
#include <sys/socket.h>
5
#include <sys/stat.h>
6
#include <sys/uio.h>
7
#ifdef __linux__
8
#include <sys/sendfile.h>
9
#endif
10
#include <unistd.h>
11

12
#include <cerrno>
13

14
namespace dftracer::utils::io {
15

16
ThreadPoolBackend::ThreadPoolBackend(Executor& executor, std::size_t pool_size,
12✔
17
                                     unsigned batch_threshold)
4✔
18
    : executor_(executor), pool_(pool_size, batch_threshold) {}
12!
19

20
void ThreadPoolBackend::start() { pool_.start(); }
8✔
21
void ThreadPoolBackend::stop() { pool_.stop(); }
8✔
22

23
static IoAwaitable make_request(IoOp op, int fd, void* buf, std::size_t len,
207✔
24
                                off_t offset, const char* path, int flags,
25
                                mode_t mode, Executor* executor,
26
                                IoThreadPool* pool) {
27
    auto* req = new IoRequest{};
207✔
28
    req->submit = &ThreadPoolBackend::submit_to_pool;
102✔
29
    req->op = op;
102✔
30
    req->fd = fd;
102✔
31
    req->buf = buf;
102✔
32
    req->len = len;
102✔
33
    req->offset = offset;
102✔
34
    req->path = path;
102✔
35
    req->flags = flags;
102✔
36
    req->mode = mode;
102✔
37
    req->executor = executor;
102✔
38
    req->pool = pool;
102✔
39

40
    IoAwaitable awaitable;
102✔
41
    // req->awaitable will be updated in submit_to_pool with the
42
    // stable address from await_suspend (the IoAwaitable may move
43
    // before the coroutine frame is allocated).
44
    req->awaitable = nullptr;
102✔
45
    awaitable.submit_ctx_ = req;
102✔
46
    return awaitable;
102✔
47
}
48

49
IoAwaitable ThreadPoolBackend::submit_read(int fd, void* buf, std::size_t len) {
×
50
    return make_request(IoOp::READ, fd, buf, len, 0, nullptr, 0, 0, &executor_,
×
51
                        &pool_);
×
52
}
53

54
IoAwaitable ThreadPoolBackend::submit_write(int fd, const void* buf,
2✔
55
                                            std::size_t len) {
56
    return make_request(IoOp::WRITE, fd, const_cast<void*>(buf), len, 0,
3✔
57
                        nullptr, 0, 0, &executor_, &pool_);
2✔
58
}
59

60
IoAwaitable ThreadPoolBackend::submit_pread(int fd, void* buf, std::size_t len,
32✔
61
                                            off_t offset) {
62
    return make_request(IoOp::PREAD, fd, buf, len, offset, nullptr, 0, 0,
48✔
63
                        &executor_, &pool_);
32✔
64
}
65

NEW
66
void ThreadPoolBackend::submit_pread_callback(int fd, void* buf,
×
67
                                              std::size_t len, off_t offset,
68
                                              IoCompletionFn completion,
69
                                              void* context) {
NEW
70
    auto* req = new IoRequest{};
×
71
    req->op = IoOp::PREAD;
72
    req->fd = fd;
73
    req->buf = buf;
74
    req->len = len;
75
    req->offset = offset;
76
    req->completion = completion;
77
    req->completion_ctx = context;
78
    req->pool = &pool_;
NEW
79
    pool_.submit([req] { execute_request(req); });
×
80
}
81

UNCOV
82
IoAwaitable ThreadPoolBackend::submit_pwrite(int fd, const void* buf,
×
83
                                             std::size_t len, off_t offset) {
84
    return make_request(IoOp::PWRITE, fd, const_cast<void*>(buf), len, offset,
×
85
                        nullptr, 0, 0, &executor_, &pool_);
×
86
}
87

88
IoAwaitable ThreadPoolBackend::submit_open(const char* path, int flags,
18✔
89
                                           mode_t mode) {
90
    return make_request(IoOp::OPEN, -1, nullptr, 0, 0, path, flags, mode,
27✔
91
                        &executor_, &pool_);
18✔
92
}
93

94
IoAwaitable ThreadPoolBackend::submit_close(int fd) {
39✔
95
    return make_request(IoOp::CLOSE, fd, nullptr, 0, 0, nullptr, 0, 0,
59✔
96
                        &executor_, &pool_);
39✔
97
}
98

99
IoAwaitable ThreadPoolBackend::submit_fsync(int fd) {
×
100
    return make_request(IoOp::FSYNC, fd, nullptr, 0, 0, nullptr, 0, 0,
×
101
                        &executor_, &pool_);
×
102
}
103

104
IoAwaitable ThreadPoolBackend::submit_ftruncate(int fd, off_t length) {
×
105
    return make_request(IoOp::FTRUNCATE, fd, nullptr, 0, length, nullptr, 0, 0,
×
106
                        &executor_, &pool_);
×
107
}
108

109
IoAwaitable ThreadPoolBackend::submit_fstat(int fd, struct stat* buf) {
×
110
    auto req_awaitable = make_request(IoOp::FSTAT, fd, nullptr, 0, 0, nullptr,
×
111
                                      0, 0, &executor_, &pool_);
×
112
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
113
    req->stat_buf = buf;
×
114
    return req_awaitable;
×
115
}
116

117
IoAwaitable ThreadPoolBackend::submit_accept(int listen_fd,
39✔
118
                                             struct sockaddr* addr,
119
                                             socklen_t* addrlen) {
120
    auto req_awaitable = make_request(IoOp::ACCEPT, listen_fd, nullptr, 0, 0,
59✔
121
                                      nullptr, 0, 0, &executor_, &pool_);
39✔
122
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
39✔
123
    req->addr = addr;
39✔
124
    req->addrlen = addrlen;
39✔
125
    return req_awaitable;
39✔
126
}
127

128
IoAwaitable ThreadPoolBackend::submit_recv(int fd, void* buf, std::size_t len,
37✔
129
                                           int flags) {
130
    auto req_awaitable = make_request(IoOp::RECV, fd, buf, len, 0, nullptr, 0,
56✔
131
                                      0, &executor_, &pool_);
37✔
132
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
37✔
133
    req->msg_flags = flags;
37✔
134
    return req_awaitable;
37✔
135
}
136

137
IoAwaitable ThreadPoolBackend::submit_send(int fd, const void* buf,
36✔
138
                                           std::size_t len, int flags) {
139
    auto req_awaitable =
140
        make_request(IoOp::SEND, fd, const_cast<void*>(buf), len, 0, nullptr, 0,
54✔
141
                     0, &executor_, &pool_);
36✔
142
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
36✔
143
    req->msg_flags = flags;
36✔
144
    return req_awaitable;
36✔
145
}
146

147
IoAwaitable ThreadPoolBackend::submit_readv(int fd, const struct iovec* iov,
×
148
                                            int iovcnt) {
149
    auto req_awaitable = make_request(IoOp::READV, fd, nullptr, 0, 0, nullptr,
×
150
                                      0, 0, &executor_, &pool_);
×
151
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
152
    req->iov = iov;
×
153
    req->iovcnt = iovcnt;
×
154
    return req_awaitable;
×
155
}
156

157
IoAwaitable ThreadPoolBackend::submit_writev(int fd, const struct iovec* iov,
4✔
158
                                             int iovcnt) {
159
    auto req_awaitable = make_request(IoOp::WRITEV, fd, nullptr, 0, 0, nullptr,
6✔
160
                                      0, 0, &executor_, &pool_);
4✔
161
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
4✔
162
    req->iov = iov;
4✔
163
    req->iovcnt = iovcnt;
4✔
164
    return req_awaitable;
4✔
165
}
166

167
IoAwaitable ThreadPoolBackend::submit_preadv(int fd, const struct iovec* iov,
×
168
                                             int iovcnt, off_t offset) {
169
    auto req_awaitable = make_request(IoOp::PREADV, fd, nullptr, 0, offset,
×
170
                                      nullptr, 0, 0, &executor_, &pool_);
×
171
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
172
    req->iov = iov;
×
173
    req->iovcnt = iovcnt;
×
174
    return req_awaitable;
×
175
}
176

177
IoAwaitable ThreadPoolBackend::submit_pwritev(int fd, const struct iovec* iov,
×
178
                                              int iovcnt, off_t offset) {
179
    auto req_awaitable = make_request(IoOp::PWRITEV, fd, nullptr, 0, offset,
×
180
                                      nullptr, 0, 0, &executor_, &pool_);
×
181
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
182
    req->iov = iov;
×
183
    req->iovcnt = iovcnt;
×
184
    return req_awaitable;
×
185
}
186

187
IoAwaitable ThreadPoolBackend::submit_lseek(int fd, off_t offset, int whence) {
×
188
    auto req_awaitable = make_request(IoOp::LSEEK, fd, nullptr, 0, offset,
×
189
                                      nullptr, 0, 0, &executor_, &pool_);
×
190
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
191
    req->whence = whence;
×
192
    return req_awaitable;
×
193
}
194

195
IoAwaitable ThreadPoolBackend::submit_sendfile(int out_fd, int in_fd,
×
196
                                               off_t offset,
197
                                               std::size_t count) {
198
    auto req_awaitable =
199
        make_request(IoOp::SENDFILE, in_fd, nullptr, count, offset, nullptr, 0,
×
200
                     0, &executor_, &pool_);
×
201
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
202
    req->dest_fd = out_fd;
×
203
    return req_awaitable;
×
204
}
205

206
void ThreadPoolBackend::submit_to_pool(SubmitContext* ctx,
207✔
207
                                       IoAwaitable* awaitable) {
208
    auto* req = static_cast<IoRequest*>(ctx);
207✔
209
    // Update the awaitable pointer -- await_suspend passes the real,
210
    // stable address of the IoAwaitable in the coroutine frame.
211
    req->awaitable = awaitable;
207✔
212
    req->pool->submit([req] { execute_request(req); });
414!
213
}
206✔
214

215
void ThreadPoolBackend::execute_request(IoRequest* req) {
207✔
216
    ssize_t result = 0;
207✔
217
    switch (req->op) {
207!
218
        case IoOp::READ:
219
            result = ::read(req->fd, req->buf, req->len);
×
220
            break;
×
221
        case IoOp::WRITE:
1✔
222
            result = ::write(req->fd, req->buf, req->len);
2✔
223
            break;
2✔
224
        case IoOp::PREAD:
16✔
225
            result = ::pread(req->fd, req->buf, req->len, req->offset);
32✔
226
            break;
32✔
227
        case IoOp::PWRITE:
228
            result = ::pwrite(req->fd, req->buf, req->len, req->offset);
×
229
            break;
×
230
        case IoOp::OPEN:
9✔
231
            result = ::open(req->path, req->flags, req->mode);
18✔
232
            break;
18✔
233
        case IoOp::CLOSE:
19✔
234
            result = ::close(req->fd);
39✔
235
            break;
39✔
236
        case IoOp::FSYNC:
237
            result = ::fsync(req->fd);
×
238
            break;
×
239
        case IoOp::FTRUNCATE:
240
            result = ::ftruncate(req->fd, req->offset);
×
241
            break;
×
242
        case IoOp::FSTAT:
243
            result = ::fstat(req->fd, req->stat_buf);
×
244
            break;
×
245
        case IoOp::ACCEPT:
19✔
246
#ifdef __linux__
247
            result = ::accept4(req->fd, req->addr, req->addrlen,
19✔
248
                               SOCK_NONBLOCK | SOCK_CLOEXEC);
249
#else
250
            result = ::accept(req->fd, req->addr, req->addrlen);
20✔
251
            if (result >= 0) {
20✔
252
                int fd = static_cast<int>(result);
19✔
253
                int fl = ::fcntl(fd, F_GETFL, 0);
19✔
254
                ::fcntl(fd, F_SETFL, fl | O_NONBLOCK);
19✔
255
                ::fcntl(fd, F_SETFD, FD_CLOEXEC);
19✔
256
            }
19✔
257
#endif
258
            break;
39✔
259
        case IoOp::RECV:
18✔
260
            result = ::recv(req->fd, req->buf, req->len, req->msg_flags);
37✔
261
            break;
37✔
262
        case IoOp::SEND:
18✔
263
            result = ::send(req->fd, req->buf, req->len, req->msg_flags);
36✔
264
            break;
36✔
265
        case IoOp::READV:
266
            result = ::readv(req->fd, req->iov, req->iovcnt);
×
267
            break;
×
268
        case IoOp::WRITEV:
2✔
269
            result = ::writev(req->fd, req->iov, req->iovcnt);
4✔
270
            break;
4✔
271
        case IoOp::PREADV:
272
            result = ::preadv(req->fd, req->iov, req->iovcnt, req->offset);
×
273
            break;
×
274
        case IoOp::PWRITEV:
275
            result = ::pwritev(req->fd, req->iov, req->iovcnt, req->offset);
×
276
            break;
×
277
        case IoOp::LSEEK:
278
            result = ::lseek(req->fd, req->offset, req->whence);
×
279
            break;
×
280
        case IoOp::SENDFILE: {
281
#ifdef __linux__
282
            off_t off = req->offset;
283
            result = ::sendfile(req->dest_fd, req->fd, &off, req->len);
284
#elif defined(__APPLE__)
285
            off_t len = static_cast<off_t>(req->len);
286
            int ret = ::sendfile(req->fd, req->dest_fd, req->offset, &len,
287
                                 nullptr, 0);
288
            result = (ret == 0 || errno == EAGAIN) ? len : -1;
×
289
#else
290
            char tmp[8192];
291
            result = 0;
292
            off_t off = req->offset;
293
            std::size_t remaining = req->len;
294
            while (remaining > 0) {
295
                std::size_t chunk =
296
                    remaining < sizeof(tmp) ? remaining : sizeof(tmp);
297
                ssize_t r = ::pread(req->fd, tmp, chunk, off);
298
                if (r <= 0) {
299
                    if (result == 0) result = r;
300
                    break;
301
                }
302
                ssize_t w =
303
                    ::write(req->dest_fd, tmp, static_cast<std::size_t>(r));
304
                if (w < 0) {
305
                    if (result == 0) result = w;
306
                    break;
307
                }
308
                result += w;
309
                off += w;
310
                remaining -= static_cast<std::size_t>(w);
311
                if (w < r) break;
312
            }
313
#endif
314
            break;
×
315
        }
316
    }
317
    if (result < 0) result = -errno;
207✔
318

319
    if (req->awaitable != nullptr) {
207!
320
        req->awaitable->result_ = result;
207✔
321
        req->executor->enqueue(req->awaitable->handle_);
207✔
322
    } else if (req->completion != nullptr) {
105!
NEW
323
        req->completion(req->completion_ctx, result);
×
324
    }
325
    delete req;
207✔
326
}
207✔
327

328
std::size_t ThreadPoolBackend::poll(int /*timeout_ms*/) {
254✔
329
    return 0;  // Thread pool backend: completions fire via callbacks
254✔
330
}
331

332
int ThreadPoolBackend::flush() { return static_cast<int>(pool_.flush()); }
254✔
333

334
std::string ThreadPoolBackend::name() const { return "threadpool"; }
2!
335

336
}  // namespace dftracer::utils::io
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc