• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26195612357

20 May 2026 11:19PM UTC coverage: 49.859% (-2.3%) from 52.2%
26195612357

push

github

hariharan-devarajan
feat(aggregator): improve system metrics scanning and persistence error handling

16041 of 43831 branches covered (36.6%)

Branch coverage included in aggregate %.

6 of 17 new or added lines in 2 files covered. (35.29%)

1072 existing lines in 104 files now uncovered.

21423 of 31309 relevant lines covered (68.42%)

13054.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.21
/src/dftracer/utils/core/io/thread_pool_backend.cpp
1
#include <dftracer/utils/core/io/thread_pool_backend.h>
2
#include <dftracer/utils/core/pipeline/executor.h>
3
#include <fcntl.h>
4
#include <sys/socket.h>
5
#include <sys/stat.h>
6
#include <sys/uio.h>
7
#ifdef __linux__
8
#include <sys/sendfile.h>
9
#endif
10
#include <unistd.h>
11

12
#include <cerrno>
13

14
namespace dftracer::utils::io {
15

16
ThreadPoolBackend::ThreadPoolBackend(Executor& executor, std::size_t pool_size,
10✔
17
                                     unsigned batch_threshold)
10✔
18
    : executor_(executor), pool_(pool_size, batch_threshold) {}
10!
19

20
void ThreadPoolBackend::start() { pool_.start(); }
10✔
21
void ThreadPoolBackend::stop() { pool_.stop(); }
10✔
22

23
static IoAwaitable make_request(IoOp op, int fd, void* buf, std::size_t len,
105✔
24
                                off_t offset, const char* path, int flags,
25
                                mode_t mode, Executor* executor,
26
                                IoThreadPool* pool) {
27
    auto* req = new IoRequest{};
105✔
28
    req->submit = &ThreadPoolBackend::submit_to_pool;
105✔
29
    req->op = op;
105✔
30
    req->fd = fd;
105✔
31
    req->buf = buf;
105✔
32
    req->len = len;
105✔
33
    req->offset = offset;
105✔
34
    req->path = path;
105✔
35
    req->flags = flags;
105✔
36
    req->mode = mode;
105✔
37
    req->executor = executor;
105✔
38
    req->pool = pool;
105✔
39

40
    IoAwaitable awaitable;
105✔
41
    // req->awaitable will be updated in submit_to_pool with the
42
    // stable address from await_suspend (the IoAwaitable may move
43
    // before the coroutine frame is allocated).
44
    req->awaitable = nullptr;
105✔
45
    awaitable.submit_ctx_ = req;
105✔
46
    return awaitable;
105✔
47
}
48

49
IoAwaitable ThreadPoolBackend::submit_read(int fd, void* buf, std::size_t len) {
×
50
    return make_request(IoOp::READ, fd, buf, len, 0, nullptr, 0, 0, &executor_,
×
51
                        &pool_);
×
52
}
53

54
IoAwaitable ThreadPoolBackend::submit_write(int fd, const void* buf,
1✔
55
                                            std::size_t len) {
56
    return make_request(IoOp::WRITE, fd, const_cast<void*>(buf), len, 0,
1✔
57
                        nullptr, 0, 0, &executor_, &pool_);
1✔
58
}
59

60
IoAwaitable ThreadPoolBackend::submit_pread(int fd, void* buf, std::size_t len,
27✔
61
                                            off_t offset) {
62
    return make_request(IoOp::PREAD, fd, buf, len, offset, nullptr, 0, 0,
27✔
63
                        &executor_, &pool_);
27✔
64
}
65

66
void ThreadPoolBackend::submit_pread_callback(int fd, void* buf,
×
67
                                              std::size_t len, off_t offset,
68
                                              IoCompletionFn completion,
69
                                              void* context) {
70
    auto* req = new IoRequest{};
×
UNCOV
71
    req->op = IoOp::PREAD;
×
UNCOV
72
    req->fd = fd;
×
UNCOV
73
    req->buf = buf;
×
UNCOV
74
    req->len = len;
×
UNCOV
75
    req->offset = offset;
×
UNCOV
76
    req->completion = completion;
×
UNCOV
77
    req->completion_ctx = context;
×
UNCOV
78
    req->pool = &pool_;
×
79
    pool_.submit([req] { execute_request(req); });
×
UNCOV
80
}
×
81

82
IoAwaitable ThreadPoolBackend::submit_pwrite(int fd, const void* buf,
×
83
                                             std::size_t len, off_t offset) {
84
    return make_request(IoOp::PWRITE, fd, const_cast<void*>(buf), len, offset,
×
85
                        nullptr, 0, 0, &executor_, &pool_);
×
86
}
87

88
IoAwaitable ThreadPoolBackend::submit_open(const char* path, int flags,
1✔
89
                                           mode_t mode) {
90
    return make_request(IoOp::OPEN, -1, nullptr, 0, 0, path, flags, mode,
1✔
91
                        &executor_, &pool_);
1✔
92
}
93

94
IoAwaitable ThreadPoolBackend::submit_close(int fd) {
19✔
95
    return make_request(IoOp::CLOSE, fd, nullptr, 0, 0, nullptr, 0, 0,
19✔
96
                        &executor_, &pool_);
19✔
97
}
98

99
IoAwaitable ThreadPoolBackend::submit_fsync(int fd) {
×
100
    return make_request(IoOp::FSYNC, fd, nullptr, 0, 0, nullptr, 0, 0,
×
101
                        &executor_, &pool_);
×
102
}
103

104
IoAwaitable ThreadPoolBackend::submit_ftruncate(int fd, off_t length) {
×
105
    return make_request(IoOp::FTRUNCATE, fd, nullptr, 0, length, nullptr, 0, 0,
×
106
                        &executor_, &pool_);
×
107
}
108

109
IoAwaitable ThreadPoolBackend::submit_fstat(int fd, struct stat* buf) {
×
110
    auto req_awaitable = make_request(IoOp::FSTAT, fd, nullptr, 0, 0, nullptr,
×
111
                                      0, 0, &executor_, &pool_);
×
112
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
113
    req->stat_buf = buf;
×
114
    return req_awaitable;
×
115
}
116

117
IoAwaitable ThreadPoolBackend::submit_accept(int listen_fd,
19✔
118
                                             struct sockaddr* addr,
119
                                             socklen_t* addrlen) {
120
    auto req_awaitable = make_request(IoOp::ACCEPT, listen_fd, nullptr, 0, 0,
19✔
121
                                      nullptr, 0, 0, &executor_, &pool_);
19✔
122
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
19✔
123
    req->addr = addr;
19✔
124
    req->addrlen = addrlen;
19✔
125
    return req_awaitable;
19✔
126
}
127

128
IoAwaitable ThreadPoolBackend::submit_recv(int fd, void* buf, std::size_t len,
18✔
129
                                           int flags) {
130
    auto req_awaitable = make_request(IoOp::RECV, fd, buf, len, 0, nullptr, 0,
18✔
131
                                      0, &executor_, &pool_);
18✔
132
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
18✔
133
    req->msg_flags = flags;
18✔
134
    return req_awaitable;
18✔
135
}
136

137
IoAwaitable ThreadPoolBackend::submit_send(int fd, const void* buf,
18✔
138
                                           std::size_t len, int flags) {
139
    auto req_awaitable =
140
        make_request(IoOp::SEND, fd, const_cast<void*>(buf), len, 0, nullptr, 0,
18✔
141
                     0, &executor_, &pool_);
18✔
142
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
18✔
143
    req->msg_flags = flags;
18✔
144
    return req_awaitable;
18✔
145
}
146

147
IoAwaitable ThreadPoolBackend::submit_readv(int fd, const struct iovec* iov,
×
148
                                            int iovcnt) {
149
    auto req_awaitable = make_request(IoOp::READV, fd, nullptr, 0, 0, nullptr,
×
150
                                      0, 0, &executor_, &pool_);
×
151
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
152
    req->iov = iov;
×
153
    req->iovcnt = iovcnt;
×
154
    return req_awaitable;
×
155
}
156

157
IoAwaitable ThreadPoolBackend::submit_writev(int fd, const struct iovec* iov,
2✔
158
                                             int iovcnt) {
159
    auto req_awaitable = make_request(IoOp::WRITEV, fd, nullptr, 0, 0, nullptr,
2✔
160
                                      0, 0, &executor_, &pool_);
2✔
161
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
2✔
162
    req->iov = iov;
2✔
163
    req->iovcnt = iovcnt;
2✔
164
    return req_awaitable;
2✔
165
}
166

167
IoAwaitable ThreadPoolBackend::submit_preadv(int fd, const struct iovec* iov,
×
168
                                             int iovcnt, off_t offset) {
169
    auto req_awaitable = make_request(IoOp::PREADV, fd, nullptr, 0, offset,
×
170
                                      nullptr, 0, 0, &executor_, &pool_);
×
171
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
172
    req->iov = iov;
×
173
    req->iovcnt = iovcnt;
×
174
    return req_awaitable;
×
175
}
176

177
IoAwaitable ThreadPoolBackend::submit_pwritev(int fd, const struct iovec* iov,
×
178
                                              int iovcnt, off_t offset) {
179
    auto req_awaitable = make_request(IoOp::PWRITEV, fd, nullptr, 0, offset,
×
180
                                      nullptr, 0, 0, &executor_, &pool_);
×
181
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
182
    req->iov = iov;
×
183
    req->iovcnt = iovcnt;
×
184
    return req_awaitable;
×
185
}
186

187
IoAwaitable ThreadPoolBackend::submit_lseek(int fd, off_t offset, int whence) {
×
188
    auto req_awaitable = make_request(IoOp::LSEEK, fd, nullptr, 0, offset,
×
189
                                      nullptr, 0, 0, &executor_, &pool_);
×
190
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
191
    req->whence = whence;
×
192
    return req_awaitable;
×
193
}
194

195
IoAwaitable ThreadPoolBackend::submit_sendfile(int out_fd, int in_fd,
×
196
                                               off_t offset,
197
                                               std::size_t count) {
198
    auto req_awaitable =
199
        make_request(IoOp::SENDFILE, in_fd, nullptr, count, offset, nullptr, 0,
×
200
                     0, &executor_, &pool_);
×
201
    auto* req = static_cast<IoRequest*>(req_awaitable.submit_ctx_);
×
202
    req->dest_fd = out_fd;
×
203
    return req_awaitable;
×
204
}
205

206
void ThreadPoolBackend::submit_to_pool(SubmitContext* ctx,
105✔
207
                                       IoAwaitable* awaitable) {
208
    auto* req = static_cast<IoRequest*>(ctx);
105✔
209
    // Update the awaitable pointer -- await_suspend passes the real,
210
    // stable address of the IoAwaitable in the coroutine frame.
211
    req->awaitable = awaitable;
105✔
212
    req->pool->submit([req] { execute_request(req); });
209!
213
}
105✔
214

215
void ThreadPoolBackend::execute_request(IoRequest* req) {
105✔
216
    ssize_t result = 0;
105✔
217
    switch (req->op) {
105!
UNCOV
218
        case IoOp::READ:
×
219
            result = ::read(req->fd, req->buf, req->len);
×
220
            break;
×
221
        case IoOp::WRITE:
1✔
222
            result = ::write(req->fd, req->buf, req->len);
1✔
223
            break;
1✔
224
        case IoOp::PREAD:
27✔
225
            result = ::pread(req->fd, req->buf, req->len, req->offset);
27✔
226
            break;
27✔
UNCOV
227
        case IoOp::PWRITE:
×
228
            result = ::pwrite(req->fd, req->buf, req->len, req->offset);
×
229
            break;
×
230
        case IoOp::OPEN:
1✔
231
            result = ::open(req->path, req->flags, req->mode);
1✔
232
            break;
1✔
233
        case IoOp::CLOSE:
19✔
234
            result = ::close(req->fd);
19✔
235
            break;
19✔
UNCOV
236
        case IoOp::FSYNC:
×
237
            result = ::fsync(req->fd);
×
238
            break;
×
UNCOV
239
        case IoOp::FTRUNCATE:
×
240
            result = ::ftruncate(req->fd, req->offset);
×
241
            break;
×
UNCOV
242
        case IoOp::FSTAT:
×
243
            result = ::fstat(req->fd, req->stat_buf);
×
244
            break;
×
245
        case IoOp::ACCEPT:
19✔
246
#ifdef __linux__
247
            result = ::accept4(req->fd, req->addr, req->addrlen,
19✔
248
                               SOCK_NONBLOCK | SOCK_CLOEXEC);
249
#else
250
            result = ::accept(req->fd, req->addr, req->addrlen);
251
            if (result >= 0) {
252
                int fd = static_cast<int>(result);
253
                int fl = ::fcntl(fd, F_GETFL, 0);
254
                ::fcntl(fd, F_SETFL, fl | O_NONBLOCK);
255
                ::fcntl(fd, F_SETFD, FD_CLOEXEC);
256
            }
257
#endif
258
            break;
19✔
259
        case IoOp::RECV:
18✔
260
            result = ::recv(req->fd, req->buf, req->len, req->msg_flags);
18✔
261
            break;
18✔
262
        case IoOp::SEND:
18✔
263
            result = ::send(req->fd, req->buf, req->len, req->msg_flags);
18✔
264
            break;
18✔
UNCOV
265
        case IoOp::READV:
×
266
            result = ::readv(req->fd, req->iov, req->iovcnt);
×
267
            break;
×
268
        case IoOp::WRITEV:
2✔
269
            result = ::writev(req->fd, req->iov, req->iovcnt);
2✔
270
            break;
2✔
UNCOV
271
        case IoOp::PREADV:
×
272
            result = ::preadv(req->fd, req->iov, req->iovcnt, req->offset);
×
273
            break;
×
UNCOV
274
        case IoOp::PWRITEV:
×
275
            result = ::pwritev(req->fd, req->iov, req->iovcnt, req->offset);
×
276
            break;
×
UNCOV
277
        case IoOp::LSEEK:
×
278
            result = ::lseek(req->fd, req->offset, req->whence);
×
279
            break;
×
UNCOV
280
        case IoOp::SENDFILE: {
×
281
#ifdef __linux__
UNCOV
282
            off_t off = req->offset;
×
UNCOV
283
            result = ::sendfile(req->dest_fd, req->fd, &off, req->len);
×
284
#elif defined(__APPLE__)
285
            off_t len = static_cast<off_t>(req->len);
286
            int ret = ::sendfile(req->fd, req->dest_fd, req->offset, &len,
287
                                 nullptr, 0);
288
            result = (ret == 0 || errno == EAGAIN) ? len : -1;
289
#else
290
            char tmp[8192];
291
            result = 0;
292
            off_t off = req->offset;
293
            std::size_t remaining = req->len;
294
            while (remaining > 0) {
295
                std::size_t chunk =
296
                    remaining < sizeof(tmp) ? remaining : sizeof(tmp);
297
                ssize_t r = ::pread(req->fd, tmp, chunk, off);
298
                if (r <= 0) {
299
                    if (result == 0) result = r;
300
                    break;
301
                }
302
                ssize_t w =
303
                    ::write(req->dest_fd, tmp, static_cast<std::size_t>(r));
304
                if (w < 0) {
305
                    if (result == 0) result = w;
306
                    break;
307
                }
308
                result += w;
309
                off += w;
310
                remaining -= static_cast<std::size_t>(w);
311
                if (w < r) break;
312
            }
313
#endif
314
            break;
×
315
        }
316
    }
317
    if (result < 0) result = -errno;
105✔
318

319
    if (req->awaitable != nullptr) {
105!
320
        req->awaitable->result_ = result;
105✔
321
        req->executor->enqueue(req->awaitable->handle_);
105✔
UNCOV
322
    } else if (req->completion != nullptr) {
×
323
        req->completion(req->completion_ctx, result);
×
324
    }
325
    delete req;
105!
326
}
105✔
327

328
std::size_t ThreadPoolBackend::poll(int /*timeout_ms*/) {
446✔
329
    return 0;  // Thread pool backend: completions fire via callbacks
446✔
330
}
331

332
int ThreadPoolBackend::flush() { return static_cast<int>(pool_.flush()); }
446✔
333

334
std::string ThreadPoolBackend::name() const { return "threadpool"; }
1!
335

336
}  // namespace dftracer::utils::io
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc