• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26195612357

20 May 2026 11:19PM UTC coverage: 49.859% (-2.3%) from 52.2%
26195612357

push

github

hariharan-devarajan
feat(aggregator): improve system metrics scanning and persistence error handling

16041 of 43831 branches covered (36.6%)

Branch coverage included in aggregate %.

6 of 17 new or added lines in 2 files covered. (35.29%)

1072 existing lines in 104 files now uncovered.

21423 of 31309 relevant lines covered (68.42%)

13054.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.56
/src/dftracer/utils/utilities/common/arrow/ipc_reader.cpp
1
#include <dftracer/utils/core/common/config.h>
2
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
3

4
#include <dftracer/utils/utilities/common/arrow/ipc_reader.h>
5
#include <nanoarrow/nanoarrow.h>
6
#include <nanoarrow/nanoarrow_ipc.h>
7

8
#include <cstring>
9
#include <new>
10

11
// Platform-specific includes for mmap
12
#ifdef _WIN32
13
#include <windows.h>
14
#else
15
#include <fcntl.h>
16
#include <sys/mman.h>
17
#include <sys/stat.h>
18
#include <unistd.h>
19
#endif
20

21
namespace dftracer::utils::utilities::common::arrow {
22

23
// ---------------------------------------------------------------------------
24
// Helpers
25
// ---------------------------------------------------------------------------
26

27
static ArrowIpcDecoder* as_decoder(void* p) noexcept {
94✔
28
    return static_cast<ArrowIpcDecoder*>(p);
94✔
29
}
30

31
// ---------------------------------------------------------------------------
32
// Lifecycle
33
// ---------------------------------------------------------------------------
34

35
IpcReader::~IpcReader() { close(); }
29✔
36

37
IpcReader::IpcReader(IpcReader&& other) noexcept
1✔
38
    : mapped_data_(other.mapped_data_),
1✔
39
      mapped_size_(other.mapped_size_),
1✔
40
      fd_(other.fd_),
1✔
41
      decoder_(other.decoder_),
1✔
42
      shared_schema_(std::move(other.shared_schema_)),
1✔
43
      blocks_(std::move(other.blocks_)),
1✔
44
      num_batches_(other.num_batches_),
1✔
45
      total_rows_(other.total_rows_) {
1✔
46
    other.reset_state();
1✔
47
}
1✔
48

49
IpcReader& IpcReader::operator=(IpcReader&& other) noexcept {
×
50
    if (this != &other) {
×
51
        close();
×
52
        mapped_data_ = other.mapped_data_;
×
53
        mapped_size_ = other.mapped_size_;
×
54
        fd_ = other.fd_;
×
55
        decoder_ = other.decoder_;
×
56
        shared_schema_ = std::move(other.shared_schema_);
×
57
        blocks_ = std::move(other.blocks_);
×
58
        num_batches_ = other.num_batches_;
×
59
        total_rows_ = other.total_rows_;
×
60
        other.reset_state();
×
61
    }
62
    return *this;
×
63
}
64

65
void IpcReader::reset_state() noexcept {
39✔
66
    mapped_data_ = nullptr;
39✔
67
    mapped_size_ = 0;
39✔
68
    fd_ = -1;
39✔
69
    decoder_ = nullptr;
39✔
70
    shared_schema_.reset();
39✔
71
    blocks_.clear();
39✔
72
    num_batches_ = 0;
39✔
73
    total_rows_ = 0;
39✔
74
}
39✔
75

76
void IpcReader::close() {
38✔
77
    if (decoder_) {
38✔
78
        ArrowIpcDecoderReset(as_decoder(decoder_));
25✔
79
        delete as_decoder(decoder_);
25!
80
        decoder_ = nullptr;
25✔
81
    }
82

83
    shared_schema_.reset();
38✔
84

85
#ifdef _WIN32
86
    if (mapped_data_) {
87
        UnmapViewOfFile(mapped_data_);
88
    }
89
    if (fd_ != -1) {
90
        CloseHandle(reinterpret_cast<HANDLE>(fd_));
91
    }
92
#else
93
    if (mapped_data_ && mapped_data_ != MAP_FAILED) {
38!
94
        munmap(mapped_data_, mapped_size_);
26✔
95
    }
96
    if (fd_ != -1) {
38✔
97
        ::close(fd_);
26✔
98
    }
99
#endif
100

101
    reset_state();
38✔
102
}
38✔
103

104
// ---------------------------------------------------------------------------
105
// open / read_footer
106
// ---------------------------------------------------------------------------
107

108
int IpcReader::open(const std::string& path) {
28✔
109
    if (is_open()) return -1;
28!
110

111
#ifdef _WIN32
112
    // Windows memory mapping
113
    HANDLE file =
114
        CreateFileA(path.c_str(), GENERIC_READ, FILE_SHARE_READ, nullptr,
115
                    OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, nullptr);
116
    if (file == INVALID_HANDLE_VALUE) return -1;
117

118
    LARGE_INTEGER size;
119
    if (!GetFileSizeEx(file, &size)) {
120
        CloseHandle(file);
121
        return -1;
122
    }
123
    mapped_size_ = static_cast<std::size_t>(size.QuadPart);
124

125
    HANDLE mapping =
126
        CreateFileMappingA(file, nullptr, PAGE_READONLY, 0, 0, nullptr);
127
    if (!mapping) {
128
        CloseHandle(file);
129
        return -1;
130
    }
131

132
    mapped_data_ = MapViewOfFile(mapping, FILE_MAP_READ, 0, 0, 0);
133
    CloseHandle(mapping);
134
    if (!mapped_data_) {
135
        CloseHandle(file);
136
        return -1;
137
    }
138
    fd_ = reinterpret_cast<int>(file);
139
#else
140
    // POSIX memory mapping
141
    fd_ = ::open(path.c_str(), O_RDONLY);
28!
142
    if (fd_ < 0) return -1;
28✔
143

144
    struct stat st;
145
    if (fstat(fd_, &st) < 0) {
26!
146
        ::close(fd_);
×
147
        fd_ = -1;
×
148
        return -1;
×
149
    }
150
    mapped_size_ = static_cast<std::size_t>(st.st_size);
26✔
151

152
    // Minimum Arrow IPC file: magic(8) + footer_size(4) + magic(6) = 18 bytes
153
    if (mapped_size_ < 18) {
26!
154
        ::close(fd_);
×
155
        fd_ = -1;
×
156
        return -1;
×
157
    }
158

159
    mapped_data_ = mmap(nullptr, mapped_size_, PROT_READ, MAP_PRIVATE, fd_, 0);
26✔
160
    if (mapped_data_ == MAP_FAILED) {
26!
161
        ::close(fd_);
×
162
        fd_ = -1;
×
163
        mapped_data_ = nullptr;
×
164
        return -1;
×
165
    }
166

167
    // Advise kernel we'll read sequentially
168
    madvise(mapped_data_, mapped_size_, MADV_SEQUENTIAL);
26✔
169
#endif
170

171
    int rc = read_footer();
26!
172
    if (rc != NANOARROW_OK) {
26✔
173
        close();
1!
174
        return rc;
1✔
175
    }
176

177
    return NANOARROW_OK;
25✔
178
}
179

180
int IpcReader::read_footer() {
26✔
181
    auto* data = static_cast<const uint8_t*>(mapped_data_);
26✔
182

183
    // Arrow IPC file format footer:
184
    // ... | footer | footer_size (4 bytes) | "ARROW1" (6 bytes)
185

186
    // Validate magic at end
187
    if (std::memcmp(data + mapped_size_ - 6, "ARROW1", 6) != 0) {
26✔
188
        return -1;
1✔
189
    }
190

191
    // Read footer size (4 bytes before magic)
192
    std::int32_t footer_size;
193
    std::memcpy(&footer_size, data + mapped_size_ - 10, sizeof(footer_size));
25✔
194

195
    // Calculate footer bounds
196
    std::int64_t footer_total_size =
25✔
197
        footer_size + 10;     // footer + size(4) + magic(6)
25✔
198
    std::int64_t footer_offset = mapped_size_ - footer_total_size;
25✔
199
    if (footer_offset < 8) {  // Must be after file magic
25!
200
        return -1;
×
201
    }
202

203
    // Initialize decoder
204
    auto* decoder = new (std::nothrow) ArrowIpcDecoder;
25✔
205
    if (!decoder) return -1;
25!
206
    std::memset(decoder, 0, sizeof(ArrowIpcDecoder));
25✔
207

208
    int rc = ArrowIpcDecoderInit(decoder);
25!
209
    if (rc != NANOARROW_OK) {
25!
210
        delete decoder;
×
211
        return rc;
×
212
    }
213
    decoder_ = decoder;
25✔
214

215
    // Decode footer directly from mmap'd memory (zero-copy)
216
    ArrowBufferView footer_view;
217
    footer_view.data.as_uint8 = data + footer_offset;
25✔
218
    footer_view.size_bytes = footer_total_size;
25✔
219

220
    ArrowError error;
221
    rc = ArrowIpcDecoderVerifyFooter(decoder, footer_view, &error);
25!
222
    if (rc != NANOARROW_OK) {
25!
223
        return rc;
×
224
    }
225

226
    rc = ArrowIpcDecoderDecodeFooter(decoder, footer_view, &error);
25!
227
    if (rc != NANOARROW_OK) {
25!
228
        return rc;
×
229
    }
230

231
    // Footer is now available at decoder->footer
232
    ArrowIpcFooter* footer = decoder->footer;
25✔
233

234
    // Copy block info - decoder state may be modified by subsequent operations
235
    num_batches_ =
25✔
236
        footer->record_batch_blocks.size_bytes / sizeof(ArrowIpcFileBlock);
25✔
237
    blocks_.resize(num_batches_);
25!
238
    auto* src_blocks = reinterpret_cast<const ArrowIpcFileBlock*>(
25✔
239
        footer->record_batch_blocks.data);
240
    for (std::size_t i = 0; i < num_batches_; ++i) {
70✔
241
        blocks_[i].offset = src_blocks[i].offset;
45✔
242
        blocks_[i].metadata_length = src_blocks[i].metadata_length;
45✔
243
        blocks_[i].body_length = src_blocks[i].body_length;
45✔
244
    }
245

246
    // Create shared schema - deep copy once, share for all batches
247
    auto* schema = new (std::nothrow) ArrowSchema;
25✔
248
    if (!schema) return -1;
25!
249
    std::memset(schema, 0, sizeof(ArrowSchema));
25✔
250
    rc = ArrowSchemaDeepCopy(&footer->schema, schema);
25!
251
    if (rc != NANOARROW_OK) {
25!
252
        delete schema;
×
253
        return rc;
×
254
    }
255

256
    // Wrap in shared_ptr with custom deleter
257
    shared_schema_ = std::shared_ptr<void>(schema, [](void* p) {
50!
258
        auto* s = static_cast<ArrowSchema*>(p);
25✔
259
        if (s->release) s->release(s);
25!
260
        delete s;
25!
261
    });
25✔
262

263
    // Set decoder's expected schema
264
    rc = ArrowIpcDecoderSetSchema(decoder, &footer->schema, &error);
25!
265
    if (rc != NANOARROW_OK) {
25!
266
        return rc;
×
267
    }
268

269
    return NANOARROW_OK;
25✔
270
}
271

272
// ---------------------------------------------------------------------------
273
// read_batch
274
// ---------------------------------------------------------------------------
275

276
ArrowExportResult IpcReader::read_batch(std::size_t index) {
46✔
277
    if (!is_open() || index >= num_batches_) {
46!
278
        return ArrowExportResult();
2!
279
    }
280

281
    auto* decoder = as_decoder(decoder_);
44✔
282
    auto* data = static_cast<const uint8_t*>(mapped_data_);
44✔
283
    const auto& block = blocks_[index];
44✔
284

285
    // Zero-copy: point directly into mmap'd memory for header
286
    ArrowBufferView header_view;
287
    header_view.data.as_uint8 = data + block.offset;
44✔
288
    header_view.size_bytes = block.metadata_length;
44✔
289

290
    ArrowError error;
291
    int rc = ArrowIpcDecoderDecodeHeader(decoder, header_view, &error);
44!
292
    if (rc != NANOARROW_OK) {
44!
293
        return ArrowExportResult();
×
294
    }
295

296
    // Zero-copy: point directly into mmap'd memory for body
297
    ArrowBufferView body_view;
298
    body_view.data.as_uint8 = data + block.offset + block.metadata_length;
44✔
299
    body_view.size_bytes = block.body_length;
44✔
300

301
    // Decode array
302
    nanoarrow::UniqueArray array;
44✔
303
    rc = ArrowIpcDecoderDecodeArray(decoder, body_view, -1, array.get(),
44!
304
                                    NANOARROW_VALIDATION_LEVEL_FULL, &error);
305
    if (rc != NANOARROW_OK) {
44!
306
        return ArrowExportResult();
×
307
    }
308

309
    // Share schema instead of deep copying
310
    // We need to create a new ArrowSchema that references our shared one
311
    auto* schema_ptr = static_cast<ArrowSchema*>(shared_schema_.get());
44✔
312
    nanoarrow::UniqueSchema schema;
44✔
313
    rc = ArrowSchemaDeepCopy(schema_ptr, schema.get());
44!
314
    if (rc != NANOARROW_OK) {
44!
315
        return ArrowExportResult();
×
316
    }
317

318
    return ArrowExportResult(std::move(schema), std::move(array));
44!
319
}
44✔
320

321
// ---------------------------------------------------------------------------
322
// read_all / for_each_batch
323
// ---------------------------------------------------------------------------
324

325
std::vector<ArrowExportResult> IpcReader::read_all() {
18✔
326
    std::vector<ArrowExportResult> results;
18✔
327
    results.reserve(num_batches_);
18!
328

329
    for (std::size_t i = 0; i < num_batches_; ++i) {
47✔
330
        auto batch = read_batch(i);
29!
331
        if (batch.valid()) {
29!
332
            results.push_back(std::move(batch));
29!
333
        }
334
    }
29✔
335

336
    return results;
18✔
UNCOV
337
}
×
338

339
int IpcReader::for_each_batch(std::function<int(ArrowExportResult&)> callback) {
2✔
340
    for (std::size_t i = 0; i < num_batches_; ++i) {
7✔
341
        auto batch = read_batch(i);
5!
342
        if (!batch.valid()) {
5!
343
            return -1;
×
344
        }
345
        int rc = callback(batch);
5!
346
        if (rc != 0) {
5!
347
            return rc;
×
348
        }
349
    }
5!
350
    return 0;
2✔
351
}
352

353
}  // namespace dftracer::utils::utilities::common::arrow
354

355
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc