• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 23730905027

30 Mar 2026 06:22AM UTC coverage: 51.451% (+0.4%) from 51.022%
23730905027

push

github

rayandrew
chore(docs)!: regenerate C++ API reference pages from Doxygen XML

- Add generate_api_index.py script for automated API doc generation
- Rename core_common.rst to core_infrastructure.rst
- Update all API reference pages with current class/function signatures
- Add doxygen group annotations to public headers

BREAKING CHANGE: API reference page structure reorganized

23019 of 57787 branches covered (39.83%)

Branch coverage included in aggregate %.

20057 of 25936 relevant lines covered (77.33%)

13268.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.78
/src/dftracer/utils/core/io/kqueue_thread_pool_backend.cpp
1
#if defined(__APPLE__) || defined(__FreeBSD__) || defined(__OpenBSD__) || \
2
    defined(__NetBSD__) || defined(__DragonFly__)
3

4
#include <dftracer/utils/core/common/logging.h>
5
#include <dftracer/utils/core/io/kqueue_thread_pool_backend.h>
6
#include <dftracer/utils/core/io/thread_pool_backend.h>  // IoRequest, IoOp
7
#include <dftracer/utils/core/pipeline/executor.h>
8
#include <sys/event.h>
9
#include <sys/socket.h>
10
#include <sys/stat.h>
11
#include <sys/time.h>
12
#include <sys/types.h>
13
#include <sys/uio.h>
14
#include <unistd.h>
15

16
#include <cerrno>
17
#include <cstring>
18

19
namespace dftracer::utils::io {
20

21
KqueueThreadPoolBackend::KqueueThreadPoolBackend(Executor& executor,
868✔
22
                                                 std::size_t pool_size,
23
                                                 unsigned batch_threshold)
24
    : executor_(executor), pool_(pool_size, batch_threshold) {}
868!
25

26
KqueueThreadPoolBackend::~KqueueThreadPoolBackend() {
1,302✔
27
    // Ensure cleanup even if stop() was not called.
28
    if (kqueue_fd_ >= 0) {
434!
29
        ::close(kqueue_fd_);
×
30
        kqueue_fd_ = -1;
×
31
    }
×
32
}
1,302✔
33

34
void KqueueThreadPoolBackend::start() {
434✔
35
    pool_.start();
434✔
36

37
    kqueue_fd_ = ::kqueue();
434✔
38
    if (kqueue_fd_ < 0) {
434!
39
        DFTRACER_UTILS_LOG_ERROR("kqueue() failed: %s", std::strerror(errno));
×
40
        return;
×
41
    }
42

43
    // Register a user event (EVFILT_USER) for shutdown signaling.
44
    struct kevent ev{};
434✔
45
    EV_SET(&ev, SHUTDOWN_IDENT, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, nullptr);
434✔
46
    if (::kevent(kqueue_fd_, &ev, 1, nullptr, 0, nullptr) < 0) {
434!
47
        DFTRACER_UTILS_LOG_ERROR("kevent(register EVFILT_USER) failed: %s",
×
48
                                 std::strerror(errno));
49
    }
×
50

51
    DFTRACER_UTILS_LOG_DEBUG("kqueue+threadpool backend started (kqueue_fd=%d)",
52
                             kqueue_fd_);
53

54
    completion_thread_.start([this] { kqueue_loop(); });
868!
55
}
434✔
56

57
void KqueueThreadPoolBackend::stop() {
434✔
58
    // Signal the completion thread to exit.
59
    completion_thread_.signal_stop();
434✔
60

61
    // Wake kevent() by triggering the user event.
62
    if (kqueue_fd_ >= 0) {
434!
63
        struct kevent ev{};
434✔
64
        EV_SET(&ev, SHUTDOWN_IDENT, EVFILT_USER, 0, NOTE_TRIGGER, 0, nullptr);
434✔
65
        ::kevent(kqueue_fd_, &ev, 1, nullptr, 0, nullptr);
434✔
66
    }
434✔
67

68
    completion_thread_.join();
434✔
69
    pool_.stop();
434✔
70

71
    if (kqueue_fd_ >= 0) {
434!
72
        ::close(kqueue_fd_);
434✔
73
        kqueue_fd_ = -1;
434✔
74
    }
434✔
75
}
434✔
76

77
void KqueueThreadPoolBackend::kqueue_loop() {
434✔
78
    constexpr int MAX_EVENTS = 64;
434✔
79
    struct kevent events[MAX_EVENTS];
80

81
    // 100ms timeout for periodic running() check.
82
    struct timespec timeout{};
434✔
83
    timeout.tv_sec = 0;
434✔
84
    timeout.tv_nsec = 100 * 1000 * 1000;  // 100ms
434✔
85

86
    while (completion_thread_.running()) {
1,131✔
87
        int n = ::kevent(kqueue_fd_, nullptr, 0, events, MAX_EVENTS, &timeout);
697✔
88
        if (n < 0) {
697!
89
            if (errno == EINTR) continue;
×
90
            break;
×
91
        }
92

93
        for (int i = 0; i < n; ++i) {
1,131✔
94
            if (events[i].filter == EVFILT_USER &&
434!
95
                events[i].ident == SHUTDOWN_IDENT) {
434✔
96
                // Shutdown signal -- process any other events first.
97
                continue;
434✔
98
            }
99

100
            // Future: dispatch socket I/O events here.
101
            // For now, only the user event is registered.
102
        }
×
103
    }
104
}
434✔
105

106
// -- File I/O: identical to ThreadPoolBackend / EpollThreadPoolBackend --
107

108
static IoAwaitable make_kqueue_request(IoOp op, int fd, void* buf,
1,094✔
109
                                       std::size_t len, off_t offset,
110
                                       const char* path, int flags, mode_t mode,
111
                                       Executor* executor, IoThreadPool* pool) {
112
    auto* req = new IoRequest{};
1,094✔
113
    req->submit = &KqueueThreadPoolBackend::submit_to_pool;
114
    req->op = op;
115
    req->fd = fd;
116
    req->buf = buf;
117
    req->len = len;
118
    req->offset = offset;
119
    req->path = path;
120
    req->flags = flags;
121
    req->mode = mode;
122
    req->executor = executor;
123
    req->pool = pool;
124

125
    IoAwaitable awaitable;
126
    req->awaitable = nullptr;
127
    awaitable.submit_ctx_ = req;
128
    return awaitable;
129
}
130

131
IoAwaitable KqueueThreadPoolBackend::submit_read(int fd, void* buf,
2✔
132
                                                 std::size_t len) {
133
    return make_kqueue_request(IoOp::READ, fd, buf, len, 0, nullptr, 0, 0,
4✔
134
                               &executor_, &pool_);
2✔
135
}
136

137
IoAwaitable KqueueThreadPoolBackend::submit_write(int fd, const void* buf,
30✔
138
                                                  std::size_t len) {
139
    return make_kqueue_request(IoOp::WRITE, fd, const_cast<void*>(buf), len, 0,
60✔
140
                               nullptr, 0, 0, &executor_, &pool_);
30✔
141
}
142

143
IoAwaitable KqueueThreadPoolBackend::submit_pread(int fd, void* buf,
825✔
144
                                                  std::size_t len,
145
                                                  off_t offset) {
146
    return make_kqueue_request(IoOp::PREAD, fd, buf, len, offset, nullptr, 0, 0,
1,650✔
147
                               &executor_, &pool_);
825✔
148
}
149

150
IoAwaitable KqueueThreadPoolBackend::submit_pwrite(int fd, const void* buf,
1✔
151
                                                   std::size_t len,
152
                                                   off_t offset) {
153
    return make_kqueue_request(IoOp::PWRITE, fd, const_cast<void*>(buf), len,
2✔
154
                               offset, nullptr, 0, 0, &executor_, &pool_);
1✔
155
}
156

157
IoAwaitable KqueueThreadPoolBackend::submit_open(const char* path, int flags,
119✔
158
                                                 mode_t mode) {
159
    return make_kqueue_request(IoOp::OPEN, -1, nullptr, 0, 0, path, flags, mode,
238✔
160
                               &executor_, &pool_);
119✔
161
}
162

163
IoAwaitable KqueueThreadPoolBackend::submit_close(int fd) {
113✔
164
    return make_kqueue_request(IoOp::CLOSE, fd, nullptr, 0, 0, nullptr, 0, 0,
226✔
165
                               &executor_, &pool_);
113✔
166
}
167

168
IoAwaitable KqueueThreadPoolBackend::submit_fsync(int fd) {
×
169
    return make_kqueue_request(IoOp::FSYNC, fd, nullptr, 0, 0, nullptr, 0, 0,
×
170
                               &executor_, &pool_);
×
171
}
172

173
IoAwaitable KqueueThreadPoolBackend::submit_ftruncate(int fd, off_t length) {
×
174
    return make_kqueue_request(IoOp::FTRUNCATE, fd, nullptr, 0, length, nullptr,
×
175
                               0, 0, &executor_, &pool_);
×
176
}
177

178
IoAwaitable KqueueThreadPoolBackend::submit_fstat(int fd, struct stat* buf) {
×
179
    auto req_awaitable = make_kqueue_request(IoOp::FSTAT, fd, nullptr, 0, 0,
×
180
                                             nullptr, 0, 0, &executor_, &pool_);
×
181
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
182
    req->stat_buf = buf;
×
183
    return req_awaitable;
×
184
}
185

186
IoAwaitable KqueueThreadPoolBackend::submit_accept(int listen_fd,
×
187
                                                   struct sockaddr* addr,
188
                                                   socklen_t* addrlen) {
189
    auto req_awaitable =
190
        make_kqueue_request(IoOp::ACCEPT, listen_fd, nullptr, 0, 0, nullptr, 0,
×
191
                            0, &executor_, &pool_);
×
192
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
193
    req->addr = addr;
×
194
    req->addrlen = addrlen;
×
195
    return req_awaitable;
×
196
}
197

198
IoAwaitable KqueueThreadPoolBackend::submit_recv(int fd, void* buf,
×
199
                                                 std::size_t len, int flags) {
200
    auto req_awaitable = make_kqueue_request(IoOp::RECV, fd, buf, len, 0,
×
201
                                             nullptr, 0, 0, &executor_, &pool_);
×
202
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
203
    req->msg_flags = flags;
×
204
    return req_awaitable;
×
205
}
206

207
IoAwaitable KqueueThreadPoolBackend::submit_send(int fd, const void* buf,
×
208
                                                 std::size_t len, int flags) {
209
    auto req_awaitable =
210
        make_kqueue_request(IoOp::SEND, fd, const_cast<void*>(buf), len, 0,
×
211
                            nullptr, 0, 0, &executor_, &pool_);
×
212
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
213
    req->msg_flags = flags;
×
214
    return req_awaitable;
×
215
}
216

217
IoAwaitable KqueueThreadPoolBackend::submit_readv(int fd,
×
218
                                                  const struct iovec* iov,
219
                                                  int iovcnt) {
220
    auto req_awaitable = make_kqueue_request(IoOp::READV, fd, nullptr, 0, 0,
×
221
                                             nullptr, 0, 0, &executor_, &pool_);
×
222
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
223
    req->iov = iov;
×
224
    req->iovcnt = iovcnt;
×
225
    return req_awaitable;
×
226
}
227

228
IoAwaitable KqueueThreadPoolBackend::submit_writev(int fd,
1✔
229
                                                   const struct iovec* iov,
230
                                                   int iovcnt) {
231
    auto req_awaitable = make_kqueue_request(IoOp::WRITEV, fd, nullptr, 0, 0,
2✔
232
                                             nullptr, 0, 0, &executor_, &pool_);
1✔
233
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
1✔
234
    req->iov = iov;
1✔
235
    req->iovcnt = iovcnt;
1✔
236
    return req_awaitable;
1✔
237
}
238

239
IoAwaitable KqueueThreadPoolBackend::submit_preadv(int fd,
1✔
240
                                                   const struct iovec* iov,
241
                                                   int iovcnt, off_t offset) {
242
    auto req_awaitable =
243
        make_kqueue_request(IoOp::PREADV, fd, nullptr, 0, offset, nullptr, 0, 0,
2✔
244
                            &executor_, &pool_);
1✔
245
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
1✔
246
    req->iov = iov;
1✔
247
    req->iovcnt = iovcnt;
1✔
248
    return req_awaitable;
1✔
249
}
250

251
IoAwaitable KqueueThreadPoolBackend::submit_pwritev(int fd,
×
252
                                                    const struct iovec* iov,
253
                                                    int iovcnt, off_t offset) {
254
    auto req_awaitable =
255
        make_kqueue_request(IoOp::PWRITEV, fd, nullptr, 0, offset, nullptr, 0,
×
256
                            0, &executor_, &pool_);
×
257
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
258
    req->iov = iov;
×
259
    req->iovcnt = iovcnt;
×
260
    return req_awaitable;
×
261
}
262

263
IoAwaitable KqueueThreadPoolBackend::submit_lseek(int fd, off_t offset,
1✔
264
                                                  int whence) {
265
    auto req_awaitable = make_kqueue_request(
1✔
266
        IoOp::LSEEK, fd, nullptr, 0, offset, nullptr, 0, 0, &executor_, &pool_);
1✔
267
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
1✔
268
    req->whence = whence;
1✔
269
    return req_awaitable;
1✔
270
}
271

272
IoAwaitable KqueueThreadPoolBackend::submit_sendfile(int out_fd, int in_fd,
1✔
273
                                                     off_t offset,
274
                                                     std::size_t count) {
275
    auto req_awaitable =
276
        make_kqueue_request(IoOp::SENDFILE, in_fd, nullptr, count, offset,
2✔
277
                            nullptr, 0, 0, &executor_, &pool_);
1✔
278
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
1✔
279
    req->dest_fd = out_fd;
1✔
280
    return req_awaitable;
1✔
281
}
282

283
void KqueueThreadPoolBackend::submit_to_pool(SubmitContext* ctx,
1,093✔
284
                                             IoAwaitable* awaitable) {
285
    auto* req = static_cast<IoRequest*>(ctx);
1,093✔
286
    req->awaitable = awaitable;
1,093✔
287
    req->pool->submit([req] { execute_request(req); });
2,187!
288
}
1,093✔
289

290
void KqueueThreadPoolBackend::execute_request(IoRequest* req) {
1,094✔
291
    ssize_t result = 0;
1,094✔
292
    switch (req->op) {
1,094!
293
        case IoOp::READ:
294
            result = ::read(req->fd, req->buf, req->len);
2✔
295
            break;
2✔
296
        case IoOp::WRITE:
297
            result = ::write(req->fd, req->buf, req->len);
30✔
298
            break;
30✔
299
        case IoOp::PREAD:
300
            result = ::pread(req->fd, req->buf, req->len, req->offset);
825✔
301
            break;
825✔
302
        case IoOp::PWRITE:
303
            result = ::pwrite(req->fd, req->buf, req->len, req->offset);
1✔
304
            break;
1✔
305
        case IoOp::OPEN:
306
            result = ::open(req->path, req->flags, req->mode);
119✔
307
            break;
119✔
308
        case IoOp::CLOSE:
309
            result = ::close(req->fd);
113✔
310
            break;
113✔
311
        case IoOp::FSYNC:
312
            result = ::fsync(req->fd);
×
313
            break;
×
314
        case IoOp::FTRUNCATE:
315
            result = ::ftruncate(req->fd, req->offset);
×
316
            break;
×
317
        case IoOp::FSTAT:
318
            result = ::fstat(req->fd, req->stat_buf);
×
319
            break;
×
320
        case IoOp::ACCEPT:
321
            result = ::accept(req->fd, req->addr, req->addrlen);
×
322
            if (result >= 0) {
×
323
                int fd = static_cast<int>(result);
×
324
                int fl = ::fcntl(fd, F_GETFL, 0);
×
325
                ::fcntl(fd, F_SETFL, fl | O_NONBLOCK);
×
326
                ::fcntl(fd, F_SETFD, FD_CLOEXEC);
×
327
            }
×
328
            break;
×
329
        case IoOp::RECV:
330
            result = ::recv(req->fd, req->buf, req->len, req->msg_flags);
×
331
            break;
×
332
        case IoOp::SEND:
333
            result = ::send(req->fd, req->buf, req->len, req->msg_flags);
×
334
            break;
×
335
        case IoOp::READV:
336
            result = ::readv(req->fd, req->iov, req->iovcnt);
×
337
            break;
×
338
        case IoOp::WRITEV:
339
            result = ::writev(req->fd, req->iov, req->iovcnt);
1✔
340
            break;
1✔
341
        case IoOp::PREADV:
342
            result = ::preadv(req->fd, req->iov, req->iovcnt, req->offset);
1✔
343
            break;
1✔
344
        case IoOp::PWRITEV:
345
            result = ::pwritev(req->fd, req->iov, req->iovcnt, req->offset);
×
346
            break;
×
347
        case IoOp::LSEEK:
348
            result = ::lseek(req->fd, req->offset, req->whence);
1✔
349
            break;
1✔
350
        case IoOp::SENDFILE: {
351
#ifdef __APPLE__
352
            off_t len = static_cast<off_t>(req->len);
1✔
353
            int ret = ::sendfile(req->fd, req->dest_fd, req->offset, &len,
1✔
354
                                 nullptr, 0);
355
            result = (ret == 0 || errno == EAGAIN) ? len : -1;
1!
356
#else
357
            char tmp[8192];
358
            result = 0;
359
            off_t off = req->offset;
360
            std::size_t remaining = req->len;
361
            while (remaining > 0) {
362
                std::size_t chunk =
363
                    remaining < sizeof(tmp) ? remaining : sizeof(tmp);
364
                ssize_t r = ::pread(req->fd, tmp, chunk, off);
365
                if (r <= 0) {
366
                    if (result == 0) result = r;
367
                    break;
368
                }
369
                ssize_t w =
370
                    ::write(req->dest_fd, tmp, static_cast<std::size_t>(r));
371
                if (w < 0) {
372
                    if (result == 0) result = w;
373
                    break;
374
                }
375
                result += w;
376
                off += w;
377
                remaining -= static_cast<std::size_t>(w);
378
                if (w < r) break;
379
            }
380
#endif
381
            break;
1✔
382
        }
383
    }
384
    if (result < 0) result = -errno;
1,094✔
385

386
    req->awaitable->result_ = result;
1,094✔
387
    req->executor->enqueue(req->awaitable->handle_);
1,094✔
388
    delete req;
1,094!
389
}
1,094✔
390

391
std::size_t KqueueThreadPoolBackend::poll(int /*timeout_ms*/) {
4,427✔
392
    // Completions fire via thread pool callbacks -- nothing to poll.
393
    return 0;
4,427✔
394
}
395

396
int KqueueThreadPoolBackend::flush() { return static_cast<int>(pool_.flush()); }
4,428✔
397

398
}  // namespace dftracer::utils::io
399

400
#endif  // kqueue platforms
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc