• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28423703495

30 Jun 2026 05:59AM UTC coverage: 51.998% (-0.3%) from 52.278%
28423703495

Pull #83

github

web-flow
Merge fb542a938 into 2efed6649
Pull Request #83: refactor and improve code QoL

37282 of 93303 branches covered (39.96%)

Branch coverage included in aggregate %.

801 of 1525 new or added lines in 78 files covered. (52.52%)

98 existing lines in 37 files now uncovered.

33674 of 43157 relevant lines covered (78.03%)

20306.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.07
/src/dftracer/utils/core/io/kqueue_thread_pool_backend.cpp
1
#if defined(__APPLE__) || defined(__FreeBSD__) || defined(__OpenBSD__) || \
2
    defined(__NetBSD__) || defined(__DragonFly__)
3

4
#include <dftracer/utils/core/common/logging.h>
5
#include <dftracer/utils/core/io/io_sendfile.h>
6
#include <dftracer/utils/core/io/kqueue_thread_pool_backend.h>
7
#include <dftracer/utils/core/io/thread_pool_backend.h>  // IoRequest, IoOp
8
#include <dftracer/utils/core/pipeline/executor.h>
9
#include <sys/event.h>
10
#include <sys/socket.h>
11
#include <sys/stat.h>
12
#include <sys/time.h>
13
#include <sys/types.h>
14
#include <sys/uio.h>
15
#include <unistd.h>
16

17
#include <cerrno>
18
#include <cstring>
19

20
namespace dftracer::utils::io {
21

22
KqueueThreadPoolBackend::KqueueThreadPoolBackend(Executor& executor,
1,054✔
23
                                                 std::size_t pool_size,
24
                                                 unsigned batch_threshold)
25
    : executor_(executor), pool_(pool_size, batch_threshold) {}
1,054!
26

27
KqueueThreadPoolBackend::~KqueueThreadPoolBackend() {
1,581✔
28
    // Ensure cleanup even if stop() was not called.
29
    if (kqueue_fd_ >= 0) {
527!
30
        ::close(kqueue_fd_);
×
31
        kqueue_fd_ = -1;
×
32
    }
×
33
}
1,581✔
34

35
void KqueueThreadPoolBackend::start() {
527✔
36
    pool_.start();
527✔
37

38
    kqueue_fd_ = ::kqueue();
527✔
39
    if (kqueue_fd_ < 0) {
527!
40
        DFTRACER_UTILS_LOG_ERROR("kqueue() failed: %s", std::strerror(errno));
×
41
        return;
×
42
    }
43

44
    // Register a user event (EVFILT_USER) for shutdown signaling.
45
    struct kevent ev{};
527✔
46
    EV_SET(&ev, SHUTDOWN_IDENT, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, nullptr);
527✔
47
    if (::kevent(kqueue_fd_, &ev, 1, nullptr, 0, nullptr) < 0) {
527!
48
        DFTRACER_UTILS_LOG_ERROR("kevent(register EVFILT_USER) failed: %s",
×
49
                                 std::strerror(errno));
50
    }
×
51

52
    DFTRACER_UTILS_LOG_DEBUG("kqueue+threadpool backend started (kqueue_fd=%d)",
527!
53
                             kqueue_fd_);
54

55
    completion_thread_.start([this] { kqueue_loop(); });
1,054!
56
}
527✔
57

58
void KqueueThreadPoolBackend::stop() {
527✔
59
    // Signal the completion thread to exit.
60
    completion_thread_.signal_stop();
527✔
61

62
    // Wake kevent() by triggering the user event.
63
    if (kqueue_fd_ >= 0) {
527!
64
        struct kevent ev{};
527✔
65
        EV_SET(&ev, SHUTDOWN_IDENT, EVFILT_USER, 0, NOTE_TRIGGER, 0, nullptr);
527✔
66
        ::kevent(kqueue_fd_, &ev, 1, nullptr, 0, nullptr);
527✔
67
    }
527✔
68

69
    completion_thread_.join();
527✔
70
    pool_.stop();
527✔
71

72
    if (kqueue_fd_ >= 0) {
527!
73
        ::close(kqueue_fd_);
527✔
74
        kqueue_fd_ = -1;
527✔
75
    }
527✔
76
}
527✔
77

78
void KqueueThreadPoolBackend::kqueue_loop() {
527✔
79
    constexpr int MAX_EVENTS = 64;
527✔
80
    struct kevent events[MAX_EVENTS];
81

82
    // 100ms timeout for periodic running() check.
83
    struct timespec timeout{};
527✔
84
    timeout.tv_sec = 0;
527✔
85
    timeout.tv_nsec = 100 * 1000 * 1000;  // 100ms
527✔
86

87
    while (completion_thread_.running()) {
1,735✔
88
        int n = ::kevent(kqueue_fd_, nullptr, 0, events, MAX_EVENTS, &timeout);
1,208✔
89
        if (n < 0) {
1,208!
90
            if (errno == EINTR) continue;
×
91
            break;
×
92
        }
93

94
        for (int i = 0; i < n; ++i) {
1,735✔
95
            if (events[i].filter == EVFILT_USER &&
527!
96
                events[i].ident == SHUTDOWN_IDENT) {
527✔
97
                // Shutdown signal -- process any other events first.
98
                continue;
527✔
99
            }
100

101
            // Future: dispatch socket I/O events here.
102
            // For now, only the user event is registered.
103
        }
×
104
    }
105
}
527✔
106

107
// File I/O reuses ThreadPoolBackend's shared request plumbing
108
// (make_request / submit_to_pool / execute_request): the syscall dispatch is
109
// identical and platform-aware via #ifdef.
110

111
IoAwaitable KqueueThreadPoolBackend::submit_read(int fd, void* buf,
626✔
112
                                                 std::size_t len) {
113
    return ThreadPoolBackend::make_request(IoOp::READ, fd, buf, len, 0, nullptr,
1,252✔
114
                                           0, 0, &executor_, &pool_);
626✔
115
}
116

117
IoAwaitable KqueueThreadPoolBackend::submit_write(int fd, const void* buf,
648✔
118
                                                  std::size_t len) {
119
    return ThreadPoolBackend::make_request(IoOp::WRITE, fd,
1,296✔
120
                                           const_cast<void*>(buf), len, 0,
648✔
121
                                           nullptr, 0, 0, &executor_, &pool_);
648✔
122
}
123

124
IoAwaitable KqueueThreadPoolBackend::submit_pread(int fd, void* buf,
1,158✔
125
                                                  std::size_t len,
126
                                                  off_t offset) {
127
    return ThreadPoolBackend::make_request(IoOp::PREAD, fd, buf, len, offset,
2,316✔
128
                                           nullptr, 0, 0, &executor_, &pool_);
1,158✔
129
}
130

131
void KqueueThreadPoolBackend::submit_pread_callback(int fd, void* buf,
×
132
                                                    std::size_t len,
133
                                                    off_t offset,
134
                                                    IoCompletionFn completion,
135
                                                    void* context) {
136
    auto* req = new IoRequest{};
×
137
    req->op = IoOp::PREAD;
×
138
    req->fd = fd;
×
139
    req->buf = buf;
×
140
    req->len = len;
×
141
    req->offset = offset;
×
142
    req->completion = completion;
×
143
    req->completion_ctx = context;
×
144
    req->pool = &pool_;
×
NEW
145
    pool_.submit([req] { ThreadPoolBackend::execute_request(req); });
×
146
}
×
147

148
IoAwaitable KqueueThreadPoolBackend::submit_pwrite(int fd, const void* buf,
176✔
149
                                                   std::size_t len,
150
                                                   off_t offset) {
151
    return ThreadPoolBackend::make_request(IoOp::PWRITE, fd,
352✔
152
                                           const_cast<void*>(buf), len, offset,
176✔
153
                                           nullptr, 0, 0, &executor_, &pool_);
176✔
154
}
155

156
IoAwaitable KqueueThreadPoolBackend::submit_open(const char* path, int flags,
906✔
157
                                                 mode_t mode) {
158
    return ThreadPoolBackend::make_request(IoOp::OPEN, -1, nullptr, 0, 0, path,
1,812✔
159
                                           flags, mode, &executor_, &pool_);
906✔
160
}
161

162
IoAwaitable KqueueThreadPoolBackend::submit_close(int fd) {
715✔
163
    return ThreadPoolBackend::make_request(IoOp::CLOSE, fd, nullptr, 0, 0,
1,430✔
164
                                           nullptr, 0, 0, &executor_, &pool_);
715✔
165
}
166

167
IoAwaitable KqueueThreadPoolBackend::submit_fsync(int fd) {
56✔
168
    return ThreadPoolBackend::make_request(IoOp::FSYNC, fd, nullptr, 0, 0,
112✔
169
                                           nullptr, 0, 0, &executor_, &pool_);
56✔
170
}
171

172
IoAwaitable KqueueThreadPoolBackend::submit_ftruncate(int fd, off_t length) {
×
NEW
173
    return ThreadPoolBackend::make_request(IoOp::FTRUNCATE, fd, nullptr, 0,
×
NEW
174
                                           length, nullptr, 0, 0, &executor_,
×
NEW
175
                                           &pool_);
×
176
}
177

178
IoAwaitable KqueueThreadPoolBackend::submit_fstat(int fd, struct stat* buf) {
×
NEW
179
    auto req_awaitable = ThreadPoolBackend::make_request(
×
NEW
180
        IoOp::FSTAT, fd, nullptr, 0, 0, nullptr, 0, 0, &executor_, &pool_);
×
181
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
182
    req->stat_buf = buf;
×
183
    return req_awaitable;
×
184
}
185

186
IoAwaitable KqueueThreadPoolBackend::submit_accept(int listen_fd,
×
187
                                                   struct sockaddr* addr,
188
                                                   socklen_t* addrlen) {
189
    auto req_awaitable =
NEW
190
        ThreadPoolBackend::make_request(IoOp::ACCEPT, listen_fd, nullptr, 0, 0,
×
NEW
191
                                        nullptr, 0, 0, &executor_, &pool_);
×
192
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
193
    req->addr = addr;
×
194
    req->addrlen = addrlen;
×
195
    return req_awaitable;
×
196
}
197

198
IoAwaitable KqueueThreadPoolBackend::submit_recv(int fd, void* buf,
×
199
                                                 std::size_t len, int flags) {
NEW
200
    auto req_awaitable = ThreadPoolBackend::make_request(
×
NEW
201
        IoOp::RECV, fd, buf, len, 0, nullptr, 0, 0, &executor_, &pool_);
×
202
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
203
    req->msg_flags = flags;
×
204
    return req_awaitable;
×
205
}
206

207
IoAwaitable KqueueThreadPoolBackend::submit_send(int fd, const void* buf,
×
208
                                                 std::size_t len, int flags) {
NEW
209
    auto req_awaitable = ThreadPoolBackend::make_request(
×
NEW
210
        IoOp::SEND, fd, const_cast<void*>(buf), len, 0, nullptr, 0, 0,
×
NEW
211
        &executor_, &pool_);
×
212
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
213
    req->msg_flags = flags;
×
214
    return req_awaitable;
×
215
}
216

217
IoAwaitable KqueueThreadPoolBackend::submit_readv(int fd,
×
218
                                                  const struct iovec* iov,
219
                                                  int iovcnt) {
NEW
220
    auto req_awaitable = ThreadPoolBackend::make_request(
×
NEW
221
        IoOp::READV, fd, nullptr, 0, 0, nullptr, 0, 0, &executor_, &pool_);
×
222
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
223
    req->iov = iov;
×
224
    req->iovcnt = iovcnt;
×
225
    return req_awaitable;
×
226
}
227

228
IoAwaitable KqueueThreadPoolBackend::submit_writev(int fd,
1✔
229
                                                   const struct iovec* iov,
230
                                                   int iovcnt) {
231
    auto req_awaitable = ThreadPoolBackend::make_request(
1✔
232
        IoOp::WRITEV, fd, nullptr, 0, 0, nullptr, 0, 0, &executor_, &pool_);
1✔
233
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
1✔
234
    req->iov = iov;
1✔
235
    req->iovcnt = iovcnt;
1✔
236
    return req_awaitable;
1✔
237
}
238

239
IoAwaitable KqueueThreadPoolBackend::submit_preadv(int fd,
1✔
240
                                                   const struct iovec* iov,
241
                                                   int iovcnt, off_t offset) {
242
    auto req_awaitable =
243
        ThreadPoolBackend::make_request(IoOp::PREADV, fd, nullptr, 0, offset,
2✔
244
                                        nullptr, 0, 0, &executor_, &pool_);
1✔
245
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
1✔
246
    req->iov = iov;
1✔
247
    req->iovcnt = iovcnt;
1✔
248
    return req_awaitable;
1✔
249
}
250

251
IoAwaitable KqueueThreadPoolBackend::submit_pwritev(int fd,
71✔
252
                                                    const struct iovec* iov,
253
                                                    int iovcnt, off_t offset) {
254
    auto req_awaitable =
255
        ThreadPoolBackend::make_request(IoOp::PWRITEV, fd, nullptr, 0, offset,
142✔
256
                                        nullptr, 0, 0, &executor_, &pool_);
71✔
257
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
71✔
258
    req->iov = iov;
71✔
259
    req->iovcnt = iovcnt;
71✔
260
    return req_awaitable;
71✔
261
}
262

263
IoAwaitable KqueueThreadPoolBackend::submit_lseek(int fd, off_t offset,
1✔
264
                                                  int whence) {
265
    auto req_awaitable = ThreadPoolBackend::make_request(
1✔
266
        IoOp::LSEEK, fd, nullptr, 0, offset, nullptr, 0, 0, &executor_, &pool_);
1✔
267
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
1✔
268
    req->whence = whence;
1✔
269
    return req_awaitable;
1✔
270
}
271

272
IoAwaitable KqueueThreadPoolBackend::submit_sendfile(int out_fd, int in_fd,
1✔
273
                                                     off_t offset,
274
                                                     std::size_t count) {
275
    auto req_awaitable = ThreadPoolBackend::make_request(
1✔
276
        IoOp::SENDFILE, in_fd, nullptr, count, offset, nullptr, 0, 0,
1✔
277
        &executor_, &pool_);
1✔
278
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
1✔
279
    req->dest_fd = out_fd;
1✔
280
    return req_awaitable;
1✔
281
}
282

283
std::size_t KqueueThreadPoolBackend::poll(int /*timeout_ms*/) {
20,296✔
284
    // Completions fire via thread pool callbacks -- nothing to poll.
285
    return 0;
20,296✔
286
}
287

288
int KqueueThreadPoolBackend::flush() { return static_cast<int>(pool_.flush()); }
20,261✔
289

290
}  // namespace dftracer::utils::io
291

292
#endif  // kqueue platforms
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc