• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28286012595

27 Jun 2026 10:04AM UTC coverage: 51.056% (-1.3%) from 52.356%
28286012595

Pull #79

github

web-flow
Merge 6c6535a19 into 8eb383f39
Pull Request #79: Add Valgrind memory checking (C++, Python, MPI) and fix the bugs it found

32079 of 80165 branches covered (40.02%)

Branch coverage included in aggregate %.

129 of 149 new or added lines in 11 files covered. (86.58%)

5116 existing lines in 181 files now uncovered.

32739 of 46790 relevant lines covered (69.97%)

9929.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.0
/src/dftracer/utils/core/io/thread_pool_backend.cpp
1
#include <dftracer/utils/core/io/thread_pool_backend.h>
2
#include <dftracer/utils/core/pipeline/executor.h>
3
#include <fcntl.h>
4
#include <sys/socket.h>
5
#include <sys/stat.h>
6
#include <sys/uio.h>
7
#ifdef __linux__
8
#include <sys/sendfile.h>
9
#endif
10
#include <unistd.h>
11

12
#include <cerrno>
13

14
namespace dftracer::utils::io {
15

16
ThreadPoolBackend::ThreadPoolBackend(Executor& executor, std::size_t pool_size,
20✔
17
                                     unsigned batch_threshold)
18
    : executor_(executor), pool_(pool_size, batch_threshold) {}
20!
19

20
void ThreadPoolBackend::start() { pool_.start(); }
10✔
21
void ThreadPoolBackend::stop() { pool_.stop(); }
10✔
22

23
static IoAwaitable make_request(IoOp op, int fd, void* buf, std::size_t len,
117✔
24
                                off_t offset, const char* path, int flags,
25
                                mode_t mode, Executor* executor,
26
                                IoThreadPool* pool) {
27
    auto* req = new IoRequest{};
117✔
28
    req->submit = &ThreadPoolBackend::submit_to_pool;
29
    req->op = op;
30
    req->fd = fd;
31
    req->buf = buf;
32
    req->len = len;
33
    req->offset = offset;
34
    req->path = path;
35
    req->flags = flags;
36
    req->mode = mode;
37
    req->executor = executor;
38
    req->pool = pool;
39

40
    IoAwaitable awaitable;
41
    // req->awaitable will be updated in submit_to_pool with the
42
    // stable address from await_suspend (the IoAwaitable may move
43
    // before the coroutine frame is allocated).
44
    req->awaitable = nullptr;
45
    awaitable.submit_ctx_ = req;
46
    return awaitable;
47
}
48

49
IoAwaitable ThreadPoolBackend::submit_read(int fd, void* buf, std::size_t len) {
×
50
    return make_request(IoOp::READ, fd, buf, len, 0, nullptr, 0, 0, &executor_,
×
51
                        &pool_);
×
52
}
53

54
IoAwaitable ThreadPoolBackend::submit_write(int fd, const void* buf,
1✔
55
                                            std::size_t len) {
56
    return make_request(IoOp::WRITE, fd, const_cast<void*>(buf), len, 0,
2✔
57
                        nullptr, 0, 0, &executor_, &pool_);
1✔
58
}
59

60
IoAwaitable ThreadPoolBackend::submit_pread(int fd, void* buf, std::size_t len,
27✔
61
                                            off_t offset) {
62
    return make_request(IoOp::PREAD, fd, buf, len, offset, nullptr, 0, 0,
54✔
63
                        &executor_, &pool_);
27✔
64
}
65

66
void ThreadPoolBackend::submit_pread_callback(int fd, void* buf,
×
67
                                              std::size_t len, off_t offset,
68
                                              IoCompletionFn completion,
69
                                              void* context) {
70
    auto* req = new IoRequest{};
×
71
    req->op = IoOp::PREAD;
72
    req->fd = fd;
73
    req->buf = buf;
74
    req->len = len;
75
    req->offset = offset;
76
    req->completion = completion;
77
    req->completion_ctx = context;
78
    req->pool = &pool_;
79
    pool_.submit([req] { execute_request(req); });
×
80
}
81

82
IoAwaitable ThreadPoolBackend::submit_pwrite(int fd, const void* buf,
×
83
                                             std::size_t len, off_t offset) {
84
    return make_request(IoOp::PWRITE, fd, const_cast<void*>(buf), len, offset,
×
85
                        nullptr, 0, 0, &executor_, &pool_);
×
86
}
87

88
IoAwaitable ThreadPoolBackend::submit_open(const char* path, int flags,
1✔
89
                                           mode_t mode) {
90
    return make_request(IoOp::OPEN, -1, nullptr, 0, 0, path, flags, mode,
2✔
91
                        &executor_, &pool_);
1✔
92
}
93

94
IoAwaitable ThreadPoolBackend::submit_close(int fd) {
22✔
95
    return make_request(IoOp::CLOSE, fd, nullptr, 0, 0, nullptr, 0, 0,
44✔
96
                        &executor_, &pool_);
22✔
97
}
98

99
IoAwaitable ThreadPoolBackend::submit_fsync(int fd) {
×
100
    return make_request(IoOp::FSYNC, fd, nullptr, 0, 0, nullptr, 0, 0,
×
101
                        &executor_, &pool_);
×
102
}
103

104
IoAwaitable ThreadPoolBackend::submit_ftruncate(int fd, off_t length) {
×
105
    return make_request(IoOp::FTRUNCATE, fd, nullptr, 0, length, nullptr, 0, 0,
×
106
                        &executor_, &pool_);
×
107
}
108

109
IoAwaitable ThreadPoolBackend::submit_fstat(int fd, struct stat* buf) {
×
110
    auto req_awaitable = make_request(IoOp::FSTAT, fd, nullptr, 0, 0, nullptr,
×
111
                                      0, 0, &executor_, &pool_);
×
112
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
113
    req->stat_buf = buf;
×
114
    return req_awaitable;
×
115
}
116

117
IoAwaitable ThreadPoolBackend::submit_accept(int listen_fd,
23✔
118
                                             struct sockaddr* addr,
119
                                             socklen_t* addrlen) {
120
    auto req_awaitable = make_request(IoOp::ACCEPT, listen_fd, nullptr, 0, 0,
46✔
121
                                      nullptr, 0, 0, &executor_, &pool_);
23✔
122
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
23✔
123
    req->addr = addr;
23✔
124
    req->addrlen = addrlen;
23✔
125
    return req_awaitable;
23✔
126
}
127

128
IoAwaitable ThreadPoolBackend::submit_recv(int fd, void* buf, std::size_t len,
21✔
129
                                           int flags) {
130
    auto req_awaitable = make_request(IoOp::RECV, fd, buf, len, 0, nullptr, 0,
42✔
131
                                      0, &executor_, &pool_);
21✔
132
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
21✔
133
    req->msg_flags = flags;
21✔
134
    return req_awaitable;
21✔
135
}
136

137
IoAwaitable ThreadPoolBackend::submit_send(int fd, const void* buf,
20✔
138
                                           std::size_t len, int flags) {
139
    auto req_awaitable =
140
        make_request(IoOp::SEND, fd, const_cast<void*>(buf), len, 0, nullptr, 0,
40✔
141
                     0, &executor_, &pool_);
20✔
142
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
20✔
143
    req->msg_flags = flags;
20✔
144
    return req_awaitable;
20✔
145
}
146

147
IoAwaitable ThreadPoolBackend::submit_readv(int fd, const struct iovec* iov,
×
148
                                            int iovcnt) {
149
    auto req_awaitable = make_request(IoOp::READV, fd, nullptr, 0, 0, nullptr,
×
150
                                      0, 0, &executor_, &pool_);
×
151
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
152
    req->iov = iov;
×
153
    req->iovcnt = iovcnt;
×
154
    return req_awaitable;
×
155
}
156

157
IoAwaitable ThreadPoolBackend::submit_writev(int fd, const struct iovec* iov,
2✔
158
                                             int iovcnt) {
159
    auto req_awaitable = make_request(IoOp::WRITEV, fd, nullptr, 0, 0, nullptr,
4✔
160
                                      0, 0, &executor_, &pool_);
2✔
161
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
2✔
162
    req->iov = iov;
2✔
163
    req->iovcnt = iovcnt;
2✔
164
    return req_awaitable;
2✔
165
}
166

167
IoAwaitable ThreadPoolBackend::submit_preadv(int fd, const struct iovec* iov,
×
168
                                             int iovcnt, off_t offset) {
169
    auto req_awaitable = make_request(IoOp::PREADV, fd, nullptr, 0, offset,
×
170
                                      nullptr, 0, 0, &executor_, &pool_);
×
171
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
172
    req->iov = iov;
×
173
    req->iovcnt = iovcnt;
×
174
    return req_awaitable;
×
175
}
176

177
IoAwaitable ThreadPoolBackend::submit_pwritev(int fd, const struct iovec* iov,
×
178
                                              int iovcnt, off_t offset) {
179
    auto req_awaitable = make_request(IoOp::PWRITEV, fd, nullptr, 0, offset,
×
180
                                      nullptr, 0, 0, &executor_, &pool_);
×
181
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
182
    req->iov = iov;
×
183
    req->iovcnt = iovcnt;
×
184
    return req_awaitable;
×
185
}
186

187
IoAwaitable ThreadPoolBackend::submit_lseek(int fd, off_t offset, int whence) {
×
188
    auto req_awaitable = make_request(IoOp::LSEEK, fd, nullptr, 0, offset,
×
189
                                      nullptr, 0, 0, &executor_, &pool_);
×
190
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
191
    req->whence = whence;
×
192
    return req_awaitable;
×
193
}
194

195
IoAwaitable ThreadPoolBackend::submit_sendfile(int out_fd, int in_fd,
×
196
                                               off_t offset,
197
                                               std::size_t count) {
198
    auto req_awaitable =
199
        make_request(IoOp::SENDFILE, in_fd, nullptr, count, offset, nullptr, 0,
×
200
                     0, &executor_, &pool_);
×
201
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
202
    req->dest_fd = out_fd;
×
203
    return req_awaitable;
×
204
}
205

206
void ThreadPoolBackend::submit_to_pool(SubmitContext* ctx,
117✔
207
                                       IoAwaitable* awaitable) {
208
    auto* req = static_cast<IoRequest*>(ctx);
117✔
209
    // Update the awaitable pointer -- await_suspend passes the real,
210
    // stable address of the IoAwaitable in the coroutine frame.
211
    req->awaitable = awaitable;
117✔
212
    req->pool->submit([req] { execute_request(req); });
234!
213
}
117✔
214

215
void ThreadPoolBackend::execute_request(IoRequest* req) {
117✔
216
    ssize_t result = 0;
117✔
217
    switch (req->op) {
117!
218
        case IoOp::READ:
219
            result = ::read(req->fd, req->buf, req->len);
×
220
            break;
×
221
        case IoOp::WRITE:
222
            result = ::write(req->fd, req->buf, req->len);
1✔
223
            break;
1✔
224
        case IoOp::PREAD:
225
            result = ::pread(req->fd, req->buf, req->len, req->offset);
27✔
226
            break;
27✔
227
        case IoOp::PWRITE:
228
            result = ::pwrite(req->fd, req->buf, req->len, req->offset);
×
229
            break;
×
230
        case IoOp::OPEN:
231
            result = ::open(req->path, req->flags, req->mode);
1✔
232
            break;
1✔
233
        case IoOp::CLOSE:
234
            result = ::close(req->fd);
22✔
235
            break;
22✔
236
        case IoOp::FSYNC:
237
            result = ::fsync(req->fd);
×
238
            break;
×
239
        case IoOp::FTRUNCATE:
240
            result = ::ftruncate(req->fd, req->offset);
×
241
            break;
×
242
        case IoOp::FSTAT:
243
            result = ::fstat(req->fd, req->stat_buf);
×
244
            break;
×
245
        case IoOp::ACCEPT:
246
#ifdef __linux__
247
            result = ::accept4(req->fd, req->addr, req->addrlen,
248
                               SOCK_NONBLOCK | SOCK_CLOEXEC);
249
#else
250
            result = ::accept(req->fd, req->addr, req->addrlen);
23✔
251
            if (result >= 0) {
23✔
252
                int fd = static_cast<int>(result);
21✔
253
                int fl = ::fcntl(fd, F_GETFL, 0);
21✔
254
                ::fcntl(fd, F_SETFL, fl | O_NONBLOCK);
21✔
255
                ::fcntl(fd, F_SETFD, FD_CLOEXEC);
21✔
256
            }
21✔
257
#endif
258
            break;
23✔
259
        case IoOp::RECV:
260
            result = ::recv(req->fd, req->buf, req->len, req->msg_flags);
21✔
261
            break;
21✔
262
        case IoOp::SEND:
263
            result = ::send(req->fd, req->buf, req->len, req->msg_flags);
20✔
264
            break;
20✔
265
        case IoOp::READV:
266
            result = ::readv(req->fd, req->iov, req->iovcnt);
×
267
            break;
×
268
        case IoOp::WRITEV:
269
            result = ::writev(req->fd, req->iov, req->iovcnt);
2✔
270
            break;
2✔
271
        case IoOp::PREADV:
272
            result = ::preadv(req->fd, req->iov, req->iovcnt, req->offset);
×
273
            break;
×
274
        case IoOp::PWRITEV:
275
            result = ::pwritev(req->fd, req->iov, req->iovcnt, req->offset);
×
276
            break;
×
277
        case IoOp::LSEEK:
278
            result = ::lseek(req->fd, req->offset, req->whence);
×
279
            break;
×
280
        case IoOp::SENDFILE: {
281
#ifdef __linux__
282
            off_t off = req->offset;
283
            result = ::sendfile(req->dest_fd, req->fd, &off, req->len);
284
#elif defined(__APPLE__)
UNCOV
285
            off_t len = static_cast<off_t>(req->len);
×
UNCOV
286
            int ret = ::sendfile(req->fd, req->dest_fd, req->offset, &len,
×
287
                                 nullptr, 0);
UNCOV
288
            result = (ret == 0 || errno == EAGAIN) ? len : -1;
×
289
#else
290
            char tmp[8192];
291
            result = 0;
292
            off_t off = req->offset;
293
            std::size_t remaining = req->len;
294
            while (remaining > 0) {
295
                std::size_t chunk =
296
                    remaining < sizeof(tmp) ? remaining : sizeof(tmp);
297
                ssize_t r = ::pread(req->fd, tmp, chunk, off);
298
                if (r <= 0) {
299
                    if (result == 0) result = r;
300
                    break;
301
                }
302
                ssize_t w =
303
                    ::write(req->dest_fd, tmp, static_cast<std::size_t>(r));
304
                if (w < 0) {
305
                    if (result == 0) result = w;
306
                    break;
307
                }
308
                result += w;
309
                off += w;
310
                remaining -= static_cast<std::size_t>(w);
311
                if (w < r) break;
312
            }
313
#endif
314
            break;
×
315
        }
316
    }
317
    if (result < 0) result = -errno;
117✔
318

319
    if (req->awaitable != nullptr) {
117!
320
        req->awaitable->result_ = result;
117✔
321
        req->executor->enqueue(req->awaitable->handle_);
117✔
322
    } else if (req->completion != nullptr) {
117!
323
        req->completion(req->completion_ctx, result);
×
UNCOV
324
    }
×
325
    delete req;
117!
326
}
117✔
327

328
std::size_t ThreadPoolBackend::poll(int /*timeout_ms*/) {
516✔
329
    return 0;  // Thread pool backend: completions fire via callbacks
516✔
330
}
331

332
int ThreadPoolBackend::flush() { return static_cast<int>(pool_.flush()); }
515✔
333

334
std::string ThreadPoolBackend::name() const { return "threadpool"; }
1✔
335

336
}  // namespace dftracer::utils::io
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc