• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 24057299873

07 Apr 2026 12:01AM UTC coverage: 52.076% (+0.8%) from 51.228%
24057299873

push

github

rayandrew
feat(rocksdb): migrate SQLite indexing to RocksDB

Replace SQLite-backed indexing and provenance storage with RocksDB-backed stores.

  Key changes:
  - add RocksDB async/database/db-manager/filesystem/key-codec layers
  - migrate index and provenance databases from SQLite to RocksDB
  - update index builder, trace reader, reorganize, view, stats, and comparator paths for
  RocksDB
  - harden transaction atomicity and rollback behavior with TransactionScope
  - add iterator status checking for prefix scans
  - harden gzip/tar indexer cache state and metadata handling
  - capture executor context in RocksDB awaitables
  - clean up failed RocksDB open paths and manager lifecycle behavior
  - vendor CPM 0.42.1 and update CI/build integration
  - refresh docs, Python bindings, and C++/Python test coverage for the new backend

  Validation:
  - full test suite passed
  - Ubuntu 22.04 Docker run passed
  - focused RocksDB/indexer regression tests passed.

24097 of 59624 branches covered (40.41%)

Branch coverage included in aggregate %.

2516 of 3144 new or added lines in 75 files covered. (80.03%)

72 existing lines in 15 files now uncovered.

20858 of 26701 relevant lines covered (78.12%)

14113.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.24
/src/dftracer/utils/core/io/kqueue_thread_pool_backend.cpp
1
#if defined(__APPLE__) || defined(__FreeBSD__) || defined(__OpenBSD__) || \
2
    defined(__NetBSD__) || defined(__DragonFly__)
3

4
#include <dftracer/utils/core/common/logging.h>
5
#include <dftracer/utils/core/io/kqueue_thread_pool_backend.h>
6
#include <dftracer/utils/core/io/thread_pool_backend.h>  // IoRequest, IoOp
7
#include <dftracer/utils/core/pipeline/executor.h>
8
#include <sys/event.h>
9
#include <sys/socket.h>
10
#include <sys/stat.h>
11
#include <sys/time.h>
12
#include <sys/types.h>
13
#include <sys/uio.h>
14
#include <unistd.h>
15

16
#include <cerrno>
17
#include <cstring>
18

19
namespace dftracer::utils::io {
20

21
KqueueThreadPoolBackend::KqueueThreadPoolBackend(Executor& executor,
874✔
22
                                                 std::size_t pool_size,
23
                                                 unsigned batch_threshold)
24
    : executor_(executor), pool_(pool_size, batch_threshold) {}
874!
25

26
KqueueThreadPoolBackend::~KqueueThreadPoolBackend() {
1,311✔
27
    // Ensure cleanup even if stop() was not called.
28
    if (kqueue_fd_ >= 0) {
437!
29
        ::close(kqueue_fd_);
×
30
        kqueue_fd_ = -1;
×
31
    }
×
32
}
1,311✔
33

34
void KqueueThreadPoolBackend::start() {
437✔
35
    pool_.start();
437✔
36

37
    kqueue_fd_ = ::kqueue();
437✔
38
    if (kqueue_fd_ < 0) {
437!
39
        DFTRACER_UTILS_LOG_ERROR("kqueue() failed: %s", std::strerror(errno));
×
40
        return;
×
41
    }
42

43
    // Register a user event (EVFILT_USER) for shutdown signaling.
44
    struct kevent ev{};
437✔
45
    EV_SET(&ev, SHUTDOWN_IDENT, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, nullptr);
437✔
46
    if (::kevent(kqueue_fd_, &ev, 1, nullptr, 0, nullptr) < 0) {
437!
47
        DFTRACER_UTILS_LOG_ERROR("kevent(register EVFILT_USER) failed: %s",
×
48
                                 std::strerror(errno));
49
    }
×
50

51
    DFTRACER_UTILS_LOG_DEBUG("kqueue+threadpool backend started (kqueue_fd=%d)",
52
                             kqueue_fd_);
53

54
    completion_thread_.start([this] { kqueue_loop(); });
874!
55
}
437✔
56

57
void KqueueThreadPoolBackend::stop() {
437✔
58
    // Signal the completion thread to exit.
59
    completion_thread_.signal_stop();
437✔
60

61
    // Wake kevent() by triggering the user event.
62
    if (kqueue_fd_ >= 0) {
437!
63
        struct kevent ev{};
437✔
64
        EV_SET(&ev, SHUTDOWN_IDENT, EVFILT_USER, 0, NOTE_TRIGGER, 0, nullptr);
437✔
65
        ::kevent(kqueue_fd_, &ev, 1, nullptr, 0, nullptr);
437✔
66
    }
437✔
67

68
    completion_thread_.join();
437✔
69
    pool_.stop();
437✔
70

71
    if (kqueue_fd_ >= 0) {
437!
72
        ::close(kqueue_fd_);
437✔
73
        kqueue_fd_ = -1;
437✔
74
    }
437✔
75
}
437✔
76

77
void KqueueThreadPoolBackend::kqueue_loop() {
437✔
78
    constexpr int MAX_EVENTS = 64;
437✔
79
    struct kevent events[MAX_EVENTS];
80

81
    // 100ms timeout for periodic running() check.
82
    struct timespec timeout{};
437✔
83
    timeout.tv_sec = 0;
437✔
84
    timeout.tv_nsec = 100 * 1000 * 1000;  // 100ms
437✔
85

86
    while (completion_thread_.running()) {
1,216✔
87
        int n = ::kevent(kqueue_fd_, nullptr, 0, events, MAX_EVENTS, &timeout);
779✔
88
        if (n < 0) {
779!
89
            if (errno == EINTR) continue;
×
90
            break;
×
91
        }
92

93
        for (int i = 0; i < n; ++i) {
1,216✔
94
            if (events[i].filter == EVFILT_USER &&
437!
95
                events[i].ident == SHUTDOWN_IDENT) {
437✔
96
                // Shutdown signal -- process any other events first.
97
                continue;
437✔
98
            }
99

100
            // Future: dispatch socket I/O events here.
101
            // For now, only the user event is registered.
102
        }
×
103
    }
104
}
437✔
105

106
// -- File I/O: identical to ThreadPoolBackend / EpollThreadPoolBackend --
107

108
static IoAwaitable make_kqueue_request(IoOp op, int fd, void* buf,
921✔
109
                                       std::size_t len, off_t offset,
110
                                       const char* path, int flags, mode_t mode,
111
                                       Executor* executor, IoThreadPool* pool) {
112
    auto* req = new IoRequest{};
921✔
113
    req->submit = &KqueueThreadPoolBackend::submit_to_pool;
114
    req->op = op;
115
    req->fd = fd;
116
    req->buf = buf;
117
    req->len = len;
118
    req->offset = offset;
119
    req->path = path;
120
    req->flags = flags;
121
    req->mode = mode;
122
    req->executor = executor;
123
    req->pool = pool;
124

125
    IoAwaitable awaitable;
126
    req->awaitable = nullptr;
127
    awaitable.submit_ctx_ = req;
128
    return awaitable;
129
}
130

131
IoAwaitable KqueueThreadPoolBackend::submit_read(int fd, void* buf,
2✔
132
                                                 std::size_t len) {
133
    return make_kqueue_request(IoOp::READ, fd, buf, len, 0, nullptr, 0, 0,
4✔
134
                               &executor_, &pool_);
2✔
135
}
136

137
IoAwaitable KqueueThreadPoolBackend::submit_write(int fd, const void* buf,
30✔
138
                                                  std::size_t len) {
139
    return make_kqueue_request(IoOp::WRITE, fd, const_cast<void*>(buf), len, 0,
60✔
140
                               nullptr, 0, 0, &executor_, &pool_);
30✔
141
}
142

143
IoAwaitable KqueueThreadPoolBackend::submit_pread(int fd, void* buf,
731✔
144
                                                  std::size_t len,
145
                                                  off_t offset) {
146
    return make_kqueue_request(IoOp::PREAD, fd, buf, len, offset, nullptr, 0, 0,
1,462✔
147
                               &executor_, &pool_);
731✔
148
}
149

NEW
150
void KqueueThreadPoolBackend::submit_pread_callback(int fd, void* buf,
×
151
                                                    std::size_t len,
152
                                                    off_t offset,
153
                                                    IoCompletionFn completion,
154
                                                    void* context) {
NEW
155
    auto* req = new IoRequest{};
×
156
    req->op = IoOp::PREAD;
157
    req->fd = fd;
158
    req->buf = buf;
159
    req->len = len;
160
    req->offset = offset;
161
    req->completion = completion;
162
    req->completion_ctx = context;
163
    req->pool = &pool_;
NEW
164
    pool_.submit([req] { execute_request(req); });
×
165
}
166

167
IoAwaitable KqueueThreadPoolBackend::submit_pwrite(int fd, const void* buf,
1✔
168
                                                   std::size_t len,
169
                                                   off_t offset) {
170
    return make_kqueue_request(IoOp::PWRITE, fd, const_cast<void*>(buf), len,
2✔
171
                               offset, nullptr, 0, 0, &executor_, &pool_);
1✔
172
}
173

174
IoAwaitable KqueueThreadPoolBackend::submit_open(const char* path, int flags,
138✔
175
                                                 mode_t mode) {
176
    return make_kqueue_request(IoOp::OPEN, -1, nullptr, 0, 0, path, flags, mode,
276✔
177
                               &executor_, &pool_);
138✔
178
}
179

180
IoAwaitable KqueueThreadPoolBackend::submit_close(int fd) {
17✔
181
    return make_kqueue_request(IoOp::CLOSE, fd, nullptr, 0, 0, nullptr, 0, 0,
34✔
182
                               &executor_, &pool_);
17✔
183
}
184

185
IoAwaitable KqueueThreadPoolBackend::submit_fsync(int fd) {
×
186
    return make_kqueue_request(IoOp::FSYNC, fd, nullptr, 0, 0, nullptr, 0, 0,
×
187
                               &executor_, &pool_);
×
188
}
189

190
IoAwaitable KqueueThreadPoolBackend::submit_ftruncate(int fd, off_t length) {
×
191
    return make_kqueue_request(IoOp::FTRUNCATE, fd, nullptr, 0, length, nullptr,
×
192
                               0, 0, &executor_, &pool_);
×
193
}
194

195
IoAwaitable KqueueThreadPoolBackend::submit_fstat(int fd, struct stat* buf) {
×
196
    auto req_awaitable = make_kqueue_request(IoOp::FSTAT, fd, nullptr, 0, 0,
×
197
                                             nullptr, 0, 0, &executor_, &pool_);
×
198
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
199
    req->stat_buf = buf;
×
200
    return req_awaitable;
×
201
}
202

203
IoAwaitable KqueueThreadPoolBackend::submit_accept(int listen_fd,
×
204
                                                   struct sockaddr* addr,
205
                                                   socklen_t* addrlen) {
206
    auto req_awaitable =
207
        make_kqueue_request(IoOp::ACCEPT, listen_fd, nullptr, 0, 0, nullptr, 0,
×
208
                            0, &executor_, &pool_);
×
209
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
210
    req->addr = addr;
×
211
    req->addrlen = addrlen;
×
212
    return req_awaitable;
×
213
}
214

215
IoAwaitable KqueueThreadPoolBackend::submit_recv(int fd, void* buf,
×
216
                                                 std::size_t len, int flags) {
217
    auto req_awaitable = make_kqueue_request(IoOp::RECV, fd, buf, len, 0,
×
218
                                             nullptr, 0, 0, &executor_, &pool_);
×
219
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
220
    req->msg_flags = flags;
×
221
    return req_awaitable;
×
222
}
223

224
IoAwaitable KqueueThreadPoolBackend::submit_send(int fd, const void* buf,
×
225
                                                 std::size_t len, int flags) {
226
    auto req_awaitable =
227
        make_kqueue_request(IoOp::SEND, fd, const_cast<void*>(buf), len, 0,
×
228
                            nullptr, 0, 0, &executor_, &pool_);
×
229
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
230
    req->msg_flags = flags;
×
231
    return req_awaitable;
×
232
}
233

234
IoAwaitable KqueueThreadPoolBackend::submit_readv(int fd,
×
235
                                                  const struct iovec* iov,
236
                                                  int iovcnt) {
237
    auto req_awaitable = make_kqueue_request(IoOp::READV, fd, nullptr, 0, 0,
×
238
                                             nullptr, 0, 0, &executor_, &pool_);
×
239
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
240
    req->iov = iov;
×
241
    req->iovcnt = iovcnt;
×
242
    return req_awaitable;
×
243
}
244

245
IoAwaitable KqueueThreadPoolBackend::submit_writev(int fd,
1✔
246
                                                   const struct iovec* iov,
247
                                                   int iovcnt) {
248
    auto req_awaitable = make_kqueue_request(IoOp::WRITEV, fd, nullptr, 0, 0,
2✔
249
                                             nullptr, 0, 0, &executor_, &pool_);
1✔
250
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
1✔
251
    req->iov = iov;
1✔
252
    req->iovcnt = iovcnt;
1✔
253
    return req_awaitable;
1✔
254
}
255

256
IoAwaitable KqueueThreadPoolBackend::submit_preadv(int fd,
1✔
257
                                                   const struct iovec* iov,
258
                                                   int iovcnt, off_t offset) {
259
    auto req_awaitable =
260
        make_kqueue_request(IoOp::PREADV, fd, nullptr, 0, offset, nullptr, 0, 0,
2✔
261
                            &executor_, &pool_);
1✔
262
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
1✔
263
    req->iov = iov;
1✔
264
    req->iovcnt = iovcnt;
1✔
265
    return req_awaitable;
1✔
266
}
267

268
IoAwaitable KqueueThreadPoolBackend::submit_pwritev(int fd,
×
269
                                                    const struct iovec* iov,
270
                                                    int iovcnt, off_t offset) {
271
    auto req_awaitable =
272
        make_kqueue_request(IoOp::PWRITEV, fd, nullptr, 0, offset, nullptr, 0,
×
273
                            0, &executor_, &pool_);
×
274
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
275
    req->iov = iov;
×
276
    req->iovcnt = iovcnt;
×
277
    return req_awaitable;
×
278
}
279

280
IoAwaitable KqueueThreadPoolBackend::submit_lseek(int fd, off_t offset,
1✔
281
                                                  int whence) {
282
    auto req_awaitable = make_kqueue_request(
1✔
283
        IoOp::LSEEK, fd, nullptr, 0, offset, nullptr, 0, 0, &executor_, &pool_);
1✔
284
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
1✔
285
    req->whence = whence;
1✔
286
    return req_awaitable;
1✔
287
}
288

289
IoAwaitable KqueueThreadPoolBackend::submit_sendfile(int out_fd, int in_fd,
1✔
290
                                                     off_t offset,
291
                                                     std::size_t count) {
292
    auto req_awaitable =
293
        make_kqueue_request(IoOp::SENDFILE, in_fd, nullptr, count, offset,
2✔
294
                            nullptr, 0, 0, &executor_, &pool_);
1✔
295
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
1✔
296
    req->dest_fd = out_fd;
1✔
297
    return req_awaitable;
1✔
298
}
299

300
void KqueueThreadPoolBackend::submit_to_pool(SubmitContext* ctx,
923✔
301
                                             IoAwaitable* awaitable) {
302
    auto* req = static_cast<IoRequest*>(ctx);
923✔
303
    req->awaitable = awaitable;
923✔
304
    req->pool->submit([req] { execute_request(req); });
1,846!
305
}
923✔
306

307
void KqueueThreadPoolBackend::execute_request(IoRequest* req) {
923✔
308
    ssize_t result = 0;
923✔
309
    switch (req->op) {
923!
310
        case IoOp::READ:
311
            result = ::read(req->fd, req->buf, req->len);
2✔
312
            break;
2✔
313
        case IoOp::WRITE:
314
            result = ::write(req->fd, req->buf, req->len);
30✔
315
            break;
30✔
316
        case IoOp::PREAD:
317
            result = ::pread(req->fd, req->buf, req->len, req->offset);
731✔
318
            break;
731✔
319
        case IoOp::PWRITE:
320
            result = ::pwrite(req->fd, req->buf, req->len, req->offset);
1✔
321
            break;
1✔
322
        case IoOp::OPEN:
323
            result = ::open(req->path, req->flags, req->mode);
138✔
324
            break;
138✔
325
        case IoOp::CLOSE:
326
            result = ::close(req->fd);
17✔
327
            break;
17✔
328
        case IoOp::FSYNC:
329
            result = ::fsync(req->fd);
×
330
            break;
×
331
        case IoOp::FTRUNCATE:
332
            result = ::ftruncate(req->fd, req->offset);
×
333
            break;
×
334
        case IoOp::FSTAT:
335
            result = ::fstat(req->fd, req->stat_buf);
×
336
            break;
×
337
        case IoOp::ACCEPT:
338
            result = ::accept(req->fd, req->addr, req->addrlen);
×
339
            if (result >= 0) {
×
340
                int fd = static_cast<int>(result);
×
341
                int fl = ::fcntl(fd, F_GETFL, 0);
×
342
                ::fcntl(fd, F_SETFL, fl | O_NONBLOCK);
×
343
                ::fcntl(fd, F_SETFD, FD_CLOEXEC);
×
344
            }
×
345
            break;
×
346
        case IoOp::RECV:
347
            result = ::recv(req->fd, req->buf, req->len, req->msg_flags);
×
348
            break;
×
349
        case IoOp::SEND:
350
            result = ::send(req->fd, req->buf, req->len, req->msg_flags);
×
351
            break;
×
352
        case IoOp::READV:
353
            result = ::readv(req->fd, req->iov, req->iovcnt);
×
354
            break;
×
355
        case IoOp::WRITEV:
356
            result = ::writev(req->fd, req->iov, req->iovcnt);
1✔
357
            break;
1✔
358
        case IoOp::PREADV:
359
            result = ::preadv(req->fd, req->iov, req->iovcnt, req->offset);
1✔
360
            break;
1✔
361
        case IoOp::PWRITEV:
362
            result = ::pwritev(req->fd, req->iov, req->iovcnt, req->offset);
×
363
            break;
×
364
        case IoOp::LSEEK:
365
            result = ::lseek(req->fd, req->offset, req->whence);
1✔
366
            break;
1✔
367
        case IoOp::SENDFILE: {
368
#ifdef __APPLE__
369
            off_t len = static_cast<off_t>(req->len);
1✔
370
            int ret = ::sendfile(req->fd, req->dest_fd, req->offset, &len,
1✔
371
                                 nullptr, 0);
372
            result = (ret == 0 || errno == EAGAIN) ? len : -1;
1!
373
#else
374
            char tmp[8192];
375
            result = 0;
376
            off_t off = req->offset;
377
            std::size_t remaining = req->len;
378
            while (remaining > 0) {
379
                std::size_t chunk =
380
                    remaining < sizeof(tmp) ? remaining : sizeof(tmp);
381
                ssize_t r = ::pread(req->fd, tmp, chunk, off);
382
                if (r <= 0) {
383
                    if (result == 0) result = r;
384
                    break;
385
                }
386
                ssize_t w =
387
                    ::write(req->dest_fd, tmp, static_cast<std::size_t>(r));
388
                if (w < 0) {
389
                    if (result == 0) result = w;
390
                    break;
391
                }
392
                result += w;
393
                off += w;
394
                remaining -= static_cast<std::size_t>(w);
395
                if (w < r) break;
396
            }
397
#endif
398
            break;
1✔
399
        }
400
    }
401
    if (result < 0) result = -errno;
923✔
402

403
    if (req->awaitable != nullptr) {
923!
404
        req->awaitable->result_ = result;
923✔
405
        req->executor->enqueue(req->awaitable->handle_);
923✔
406
    } else if (req->completion != nullptr) {
923!
NEW
407
        req->completion(req->completion_ctx, result);
×
NEW
408
    }
×
409
    delete req;
923!
410
}
923✔
411

412
std::size_t KqueueThreadPoolBackend::poll(int /*timeout_ms*/) {
4,224✔
413
    // Completions fire via thread pool callbacks -- nothing to poll.
414
    return 0;
4,224✔
415
}
416

417
int KqueueThreadPoolBackend::flush() { return static_cast<int>(pool_.flush()); }
4,221✔
418

419
}  // namespace dftracer::utils::io
420

421
#endif  // kqueue platforms
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc