• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28496595030

01 Jul 2026 05:50AM UTC coverage: 50.727% (-1.6%) from 52.278%
28496595030

Pull #83

github

web-flow
Merge 8f1ff4df5 into 2efed6649
Pull Request #83: refactor and improve code QoL

31872 of 80367 branches covered (39.66%)

Branch coverage included in aggregate %.

770 of 1591 new or added lines in 85 files covered. (48.4%)

5070 existing lines in 182 files now uncovered.

32742 of 47009 relevant lines covered (69.65%)

9887.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.94
/src/dftracer/utils/core/io/thread_pool_backend.cpp
1
#include <dftracer/utils/core/io/io_sendfile.h>
2
#include <dftracer/utils/core/io/thread_pool_backend.h>
3
#include <dftracer/utils/core/pipeline/executor.h>
4
#include <fcntl.h>
5
#include <sys/socket.h>
6
#include <sys/stat.h>
7
#include <sys/uio.h>
8
#include <unistd.h>
9

10
#include <cerrno>
11

12
namespace dftracer::utils::io {
13

14
ThreadPoolBackend::ThreadPoolBackend(Executor& executor, std::size_t pool_size,
20✔
15
                                     unsigned batch_threshold)
16
    : executor_(executor), pool_(pool_size, batch_threshold) {}
20!
17

18
void ThreadPoolBackend::start() { pool_.start(); }
10✔
19
void ThreadPoolBackend::stop() { pool_.stop(); }
10✔
20

21
IoAwaitable ThreadPoolBackend::make_request(IoOp op, int fd, void* buf,
4,474✔
22
                                            std::size_t len, off_t offset,
23
                                            const char* path, int flags,
24
                                            mode_t mode, Executor* executor,
25
                                            IoThreadPool* pool) {
26
    auto* req = new IoRequest{};
4,474✔
27
    req->submit = &ThreadPoolBackend::submit_to_pool;
28
    req->op = op;
29
    req->fd = fd;
30
    req->buf = buf;
31
    req->len = len;
32
    req->offset = offset;
33
    req->path = path;
34
    req->flags = flags;
35
    req->mode = mode;
36
    req->executor = executor;
37
    req->pool = pool;
38

39
    IoAwaitable awaitable;
40
    // req->awaitable will be updated in submit_to_pool with the
41
    // stable address from await_suspend (the IoAwaitable may move
42
    // before the coroutine frame is allocated).
43
    req->awaitable = nullptr;
44
    awaitable.submit_ctx_ = req;
45
    return awaitable;
46
}
47

48
IoAwaitable ThreadPoolBackend::submit_read(int fd, void* buf, std::size_t len) {
×
49
    return make_request(IoOp::READ, fd, buf, len, 0, nullptr, 0, 0, &executor_,
×
50
                        &pool_);
×
51
}
52

53
IoAwaitable ThreadPoolBackend::submit_write(int fd, const void* buf,
1✔
54
                                            std::size_t len) {
55
    return make_request(IoOp::WRITE, fd, const_cast<void*>(buf), len, 0,
2✔
56
                        nullptr, 0, 0, &executor_, &pool_);
1✔
57
}
58

59
IoAwaitable ThreadPoolBackend::submit_pread(int fd, void* buf, std::size_t len,
27✔
60
                                            off_t offset) {
61
    return make_request(IoOp::PREAD, fd, buf, len, offset, nullptr, 0, 0,
54✔
62
                        &executor_, &pool_);
27✔
63
}
64

65
void ThreadPoolBackend::submit_pread_callback(int fd, void* buf,
×
66
                                              std::size_t len, off_t offset,
67
                                              IoCompletionFn completion,
68
                                              void* context) {
69
    auto* req = new IoRequest{};
×
70
    req->op = IoOp::PREAD;
71
    req->fd = fd;
72
    req->buf = buf;
73
    req->len = len;
74
    req->offset = offset;
75
    req->completion = completion;
76
    req->completion_ctx = context;
77
    req->pool = &pool_;
78
    pool_.submit([req] { execute_request(req); });
×
79
}
80

81
IoAwaitable ThreadPoolBackend::submit_pwrite(int fd, const void* buf,
×
82
                                             std::size_t len, off_t offset) {
83
    return make_request(IoOp::PWRITE, fd, const_cast<void*>(buf), len, offset,
×
84
                        nullptr, 0, 0, &executor_, &pool_);
×
85
}
86

87
IoAwaitable ThreadPoolBackend::submit_open(const char* path, int flags,
1✔
88
                                           mode_t mode) {
89
    return make_request(IoOp::OPEN, -1, nullptr, 0, 0, path, flags, mode,
2✔
90
                        &executor_, &pool_);
1✔
91
}
92

93
IoAwaitable ThreadPoolBackend::submit_close(int fd) {
22✔
94
    return make_request(IoOp::CLOSE, fd, nullptr, 0, 0, nullptr, 0, 0,
44✔
95
                        &executor_, &pool_);
22✔
96
}
97

98
IoAwaitable ThreadPoolBackend::submit_fsync(int fd) {
×
99
    return make_request(IoOp::FSYNC, fd, nullptr, 0, 0, nullptr, 0, 0,
×
100
                        &executor_, &pool_);
×
101
}
102

103
IoAwaitable ThreadPoolBackend::submit_ftruncate(int fd, off_t length) {
×
104
    return make_request(IoOp::FTRUNCATE, fd, nullptr, 0, length, nullptr, 0, 0,
×
105
                        &executor_, &pool_);
×
106
}
107

108
IoAwaitable ThreadPoolBackend::submit_fstat(int fd, struct stat* buf) {
×
109
    auto req_awaitable = make_request(IoOp::FSTAT, fd, nullptr, 0, 0, nullptr,
×
110
                                      0, 0, &executor_, &pool_);
×
111
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
112
    req->stat_buf = buf;
×
113
    return req_awaitable;
×
114
}
115

116
IoAwaitable ThreadPoolBackend::submit_accept(int listen_fd,
22✔
117
                                             struct sockaddr* addr,
118
                                             socklen_t* addrlen) {
119
    auto req_awaitable = make_request(IoOp::ACCEPT, listen_fd, nullptr, 0, 0,
44✔
120
                                      nullptr, 0, 0, &executor_, &pool_);
22✔
121
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
22✔
122
    req->addr = addr;
22✔
123
    req->addrlen = addrlen;
22✔
124
    return req_awaitable;
22✔
125
}
126

127
IoAwaitable ThreadPoolBackend::submit_recv(int fd, void* buf, std::size_t len,
21✔
128
                                           int flags) {
129
    auto req_awaitable = make_request(IoOp::RECV, fd, buf, len, 0, nullptr, 0,
42✔
130
                                      0, &executor_, &pool_);
21✔
131
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
21✔
132
    req->msg_flags = flags;
21✔
133
    return req_awaitable;
21✔
134
}
135

136
IoAwaitable ThreadPoolBackend::submit_send(int fd, const void* buf,
20✔
137
                                           std::size_t len, int flags) {
138
    auto req_awaitable =
139
        make_request(IoOp::SEND, fd, const_cast<void*>(buf), len, 0, nullptr, 0,
40✔
140
                     0, &executor_, &pool_);
20✔
141
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
20✔
142
    req->msg_flags = flags;
20✔
143
    return req_awaitable;
20✔
144
}
145

146
IoAwaitable ThreadPoolBackend::submit_readv(int fd, const struct iovec* iov,
×
147
                                            int iovcnt) {
148
    auto req_awaitable = make_request(IoOp::READV, fd, nullptr, 0, 0, nullptr,
×
149
                                      0, 0, &executor_, &pool_);
×
150
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
151
    req->iov = iov;
×
152
    req->iovcnt = iovcnt;
×
153
    return req_awaitable;
×
154
}
155

156
IoAwaitable ThreadPoolBackend::submit_writev(int fd, const struct iovec* iov,
2✔
157
                                             int iovcnt) {
158
    auto req_awaitable = make_request(IoOp::WRITEV, fd, nullptr, 0, 0, nullptr,
4✔
159
                                      0, 0, &executor_, &pool_);
2✔
160
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
2✔
161
    req->iov = iov;
2✔
162
    req->iovcnt = iovcnt;
2✔
163
    return req_awaitable;
2✔
164
}
165

166
IoAwaitable ThreadPoolBackend::submit_preadv(int fd, const struct iovec* iov,
×
167
                                             int iovcnt, off_t offset) {
168
    auto req_awaitable = make_request(IoOp::PREADV, fd, nullptr, 0, offset,
×
169
                                      nullptr, 0, 0, &executor_, &pool_);
×
170
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
171
    req->iov = iov;
×
172
    req->iovcnt = iovcnt;
×
173
    return req_awaitable;
×
174
}
175

176
IoAwaitable ThreadPoolBackend::submit_pwritev(int fd, const struct iovec* iov,
×
177
                                              int iovcnt, off_t offset) {
178
    auto req_awaitable = make_request(IoOp::PWRITEV, fd, nullptr, 0, offset,
×
179
                                      nullptr, 0, 0, &executor_, &pool_);
×
180
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
181
    req->iov = iov;
×
182
    req->iovcnt = iovcnt;
×
183
    return req_awaitable;
×
184
}
185

186
IoAwaitable ThreadPoolBackend::submit_lseek(int fd, off_t offset, int whence) {
×
187
    auto req_awaitable = make_request(IoOp::LSEEK, fd, nullptr, 0, offset,
×
188
                                      nullptr, 0, 0, &executor_, &pool_);
×
189
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
190
    req->whence = whence;
×
191
    return req_awaitable;
×
192
}
193

194
IoAwaitable ThreadPoolBackend::submit_sendfile(int out_fd, int in_fd,
×
195
                                               off_t offset,
196
                                               std::size_t count) {
197
    auto req_awaitable =
198
        make_request(IoOp::SENDFILE, in_fd, nullptr, count, offset, nullptr, 0,
×
199
                     0, &executor_, &pool_);
×
200
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
201
    req->dest_fd = out_fd;
×
202
    return req_awaitable;
×
203
}
204

205
void ThreadPoolBackend::submit_to_pool(SubmitContext* ctx,
4,475✔
206
                                       IoAwaitable* awaitable) {
207
    auto* req = static_cast<IoRequest*>(ctx);
4,475✔
208
    // Update the awaitable pointer -- await_suspend passes the real,
209
    // stable address of the IoAwaitable in the coroutine frame.
210
    req->awaitable = awaitable;
4,475✔
211
    req->pool->submit([req] { execute_request(req); });
8,951!
212
}
4,475✔
213

214
void ThreadPoolBackend::execute_request(IoRequest* req) {
4,476✔
215
    ssize_t result = 0;
4,476✔
216
    switch (req->op) {
4,476!
217
        case IoOp::READ:
218
            result = ::read(req->fd, req->buf, req->len);
626✔
219
            break;
626✔
220
        case IoOp::WRITE:
221
            result = ::write(req->fd, req->buf, req->len);
649✔
222
            break;
649✔
223
        case IoOp::PREAD:
224
            result = ::pread(req->fd, req->buf, req->len, req->offset);
1,185✔
225
            break;
1,185✔
226
        case IoOp::PWRITE:
227
            result = ::pwrite(req->fd, req->buf, req->len, req->offset);
176✔
228
            break;
176✔
229
        case IoOp::OPEN:
230
            result = ::open(req->path, req->flags, req->mode);
906✔
231
            break;
906✔
232
        case IoOp::CLOSE:
233
            result = ::close(req->fd);
737✔
234
            break;
737✔
235
        case IoOp::FSYNC:
236
            result = ::fsync(req->fd);
56✔
237
            break;
56✔
238
        case IoOp::FTRUNCATE:
239
            result = ::ftruncate(req->fd, req->offset);
×
240
            break;
×
241
        case IoOp::FSTAT:
242
            result = ::fstat(req->fd, req->stat_buf);
×
243
            break;
×
244
        case IoOp::ACCEPT:
245
#ifdef __linux__
246
            result = ::accept4(req->fd, req->addr, req->addrlen,
247
                               SOCK_NONBLOCK | SOCK_CLOEXEC);
248
#else
249
            result = ::accept(req->fd, req->addr, req->addrlen);
22✔
250
            if (result >= 0) {
22✔
251
                int fd = static_cast<int>(result);
21✔
252
                int fl = ::fcntl(fd, F_GETFL, 0);
21✔
253
                ::fcntl(fd, F_SETFL, fl | O_NONBLOCK);
21✔
254
                ::fcntl(fd, F_SETFD, FD_CLOEXEC);
21✔
255
            }
21✔
256
#endif
257
            break;
22✔
258
        case IoOp::RECV:
259
            result = ::recv(req->fd, req->buf, req->len, req->msg_flags);
21✔
260
            break;
21✔
261
        case IoOp::SEND:
262
            result = ::send(req->fd, req->buf, req->len, req->msg_flags);
20✔
263
            break;
20✔
264
        case IoOp::READV:
265
            result = ::readv(req->fd, req->iov, req->iovcnt);
×
266
            break;
×
267
        case IoOp::WRITEV:
268
            result = ::writev(req->fd, req->iov, req->iovcnt);
3✔
269
            break;
3✔
270
        case IoOp::PREADV:
271
            result = ::preadv(req->fd, req->iov, req->iovcnt, req->offset);
1✔
272
            break;
1✔
273
        case IoOp::PWRITEV:
274
            result = ::pwritev(req->fd, req->iov, req->iovcnt, req->offset);
71✔
275
            break;
71✔
276
        case IoOp::LSEEK:
277
            result = ::lseek(req->fd, req->offset, req->whence);
1✔
278
            break;
1✔
279
        case IoOp::SENDFILE:
280
            result =
1✔
281
                platform_sendfile(req->dest_fd, req->fd, req->offset, req->len);
1✔
282
            break;
1✔
283
    }
284
    if (result < 0) result = -errno;
4,476✔
285

286
    if (req->awaitable != nullptr) {
4,476!
287
        req->awaitable->result_ = result;
4,476✔
288
        req->executor->enqueue(req->awaitable->handle_);
4,476✔
289
    } else if (req->completion != nullptr) {
4,476!
290
        req->completion(req->completion_ctx, result);
×
UNCOV
291
    }
×
292
    delete req;
4,476!
293
}
4,476✔
294

295
std::size_t ThreadPoolBackend::poll(int /*timeout_ms*/) {
483✔
296
    return 0;  // Thread pool backend: completions fire via callbacks
483✔
297
}
298

299
int ThreadPoolBackend::flush() { return static_cast<int>(pool_.flush()); }
483✔
300

301
std::string ThreadPoolBackend::name() const { return "threadpool"; }
1✔
302

303
}  // namespace dftracer::utils::io
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc