• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 23934877458

03 Apr 2026 05:15AM UTC coverage: 51.182% (-0.3%) from 51.498%
23934877458

Pull #63

github

web-flow
Merge 946c3e0ba into 773a62661
Pull Request #63: feat(aggregator): support profile/system counter aggregation and custom metric Arrow output

23378 of 58949 branches covered (39.66%)

Branch coverage included in aggregate %.

328 of 568 new or added lines in 7 files covered. (57.75%)

73 existing lines in 6 files now uncovered.

20276 of 26343 relevant lines covered (76.97%)

13072.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.48
/src/dftracer/utils/python/trace_reader.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <Python.h>
3
#include <dftracer/utils/core/coro/task.h>
4
#include <dftracer/utils/core/utils/string.h>
5
#include <dftracer/utils/python/arrow_helpers.h>
6
#include <dftracer/utils/python/runtime.h>
7
#include <dftracer/utils/python/trace_reader.h>
8
#include <dftracer/utils/python/trace_reader_iterator.h>
9
#include <dftracer/utils/utilities/reader/trace_reader.h>
10

11
#include <cinttypes>
12
#include <cstddef>
13
#include <cstdio>
14
#include <cstring>
15
#include <exception>
16
#include <memory>
17
#include <string>
18
#include <vector>
19

20
#ifdef DFTRACER_UTILS_ENABLE_ARROW
21
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
22
#include <yyjson.h>
23
#endif
24

25
namespace {
26

27
using dftracer::utils::Runtime;
28
using dftracer::utils::coro::CoroTask;
29
using dftracer::utils::utilities::reader::ReadConfig;
30
using dftracer::utils::utilities::reader::TraceReader;
31
using dftracer::utils::utilities::reader::TraceReaderConfig;
32

33
CoroTask<void> produce_lines(std::shared_ptr<IteratorState> state,
4,856!
34
                             TraceReaderConfig cfg, ReadConfig rc) {
66!
35
    auto *sp = state.get();
66✔
36
    try {
37
        TraceReader reader(std::move(cfg));
66!
38
        auto gen = reader.read_lines(rc);
66!
39
        while (auto opt = co_await gen.next()) {
4,642!
40
            if (sp->cancelled.load(std::memory_order_acquire)) break;
1,081!
41
            std::string item(opt->content);
1,081!
42
            {
43
                std::unique_lock<std::mutex> lock(sp->mtx);
1,081!
44
                sp->cv_producer.wait(lock, [sp] {
4,293!
45
                    return sp->queue.size() < sp->max_queue_size ||
2,147!
46
                           sp->cancelled.load(std::memory_order_acquire);
1,067✔
47
                });
48
                if (sp->cancelled.load(std::memory_order_acquire)) break;
1,081✔
49
                sp->queue.push(std::move(item));
1,079!
50
            }
1,081✔
51
            sp->cv_consumer.notify_one();
1,079✔
52
        }
1,145✔
53
    } catch (...) {
2,352✔
54
        std::lock_guard<std::mutex> lock(sp->mtx);
2!
55
        sp->error = std::current_exception();
2✔
56
        sp->queue.push(std::nullopt);
2!
57
        sp->done.store(true, std::memory_order_release);
2✔
58
        sp->cv_consumer.notify_one();
2✔
59
        co_return;
2✔
60
    }
2!
61
    {
62
        std::lock_guard<std::mutex> lock(sp->mtx);
64!
63
        sp->queue.push(std::nullopt);
64!
64
        sp->done.store(true, std::memory_order_release);
64✔
65
    }
64✔
66
    sp->cv_consumer.notify_one();
64✔
67
}
7,058!
68

69
CoroTask<void> produce_raw(std::shared_ptr<IteratorState> state,
728!
70
                           TraceReaderConfig cfg, ReadConfig rc) {
10!
71
    auto *sp = state.get();
10✔
72
    try {
73
        TraceReader reader(std::move(cfg));
10!
74
        auto gen = reader.read_raw(rc);
10!
75
        while (auto opt = co_await gen.next()) {
696!
76
            if (sp->cancelled.load(std::memory_order_acquire)) break;
163✔
77
            std::string item(opt->data(), opt->size());
162!
78
            {
79
                std::unique_lock<std::mutex> lock(sp->mtx);
162!
80
                sp->cv_producer.wait(lock, [sp] {
594!
81
                    return sp->queue.size() < sp->max_queue_size ||
297!
82
                           sp->cancelled.load(std::memory_order_acquire);
135✔
83
                });
84
                if (sp->cancelled.load(std::memory_order_acquire)) break;
162!
85
                sp->queue.push(std::move(item));
162!
86
            }
162!
87
            sp->cv_consumer.notify_one();
162✔
88
        }
172!
89
    } catch (...) {
352✔
90
        std::lock_guard<std::mutex> lock(sp->mtx);
1!
91
        sp->error = std::current_exception();
1✔
92
        sp->queue.push(std::nullopt);
1!
93
        sp->done.store(true, std::memory_order_release);
1✔
94
        sp->cv_consumer.notify_one();
1✔
95
        co_return;
1✔
96
    }
1!
97
    {
98
        std::lock_guard<std::mutex> lock(sp->mtx);
9!
99
        sp->queue.push(std::nullopt);
9!
100
        sp->done.store(true, std::memory_order_release);
9✔
101
    }
9✔
102
    sp->cv_consumer.notify_one();
9✔
103
}
1,057!
104

105
#ifdef DFTRACER_UTILS_ENABLE_ARROW
106

107
using dftracer::utils::utilities::common::arrow::ColumnType;
108
using dftracer::utils::utilities::common::arrow::RecordBatchBuilder;
109

110
// Bump arena for string_views that must survive until builder.finish().
111
struct StringArena {
112
    static constexpr std::size_t BLOCK_SIZE = 64 * 1024;
113
    std::vector<std::vector<char>> blocks;
114
    std::size_t pos = 0;
14✔
115

116
    StringArena() { blocks.emplace_back(BLOCK_SIZE); }
56!
117

118
    std::string_view push(const char *data, std::size_t len) {
792✔
119
        if (pos + len > blocks.back().size()) {
792✔
NEW
120
            blocks.emplace_back(std::max(BLOCK_SIZE, len));
×
NEW
121
            pos = 0;
×
122
        }
123
        char *dst = blocks.back().data() + pos;
792✔
124
        std::memcpy(dst, data, len);
792✔
125
        pos += len;
792✔
126
        return {dst, len};
792✔
127
    }
128

129
    void clear() {
42✔
130
        if (blocks.size() > 1) blocks.resize(1);
42✔
131
        pos = 0;
42✔
132
    }
42✔
133
};
134

135
// --- Row type constants (must match Python TYPE_* constants) ---
136
enum RowType : int8_t {
137
    ROW_EVENT = 0,
138
    ROW_FILE_HASH = 1,
139
    ROW_HOST_HASH = 2,
140
    ROW_STRING_HASH = 3,
141
    ROW_METADATA = 4,
142
    ROW_PROC_METADATA = 5,
143
    ROW_PROFILE = 6,
144
    ROW_SYSTEM = 7,
145
};
146

147
// --- IO category constants (must match Python IOCategory values) ---
148
enum IOCat : int8_t {
149
    IO_READ = 1,
150
    IO_WRITE = 2,
151
    IO_METADATA = 3,
152
    IO_PCTL = 4,
153
    IO_IPC = 5,
154
    IO_OTHER = 6,
155
    IO_SYNC = 7,
156
};
157

NEW
158
static int8_t get_io_cat(std::string_view func) {
×
159
    // READ
NEW
160
    if (func == "fread" || func == "pread" || func == "preadv" ||
×
NEW
161
        func == "read" || func == "readv")
×
NEW
162
        return IO_READ;
×
163
    // WRITE
NEW
164
    if (func == "fwrite" || func == "pwrite" || func == "pwritev" ||
×
NEW
165
        func == "write" || func == "writev")
×
NEW
166
        return IO_WRITE;
×
167
    // SYNC
NEW
168
    if (func == "fsync" || func == "fdatasync" || func == "msync" ||
×
NEW
169
        func == "sync")
×
NEW
170
        return IO_SYNC;
×
171
    // PCTL
NEW
172
    if (func == "exec" || func == "exit" || func == "fork" || func == "kill" ||
×
NEW
173
        func == "pipe" || func == "wait")
×
NEW
174
        return IO_PCTL;
×
175
    // IPC
NEW
176
    if (func == "msgctl" || func == "msgget" || func == "msgrcv" ||
×
NEW
177
        func == "msgsnd" || func == "semctl" || func == "semget" ||
×
NEW
178
        func == "semop" || func == "shmat" || func == "shmctl" ||
×
NEW
179
        func == "shmdt" || func == "shmget")
×
NEW
180
        return IO_IPC;
×
181
    // METADATA
NEW
182
    if (func == "__fxstat" || func == "__fxstat64" || func == "__lxstat" ||
×
NEW
183
        func == "__lxstat64" || func == "__xstat" || func == "__xstat64" ||
×
NEW
184
        func == "access" || func == "close" || func == "closedir" ||
×
NEW
185
        func == "fclose" || func == "fcntl" || func == "fopen" ||
×
NEW
186
        func == "fopen64" || func == "fseek" || func == "fstat" ||
×
NEW
187
        func == "fstatat" || func == "ftell" || func == "ftruncate" ||
×
NEW
188
        func == "link" || func == "lseek" || func == "lseek64" ||
×
NEW
189
        func == "mkdir" || func == "open" || func == "open64" ||
×
NEW
190
        func == "opendir" || func == "readdir" || func == "readlink" ||
×
NEW
191
        func == "remove" || func == "rename" || func == "rmdir" ||
×
NEW
192
        func == "seek" || func == "stat" || func == "unlink")
×
NEW
193
        return IO_METADATA;
×
NEW
194
    return IO_OTHER;
×
195
}
196

NEW
197
static bool str_iequal(std::string_view a, const char *b) {
×
NEW
198
    std::size_t len = std::strlen(b);
×
NEW
199
    if (a.size() != len) return false;
×
NEW
200
    for (std::size_t i = 0; i < len; ++i) {
×
NEW
201
        if (std::tolower(static_cast<unsigned char>(a[i])) !=
×
NEW
202
            static_cast<unsigned char>(b[i]))
×
NEW
203
            return false;
×
204
    }
NEW
205
    return true;
×
206
}
207

NEW
208
static bool str_contains_lower(std::string_view s, const char *needle) {
×
NEW
209
    std::size_t nlen = std::strlen(needle);
×
NEW
210
    if (s.size() < nlen) return false;
×
NEW
211
    for (std::size_t i = 0; i <= s.size() - nlen; ++i) {
×
NEW
212
        bool match = true;
×
NEW
213
        for (std::size_t j = 0; j < nlen; ++j) {
×
NEW
214
            if (std::tolower(static_cast<unsigned char>(s[i + j])) !=
×
NEW
215
                static_cast<unsigned char>(needle[j])) {
×
NEW
216
                match = false;
×
NEW
217
                break;
×
218
            }
219
        }
NEW
220
        if (match) return true;
×
221
    }
NEW
222
    return false;
×
223
}
224

225
// Normalize a raw JSON row (already parsed into yyjson) into the semantic
226
// output schema.  Appends one row to `builder` with the full set of output
227
// columns.  Returns false if the row should be skipped (no valid name).
NEW
228
static bool normalize_row(RecordBatchBuilder &builder, StringArena &arena,
×
229
                          yyjson_val *root) {
230
    // --- Extract top-level fields ---
NEW
231
    yyjson_val *v_ph = yyjson_obj_get(root, "ph");
×
NEW
232
    yyjson_val *v_name = yyjson_obj_get(root, "name");
×
NEW
233
    yyjson_val *v_cat = yyjson_obj_get(root, "cat");
×
NEW
234
    yyjson_val *v_pid = yyjson_obj_get(root, "pid");
×
NEW
235
    yyjson_val *v_tid = yyjson_obj_get(root, "tid");
×
NEW
236
    yyjson_val *v_ts = yyjson_obj_get(root, "ts");
×
NEW
237
    yyjson_val *v_dur = yyjson_obj_get(root, "dur");
×
NEW
238
    yyjson_val *v_args = yyjson_obj_get(root, "args");
×
239

240
    std::string_view ph =
NEW
241
        v_ph && yyjson_is_str(v_ph)
×
NEW
242
            ? std::string_view(yyjson_get_str(v_ph), yyjson_get_len(v_ph))
×
NEW
243
            : std::string_view();
×
244
    std::string_view name_sv =
NEW
245
        v_name && yyjson_is_str(v_name)
×
NEW
246
            ? std::string_view(yyjson_get_str(v_name), yyjson_get_len(v_name))
×
NEW
247
            : std::string_view();
×
248
    std::string_view cat_sv =
NEW
249
        v_cat && yyjson_is_str(v_cat)
×
NEW
250
            ? std::string_view(yyjson_get_str(v_cat), yyjson_get_len(v_cat))
×
NEW
251
            : std::string_view();
×
252

253
    // Helper to get args fields
NEW
254
    auto args_str = [&](const char *key) -> std::string_view {
×
NEW
255
        if (!v_args) return {};
×
NEW
256
        yyjson_val *v = yyjson_obj_get(v_args, key);
×
NEW
257
        if (!v) return {};
×
NEW
258
        if (yyjson_is_str(v)) return {yyjson_get_str(v), yyjson_get_len(v)};
×
NEW
259
        return {};
×
NEW
260
    };
×
NEW
261
    auto args_int = [&](const char *key) -> std::pair<bool, int64_t> {
×
NEW
262
        if (!v_args) return {false, 0};
×
NEW
263
        yyjson_val *v = yyjson_obj_get(v_args, key);
×
NEW
264
        if (!v) return {false, 0};
×
NEW
265
        if (yyjson_is_int(v)) return {true, yyjson_get_sint(v)};
×
NEW
266
        if (yyjson_is_uint(v))
×
NEW
267
            return {true, static_cast<int64_t>(yyjson_get_uint(v))};
×
NEW
268
        if (yyjson_is_real(v))
×
NEW
269
            return {true, static_cast<int64_t>(yyjson_get_real(v))};
×
NEW
270
        return {false, 0};
×
NEW
271
    };
×
NEW
272
    auto args_float = [&](const char *key) -> std::pair<bool, double> {
×
NEW
273
        if (!v_args) return {false, 0.0};
×
NEW
274
        yyjson_val *v = yyjson_obj_get(v_args, key);
×
NEW
275
        if (!v) return {false, 0.0};
×
NEW
276
        if (yyjson_is_real(v)) return {true, yyjson_get_real(v)};
×
NEW
277
        if (yyjson_is_int(v))
×
NEW
278
            return {true, static_cast<double>(yyjson_get_sint(v))};
×
NEW
279
        if (yyjson_is_uint(v))
×
NEW
280
            return {true, static_cast<double>(yyjson_get_uint(v))};
×
NEW
281
        return {false, 0.0};
×
NEW
282
    };
×
283

284
    // --- Type classification ---
NEW
285
    bool is_M = (ph == "M");
×
NEW
286
    bool is_C = (ph == "C");
×
NEW
287
    bool is_event = !is_M && !is_C;
×
288

NEW
289
    int8_t row_type = ROW_EVENT;
×
NEW
290
    if (is_M) {
×
NEW
291
        if (name_sv == "FH")
×
NEW
292
            row_type = ROW_FILE_HASH;
×
NEW
293
        else if (name_sv == "HH")
×
NEW
294
            row_type = ROW_HOST_HASH;
×
NEW
295
        else if (name_sv == "SH")
×
NEW
296
            row_type = ROW_STRING_HASH;
×
NEW
297
        else if (name_sv == "PR")
×
NEW
298
            row_type = ROW_PROC_METADATA;
×
299
        else
NEW
300
            row_type = ROW_METADATA;
×
NEW
301
    } else if (is_C) {
×
NEW
302
        row_type = str_iequal(cat_sv, "sys") ? ROW_SYSTEM : ROW_PROFILE;
×
303
    }
NEW
304
    bool is_hash = (row_type >= ROW_FILE_HASH && row_type <= ROW_STRING_HASH) ||
×
305
                   row_type == ROW_PROC_METADATA;
NEW
306
    bool is_profile = (row_type == ROW_PROFILE);
×
NEW
307
    bool is_sys = (row_type == ROW_SYSTEM);
×
308

309
    // Name: metadata rows use args.name if available
NEW
310
    std::string_view out_name = name_sv;
×
NEW
311
    if (is_M) {
×
NEW
312
        auto an = args_str("name");
×
NEW
313
        if (!an.empty()) out_name = an;
×
314
    }
NEW
315
    if (out_name.empty()) return false;  // skip rows without name
×
316

317
    // --- Declare all output columns (lazy — add_or_get_column handles
318
    // first-time creation) --- We use a fixed schema so column indices are
319
    // stable across rows. The builder backfills nulls for columns not touched
320
    // via end_row().
321

NEW
322
    auto ci_type = builder.add_or_get_column("type", ColumnType::INT64);
×
NEW
323
    auto ci_cat = builder.add_or_get_column("cat", ColumnType::STRING);
×
NEW
324
    auto ci_name = builder.add_or_get_column("name", ColumnType::STRING);
×
NEW
325
    auto ci_pid = builder.add_or_get_column("pid", ColumnType::INT64);
×
NEW
326
    auto ci_tid = builder.add_or_get_column("tid", ColumnType::INT64);
×
NEW
327
    auto ci_hash = builder.add_or_get_column("hash", ColumnType::STRING);
×
NEW
328
    auto ci_value = builder.add_or_get_column("value", ColumnType::STRING);
×
329
    auto ci_host_hash =
NEW
330
        builder.add_or_get_column("host_hash", ColumnType::STRING);
×
331
    auto ci_file_hash =
NEW
332
        builder.add_or_get_column("file_hash", ColumnType::STRING);
×
NEW
333
    auto ci_epoch = builder.add_or_get_column("epoch", ColumnType::INT64);
×
NEW
334
    auto ci_step = builder.add_or_get_column("step", ColumnType::INT64);
×
NEW
335
    auto ci_ts = builder.add_or_get_column("ts", ColumnType::INT64);
×
NEW
336
    auto ci_dur = builder.add_or_get_column("dur", ColumnType::INT64);
×
NEW
337
    auto ci_te = builder.add_or_get_column("te", ColumnType::INT64);
×
NEW
338
    auto ci_trange = builder.add_or_get_column("trange", ColumnType::INT64);
×
NEW
339
    auto ci_io_cat = builder.add_or_get_column("io_cat", ColumnType::INT64);
×
NEW
340
    auto ci_size = builder.add_or_get_column("size", ColumnType::INT64);
×
NEW
341
    auto ci_offset = builder.add_or_get_column("offset", ColumnType::INT64);
×
NEW
342
    auto ci_image_id = builder.add_or_get_column("image_id", ColumnType::INT64);
×
343

344
    // --- Populate core columns ---
NEW
345
    builder.append_int64(ci_type, row_type);
×
346

347
    // cat (lowercased) — write into arena
NEW
348
    if (!cat_sv.empty()) {
×
349
        char lbuf[256];
NEW
350
        std::size_t clen = std::min(cat_sv.size(), sizeof(lbuf));
×
NEW
351
        for (std::size_t i = 0; i < clen; ++i)
×
NEW
352
            lbuf[i] = static_cast<char>(
×
NEW
353
                std::tolower(static_cast<unsigned char>(cat_sv[i])));
×
NEW
354
        builder.append_string(ci_cat, arena.push(lbuf, clen));
×
355
    } else {
NEW
356
        builder.append_null(ci_cat);
×
357
    }
358

NEW
359
    builder.append_string(ci_name, out_name);
×
360

NEW
361
    if (v_pid && (yyjson_is_int(v_pid) || yyjson_is_uint(v_pid)))
×
NEW
362
        builder.append_int64(ci_pid, yyjson_get_sint(v_pid));
×
363
    // else: null via end_row backfill
364

NEW
365
    if (v_tid && (yyjson_is_int(v_tid) || yyjson_is_uint(v_tid)))
×
NEW
366
        builder.append_int64(ci_tid, yyjson_get_sint(v_tid));
×
367

368
    // hash / value
NEW
369
    auto a_value = args_str("value");
×
NEW
370
    if (is_hash && !a_value.empty()) builder.append_string(ci_hash, a_value);
×
NEW
371
    if (row_type == ROW_METADATA && !a_value.empty())
×
NEW
372
        builder.append_string(ci_value, a_value);
×
373

374
    // host_hash / file_hash
NEW
375
    auto a_hhash = args_str("hhash");
×
NEW
376
    if (!a_hhash.empty()) builder.append_string(ci_host_hash, a_hhash);
×
NEW
377
    auto a_fhash = args_str("fhash");
×
NEW
378
    if (!a_fhash.empty()) builder.append_string(ci_file_hash, a_fhash);
×
379

380
    // epoch / step
NEW
381
    auto [has_epoch, epoch_v] = args_int("epoch");
×
NEW
382
    if (has_epoch && epoch_v >= 0) builder.append_int64(ci_epoch, epoch_v);
×
NEW
383
    auto [has_step, step_v] = args_int("step");
×
NEW
384
    if (has_step && step_v >= 0) builder.append_int64(ci_step, step_v);
×
385

386
    // --- Temporal ---
NEW
387
    bool has_ts = (is_event || is_C) && v_ts &&
×
NEW
388
                  (yyjson_is_int(v_ts) || yyjson_is_uint(v_ts));
×
NEW
389
    bool has_dur = v_dur && (yyjson_is_int(v_dur) || yyjson_is_uint(v_dur));
×
NEW
390
    int64_t ts_val = 0, dur_val = 0;
×
NEW
391
    if (has_ts) {
×
NEW
392
        ts_val = yyjson_get_sint(v_ts);
×
NEW
393
        builder.append_int64(ci_ts, ts_val);
×
394
    }
NEW
395
    if (is_event && has_ts && has_dur) {
×
NEW
396
        dur_val = yyjson_get_sint(v_dur);
×
NEW
397
        builder.append_int64(ci_dur, dur_val);
×
NEW
398
        builder.append_int64(ci_te, ts_val + dur_val);
×
399
    }
400

401
    // --- IO columns (events only) ---
NEW
402
    if (is_event) {
×
403
        bool is_posix_stdio =
NEW
404
            str_iequal(cat_sv, "posix") || str_iequal(cat_sv, "stdio");
×
NEW
405
        int8_t io_cat = IO_OTHER;
×
406

407
        // size priority: size_sum > POSIX ret > image_size
NEW
408
        auto [has_ss, ss_val] = args_int("size_sum");
×
NEW
409
        if (has_ss) {
×
NEW
410
            builder.append_int64(ci_size, ss_val);
×
NEW
411
            if (is_posix_stdio) io_cat = get_io_cat(name_sv);
×
NEW
412
        } else if (is_posix_stdio) {
×
NEW
413
            io_cat = get_io_cat(name_sv);
×
NEW
414
            auto [has_ret, ret_val] = args_int("ret");
×
NEW
415
            if (has_ret && ret_val > 0 &&
×
NEW
416
                (io_cat == IO_READ || io_cat == IO_WRITE))
×
NEW
417
                builder.append_int64(ci_size, ret_val);
×
NEW
418
            auto [has_ofs, ofs_val] = args_int("offset");
×
NEW
419
            if (has_ofs && ofs_val >= 0)
×
NEW
420
                builder.append_int64(ci_offset, ofs_val);
×
421
        } else {
NEW
422
            auto [has_img, img_val] = args_int("image_idx");
×
NEW
423
            if (has_img && img_val > 0)
×
NEW
424
                builder.append_int64(ci_image_id, img_val);
×
NEW
425
            auto [has_ims, ims_val] = args_int("image_size");
×
NEW
426
            if (has_ims && ims_val > 0 && !str_contains_lower(name_sv, "open"))
×
NEW
427
                builder.append_int64(ci_size, ims_val);
×
428
        }
NEW
429
        builder.append_int64(ci_io_cat, io_cat);
×
430
    }
431

432
    // --- Profile columns ---
NEW
433
    if (is_profile) {
×
434
        bool is_posix_stdio =
NEW
435
            str_iequal(cat_sv, "posix") || str_iequal(cat_sv, "stdio");
×
NEW
436
        int8_t io_cat = is_posix_stdio ? get_io_cat(name_sv) : IO_OTHER;
×
NEW
437
        builder.append_int64(ci_io_cat, io_cat);
×
438

439
        static const char *profile_keys[] = {
440
            "count",      "count_max",  "count_min",  "count_sum",
441
            "dft_cnt",    "dur",        "dur_max",    "dur_min",
442
            "dur_sum",    "epoch",      "flags",      "offset",
443
            "offset_max", "offset_min", "offset_sum", "ret",
444
            "ret_max",    "ret_min",    "ret_sum",    "whence",
445
            "whence_max", "whence_min", "whence_sum", nullptr};
NEW
446
        for (const char **pk = profile_keys; *pk; ++pk) {
×
NEW
447
            auto [has_v, val] = args_int(*pk);
×
NEW
448
            if (has_v) {
×
NEW
449
                auto idx = builder.add_or_get_column(*pk, ColumnType::INT64);
×
NEW
450
                builder.append_int64(idx, val);
×
451
            }
452
        }
453
    }
454

455
    // --- System columns ---
NEW
456
    if (is_sys) {
×
457
        static const char *sys_keys[] = {
458
            "user_pct", "system_pct",  "iowait_pct",   "idle_pct",
459
            "irq_pct",  "softirq_pct", "MemAvailable", "MemFree",
460
            "Cached",   "Dirty",       "Active",       nullptr};
NEW
461
        for (const char **sk = sys_keys; *sk; ++sk) {
×
NEW
462
            auto [has_v, val] = args_float(*sk);
×
NEW
463
            if (has_v) {
×
NEW
464
                auto idx = builder.add_or_get_column(*sk, ColumnType::DOUBLE);
×
NEW
465
                builder.append_double(idx, val);
×
466
            }
467
        }
468
    }
469

NEW
470
    builder.end_row();
×
NEW
471
    return true;
×
472
}
473

474
// Flatten a yyjson object into "prefix.key" columns using native types.
475
// On type mismatch (same key, different type across rows), appends null.
476
static void flatten_object_into(RecordBatchBuilder &builder, StringArena &arena,
×
477
                                std::string_view prefix, yyjson_val *obj) {
478
    char key_buf[512];
479

480
    yyjson_obj_iter sub_iter;
481
    yyjson_obj_iter_init(obj, &sub_iter);
482
    yyjson_val *sub_key;
483
    while ((sub_key = yyjson_obj_iter_next(&sub_iter))) {
×
484
        yyjson_val *sub_val = yyjson_obj_iter_get_val(sub_key);
×
485
        const char *sk_str = yyjson_get_str(sub_key);
×
486
        std::size_t sk_len = yyjson_get_len(sub_key);
487

488
        std::size_t needed = prefix.size() + 1 + sk_len;
489
        if (needed >= sizeof(key_buf)) continue;
×
490
        std::memcpy(key_buf, prefix.data(), prefix.size());
491
        key_buf[prefix.size()] = '.';
492
        std::memcpy(key_buf + prefix.size() + 1, sk_str, sk_len);
493
        std::string_view full_key(key_buf, needed);
494

495
        if (yyjson_is_int(sub_val)) {
×
496
            auto idx = builder.add_or_get_column(full_key, ColumnType::INT64);
×
497
            if (builder.column_type(idx) == ColumnType::INT64)
×
498
                builder.append_int64(idx, yyjson_get_sint(sub_val));
×
499
            else
500
                builder.append_null(idx);
×
501
        } else if (yyjson_is_uint(sub_val)) {
×
502
            auto idx = builder.add_or_get_column(full_key, ColumnType::UINT64);
×
503
            if (builder.column_type(idx) == ColumnType::UINT64)
×
504
                builder.append_uint64(idx, yyjson_get_uint(sub_val));
×
505
            else
506
                builder.append_null(idx);
×
507
        } else if (yyjson_is_real(sub_val)) {
×
508
            auto idx = builder.add_or_get_column(full_key, ColumnType::DOUBLE);
×
509
            if (builder.column_type(idx) == ColumnType::DOUBLE)
×
510
                builder.append_double(idx, yyjson_get_real(sub_val));
×
511
            else
512
                builder.append_null(idx);
×
513
        } else if (yyjson_is_bool(sub_val)) {
×
514
            auto idx = builder.add_or_get_column(full_key, ColumnType::BOOL);
×
515
            if (builder.column_type(idx) == ColumnType::BOOL)
×
516
                builder.append_bool(idx, yyjson_get_bool(sub_val));
×
517
            else
518
                builder.append_null(idx);
×
519
        } else if (yyjson_is_str(sub_val)) {
×
520
            auto idx = builder.add_or_get_column(full_key, ColumnType::STRING);
×
521
            if (builder.column_type(idx) == ColumnType::STRING)
×
522
                builder.append_string(
×
523
                    idx, std::string_view(yyjson_get_str(sub_val),
524
                                          yyjson_get_len(sub_val)));
525
            else
526
                builder.append_null(idx);
×
527
        } else if (yyjson_is_null(sub_val)) {
×
528
            auto existing = builder.find_column(full_key);
×
529
            if (existing) builder.append_null(*existing);
×
530
        } else {
531
            // nested object/array: serialize
532
            std::size_t json_len;
533
            char *json_str = yyjson_val_write(sub_val, 0, &json_len);
534
            auto idx = builder.add_or_get_column(full_key, ColumnType::STRING);
×
535
            if (json_str) {
×
536
                builder.append_string(idx, arena.push(json_str, json_len));
×
537
                free(json_str);
538
            } else {
539
                builder.append_null(idx);
×
540
            }
541
        }
542
    }
543
}
544

545
CoroTask<void> produce_arrow_batches(std::shared_ptr<ArrowIteratorState> state,
1,800!
546
                                     TraceReaderConfig cfg, ReadConfig rc,
547
                                     std::size_t batch_size,
548
                                     bool flatten_objects = false,
549
                                     bool normalize = false) {
14!
550
    auto *sp = state.get();
14✔
551
    try {
552
        TraceReader reader(std::move(cfg));
14!
553
        auto gen = reader.read_lines(rc);
14!
554
        RecordBatchBuilder builder;
14✔
555
        builder.reserve(batch_size);
14!
556

557
        std::vector<yyjson_doc *> held_docs;
14✔
558
        StringArena arena;
14!
559
        held_docs.reserve(batch_size);
14!
560

561
        while (auto opt = co_await gen.next()) {
1,758!
562
            if (sp->cancelled.load(std::memory_order_acquire)) break;
422!
563

564
            const char *trimmed;
422✔
565
            std::size_t trimmed_length;
422✔
566
            if (!dftracer::utils::json_trim_and_validate(
422!
567
                    opt->content.data(), opt->content.size(), trimmed,
422✔
568
                    trimmed_length)) {
569
                continue;
26✔
570
            }
571

572
            yyjson_doc *doc = yyjson_read(trimmed, trimmed_length, 0);
396!
573
            if (!doc) continue;
396!
574

575
            yyjson_val *root = yyjson_doc_get_root(doc);
396!
576
            if (!root || !yyjson_is_obj(root)) {
396!
577
                yyjson_doc_free(doc);
×
578
                continue;
579
            }
580

581
            if (normalize) {
396!
582
                // Produce the semantic output schema directly.
583
                // normalize_row calls end_row() internally.
584
                if (!normalize_row(builder, arena, root)) {
×
585
                    yyjson_doc_free(doc);
×
586
                    continue;
587
                }
588
                held_docs.push_back(doc);
×
589
            } else {
590
                yyjson_obj_iter iter;
396✔
591
                yyjson_obj_iter_init(root, &iter);
396!
592
                yyjson_val *key;
396✔
593
                while ((key = yyjson_obj_iter_next(&iter))) {
3,564!
594
                    yyjson_val *val = yyjson_obj_iter_get_val(key);
3,168!
595
                    const char *key_str = yyjson_get_str(key);
3,168!
596
                    std::size_t key_len = yyjson_get_len(key);
3,168!
597
                    std::string_view key_sv(key_str, key_len);
3,168✔
598

599
                    if (yyjson_is_int(val)) {
3,168!
600
                        std::size_t idx = builder.add_or_get_column(
3,168!
601
                            key_sv, ColumnType::INT64);
1,584✔
602
                        builder.append_int64(idx, yyjson_get_sint(val));
1,584!
603
                    } else if (yyjson_is_uint(val)) {
3,168!
604
                        std::size_t idx = builder.add_or_get_column(
×
605
                            key_sv, ColumnType::UINT64);
606
                        builder.append_uint64(idx, yyjson_get_uint(val));
×
607
                    } else if (yyjson_is_real(val)) {
1,584!
608
                        std::size_t idx = builder.add_or_get_column(
×
609
                            key_sv, ColumnType::DOUBLE);
610
                        builder.append_double(idx, yyjson_get_real(val));
×
611
                    } else if (yyjson_is_bool(val)) {
1,584!
612
                        std::size_t idx =
613
                            builder.add_or_get_column(key_sv, ColumnType::BOOL);
×
614
                        builder.append_bool(idx, yyjson_get_bool(val));
×
615
                    } else if (yyjson_is_str(val)) {
1,584!
616
                        std::size_t idx = builder.add_or_get_column(
2,376!
617
                            key_sv, ColumnType::STRING);
1,188✔
618
                        builder.append_string(
2,376!
619
                            idx, std::string_view(yyjson_get_str(val),
3,564!
620
                                                  yyjson_get_len(val)));
1,188!
621
                    } else if (yyjson_is_null(val)) {
1,584!
622
                        auto existing = builder.find_column(key_sv);
×
623
                        if (existing) builder.append_null(*existing);
×
624
                    } else {
625
                        std::size_t json_len;
396✔
626
                        char *json_str = yyjson_val_write(val, 0, &json_len);
396!
627
                        std::size_t idx = builder.add_or_get_column(
792!
628
                            key_sv, ColumnType::STRING);
396✔
629
                        if (json_str) {
396!
630
                            builder.append_string(
396!
631
                                idx, arena.push(json_str, json_len));
792!
632
                            free(json_str);
396!
633
                        } else {
396✔
634
                            builder.append_null(idx);
×
635
                        }
636
                    }
396✔
637
                }
3,168✔
638
                builder.end_row();
396!
639
                held_docs.push_back(doc);
396!
640
            }  // end else (raw path)
396✔
641

642
            if (builder.num_rows() >= batch_size) {
396✔
643
                auto result = builder.finish();
7!
644
                for (auto *d : held_docs) yyjson_doc_free(d);
177!
645
                held_docs.clear();
7✔
646
                arena.clear();
7!
647

648
                {
649
                    std::unique_lock<std::mutex> lock(sp->mtx);
7!
650
                    sp->cv_producer.wait(lock, [sp] {
28!
651
                        return sp->queue.size() < sp->max_queue_size ||
14!
652
                               sp->cancelled.load(std::memory_order_acquire);
7✔
653
                    });
654
                    if (sp->cancelled.load(std::memory_order_acquire)) break;
7!
655
                    sp->queue.push(std::move(result));
7!
656
                }
7!
657
                sp->cv_consumer.notify_one();
7✔
658
                builder.reset(false);
7!
659
                builder.reserve(batch_size);
7!
660
            }
7!
661
        }
436✔
662

663
        if (builder.num_rows() > 0) {
14!
664
            auto result = builder.finish();
14!
665
            for (auto *d : held_docs) yyjson_doc_free(d);
240!
666
            held_docs.clear();
14✔
667
            arena.clear();
14!
668
            {
669
                std::lock_guard<std::mutex> lock(sp->mtx);
14!
670
                sp->queue.push(std::move(result));
14!
671
            }
14✔
672
            sp->cv_consumer.notify_one();
14✔
673
        } else {
14✔
674
            for (auto *d : held_docs) yyjson_doc_free(d);
×
675
        }
676
    } catch (...) {
886✔
677
        std::lock_guard<std::mutex> lock(sp->mtx);
×
678
        sp->error = std::current_exception();
679
        sp->queue.push(std::nullopt);
×
680
        sp->done.store(true, std::memory_order_release);
681
        sp->cv_consumer.notify_one();
682
        co_return;
683
    }
×
684
    {
685
        std::lock_guard<std::mutex> lock(sp->mtx);
14!
686
        sp->queue.push(std::nullopt);
14!
687
        sp->done.store(true, std::memory_order_release);
14✔
688
    }
14✔
689
    sp->cv_consumer.notify_one();
14✔
690
}
2,658!
691

692
#endif  // DFTRACER_UTILS_ENABLE_ARROW
693

694
TraceReaderConfig build_config(TraceReaderObject *self) {
218✔
695
    TraceReaderConfig cfg;
218✔
696
    cfg.file_path = PyUnicode_AsUTF8(self->file_path);
218!
697
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
218!
698
    if (idx) cfg.index_dir = idx;
218!
699
    cfg.checkpoint_size = self->checkpoint_size;
218✔
700
    cfg.auto_build_index = self->auto_build_index != 0;
218✔
701
    cfg.index_threshold = self->index_threshold;
218✔
702
    return cfg;
218✔
703
}
109!
704

705
static Runtime *get_runtime(TraceReaderObject *self) {
180✔
706
    if (self->runtime_obj) {
180✔
707
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
18✔
708
    }
709
    return get_default_runtime();
162✔
710
}
90✔
711

712
static TraceReaderIteratorObject *make_iterator(
152✔
713
    std::shared_ptr<IteratorState> state, IteratorMode mode) {
714
    TraceReaderIteratorObject *it =
76✔
715
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
152✔
716
            &TraceReaderIteratorType, 0);
717
    if (!it) return NULL;
152✔
718
    new (&it->state) std::shared_ptr<IteratorState>(std::move(state));
152✔
719
#ifdef DFTRACER_UTILS_ENABLE_ARROW
720
    new (&it->arrow_state) std::shared_ptr<ArrowIteratorState>();
152✔
721
#endif
722
    it->mode = mode;
152✔
723
    return it;
152✔
724
}
76✔
725

726
#ifdef DFTRACER_UTILS_ENABLE_ARROW
727
static TraceReaderIteratorObject *make_arrow_iterator(
28✔
728
    std::shared_ptr<ArrowIteratorState> state) {
729
    TraceReaderIteratorObject *it =
14✔
730
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
28✔
731
            &TraceReaderIteratorType, 0);
732
    if (!it) return NULL;
28✔
733
    new (&it->state) std::shared_ptr<IteratorState>();
28✔
734
    new (&it->arrow_state)
28✔
735
        std::shared_ptr<ArrowIteratorState>(std::move(state));
28✔
736
    it->mode = IteratorMode::ARROW;
28✔
737
    return it;
28✔
738
}
14✔
739
#endif
740

741
}  // namespace
742

743
using dftracer::utils::python::wrap_arrow_table;
744

745
static void TraceReader_dealloc(TraceReaderObject *self) {
204✔
746
    Py_XDECREF(self->file_path);
204✔
747
    Py_XDECREF(self->index_dir);
204✔
748
    Py_XDECREF(self->runtime_obj);
204✔
749
    Py_TYPE(self)->tp_free((PyObject *)self);
204✔
750
}
204✔
751

752
static PyObject *TraceReader_new(PyTypeObject *type, PyObject *args,
204✔
753
                                 PyObject *kwds) {
754
    TraceReaderObject *self = (TraceReaderObject *)type->tp_alloc(type, 0);
204✔
755
    if (self) {
204✔
756
        self->file_path = NULL;
204✔
757
        self->index_dir = NULL;
204✔
758
        self->checkpoint_size = 32 * 1024 * 1024;
204✔
759
        self->auto_build_index = 0;
204✔
760
        self->index_threshold =
204✔
761
            dftracer::utils::constants::indexer::DEFAULT_INDEX_SIZE_THRESHOLD;
762
        self->has_index = 0;
204✔
763
        self->runtime_obj = NULL;
204✔
764
    }
102✔
765
    return (PyObject *)self;
204✔
766
}
767

768
static int TraceReader_init(TraceReaderObject *self, PyObject *args,
204✔
769
                            PyObject *kwds) {
770
    static const char *kwlist[] = {"file_path",
771
                                   "index_dir",
772
                                   "checkpoint_size",
773
                                   "auto_build_index",
774
                                   "index_threshold",
775
                                   "runtime",
776
                                   NULL};
777

778
    const char *file_path;
779
    const char *index_dir = "";
204✔
780
    std::size_t checkpoint_size = 32 * 1024 * 1024;
204✔
781
    int auto_build_index = 0;
204✔
782
    std::size_t index_threshold =
204✔
783
        dftracer::utils::constants::indexer::DEFAULT_INDEX_SIZE_THRESHOLD;
784
    PyObject *runtime_arg = NULL;
204✔
785

786
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|snpnO", (char **)kwlist,
204!
787
                                     &file_path, &index_dir, &checkpoint_size,
788
                                     &auto_build_index, &index_threshold,
789
                                     &runtime_arg)) {
UNCOV
790
        return -1;
×
791
    }
792

793
    if (runtime_arg && runtime_arg != Py_None) {
204✔
794
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
14!
795
            // Direct C++ Runtime object
UNCOV
796
            Py_INCREF(runtime_arg);
×
797
            self->runtime_obj = runtime_arg;
×
798
        } else {
799
            // Python wrapper, extract _native attribute
800
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
14!
801
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
14!
802
                self->runtime_obj = native;  // already incref'd by GetAttr
14✔
803
            } else {
7✔
804
                Py_XDECREF(native);
×
UNCOV
805
                PyErr_SetString(PyExc_TypeError,
×
806
                                "runtime must be a Runtime instance or None");
UNCOV
807
                return -1;
×
808
            }
809
        }
810
    }
7✔
811

812
    self->file_path = PyUnicode_FromString(file_path);
204!
813
    if (!self->file_path) return -1;
204✔
814

815
    self->index_dir = PyUnicode_FromString(index_dir);
204!
816
    if (!self->index_dir) {
204✔
UNCOV
817
        Py_DECREF(self->file_path);
×
UNCOV
818
        self->file_path = NULL;
×
UNCOV
819
        return -1;
×
820
    }
821

822
    self->checkpoint_size = checkpoint_size;
204✔
823
    self->auto_build_index = auto_build_index;
204✔
824
    self->index_threshold = index_threshold;
204✔
825

826
    try {
827
        TraceReaderConfig cfg;
204✔
828
        cfg.file_path = file_path;
204!
829
        cfg.index_dir = index_dir;
204!
830
        cfg.checkpoint_size = checkpoint_size;
204✔
831
        cfg.auto_build_index = auto_build_index != 0;
204✔
832
        cfg.index_threshold = index_threshold;
204✔
833
        TraceReader probe(std::move(cfg));
204!
834
        self->has_index = probe.has_index() ? 1 : 0;
204!
835
    } catch (const std::exception &e) {
204!
UNCOV
836
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
UNCOV
837
        Py_DECREF(self->file_path);
×
UNCOV
838
        Py_DECREF(self->index_dir);
×
UNCOV
839
        self->file_path = NULL;
×
UNCOV
840
        self->index_dir = NULL;
×
UNCOV
841
        return -1;
×
UNCOV
842
    }
×
843

844
    return 0;
204✔
845
}
102✔
846

847
static PyObject *TraceReader_iter_lines(TraceReaderObject *self, PyObject *args,
118✔
848
                                        PyObject *kwds) {
849
    static const char *kwlist[] = {"start_line", "end_line",    "start_byte",
850
                                   "end_byte",   "buffer_size", "query",
851
                                   NULL};
852
    Py_ssize_t start_line = 0, end_line = 0;
118✔
853
    Py_ssize_t start_byte = 0, end_byte = 0;
118✔
854
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
118✔
855
    const char *query_str = NULL;
118✔
856

857
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnz", (char **)kwlist,
118!
858
                                     &start_line, &end_line, &start_byte,
859
                                     &end_byte, &buffer_size, &query_str)) {
UNCOV
860
        return NULL;
×
861
    }
862

863
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
118!
864
        buffer_size <= 0) {
112!
865
        PyErr_SetString(
6!
866
            PyExc_ValueError,
3✔
867
            "range arguments must be >= 0; buffer_size must be > 0");
868
        return NULL;
6✔
869
    }
870

871
    TraceReaderConfig cfg;
112✔
872
    try {
873
        cfg = build_config(self);
112!
874
    } catch (const std::exception &e) {
56!
UNCOV
875
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
UNCOV
876
        return NULL;
×
UNCOV
877
    }
×
878

879
    ReadConfig rc;
112✔
880
    rc.start_line = static_cast<std::size_t>(start_line);
112✔
881
    rc.end_line = static_cast<std::size_t>(end_line);
112✔
882
    rc.start_byte = static_cast<std::size_t>(start_byte);
112✔
883
    rc.end_byte = static_cast<std::size_t>(end_byte);
112✔
884
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
112✔
885
    if (query_str) rc.query = query_str;
112!
886

887
    auto state = std::make_shared<IteratorState>();
112!
888

889
    Runtime *rt = get_runtime(self);
112!
890
    try {
891
        rt->submit(produce_lines(state, cfg, rc), "iter_lines");
112!
892
    } catch (const std::exception &e) {
56!
UNCOV
893
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
UNCOV
894
        return NULL;
×
UNCOV
895
    }
×
896

897
    TraceReaderIteratorObject *it = make_iterator(state, IteratorMode::LINES);
112!
898
    return (PyObject *)it;
112✔
899
}
115✔
900

901
static PyObject *TraceReader_iter_raw(TraceReaderObject *self, PyObject *args,
22✔
902
                                      PyObject *kwds) {
903
    static const char *kwlist[] = {"start_line", "end_line",    "start_byte",
904
                                   "end_byte",   "buffer_size", "line_aligned",
905
                                   "multi_line", "query",       NULL};
906
    Py_ssize_t start_line = 0, end_line = 0;
22✔
907
    Py_ssize_t start_byte = 0, end_byte = 0;
22✔
908
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
22✔
909
    int line_aligned = 1;
22✔
910
    int multi_line = 1;
22✔
911
    const char *query_str = NULL;
22✔
912

913
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnppz", (char **)kwlist,
22!
914
                                     &start_line, &end_line, &start_byte,
915
                                     &end_byte, &buffer_size, &line_aligned,
916
                                     &multi_line, &query_str)) {
UNCOV
917
        return NULL;
×
918
    }
919

920
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
22!
921
        buffer_size <= 0) {
20!
922
        PyErr_SetString(
2!
923
            PyExc_ValueError,
1✔
924
            "range arguments must be >= 0; buffer_size must be > 0");
925
        return NULL;
2✔
926
    }
927

928
    TraceReaderConfig cfg;
20✔
929
    try {
930
        cfg = build_config(self);
20!
931
    } catch (const std::exception &e) {
10!
UNCOV
932
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
UNCOV
933
        return NULL;
×
UNCOV
934
    }
×
935

936
    ReadConfig rc;
20✔
937
    rc.start_line = static_cast<std::size_t>(start_line);
20✔
938
    rc.end_line = static_cast<std::size_t>(end_line);
20✔
939
    rc.start_byte = static_cast<std::size_t>(start_byte);
20✔
940
    rc.end_byte = static_cast<std::size_t>(end_byte);
20✔
941
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
20✔
942
    rc.line_aligned = line_aligned != 0;
20✔
943
    rc.multi_line = multi_line != 0;
20✔
944
    if (query_str) rc.query = query_str;
20!
945

946
    auto state = std::make_shared<IteratorState>();
20!
947

948
    Runtime *rt = get_runtime(self);
20!
949
    try {
950
        rt->submit(produce_raw(state, cfg, rc), "iter_raw");
20!
951
    } catch (const std::exception &e) {
10!
UNCOV
952
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
UNCOV
953
        return NULL;
×
UNCOV
954
    }
×
955

956
    TraceReaderIteratorObject *it = make_iterator(state, IteratorMode::RAW);
20!
957
    return (PyObject *)it;
20✔
958
}
21✔
959

960
static PyObject *TraceReader_read_lines(TraceReaderObject *self, PyObject *args,
90✔
961
                                        PyObject *kwds) {
962
    PyObject *iter = TraceReader_iter_lines(self, args, kwds);
90✔
963
    if (!iter) return NULL;
90✔
964
    PyObject *list = PySequence_List(iter);
86✔
965
    Py_DECREF(iter);
43✔
966
    return list;
86✔
967
}
45✔
968

969
static PyObject *TraceReader_read_raw(TraceReaderObject *self, PyObject *args,
8✔
970
                                      PyObject *kwds) {
971
    PyObject *iter = TraceReader_iter_raw(self, args, kwds);
8✔
972
    if (!iter) return NULL;
8✔
973
    PyObject *list = PySequence_List(iter);
8✔
974
    Py_DECREF(iter);
4✔
975
    return list;
8✔
976
}
4✔
977

978
static PyObject *TraceReader_iter_lines_json(TraceReaderObject *self,
20✔
979
                                             PyObject *args, PyObject *kwds) {
980
    static const char *kwlist[] = {"start_line", "end_line",    "start_byte",
981
                                   "end_byte",   "buffer_size", "query",
982
                                   NULL};
983
    Py_ssize_t start_line = 0, end_line = 0;
20✔
984
    Py_ssize_t start_byte = 0, end_byte = 0;
20✔
985
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
20✔
986
    const char *query_str = NULL;
20✔
987

988
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnz", (char **)kwlist,
20!
989
                                     &start_line, &end_line, &start_byte,
990
                                     &end_byte, &buffer_size, &query_str)) {
UNCOV
991
        return NULL;
×
992
    }
993

994
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
20!
995
        buffer_size <= 0) {
20!
UNCOV
996
        PyErr_SetString(
×
997
            PyExc_ValueError,
998
            "range arguments must be >= 0; buffer_size must be > 0");
UNCOV
999
        return NULL;
×
1000
    }
1001

1002
    TraceReaderConfig cfg;
20✔
1003
    try {
1004
        cfg = build_config(self);
20!
1005
    } catch (const std::exception &e) {
10!
1006
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
UNCOV
1007
        return NULL;
×
UNCOV
1008
    }
×
1009

1010
    ReadConfig rc;
20✔
1011
    rc.start_line = static_cast<std::size_t>(start_line);
20✔
1012
    rc.end_line = static_cast<std::size_t>(end_line);
20✔
1013
    rc.start_byte = static_cast<std::size_t>(start_byte);
20✔
1014
    rc.end_byte = static_cast<std::size_t>(end_byte);
20✔
1015
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
20✔
1016
    if (query_str) rc.query = query_str;
20!
1017

1018
    auto state = std::make_shared<IteratorState>();
20!
1019

1020
    Runtime *rt = get_runtime(self);
20!
1021
    try {
1022
        rt->submit(produce_lines(state, cfg, rc), "iter_lines_json");
20!
1023
    } catch (const std::exception &e) {
10!
UNCOV
1024
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
UNCOV
1025
        return NULL;
×
UNCOV
1026
    }
×
1027

1028
    TraceReaderIteratorObject *it = make_iterator(state, IteratorMode::JSON);
20!
1029
    return (PyObject *)it;
20✔
1030
}
20✔
1031

1032
static PyObject *TraceReader_read_lines_json(TraceReaderObject *self,
14✔
1033
                                             PyObject *args, PyObject *kwds) {
1034
    PyObject *iter = TraceReader_iter_lines_json(self, args, kwds);
14✔
1035
    if (!iter) return NULL;
14✔
1036
    PyObject *list = PySequence_List(iter);
14✔
1037
    Py_DECREF(iter);
7✔
1038
    return list;
14✔
1039
}
7✔
1040

1041
#ifdef DFTRACER_UTILS_ENABLE_ARROW
1042

1043
static PyObject *TraceReader_iter_arrow(TraceReaderObject *self, PyObject *args,
28✔
1044
                                        PyObject *kwds) {
1045
    static const char *kwlist[] = {
1046
        "batch_size", "start_line",  "end_line", "start_byte",
1047
        "end_byte",   "buffer_size", "query",    "flatten_objects",
1048
        "normalize",  NULL};
1049
    Py_ssize_t batch_size = 10000;
28✔
1050
    Py_ssize_t start_line = 0, end_line = 0;
28✔
1051
    Py_ssize_t start_byte = 0, end_byte = 0;
28✔
1052
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
28✔
1053
    const char *query_str = NULL;
28✔
1054
    int flatten_objects = 0;
28✔
1055
    int normalize = 0;
28✔
1056

1057
    if (!PyArg_ParseTupleAndKeywords(
28!
1058
            args, kwds, "|nnnnnnzpp", (char **)kwlist, &batch_size, &start_line,
14✔
1059
            &end_line, &start_byte, &end_byte, &buffer_size, &query_str,
1060
            &flatten_objects, &normalize)) {
NEW
1061
        return NULL;
×
1062
    }
1063

1064
    if (batch_size <= 0) {
28!
NEW
1065
        PyErr_SetString(PyExc_ValueError, "batch_size must be > 0");
×
NEW
1066
        return NULL;
×
1067
    }
1068
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
28!
1069
        buffer_size <= 0) {
28!
UNCOV
1070
        PyErr_SetString(
×
1071
            PyExc_ValueError,
1072
            "range arguments must be >= 0; buffer_size must be > 0");
1073
        return NULL;
×
1074
    }
1075

1076
    TraceReaderConfig cfg;
28✔
1077
    try {
1078
        cfg = build_config(self);
28!
1079
    } catch (const std::exception &e) {
14!
1080
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
UNCOV
1081
        return NULL;
×
UNCOV
1082
    }
×
1083

1084
    ReadConfig rc;
28✔
1085
    rc.start_line = static_cast<std::size_t>(start_line);
28✔
1086
    rc.end_line = static_cast<std::size_t>(end_line);
28✔
1087
    rc.start_byte = static_cast<std::size_t>(start_byte);
28✔
1088
    rc.end_byte = static_cast<std::size_t>(end_byte);
28✔
1089
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
28✔
1090
    if (query_str) rc.query = query_str;
28!
1091

1092
    auto state = std::make_shared<ArrowIteratorState>();
28!
1093

1094
    Runtime *rt = get_runtime(self);
28!
1095
    try {
1096
        rt->submit(produce_arrow_batches(state, cfg, rc,
42!
1097
                                         static_cast<std::size_t>(batch_size),
14✔
1098
                                         flatten_objects != 0, normalize != 0),
14✔
1099
                   "iter_arrow");
14!
1100
    } catch (const std::exception &e) {
14!
UNCOV
1101
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
UNCOV
1102
        return NULL;
×
UNCOV
1103
    }
×
1104

1105
    TraceReaderIteratorObject *it = make_arrow_iterator(std::move(state));
28!
1106
    return (PyObject *)it;
28✔
1107
}
28✔
1108

1109
static PyObject *TraceReader_read_arrow(TraceReaderObject *self, PyObject *args,
10✔
1110
                                        PyObject *kwds) {
1111
    PyObject *iter = TraceReader_iter_arrow(self, args, kwds);
10✔
1112
    if (!iter) return NULL;
10✔
1113
    PyObject *list = PySequence_List(iter);
10✔
1114
    Py_DECREF(iter);
5✔
1115
    if (!list) return NULL;
10✔
1116

1117
    return wrap_arrow_table(list);
10✔
1118
}
5✔
1119

1120
#endif  // DFTRACER_UTILS_ENABLE_ARROW
1121

1122
static PyObject *TraceReader_enter(TraceReaderObject *self,
8!
1123
                                   PyObject *Py_UNUSED(ignored)) {
1124
    Py_INCREF(self);
4✔
1125
    return (PyObject *)self;
8✔
1126
}
1127

1128
static PyObject *TraceReader_exit(TraceReaderObject *self, PyObject *args) {
6✔
1129
    Py_RETURN_NONE;
6✔
1130
}
1131

1132
static PyObject *TraceReader_get_file_path(TraceReaderObject *self,
16✔
1133
                                           void *closure) {
1134
    Py_INCREF(self->file_path);
16!
1135
    return self->file_path;
16✔
1136
}
1137

1138
static PyObject *TraceReader_get_index_dir(TraceReaderObject *self,
6✔
1139
                                           void *closure) {
1140
    Py_INCREF(self->index_dir);
6✔
1141
    return self->index_dir;
6✔
1142
}
1143

1144
static PyObject *TraceReader_get_has_index(TraceReaderObject *self,
12✔
1145
                                           void *closure) {
1146
    return PyBool_FromLong(self->has_index);
12✔
1147
}
1148

1149
static PyObject *TraceReader_get_num_lines_prop(TraceReaderObject *self,
8✔
1150
                                                void *closure) {
1151
    try {
1152
        TraceReaderConfig cfg = build_config(self);
8!
1153
        TraceReader reader(std::move(cfg));
8!
1154
        std::size_t n = reader.get_num_lines();
8!
1155
        if (n > 0) return PyLong_FromSize_t(n);
8!
1156
    } catch (...) {
8!
UNCOV
1157
    }
×
1158
    PyObject *empty_args = PyTuple_New(0);
8✔
1159
    if (!empty_args) return NULL;
8✔
1160
    PyObject *list = TraceReader_read_lines(self, empty_args, NULL);
8✔
1161
    Py_DECREF(empty_args);
4✔
1162
    if (!list) return NULL;
8✔
1163
    Py_ssize_t n = PyList_GET_SIZE(list);
8✔
1164
    Py_DECREF(list);
4✔
1165
    return PyLong_FromSsize_t(n);
8✔
1166
}
4✔
1167

1168
static PyObject *TraceReader_get_max_bytes(TraceReaderObject *self,
26✔
1169
                                           PyObject *Py_UNUSED(ignored)) {
1170
    try {
1171
        TraceReaderConfig cfg = build_config(self);
26!
1172
        TraceReader reader(std::move(cfg));
26!
1173
        return PyLong_FromSize_t(reader.get_max_bytes());
26!
1174
    } catch (const std::exception &e) {
26!
UNCOV
1175
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
UNCOV
1176
        return NULL;
×
UNCOV
1177
    }
×
1178
}
13✔
1179

1180
static PyObject *TraceReader_get_num_lines(TraceReaderObject *self,
4✔
1181
                                           PyObject *Py_UNUSED(ignored)) {
1182
    try {
1183
        TraceReaderConfig cfg = build_config(self);
4!
1184
        TraceReader reader(std::move(cfg));
4!
1185
        return PyLong_FromSize_t(reader.get_num_lines());
4!
1186
    } catch (const std::exception &e) {
4!
UNCOV
1187
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
UNCOV
1188
        return NULL;
×
UNCOV
1189
    }
×
1190
}
2✔
1191

1192
static PyMethodDef TraceReader_methods[] = {
1193
    {"iter_lines", (PyCFunction)TraceReader_iter_lines,
1194
     METH_VARARGS | METH_KEYWORDS,
1195
     "Return an iterator over decoded lines.\n"
1196
     "\n"
1197
     "Args:\n"
1198
     "    start_line (int): First line (0 = beginning).\n"
1199
     "    end_line (int): Last line (0 = end of file).\n"
1200
     "    start_byte (int): First byte offset (0 = beginning).\n"
1201
     "    end_byte (int): Last byte offset (0 = end of file).\n"
1202
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
1203
    {"iter_raw", (PyCFunction)TraceReader_iter_raw,
1204
     METH_VARARGS | METH_KEYWORDS,
1205
     "Return an iterator over raw byte chunks.\n"
1206
     "\n"
1207
     "Args:\n"
1208
     "    start_line (int): First line (0 = beginning).\n"
1209
     "    end_line (int): Last line (0 = end of file).\n"
1210
     "    start_byte (int): First byte offset (0 = beginning).\n"
1211
     "    end_byte (int): Last byte offset (0 = end of file).\n"
1212
     "    buffer_size (int): Internal read buffer size in bytes.\n"
1213
     "    line_aligned (bool): Align chunks to line boundaries.\n"
1214
     "    multi_line (bool): Allow multiple lines per chunk.\n"},
1215
    {"read_lines", (PyCFunction)TraceReader_read_lines,
1216
     METH_VARARGS | METH_KEYWORDS,
1217
     "Read all lines and return as list.\n"
1218
     "\n"
1219
     "Args:\n"
1220
     "    start_line (int): First line (0 = beginning).\n"
1221
     "    end_line (int): Last line (0 = end of file).\n"
1222
     "    start_byte (int): First byte offset (0 = beginning).\n"
1223
     "    end_byte (int): Last byte offset (0 = end of file).\n"
1224
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
1225
    {"read_raw", (PyCFunction)TraceReader_read_raw,
1226
     METH_VARARGS | METH_KEYWORDS,
1227
     "Read all raw chunks and return as list.\n"
1228
     "\n"
1229
     "Args:\n"
1230
     "    start_line (int): First line (0 = beginning).\n"
1231
     "    end_line (int): Last line (0 = end of file).\n"
1232
     "    start_byte (int): First byte offset (0 = beginning).\n"
1233
     "    end_byte (int): Last byte offset (0 = end of file).\n"
1234
     "    buffer_size (int): Internal read buffer size in bytes.\n"
1235
     "    line_aligned (bool): Align chunks to line boundaries.\n"
1236
     "    multi_line (bool): Allow multiple lines per chunk.\n"},
1237
    {"iter_lines_json", (PyCFunction)TraceReader_iter_lines_json,
1238
     METH_VARARGS | METH_KEYWORDS,
1239
     "Return an iterator over parsed JSON objects.\n"
1240
     "\n"
1241
     "Args:\n"
1242
     "    start_line (int): First line (0 = beginning).\n"
1243
     "    end_line (int): Last line (0 = end of file).\n"
1244
     "    start_byte (int): First byte offset (0 = beginning).\n"
1245
     "    end_byte (int): Last byte offset (0 = end of file).\n"
1246
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
1247
    {"read_lines_json", (PyCFunction)TraceReader_read_lines_json,
1248
     METH_VARARGS | METH_KEYWORDS,
1249
     "Read all lines as parsed JSON objects.\n"
1250
     "\n"
1251
     "Args:\n"
1252
     "    start_line (int): First line (0 = beginning).\n"
1253
     "    end_line (int): Last line (0 = end of file).\n"
1254
     "    start_byte (int): First byte offset (0 = beginning).\n"
1255
     "    end_byte (int): Last byte offset (0 = end of file).\n"
1256
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
1257
#ifdef DFTRACER_UTILS_ENABLE_ARROW
1258
    {"iter_arrow", (PyCFunction)TraceReader_iter_arrow,
1259
     METH_VARARGS | METH_KEYWORDS,
1260
     "Return an iterator over Arrow record batches.\n"
1261
     "\n"
1262
     "Args:\n"
1263
     "    batch_size (int): Maximum rows per Arrow batch.\n"
1264
     "    start_line (int): First line (0 = beginning).\n"
1265
     "    end_line (int): Last line (0 = end of file).\n"
1266
     "    start_byte (int): First byte offset (0 = beginning).\n"
1267
     "    end_byte (int): Last byte offset (0 = end of file).\n"
1268
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
1269
    {"read_arrow", (PyCFunction)TraceReader_read_arrow,
1270
     METH_VARARGS | METH_KEYWORDS,
1271
     "Read all events as a materialized ArrowTable.\n"
1272
     "\n"
1273
     "Args:\n"
1274
     "    batch_size (int): Maximum rows per Arrow batch.\n"
1275
     "    start_line (int): First line (0 = beginning).\n"
1276
     "    end_line (int): Last line (0 = end of file).\n"
1277
     "    start_byte (int): First byte offset (0 = beginning).\n"
1278
     "    end_byte (int): Last byte offset (0 = end of file).\n"
1279
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
1280
#endif
1281
    {"get_max_bytes", (PyCFunction)TraceReader_get_max_bytes, METH_NOARGS,
1282
     "Get the maximum byte position (0 if unknown for compressed\n"
1283
     "files without index)."},
1284
    {"get_num_lines", (PyCFunction)TraceReader_get_num_lines, METH_NOARGS,
1285
     "Get the total number of lines (0 if unknown for files without\n"
1286
     "index)."},
1287
    {"__enter__", (PyCFunction)TraceReader_enter, METH_NOARGS,
1288
     "Enter the runtime context for the with statement."},
1289
    {"__exit__", (PyCFunction)TraceReader_exit, METH_VARARGS,
1290
     "Exit the runtime context for the with statement."},
1291
    {NULL}};
1292

1293
static PyGetSetDef TraceReader_getsetters[] = {
1294
    {"file_path", (getter)TraceReader_get_file_path, NULL,
1295
     "Path to the trace file", NULL},
1296
    {"index_dir", (getter)TraceReader_get_index_dir, NULL,
1297
     "Directory for index files", NULL},
1298
    {"has_index", (getter)TraceReader_get_has_index, NULL,
1299
     "True if a checkpoint index was found", NULL},
1300
    {"num_lines", (getter)TraceReader_get_num_lines_prop, NULL,
1301
     "Total line count (reads all lines if needed)", NULL},
1302
    {NULL}};
1303

1304
PyTypeObject TraceReaderType = {
1305
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.TraceReader",
1306
    sizeof(TraceReaderObject),                /* tp_basicsize */
1307
    0,                                        /* tp_itemsize */
1308
    (destructor)TraceReader_dealloc,          /* tp_dealloc */
1309
    0,                                        /* tp_vectorcall_offset */
1310
    0,                                        /* tp_getattr */
1311
    0,                                        /* tp_setattr */
1312
    0,                                        /* tp_as_async */
1313
    0,                                        /* tp_repr */
1314
    0,                                        /* tp_as_number */
1315
    0,                                        /* tp_as_sequence */
1316
    0,                                        /* tp_as_mapping */
1317
    0,                                        /* tp_hash */
1318
    0,                                        /* tp_call */
1319
    0,                                        /* tp_str */
1320
    0,                                        /* tp_getattro */
1321
    0,                                        /* tp_setattro */
1322
    0,                                        /* tp_as_buffer */
1323
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
1324
    "TraceReader(file_path: str, index_dir: str = '',\n"
1325
    "            checkpoint_size: int = 33554432,\n"
1326
    "            auto_build_index: bool = False,\n"
1327
    "            index_threshold: int = 1048576,\n"
1328
    "            runtime: Runtime | None = None)\n"
1329
    "--\n"
1330
    "\n"
1331
    "Smart trace file reader that auto-selects sequential or indexed\n"
1332
    "reading based on whether an ``.idx`` sidecar exists.\n"
1333
    "\n"
1334
    "Args:\n"
1335
    "    file_path (str): Path to the trace file (.pfw.gz or plain "
1336
    "text).\n"
1337
    "    index_dir (str): Directory to search for ``.idx`` sidecar "
1338
    "files.\n"
1339
    "        Empty string (default) searches next to the trace file.\n"
1340
    "    checkpoint_size (int): Checkpoint interval in bytes for index\n"
1341
    "        building (default 32 MB).\n"
1342
    "    auto_build_index (bool): If True, automatically build an "
1343
    "index\n"
1344
    "        when none exists and the file exceeds *index_threshold*.\n"
1345
    "    index_threshold (int): Minimum file size in bytes before\n"
1346
    "        auto-indexing is triggered (default 8 MB).\n"
1347
    "    runtime (Runtime or None): Runtime instance for thread pool "
1348
    "control.\n"
1349
    "        If None, uses the default global Runtime.\n"
1350
    "\n"
1351
    "Raises:\n"
1352
    "    RuntimeError: If *file_path* does not exist or cannot be "
1353
    "opened.\n",                /* tp_doc */
1354
    0,                          /* tp_traverse */
1355
    0,                          /* tp_clear */
1356
    0,                          /* tp_richcompare */
1357
    0,                          /* tp_weaklistoffset */
1358
    0,                          /* tp_iter */
1359
    0,                          /* tp_iternext */
1360
    TraceReader_methods,        /* tp_methods */
1361
    0,                          /* tp_members */
1362
    TraceReader_getsetters,     /* tp_getset */
1363
    0,                          /* tp_base */
1364
    0,                          /* tp_dict */
1365
    0,                          /* tp_descr_get */
1366
    0,                          /* tp_descr_set */
1367
    0,                          /* tp_dictoffset */
1368
    (initproc)TraceReader_init, /* tp_init */
1369
    0,                          /* tp_alloc */
1370
    TraceReader_new,            /* tp_new */
1371
};
1372

1373
int init_trace_reader(PyObject *m) {
2✔
1374
    if (PyType_Ready(&TraceReaderType) < 0) return -1;
2✔
1375

1376
    Py_INCREF(&TraceReaderType);
1✔
1377
    if (PyModule_AddObject(m, "TraceReader", (PyObject *)&TraceReaderType) <
2✔
1378
        0) {
1379
        Py_DECREF(&TraceReaderType);
1380
        Py_DECREF(m);
UNCOV
1381
        return -1;
×
1382
    }
1383

1384
    return 0;
2✔
1385
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc