• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 23529483807

25 Mar 2026 07:17AM UTC coverage: 48.515% (-1.6%) from 50.098%
23529483807

Pull #57

github

web-flow
Merge 5b1e117ad into 38f9f3616
Pull Request #57: feat(comparator): add pairwise traces comparator

18829 of 49412 branches covered (38.11%)

Branch coverage included in aggregate %.

1584 of 1933 new or added lines in 14 files covered. (81.95%)

3552 existing lines in 135 files now uncovered.

18474 of 27477 relevant lines covered (67.23%)

241072.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.56
/src/dftracer/utils/core/io/thread_pool_backend.cpp
1
#include "thread_pool_backend.h"
2

3
#include <dftracer/utils/core/pipeline/executor.h>
4
#include <fcntl.h>
5
#include <sys/socket.h>
6
#include <sys/stat.h>
7
#include <sys/uio.h>
8
#ifdef __linux__
9
#include <sys/sendfile.h>
10
#endif
11
#include <unistd.h>
12

13
#include <cerrno>
14

15
namespace dftracer::utils::io {
16

17
ThreadPoolBackend::ThreadPoolBackend(Executor& executor, std::size_t pool_size,
8✔
18
                                     unsigned batch_threshold)
19
    : executor_(executor), pool_(pool_size, batch_threshold) {}
8!
20

21
void ThreadPoolBackend::start() { pool_.start(); }
4✔
22
void ThreadPoolBackend::stop() { pool_.stop(); }
4✔
23

24
static IoAwaitable make_request(IoOp op, int fd, void* buf, std::size_t len,
106✔
25
                                off_t offset, const char* path, int flags,
26
                                mode_t mode, Executor* executor,
27
                                IoThreadPool* pool) {
28
    auto* req = new IoRequest{};
106✔
29
    req->submit = &ThreadPoolBackend::submit_to_pool;
30
    req->op = op;
31
    req->fd = fd;
32
    req->buf = buf;
33
    req->len = len;
34
    req->offset = offset;
35
    req->path = path;
36
    req->flags = flags;
37
    req->mode = mode;
38
    req->executor = executor;
39
    req->pool = pool;
40

41
    IoAwaitable awaitable;
42
    // req->awaitable will be updated in submit_to_pool with the
43
    // stable address from await_suspend (the IoAwaitable may move
44
    // before the coroutine frame is allocated).
45
    req->awaitable = nullptr;
46
    awaitable.submit_ctx_ = req;
47
    return awaitable;
48
}
49

50
IoAwaitable ThreadPoolBackend::submit_read(int fd, void* buf, std::size_t len) {
×
51
    return make_request(IoOp::READ, fd, buf, len, 0, nullptr, 0, 0, &executor_,
×
52
                        &pool_);
×
53
}
54

55
IoAwaitable ThreadPoolBackend::submit_write(int fd, const void* buf,
1✔
56
                                            std::size_t len) {
57
    return make_request(IoOp::WRITE, fd, const_cast<void*>(buf), len, 0,
2✔
58
                        nullptr, 0, 0, &executor_, &pool_);
1✔
59
}
60

61
IoAwaitable ThreadPoolBackend::submit_pread(int fd, void* buf, std::size_t len,
16✔
62
                                            off_t offset) {
63
    return make_request(IoOp::PREAD, fd, buf, len, offset, nullptr, 0, 0,
32✔
64
                        &executor_, &pool_);
16✔
65
}
66

67
IoAwaitable ThreadPoolBackend::submit_pwrite(int fd, const void* buf,
×
68
                                             std::size_t len, off_t offset) {
69
    return make_request(IoOp::PWRITE, fd, const_cast<void*>(buf), len, offset,
×
70
                        nullptr, 0, 0, &executor_, &pool_);
×
71
}
72

73
IoAwaitable ThreadPoolBackend::submit_open(const char* path, int flags,
9✔
74
                                           mode_t mode) {
75
    return make_request(IoOp::OPEN, -1, nullptr, 0, 0, path, flags, mode,
18✔
76
                        &executor_, &pool_);
9✔
77
}
78

79
IoAwaitable ThreadPoolBackend::submit_close(int fd) {
26✔
80
    return make_request(IoOp::CLOSE, fd, nullptr, 0, 0, nullptr, 0, 0,
52✔
81
                        &executor_, &pool_);
26✔
82
}
83

84
IoAwaitable ThreadPoolBackend::submit_fsync(int fd) {
×
85
    return make_request(IoOp::FSYNC, fd, nullptr, 0, 0, nullptr, 0, 0,
×
86
                        &executor_, &pool_);
×
87
}
88

89
IoAwaitable ThreadPoolBackend::submit_ftruncate(int fd, off_t length) {
×
90
    return make_request(IoOp::FTRUNCATE, fd, nullptr, 0, length, nullptr, 0, 0,
×
91
                        &executor_, &pool_);
×
92
}
93

94
IoAwaitable ThreadPoolBackend::submit_fstat(int fd, struct stat* buf) {
×
95
    auto req_awaitable = make_request(IoOp::FSTAT, fd, nullptr, 0, 0, nullptr,
×
96
                                      0, 0, &executor_, &pool_);
×
97
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
98
    req->stat_buf = buf;
×
99
    return req_awaitable;
×
100
}
101

102
IoAwaitable ThreadPoolBackend::submit_accept(int listen_fd,
20✔
103
                                             struct sockaddr* addr,
104
                                             socklen_t* addrlen) {
105
    auto req_awaitable = make_request(IoOp::ACCEPT, listen_fd, nullptr, 0, 0,
40✔
106
                                      nullptr, 0, 0, &executor_, &pool_);
20✔
107
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
20✔
108
    req->addr = addr;
20✔
109
    req->addrlen = addrlen;
20✔
110
    return req_awaitable;
20✔
111
}
112

113
IoAwaitable ThreadPoolBackend::submit_recv(int fd, void* buf, std::size_t len,
18✔
114
                                           int flags) {
115
    auto req_awaitable = make_request(IoOp::RECV, fd, buf, len, 0, nullptr, 0,
36✔
116
                                      0, &executor_, &pool_);
18✔
117
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
18✔
118
    req->msg_flags = flags;
18✔
119
    return req_awaitable;
18✔
120
}
121

122
IoAwaitable ThreadPoolBackend::submit_send(int fd, const void* buf,
16✔
123
                                           std::size_t len, int flags) {
124
    auto req_awaitable =
125
        make_request(IoOp::SEND, fd, const_cast<void*>(buf), len, 0, nullptr, 0,
32✔
126
                     0, &executor_, &pool_);
16✔
127
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
16✔
128
    req->msg_flags = flags;
16✔
129
    return req_awaitable;
16✔
130
}
131

132
IoAwaitable ThreadPoolBackend::submit_readv(int fd, const struct iovec* iov,
×
133
                                            int iovcnt) {
134
    auto req_awaitable = make_request(IoOp::READV, fd, nullptr, 0, 0, nullptr,
×
135
                                      0, 0, &executor_, &pool_);
×
136
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
137
    req->iov = iov;
×
138
    req->iovcnt = iovcnt;
×
139
    return req_awaitable;
×
140
}
141

142
IoAwaitable ThreadPoolBackend::submit_writev(int fd, const struct iovec* iov,
×
143
                                             int iovcnt) {
144
    auto req_awaitable = make_request(IoOp::WRITEV, fd, nullptr, 0, 0, nullptr,
×
145
                                      0, 0, &executor_, &pool_);
×
146
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
147
    req->iov = iov;
×
148
    req->iovcnt = iovcnt;
×
149
    return req_awaitable;
×
150
}
151

152
IoAwaitable ThreadPoolBackend::submit_preadv(int fd, const struct iovec* iov,
×
153
                                             int iovcnt, off_t offset) {
154
    auto req_awaitable = make_request(IoOp::PREADV, fd, nullptr, 0, offset,
×
155
                                      nullptr, 0, 0, &executor_, &pool_);
×
156
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
157
    req->iov = iov;
×
158
    req->iovcnt = iovcnt;
×
159
    return req_awaitable;
×
160
}
161

162
IoAwaitable ThreadPoolBackend::submit_pwritev(int fd, const struct iovec* iov,
×
163
                                              int iovcnt, off_t offset) {
164
    auto req_awaitable = make_request(IoOp::PWRITEV, fd, nullptr, 0, offset,
×
165
                                      nullptr, 0, 0, &executor_, &pool_);
×
166
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
167
    req->iov = iov;
×
168
    req->iovcnt = iovcnt;
×
169
    return req_awaitable;
×
170
}
171

172
IoAwaitable ThreadPoolBackend::submit_lseek(int fd, off_t offset, int whence) {
×
173
    auto req_awaitable = make_request(IoOp::LSEEK, fd, nullptr, 0, offset,
×
174
                                      nullptr, 0, 0, &executor_, &pool_);
×
175
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
176
    req->whence = whence;
×
177
    return req_awaitable;
×
178
}
179

180
IoAwaitable ThreadPoolBackend::submit_sendfile(int out_fd, int in_fd,
×
181
                                               off_t offset,
182
                                               std::size_t count) {
183
    auto req_awaitable =
184
        make_request(IoOp::SENDFILE, in_fd, nullptr, count, offset, nullptr, 0,
×
185
                     0, &executor_, &pool_);
×
186
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
187
    req->dest_fd = out_fd;
×
188
    return req_awaitable;
×
189
}
190

191
void ThreadPoolBackend::submit_to_pool(SubmitContext* ctx,
106✔
192
                                       IoAwaitable* awaitable) {
193
    auto* req = static_cast<IoRequest*>(ctx);
106✔
194
    // Update the awaitable pointer -- await_suspend passes the real,
195
    // stable address of the IoAwaitable in the coroutine frame.
196
    req->awaitable = awaitable;
106✔
197
    req->pool->submit([req] { execute_request(req); });
212!
198
}
106✔
199

200
void ThreadPoolBackend::execute_request(IoRequest* req) {
106✔
201
    ssize_t result = 0;
106✔
202
    switch (req->op) {
106!
203
        case IoOp::READ:
204
            result = ::read(req->fd, req->buf, req->len);
×
205
            break;
×
206
        case IoOp::WRITE:
207
            result = ::write(req->fd, req->buf, req->len);
1✔
208
            break;
1✔
209
        case IoOp::PREAD:
210
            result = ::pread(req->fd, req->buf, req->len, req->offset);
16✔
211
            break;
16✔
212
        case IoOp::PWRITE:
213
            result = ::pwrite(req->fd, req->buf, req->len, req->offset);
×
214
            break;
×
215
        case IoOp::OPEN:
216
            result = ::open(req->path, req->flags, req->mode);
9✔
217
            break;
9✔
218
        case IoOp::CLOSE:
219
            result = ::close(req->fd);
26✔
220
            break;
26✔
221
        case IoOp::FSYNC:
222
            result = ::fsync(req->fd);
×
223
            break;
×
224
        case IoOp::FTRUNCATE:
225
            result = ::ftruncate(req->fd, req->offset);
×
226
            break;
×
227
        case IoOp::FSTAT:
228
            result = ::fstat(req->fd, req->stat_buf);
×
229
            break;
×
230
        case IoOp::ACCEPT:
231
#ifdef __linux__
232
            result = ::accept4(req->fd, req->addr, req->addrlen,
233
                               SOCK_NONBLOCK | SOCK_CLOEXEC);
234
#else
235
            result = ::accept(req->fd, req->addr, req->addrlen);
20✔
236
            if (result >= 0) {
20✔
237
                int fd = static_cast<int>(result);
19✔
238
                int fl = ::fcntl(fd, F_GETFL, 0);
19✔
239
                ::fcntl(fd, F_SETFL, fl | O_NONBLOCK);
19✔
240
                ::fcntl(fd, F_SETFD, FD_CLOEXEC);
19✔
241
            }
19✔
242
#endif
243
            break;
20✔
244
        case IoOp::RECV:
245
            result = ::recv(req->fd, req->buf, req->len, req->msg_flags);
18✔
246
            break;
18✔
247
        case IoOp::SEND:
248
            result = ::send(req->fd, req->buf, req->len, req->msg_flags);
16✔
249
            break;
16✔
250
        case IoOp::READV:
251
            result = ::readv(req->fd, req->iov, req->iovcnt);
×
252
            break;
×
253
        case IoOp::WRITEV:
254
            result = ::writev(req->fd, req->iov, req->iovcnt);
×
255
            break;
×
256
        case IoOp::PREADV:
257
            result = ::preadv(req->fd, req->iov, req->iovcnt, req->offset);
×
258
            break;
×
259
        case IoOp::PWRITEV:
260
            result = ::pwritev(req->fd, req->iov, req->iovcnt, req->offset);
×
261
            break;
×
262
        case IoOp::LSEEK:
263
            result = ::lseek(req->fd, req->offset, req->whence);
×
264
            break;
×
265
        case IoOp::SENDFILE: {
266
#ifdef __linux__
267
            off_t off = req->offset;
268
            result = ::sendfile(req->dest_fd, req->fd, &off, req->len);
269
#elif defined(__APPLE__)
UNCOV
270
            off_t len = static_cast<off_t>(req->len);
×
UNCOV
271
            int ret = ::sendfile(req->fd, req->dest_fd, req->offset, &len,
×
272
                                 nullptr, 0);
UNCOV
273
            result = (ret == 0 || errno == EAGAIN) ? len : -1;
×
274
#else
275
            char tmp[8192];
276
            result = 0;
277
            off_t off = req->offset;
278
            std::size_t remaining = req->len;
279
            while (remaining > 0) {
280
                std::size_t chunk =
281
                    remaining < sizeof(tmp) ? remaining : sizeof(tmp);
282
                ssize_t r = ::pread(req->fd, tmp, chunk, off);
283
                if (r <= 0) {
284
                    if (result == 0) result = r;
285
                    break;
286
                }
287
                ssize_t w =
288
                    ::write(req->dest_fd, tmp, static_cast<std::size_t>(r));
289
                if (w < 0) {
290
                    if (result == 0) result = w;
291
                    break;
292
                }
293
                result += w;
294
                off += w;
295
                remaining -= static_cast<std::size_t>(w);
296
                if (w < r) break;
297
            }
298
#endif
299
            break;
×
300
        }
301
    }
302
    if (result < 0) result = -errno;
106✔
303

304
    req->awaitable->result_ = result;
106✔
305
    req->executor->enqueue(req->awaitable->handle_);
106✔
306
    delete req;
106!
307
}
106✔
308

309
std::size_t ThreadPoolBackend::poll(int /*timeout_ms*/) {
223✔
310
    return 0;  // Thread pool backend: completions fire via callbacks
223✔
311
}
312

313
int ThreadPoolBackend::flush() { return static_cast<int>(pool_.flush()); }
223✔
314

315
std::string ThreadPoolBackend::name() const { return "threadpool"; }
1✔
316

317
}  // namespace dftracer::utils::io
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc