• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 23958653450

03 Apr 2026 07:09PM UTC coverage: 51.201% (-0.3%) from 51.498%
23958653450

Pull #63

github

web-flow
Merge 39b9b58d3 into 773a62661
Pull Request #63: feat(aggregator): support profile/system counter aggregation and custom metric Arrow output

23379 of 58959 branches covered (39.65%)

Branch coverage included in aggregate %.

347 of 574 new or added lines in 8 files covered. (60.45%)

9 existing lines in 4 files now uncovered.

20301 of 26352 relevant lines covered (77.04%)

13075.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.45
/src/dftracer/utils/python/trace_reader.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <Python.h>
3
#include <dftracer/utils/core/coro/task.h>
4
#include <dftracer/utils/core/utils/string.h>
5
#include <dftracer/utils/python/arrow_helpers.h>
6
#include <dftracer/utils/python/runtime.h>
7
#include <dftracer/utils/python/trace_reader.h>
8
#include <dftracer/utils/python/trace_reader_iterator.h>
9
#include <dftracer/utils/utilities/reader/trace_reader.h>
10

11
#include <algorithm>
12
#include <cctype>
13
#include <cinttypes>
14
#include <cstddef>
15
#include <cstdio>
16
#include <cstring>
17
#include <exception>
18
#include <memory>
19
#include <string>
20
#include <vector>
21

22
#ifdef DFTRACER_UTILS_ENABLE_ARROW
23
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
24
#include <yyjson.h>
25
#endif
26

27
namespace {
28

29
using dftracer::utils::Runtime;
30
using dftracer::utils::coro::CoroTask;
31
using dftracer::utils::utilities::reader::ReadConfig;
32
using dftracer::utils::utilities::reader::TraceReader;
33
using dftracer::utils::utilities::reader::TraceReaderConfig;
34

NEW
35
int64_t json_to_int64(yyjson_val *value) {
×
NEW
36
    if (yyjson_is_int(value)) return yyjson_get_sint(value);
×
NEW
37
    return static_cast<int64_t>(yyjson_get_uint(value));
×
38
}
39

40
CoroTask<void> produce_lines(std::shared_ptr<IteratorState> state,
5,008!
41
                             TraceReaderConfig cfg, ReadConfig rc) {
66!
42
    auto *sp = state.get();
66✔
43
    try {
44
        TraceReader reader(std::move(cfg));
66!
45
        auto gen = reader.read_lines(rc);
66!
46
        while (auto opt = co_await gen.next()) {
4,794!
47
            if (sp->cancelled.load(std::memory_order_acquire)) break;
1,119!
48
            std::string item(opt->content);
1,119!
49
            {
50
                std::unique_lock<std::mutex> lock(sp->mtx);
1,119!
51
                sp->cv_producer.wait(lock, [sp] {
4,347!
52
                    return sp->queue.size() < sp->max_queue_size ||
2,174!
53
                           sp->cancelled.load(std::memory_order_acquire);
1,056✔
54
                });
55
                if (sp->cancelled.load(std::memory_order_acquire)) break;
1,119✔
56
                sp->queue.push(std::move(item));
1,117!
57
            }
1,119✔
58
            sp->cv_consumer.notify_one();
1,117✔
59
        }
1,183✔
60
    } catch (...) {
2,428✔
61
        std::lock_guard<std::mutex> lock(sp->mtx);
2!
62
        sp->error = std::current_exception();
2✔
63
        sp->queue.push(std::nullopt);
2!
64
        sp->done.store(true, std::memory_order_release);
2✔
65
        sp->cv_consumer.notify_one();
2✔
66
        co_return;
2✔
67
    }
2!
68
    {
69
        std::lock_guard<std::mutex> lock(sp->mtx);
64!
70
        sp->queue.push(std::nullopt);
64!
71
        sp->done.store(true, std::memory_order_release);
64✔
72
    }
64✔
73
    sp->cv_consumer.notify_one();
64✔
74
}
7,286!
75

76
CoroTask<void> produce_raw(std::shared_ptr<IteratorState> state,
680!
77
                           TraceReaderConfig cfg, ReadConfig rc) {
10!
78
    auto *sp = state.get();
10✔
79
    try {
80
        TraceReader reader(std::move(cfg));
10!
81
        auto gen = reader.read_raw(rc);
10!
82
        while (auto opt = co_await gen.next()) {
648!
83
            if (sp->cancelled.load(std::memory_order_acquire)) break;
151!
84
            std::string item(opt->data(), opt->size());
151!
85
            {
86
                std::unique_lock<std::mutex> lock(sp->mtx);
151!
87
                sp->cv_producer.wait(lock, [sp] {
574!
88
                    return sp->queue.size() < sp->max_queue_size ||
287!
89
                           sp->cancelled.load(std::memory_order_acquire);
136✔
90
                });
91
                if (sp->cancelled.load(std::memory_order_acquire)) break;
151✔
92
                sp->queue.push(std::move(item));
150!
93
            }
151✔
94
            sp->cv_consumer.notify_one();
150✔
95
        }
160✔
96
    } catch (...) {
328✔
97
        std::lock_guard<std::mutex> lock(sp->mtx);
1!
98
        sp->error = std::current_exception();
1✔
99
        sp->queue.push(std::nullopt);
1!
100
        sp->done.store(true, std::memory_order_release);
1✔
101
        sp->cv_consumer.notify_one();
1✔
102
        co_return;
1✔
103
    }
1!
104
    {
105
        std::lock_guard<std::mutex> lock(sp->mtx);
9!
106
        sp->queue.push(std::nullopt);
9!
107
        sp->done.store(true, std::memory_order_release);
9✔
108
    }
9✔
109
    sp->cv_consumer.notify_one();
9✔
110
}
985!
111

112
#ifdef DFTRACER_UTILS_ENABLE_ARROW
113

114
using dftracer::utils::utilities::common::arrow::ColumnType;
115
using dftracer::utils::utilities::common::arrow::RecordBatchBuilder;
116

117
// Bump arena for string_views that must survive until builder.finish().
118
struct StringArena {
119
    static constexpr std::size_t BLOCK_SIZE = 64 * 1024;
120
    std::vector<std::vector<char>> blocks;
121
    std::size_t pos = 0;
14✔
122

123
    StringArena() { blocks.emplace_back(BLOCK_SIZE); }
56!
124

125
    std::string_view push(const char *data, std::size_t len) {
792✔
126
        if (pos + len > blocks.back().size()) {
792✔
NEW
127
            blocks.emplace_back(std::max(BLOCK_SIZE, len));
×
NEW
128
            pos = 0;
×
129
        }
130
        char *dst = blocks.back().data() + pos;
792✔
131
        std::memcpy(dst, data, len);
792✔
132
        pos += len;
792✔
133
        return {dst, len};
792✔
134
    }
135

136
    void clear() {
42✔
137
        if (blocks.size() > 1) blocks.resize(1);
42✔
138
        pos = 0;
42✔
139
    }
42✔
140
};
141

142
// --- Row type constants (must match Python TYPE_* constants) ---
143
enum RowType : int8_t {
144
    ROW_EVENT = 0,
145
    ROW_FILE_HASH = 1,
146
    ROW_HOST_HASH = 2,
147
    ROW_STRING_HASH = 3,
148
    ROW_METADATA = 4,
149
    ROW_PROC_METADATA = 5,
150
    ROW_PROFILE = 6,
151
    ROW_SYSTEM = 7,
152
};
153

154
// --- IO category constants (must match Python IOCategory values) ---
155
enum IOCat : int8_t {
156
    IO_READ = 1,
157
    IO_WRITE = 2,
158
    IO_METADATA = 3,
159
    IO_PCTL = 4,
160
    IO_IPC = 5,
161
    IO_OTHER = 6,
162
    IO_SYNC = 7,
163
};
164

NEW
165
static int8_t get_io_cat(std::string_view func) {
×
166
    // READ
NEW
167
    if (func == "fread" || func == "pread" || func == "preadv" ||
×
NEW
168
        func == "read" || func == "readv")
×
NEW
169
        return IO_READ;
×
170
    // WRITE
NEW
171
    if (func == "fwrite" || func == "pwrite" || func == "pwritev" ||
×
NEW
172
        func == "write" || func == "writev")
×
NEW
173
        return IO_WRITE;
×
174
    // SYNC
NEW
175
    if (func == "fsync" || func == "fdatasync" || func == "msync" ||
×
NEW
176
        func == "sync")
×
NEW
177
        return IO_SYNC;
×
178
    // PCTL
NEW
179
    if (func == "exec" || func == "exit" || func == "fork" || func == "kill" ||
×
NEW
180
        func == "pipe" || func == "wait")
×
NEW
181
        return IO_PCTL;
×
182
    // IPC
NEW
183
    if (func == "msgctl" || func == "msgget" || func == "msgrcv" ||
×
NEW
184
        func == "msgsnd" || func == "semctl" || func == "semget" ||
×
NEW
185
        func == "semop" || func == "shmat" || func == "shmctl" ||
×
NEW
186
        func == "shmdt" || func == "shmget")
×
NEW
187
        return IO_IPC;
×
188
    // METADATA
NEW
189
    if (func == "__fxstat" || func == "__fxstat64" || func == "__lxstat" ||
×
NEW
190
        func == "__lxstat64" || func == "__xstat" || func == "__xstat64" ||
×
NEW
191
        func == "access" || func == "close" || func == "closedir" ||
×
NEW
192
        func == "fclose" || func == "fcntl" || func == "fopen" ||
×
NEW
193
        func == "fopen64" || func == "fseek" || func == "fstat" ||
×
NEW
194
        func == "fstatat" || func == "ftell" || func == "ftruncate" ||
×
NEW
195
        func == "link" || func == "lseek" || func == "lseek64" ||
×
NEW
196
        func == "mkdir" || func == "open" || func == "open64" ||
×
NEW
197
        func == "opendir" || func == "readdir" || func == "readlink" ||
×
NEW
198
        func == "remove" || func == "rename" || func == "rmdir" ||
×
NEW
199
        func == "seek" || func == "stat" || func == "unlink")
×
NEW
200
        return IO_METADATA;
×
NEW
201
    return IO_OTHER;
×
202
}
203

NEW
204
static bool str_iequal(std::string_view a, const char *b) {
×
NEW
205
    std::size_t len = std::strlen(b);
×
NEW
206
    if (a.size() != len) return false;
×
NEW
207
    for (std::size_t i = 0; i < len; ++i) {
×
NEW
208
        if (std::tolower(static_cast<unsigned char>(a[i])) !=
×
NEW
209
            static_cast<unsigned char>(b[i]))
×
NEW
210
            return false;
×
211
    }
NEW
212
    return true;
×
213
}
214

NEW
215
static bool str_contains_lower(std::string_view s, const char *needle) {
×
NEW
216
    std::size_t nlen = std::strlen(needle);
×
NEW
217
    if (s.size() < nlen) return false;
×
NEW
218
    for (std::size_t i = 0; i <= s.size() - nlen; ++i) {
×
NEW
219
        bool match = true;
×
NEW
220
        for (std::size_t j = 0; j < nlen; ++j) {
×
NEW
221
            if (std::tolower(static_cast<unsigned char>(s[i + j])) !=
×
NEW
222
                static_cast<unsigned char>(needle[j])) {
×
NEW
223
                match = false;
×
NEW
224
                break;
×
225
            }
226
        }
NEW
227
        if (match) return true;
×
228
    }
NEW
229
    return false;
×
230
}
231

232
// Normalize a raw JSON row (already parsed into yyjson) into the semantic
233
// output schema.  Appends one row to `builder` with the full set of output
234
// columns.  Returns false if the row should be skipped (no valid name).
NEW
235
static bool normalize_row(RecordBatchBuilder &builder, StringArena &arena,
×
236
                          yyjson_val *root) {
237
    // --- Extract top-level fields ---
NEW
238
    yyjson_val *v_ph = yyjson_obj_get(root, "ph");
×
NEW
239
    yyjson_val *v_name = yyjson_obj_get(root, "name");
×
NEW
240
    yyjson_val *v_cat = yyjson_obj_get(root, "cat");
×
NEW
241
    yyjson_val *v_pid = yyjson_obj_get(root, "pid");
×
NEW
242
    yyjson_val *v_tid = yyjson_obj_get(root, "tid");
×
NEW
243
    yyjson_val *v_ts = yyjson_obj_get(root, "ts");
×
NEW
244
    yyjson_val *v_dur = yyjson_obj_get(root, "dur");
×
NEW
245
    yyjson_val *v_args = yyjson_obj_get(root, "args");
×
246

247
    std::string_view ph =
NEW
248
        v_ph && yyjson_is_str(v_ph)
×
NEW
249
            ? std::string_view(yyjson_get_str(v_ph), yyjson_get_len(v_ph))
×
NEW
250
            : std::string_view();
×
251
    std::string_view name_sv =
NEW
252
        v_name && yyjson_is_str(v_name)
×
NEW
253
            ? std::string_view(yyjson_get_str(v_name), yyjson_get_len(v_name))
×
NEW
254
            : std::string_view();
×
255
    std::string_view cat_sv =
NEW
256
        v_cat && yyjson_is_str(v_cat)
×
NEW
257
            ? std::string_view(yyjson_get_str(v_cat), yyjson_get_len(v_cat))
×
NEW
258
            : std::string_view();
×
259

260
    // Helper to get args fields
NEW
261
    auto args_str = [&](const char *key) -> std::string_view {
×
NEW
262
        if (!v_args) return {};
×
NEW
263
        yyjson_val *v = yyjson_obj_get(v_args, key);
×
NEW
264
        if (!v) return {};
×
NEW
265
        if (yyjson_is_str(v)) return {yyjson_get_str(v), yyjson_get_len(v)};
×
NEW
266
        return {};
×
NEW
267
    };
×
NEW
268
    auto args_int = [&](const char *key) -> std::pair<bool, int64_t> {
×
NEW
269
        if (!v_args) return {false, 0};
×
NEW
270
        yyjson_val *v = yyjson_obj_get(v_args, key);
×
NEW
271
        if (!v) return {false, 0};
×
NEW
272
        if (yyjson_is_int(v)) return {true, yyjson_get_sint(v)};
×
NEW
273
        if (yyjson_is_uint(v))
×
NEW
274
            return {true, static_cast<int64_t>(yyjson_get_uint(v))};
×
NEW
275
        if (yyjson_is_real(v))
×
NEW
276
            return {true, static_cast<int64_t>(yyjson_get_real(v))};
×
NEW
277
        return {false, 0};
×
NEW
278
    };
×
NEW
279
    auto args_float = [&](const char *key) -> std::pair<bool, double> {
×
NEW
280
        if (!v_args) return {false, 0.0};
×
NEW
281
        yyjson_val *v = yyjson_obj_get(v_args, key);
×
NEW
282
        if (!v) return {false, 0.0};
×
NEW
283
        if (yyjson_is_real(v)) return {true, yyjson_get_real(v)};
×
NEW
284
        if (yyjson_is_int(v))
×
NEW
285
            return {true, static_cast<double>(yyjson_get_sint(v))};
×
NEW
286
        if (yyjson_is_uint(v))
×
NEW
287
            return {true, static_cast<double>(yyjson_get_uint(v))};
×
NEW
288
        return {false, 0.0};
×
NEW
289
    };
×
290

291
    // --- Type classification ---
NEW
292
    bool is_M = (ph == "M");
×
NEW
293
    bool is_C = (ph == "C");
×
NEW
294
    bool is_event = !is_M && !is_C;
×
295

NEW
296
    int8_t row_type = ROW_EVENT;
×
NEW
297
    if (is_M) {
×
NEW
298
        if (name_sv == "FH")
×
NEW
299
            row_type = ROW_FILE_HASH;
×
NEW
300
        else if (name_sv == "HH")
×
NEW
301
            row_type = ROW_HOST_HASH;
×
NEW
302
        else if (name_sv == "SH")
×
NEW
303
            row_type = ROW_STRING_HASH;
×
NEW
304
        else if (name_sv == "PR")
×
NEW
305
            row_type = ROW_PROC_METADATA;
×
306
        else
NEW
307
            row_type = ROW_METADATA;
×
NEW
308
    } else if (is_C) {
×
NEW
309
        row_type = str_iequal(cat_sv, "sys") ? ROW_SYSTEM : ROW_PROFILE;
×
310
    }
NEW
311
    bool is_hash = (row_type >= ROW_FILE_HASH && row_type <= ROW_STRING_HASH) ||
×
312
                   row_type == ROW_PROC_METADATA;
NEW
313
    bool is_profile = (row_type == ROW_PROFILE);
×
NEW
314
    bool is_sys = (row_type == ROW_SYSTEM);
×
315

316
    // Name: metadata rows use args.name if available
NEW
317
    std::string_view out_name = name_sv;
×
NEW
318
    if (is_M) {
×
NEW
319
        auto an = args_str("name");
×
NEW
320
        if (!an.empty()) out_name = an;
×
321
    }
NEW
322
    if (out_name.empty()) return false;  // skip rows without name
×
323

324
    // --- Declare all output columns (lazy — add_or_get_column handles
325
    // first-time creation) --- We use a fixed schema so column indices are
326
    // stable across rows. The builder backfills nulls for columns not touched
327
    // via end_row().
328

NEW
329
    auto ci_type = builder.add_or_get_column("type", ColumnType::INT64);
×
NEW
330
    auto ci_cat = builder.add_or_get_column("cat", ColumnType::STRING);
×
NEW
331
    auto ci_name = builder.add_or_get_column("name", ColumnType::STRING);
×
NEW
332
    auto ci_pid = builder.add_or_get_column("pid", ColumnType::INT64);
×
NEW
333
    auto ci_tid = builder.add_or_get_column("tid", ColumnType::INT64);
×
NEW
334
    auto ci_hash = builder.add_or_get_column("hash", ColumnType::STRING);
×
NEW
335
    auto ci_value = builder.add_or_get_column("value", ColumnType::STRING);
×
336
    auto ci_host_hash =
NEW
337
        builder.add_or_get_column("host_hash", ColumnType::STRING);
×
338
    auto ci_file_hash =
NEW
339
        builder.add_or_get_column("file_hash", ColumnType::STRING);
×
NEW
340
    auto ci_epoch = builder.add_or_get_column("epoch", ColumnType::INT64);
×
NEW
341
    auto ci_step = builder.add_or_get_column("step", ColumnType::INT64);
×
NEW
342
    auto ci_ts = builder.add_or_get_column("ts", ColumnType::INT64);
×
NEW
343
    auto ci_dur = builder.add_or_get_column("dur", ColumnType::INT64);
×
NEW
344
    auto ci_te = builder.add_or_get_column("te", ColumnType::INT64);
×
NEW
345
    auto ci_trange = builder.add_or_get_column("trange", ColumnType::INT64);
×
NEW
346
    auto ci_io_cat = builder.add_or_get_column("io_cat", ColumnType::INT64);
×
NEW
347
    auto ci_size = builder.add_or_get_column("size", ColumnType::INT64);
×
NEW
348
    auto ci_offset = builder.add_or_get_column("offset", ColumnType::INT64);
×
NEW
349
    auto ci_image_id = builder.add_or_get_column("image_id", ColumnType::INT64);
×
350

351
    // --- Populate core columns ---
NEW
352
    builder.append_int64(ci_type, row_type);
×
353

354
    // cat (lowercased) — write into arena
NEW
355
    if (!cat_sv.empty()) {
×
356
        char lbuf[256];
NEW
357
        std::size_t clen = std::min(cat_sv.size(), sizeof(lbuf));
×
NEW
358
        for (std::size_t i = 0; i < clen; ++i)
×
NEW
359
            lbuf[i] = static_cast<char>(
×
NEW
360
                std::tolower(static_cast<unsigned char>(cat_sv[i])));
×
NEW
361
        builder.append_string(ci_cat, arena.push(lbuf, clen));
×
362
    } else {
NEW
363
        builder.append_null(ci_cat);
×
364
    }
365

NEW
366
    builder.append_string(ci_name, out_name);
×
367

NEW
368
    if (v_pid && (yyjson_is_int(v_pid) || yyjson_is_uint(v_pid)))
×
NEW
369
        builder.append_int64(ci_pid, json_to_int64(v_pid));
×
370
    // else: null via end_row backfill
371

NEW
372
    if (v_tid && (yyjson_is_int(v_tid) || yyjson_is_uint(v_tid)))
×
NEW
373
        builder.append_int64(ci_tid, json_to_int64(v_tid));
×
374

375
    // hash / value
NEW
376
    auto a_value = args_str("value");
×
NEW
377
    if (is_hash && !a_value.empty()) builder.append_string(ci_hash, a_value);
×
NEW
378
    if (row_type == ROW_METADATA && !a_value.empty())
×
NEW
379
        builder.append_string(ci_value, a_value);
×
380

381
    // host_hash / file_hash
NEW
382
    auto a_hhash = args_str("hhash");
×
NEW
383
    if (!a_hhash.empty()) builder.append_string(ci_host_hash, a_hhash);
×
NEW
384
    auto a_fhash = args_str("fhash");
×
NEW
385
    if (!a_fhash.empty()) builder.append_string(ci_file_hash, a_fhash);
×
386

387
    // epoch / step
NEW
388
    auto [has_epoch, epoch_v] = args_int("epoch");
×
NEW
389
    if (has_epoch && epoch_v >= 0) builder.append_int64(ci_epoch, epoch_v);
×
NEW
390
    auto [has_step, step_v] = args_int("step");
×
NEW
391
    if (has_step && step_v >= 0) builder.append_int64(ci_step, step_v);
×
392

393
    // --- Temporal ---
NEW
394
    bool has_ts = (is_event || is_C) && v_ts &&
×
NEW
395
                  (yyjson_is_int(v_ts) || yyjson_is_uint(v_ts));
×
NEW
396
    bool has_dur = v_dur && (yyjson_is_int(v_dur) || yyjson_is_uint(v_dur));
×
NEW
397
    int64_t ts_val = 0, dur_val = 0;
×
NEW
398
    if (has_ts) {
×
NEW
399
        ts_val = json_to_int64(v_ts);
×
NEW
400
        builder.append_int64(ci_ts, ts_val);
×
401
    }
NEW
402
    if (is_event && has_ts && has_dur) {
×
NEW
403
        dur_val = json_to_int64(v_dur);
×
NEW
404
        builder.append_int64(ci_dur, dur_val);
×
NEW
405
        builder.append_int64(ci_te, ts_val + dur_val);
×
406
    }
407

408
    // --- IO columns (events only) ---
NEW
409
    if (is_event) {
×
410
        bool is_posix_stdio =
NEW
411
            str_iequal(cat_sv, "posix") || str_iequal(cat_sv, "stdio");
×
NEW
412
        int8_t io_cat = IO_OTHER;
×
413

414
        // size priority: size_sum > POSIX ret > image_size
NEW
415
        auto [has_ss, ss_val] = args_int("size_sum");
×
NEW
416
        if (has_ss) {
×
NEW
417
            builder.append_int64(ci_size, ss_val);
×
NEW
418
            if (is_posix_stdio) io_cat = get_io_cat(name_sv);
×
NEW
419
        } else if (is_posix_stdio) {
×
NEW
420
            io_cat = get_io_cat(name_sv);
×
NEW
421
            auto [has_ret, ret_val] = args_int("ret");
×
NEW
422
            if (has_ret && ret_val > 0 &&
×
NEW
423
                (io_cat == IO_READ || io_cat == IO_WRITE))
×
NEW
424
                builder.append_int64(ci_size, ret_val);
×
NEW
425
            auto [has_ofs, ofs_val] = args_int("offset");
×
NEW
426
            if (has_ofs && ofs_val >= 0)
×
NEW
427
                builder.append_int64(ci_offset, ofs_val);
×
428
        } else {
NEW
429
            auto [has_img, img_val] = args_int("image_idx");
×
NEW
430
            if (has_img && img_val > 0)
×
NEW
431
                builder.append_int64(ci_image_id, img_val);
×
NEW
432
            auto [has_ims, ims_val] = args_int("image_size");
×
NEW
433
            if (has_ims && ims_val > 0 && !str_contains_lower(name_sv, "open"))
×
NEW
434
                builder.append_int64(ci_size, ims_val);
×
435
        }
NEW
436
        builder.append_int64(ci_io_cat, io_cat);
×
437
    }
438

439
    // --- Profile columns ---
NEW
440
    if (is_profile) {
×
441
        bool is_posix_stdio =
NEW
442
            str_iequal(cat_sv, "posix") || str_iequal(cat_sv, "stdio");
×
NEW
443
        int8_t io_cat = is_posix_stdio ? get_io_cat(name_sv) : IO_OTHER;
×
NEW
444
        builder.append_int64(ci_io_cat, io_cat);
×
445

446
        static const char *profile_keys[] = {
447
            "count",      "count_max",  "count_min",  "count_sum",
448
            "dft_cnt",    "dur",        "dur_max",    "dur_min",
449
            "dur_sum",    "epoch",      "flags",      "offset",
450
            "offset_max", "offset_min", "offset_sum", "ret",
451
            "ret_max",    "ret_min",    "ret_sum",    "whence",
452
            "whence_max", "whence_min", "whence_sum", nullptr};
NEW
453
        for (const char **pk = profile_keys; *pk; ++pk) {
×
NEW
454
            auto [has_v, val] = args_int(*pk);
×
NEW
455
            if (has_v) {
×
NEW
456
                auto idx = builder.add_or_get_column(*pk, ColumnType::INT64);
×
NEW
457
                builder.append_int64(idx, val);
×
458
            }
459
        }
460
    }
461

462
    // --- System columns ---
NEW
463
    if (is_sys) {
×
464
        static const char *sys_keys[] = {
465
            "user_pct", "system_pct",  "iowait_pct",   "idle_pct",
466
            "irq_pct",  "softirq_pct", "MemAvailable", "MemFree",
467
            "Cached",   "Dirty",       "Active",       nullptr};
NEW
468
        for (const char **sk = sys_keys; *sk; ++sk) {
×
NEW
469
            auto [has_v, val] = args_float(*sk);
×
NEW
470
            if (has_v) {
×
NEW
471
                auto idx = builder.add_or_get_column(*sk, ColumnType::DOUBLE);
×
NEW
472
                builder.append_double(idx, val);
×
473
            }
474
        }
475
    }
476

NEW
477
    builder.end_row();
×
NEW
478
    return true;
×
479
}
480

481
// Flatten a yyjson object into "prefix.key" columns using native types.
482
// On type mismatch (same key, different type across rows), appends null.
483
static void flatten_object_into(RecordBatchBuilder &builder, StringArena &arena,
×
484
                                std::string_view prefix, yyjson_val *obj) {
485
    char key_buf[512];
486

487
    yyjson_obj_iter sub_iter;
488
    yyjson_obj_iter_init(obj, &sub_iter);
489
    yyjson_val *sub_key;
490
    while ((sub_key = yyjson_obj_iter_next(&sub_iter))) {
×
491
        yyjson_val *sub_val = yyjson_obj_iter_get_val(sub_key);
×
492
        const char *sk_str = yyjson_get_str(sub_key);
×
493
        std::size_t sk_len = yyjson_get_len(sub_key);
494

495
        std::size_t needed = prefix.size() + 1 + sk_len;
496
        if (needed >= sizeof(key_buf)) continue;
×
497
        std::memcpy(key_buf, prefix.data(), prefix.size());
498
        key_buf[prefix.size()] = '.';
499
        std::memcpy(key_buf + prefix.size() + 1, sk_str, sk_len);
500
        std::string_view full_key(key_buf, needed);
501

502
        if (yyjson_is_int(sub_val)) {
×
503
            auto idx = builder.add_or_get_column(full_key, ColumnType::INT64);
×
504
            if (builder.column_type(idx) == ColumnType::INT64)
×
505
                builder.append_int64(idx, yyjson_get_sint(sub_val));
×
506
            else
507
                builder.append_null(idx);
×
508
        } else if (yyjson_is_uint(sub_val)) {
×
509
            auto idx = builder.add_or_get_column(full_key, ColumnType::UINT64);
×
510
            if (builder.column_type(idx) == ColumnType::UINT64)
×
511
                builder.append_uint64(idx, yyjson_get_uint(sub_val));
×
512
            else
513
                builder.append_null(idx);
×
514
        } else if (yyjson_is_real(sub_val)) {
×
515
            auto idx = builder.add_or_get_column(full_key, ColumnType::DOUBLE);
×
516
            if (builder.column_type(idx) == ColumnType::DOUBLE)
×
517
                builder.append_double(idx, yyjson_get_real(sub_val));
×
518
            else
519
                builder.append_null(idx);
×
520
        } else if (yyjson_is_bool(sub_val)) {
×
521
            auto idx = builder.add_or_get_column(full_key, ColumnType::BOOL);
×
522
            if (builder.column_type(idx) == ColumnType::BOOL)
×
523
                builder.append_bool(idx, yyjson_get_bool(sub_val));
×
524
            else
525
                builder.append_null(idx);
×
526
        } else if (yyjson_is_str(sub_val)) {
×
527
            auto idx = builder.add_or_get_column(full_key, ColumnType::STRING);
×
528
            if (builder.column_type(idx) == ColumnType::STRING)
×
529
                builder.append_string(
×
530
                    idx, std::string_view(yyjson_get_str(sub_val),
531
                                          yyjson_get_len(sub_val)));
532
            else
533
                builder.append_null(idx);
×
534
        } else if (yyjson_is_null(sub_val)) {
×
535
            auto existing = builder.find_column(full_key);
×
536
            if (existing) builder.append_null(*existing);
×
537
        } else {
538
            // nested object/array: serialize
539
            std::size_t json_len;
540
            char *json_str = yyjson_val_write(sub_val, 0, &json_len);
541
            auto idx = builder.add_or_get_column(full_key, ColumnType::STRING);
×
542
            if (json_str) {
×
543
                builder.append_string(idx, arena.push(json_str, json_len));
×
544
                free(json_str);
545
            } else {
546
                builder.append_null(idx);
×
547
            }
548
        }
549
    }
550
}
551

552
CoroTask<void> produce_arrow_batches(std::shared_ptr<ArrowIteratorState> state,
1,800!
553
                                     TraceReaderConfig cfg, ReadConfig rc,
554
                                     std::size_t batch_size,
555
                                     bool flatten_objects = false,
556
                                     bool normalize = false) {
14!
557
    auto *sp = state.get();
14✔
558
    try {
559
        TraceReader reader(std::move(cfg));
14!
560
        auto gen = reader.read_lines(rc);
14!
561
        RecordBatchBuilder builder;
14✔
562
        builder.reserve(batch_size);
14!
563

564
        std::vector<yyjson_doc *> held_docs;
14✔
565
        StringArena arena;
14!
566
        held_docs.reserve(batch_size);
14!
567

568
        while (auto opt = co_await gen.next()) {
1,758!
569
            if (sp->cancelled.load(std::memory_order_acquire)) break;
422!
570

571
            const char *trimmed;
422✔
572
            std::size_t trimmed_length;
422✔
573
            if (!dftracer::utils::json_trim_and_validate(
422!
574
                    opt->content.data(), opt->content.size(), trimmed,
422✔
575
                    trimmed_length)) {
576
                continue;
26✔
577
            }
578

579
            yyjson_doc *doc = yyjson_read(trimmed, trimmed_length, 0);
396!
580
            if (!doc) continue;
396!
581

582
            yyjson_val *root = yyjson_doc_get_root(doc);
396!
583
            if (!root || !yyjson_is_obj(root)) {
396!
584
                yyjson_doc_free(doc);
×
585
                continue;
586
            }
587

588
            if (normalize) {
396!
589
                // Produce the semantic output schema directly.
590
                // normalize_row calls end_row() internally.
591
                if (!normalize_row(builder, arena, root)) {
×
592
                    yyjson_doc_free(doc);
×
593
                    continue;
594
                }
595
                held_docs.push_back(doc);
×
596
            } else {
597
                yyjson_obj_iter iter;
396✔
598
                yyjson_obj_iter_init(root, &iter);
396!
599
                yyjson_val *key;
396✔
600
                while ((key = yyjson_obj_iter_next(&iter))) {
3,564!
601
                    yyjson_val *val = yyjson_obj_iter_get_val(key);
3,168!
602
                    const char *key_str = yyjson_get_str(key);
3,168!
603
                    std::size_t key_len = yyjson_get_len(key);
3,168!
604
                    std::string_view key_sv(key_str, key_len);
3,168✔
605

606
                    if (yyjson_is_int(val)) {
3,168!
607
                        std::size_t idx = builder.add_or_get_column(
3,168!
608
                            key_sv, ColumnType::INT64);
1,584✔
609
                        builder.append_int64(idx, yyjson_get_sint(val));
1,584!
610
                    } else if (yyjson_is_uint(val)) {
3,168!
611
                        std::size_t idx = builder.add_or_get_column(
×
612
                            key_sv, ColumnType::UINT64);
613
                        builder.append_uint64(idx, yyjson_get_uint(val));
×
614
                    } else if (yyjson_is_real(val)) {
1,584!
615
                        std::size_t idx = builder.add_or_get_column(
×
616
                            key_sv, ColumnType::DOUBLE);
617
                        builder.append_double(idx, yyjson_get_real(val));
×
618
                    } else if (yyjson_is_bool(val)) {
1,584!
619
                        std::size_t idx =
620
                            builder.add_or_get_column(key_sv, ColumnType::BOOL);
×
621
                        builder.append_bool(idx, yyjson_get_bool(val));
×
622
                    } else if (yyjson_is_str(val)) {
1,584!
623
                        std::size_t idx = builder.add_or_get_column(
2,376!
624
                            key_sv, ColumnType::STRING);
1,188✔
625
                        builder.append_string(
2,376!
626
                            idx, std::string_view(yyjson_get_str(val),
3,564!
627
                                                  yyjson_get_len(val)));
1,188!
628
                    } else if (yyjson_is_null(val)) {
1,584!
629
                        auto existing = builder.find_column(key_sv);
×
630
                        if (existing) builder.append_null(*existing);
×
631
                    } else {
632
                        std::size_t json_len;
396✔
633
                        char *json_str = yyjson_val_write(val, 0, &json_len);
396!
634
                        std::size_t idx = builder.add_or_get_column(
792!
635
                            key_sv, ColumnType::STRING);
396✔
636
                        if (json_str) {
396!
637
                            builder.append_string(
396!
638
                                idx, arena.push(json_str, json_len));
792!
639
                            free(json_str);
396!
640
                        } else {
396✔
641
                            builder.append_null(idx);
×
642
                        }
643
                    }
396✔
644
                }
3,168✔
645
                builder.end_row();
396!
646
                held_docs.push_back(doc);
396!
647
            }  // end else (raw path)
396✔
648

649
            if (builder.num_rows() >= batch_size) {
396✔
650
                auto result = builder.finish();
7!
651
                for (auto *d : held_docs) yyjson_doc_free(d);
177!
652
                held_docs.clear();
7✔
653
                arena.clear();
7!
654

655
                {
656
                    std::unique_lock<std::mutex> lock(sp->mtx);
7!
657
                    sp->cv_producer.wait(lock, [sp] {
28!
658
                        return sp->queue.size() < sp->max_queue_size ||
14!
659
                               sp->cancelled.load(std::memory_order_acquire);
7✔
660
                    });
661
                    if (sp->cancelled.load(std::memory_order_acquire)) break;
7!
662
                    sp->queue.push(std::move(result));
7!
663
                }
7!
664
                sp->cv_consumer.notify_one();
7✔
665
                builder.reset(false);
7!
666
                builder.reserve(batch_size);
7!
667
            }
7!
668
        }
436✔
669

670
        if (builder.num_rows() > 0) {
14!
671
            auto result = builder.finish();
14!
672
            for (auto *d : held_docs) yyjson_doc_free(d);
240!
673
            held_docs.clear();
14✔
674
            arena.clear();
14!
675
            {
676
                std::lock_guard<std::mutex> lock(sp->mtx);
14!
677
                sp->queue.push(std::move(result));
14!
678
            }
14✔
679
            sp->cv_consumer.notify_one();
14✔
680
        } else {
14✔
681
            for (auto *d : held_docs) yyjson_doc_free(d);
×
682
        }
683
    } catch (...) {
886✔
684
        std::lock_guard<std::mutex> lock(sp->mtx);
×
685
        sp->error = std::current_exception();
686
        sp->queue.push(std::nullopt);
×
687
        sp->done.store(true, std::memory_order_release);
688
        sp->cv_consumer.notify_one();
689
        co_return;
690
    }
×
691
    {
692
        std::lock_guard<std::mutex> lock(sp->mtx);
14!
693
        sp->queue.push(std::nullopt);
14!
694
        sp->done.store(true, std::memory_order_release);
14✔
695
    }
14✔
696
    sp->cv_consumer.notify_one();
14✔
697
}
2,658!
698

699
#endif  // DFTRACER_UTILS_ENABLE_ARROW
700

701
TraceReaderConfig build_config(TraceReaderObject *self) {
218✔
702
    TraceReaderConfig cfg;
218✔
703
    cfg.file_path = PyUnicode_AsUTF8(self->file_path);
218!
704
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
218!
705
    if (idx) cfg.index_dir = idx;
218!
706
    cfg.checkpoint_size = self->checkpoint_size;
218✔
707
    cfg.auto_build_index = self->auto_build_index != 0;
218✔
708
    cfg.index_threshold = self->index_threshold;
218✔
709
    return cfg;
218✔
710
}
109!
711

712
static Runtime *get_runtime(TraceReaderObject *self) {
180✔
713
    if (self->runtime_obj) {
180✔
714
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
18✔
715
    }
716
    return get_default_runtime();
162✔
717
}
90✔
718

719
static TraceReaderIteratorObject *make_iterator(
152✔
720
    std::shared_ptr<IteratorState> state, IteratorMode mode) {
721
    TraceReaderIteratorObject *it =
76✔
722
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
152✔
723
            &TraceReaderIteratorType, 0);
724
    if (!it) return NULL;
152✔
725
    new (&it->state) std::shared_ptr<IteratorState>(std::move(state));
152✔
726
#ifdef DFTRACER_UTILS_ENABLE_ARROW
727
    new (&it->arrow_state) std::shared_ptr<ArrowIteratorState>();
152✔
728
#endif
729
    it->mode = mode;
152✔
730
    return it;
152✔
731
}
76✔
732

733
#ifdef DFTRACER_UTILS_ENABLE_ARROW
734
static TraceReaderIteratorObject *make_arrow_iterator(
28✔
735
    std::shared_ptr<ArrowIteratorState> state) {
736
    TraceReaderIteratorObject *it =
14✔
737
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
28✔
738
            &TraceReaderIteratorType, 0);
739
    if (!it) return NULL;
28✔
740
    new (&it->state) std::shared_ptr<IteratorState>();
28✔
741
    new (&it->arrow_state)
28✔
742
        std::shared_ptr<ArrowIteratorState>(std::move(state));
28✔
743
    it->mode = IteratorMode::ARROW;
28✔
744
    return it;
28✔
745
}
14✔
746
#endif
747

748
}  // namespace
749

750
using dftracer::utils::python::wrap_arrow_table;
751

752
static void TraceReader_dealloc(TraceReaderObject *self) {
204✔
753
    Py_XDECREF(self->file_path);
204✔
754
    Py_XDECREF(self->index_dir);
204✔
755
    Py_XDECREF(self->runtime_obj);
204✔
756
    Py_TYPE(self)->tp_free((PyObject *)self);
204✔
757
}
204✔
758

759
static PyObject *TraceReader_new(PyTypeObject *type, PyObject *args,
204✔
760
                                 PyObject *kwds) {
761
    TraceReaderObject *self = (TraceReaderObject *)type->tp_alloc(type, 0);
204✔
762
    if (self) {
204✔
763
        self->file_path = NULL;
204✔
764
        self->index_dir = NULL;
204✔
765
        self->checkpoint_size = 32 * 1024 * 1024;
204✔
766
        self->auto_build_index = 0;
204✔
767
        self->index_threshold =
204✔
768
            dftracer::utils::constants::indexer::DEFAULT_INDEX_SIZE_THRESHOLD;
769
        self->has_index = 0;
204✔
770
        self->runtime_obj = NULL;
204✔
771
    }
102✔
772
    return (PyObject *)self;
204✔
773
}
774

775
static int TraceReader_init(TraceReaderObject *self, PyObject *args,
204✔
776
                            PyObject *kwds) {
777
    static const char *kwlist[] = {"file_path",
778
                                   "index_dir",
779
                                   "checkpoint_size",
780
                                   "auto_build_index",
781
                                   "index_threshold",
782
                                   "runtime",
783
                                   NULL};
784

785
    const char *file_path;
786
    const char *index_dir = "";
204✔
787
    std::size_t checkpoint_size = 32 * 1024 * 1024;
204✔
788
    int auto_build_index = 0;
204✔
789
    std::size_t index_threshold =
204✔
790
        dftracer::utils::constants::indexer::DEFAULT_INDEX_SIZE_THRESHOLD;
791
    PyObject *runtime_arg = NULL;
204✔
792

793
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|snpnO", (char **)kwlist,
204!
794
                                     &file_path, &index_dir, &checkpoint_size,
795
                                     &auto_build_index, &index_threshold,
796
                                     &runtime_arg)) {
797
        return -1;
×
798
    }
799

800
    if (runtime_arg && runtime_arg != Py_None) {
204✔
801
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
14!
802
            // Direct C++ Runtime object
803
            Py_INCREF(runtime_arg);
×
804
            self->runtime_obj = runtime_arg;
×
805
        } else {
806
            // Python wrapper, extract _native attribute
807
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
14!
808
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
14!
809
                self->runtime_obj = native;  // already incref'd by GetAttr
14✔
810
            } else {
7✔
811
                Py_XDECREF(native);
×
812
                PyErr_SetString(PyExc_TypeError,
×
813
                                "runtime must be a Runtime instance or None");
814
                return -1;
×
815
            }
816
        }
817
    }
7✔
818

819
    self->file_path = PyUnicode_FromString(file_path);
204!
820
    if (!self->file_path) return -1;
204✔
821

822
    self->index_dir = PyUnicode_FromString(index_dir);
204!
823
    if (!self->index_dir) {
204✔
824
        Py_DECREF(self->file_path);
×
825
        self->file_path = NULL;
×
826
        return -1;
×
827
    }
828

829
    self->checkpoint_size = checkpoint_size;
204✔
830
    self->auto_build_index = auto_build_index;
204✔
831
    self->index_threshold = index_threshold;
204✔
832

833
    try {
834
        TraceReaderConfig cfg;
204✔
835
        cfg.file_path = file_path;
204!
836
        cfg.index_dir = index_dir;
204!
837
        cfg.checkpoint_size = checkpoint_size;
204✔
838
        cfg.auto_build_index = auto_build_index != 0;
204✔
839
        cfg.index_threshold = index_threshold;
204✔
840
        TraceReader probe(std::move(cfg));
204!
841
        self->has_index = probe.has_index() ? 1 : 0;
204!
842
    } catch (const std::exception &e) {
204!
843
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
844
        Py_DECREF(self->file_path);
×
845
        Py_DECREF(self->index_dir);
×
846
        self->file_path = NULL;
×
847
        self->index_dir = NULL;
×
848
        return -1;
×
849
    }
×
850

851
    return 0;
204✔
852
}
102✔
853

854
static PyObject *TraceReader_iter_lines(TraceReaderObject *self, PyObject *args,
118✔
855
                                        PyObject *kwds) {
856
    static const char *kwlist[] = {"start_line", "end_line",    "start_byte",
857
                                   "end_byte",   "buffer_size", "query",
858
                                   NULL};
859
    Py_ssize_t start_line = 0, end_line = 0;
118✔
860
    Py_ssize_t start_byte = 0, end_byte = 0;
118✔
861
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
118✔
862
    const char *query_str = NULL;
118✔
863

864
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnz", (char **)kwlist,
118!
865
                                     &start_line, &end_line, &start_byte,
866
                                     &end_byte, &buffer_size, &query_str)) {
867
        return NULL;
×
868
    }
869

870
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
118!
871
        buffer_size <= 0) {
112!
872
        PyErr_SetString(
6!
873
            PyExc_ValueError,
3✔
874
            "range arguments must be >= 0; buffer_size must be > 0");
875
        return NULL;
6✔
876
    }
877

878
    TraceReaderConfig cfg;
112✔
879
    try {
880
        cfg = build_config(self);
112!
881
    } catch (const std::exception &e) {
56!
882
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
883
        return NULL;
×
884
    }
×
885

886
    ReadConfig rc;
112✔
887
    rc.start_line = static_cast<std::size_t>(start_line);
112✔
888
    rc.end_line = static_cast<std::size_t>(end_line);
112✔
889
    rc.start_byte = static_cast<std::size_t>(start_byte);
112✔
890
    rc.end_byte = static_cast<std::size_t>(end_byte);
112✔
891
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
112✔
892
    if (query_str) rc.query = query_str;
112!
893

894
    auto state = std::make_shared<IteratorState>();
112!
895

896
    Runtime *rt = get_runtime(self);
112!
897
    try {
898
        rt->submit(produce_lines(state, cfg, rc), "iter_lines");
112!
899
    } catch (const std::exception &e) {
56!
900
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
901
        return NULL;
×
902
    }
×
903

904
    TraceReaderIteratorObject *it = make_iterator(state, IteratorMode::LINES);
112!
905
    return (PyObject *)it;
112✔
906
}
115✔
907

908
static PyObject *TraceReader_iter_raw(TraceReaderObject *self, PyObject *args,
22✔
909
                                      PyObject *kwds) {
910
    static const char *kwlist[] = {"start_line", "end_line",    "start_byte",
911
                                   "end_byte",   "buffer_size", "line_aligned",
912
                                   "multi_line", "query",       NULL};
913
    Py_ssize_t start_line = 0, end_line = 0;
22✔
914
    Py_ssize_t start_byte = 0, end_byte = 0;
22✔
915
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
22✔
916
    int line_aligned = 1;
22✔
917
    int multi_line = 1;
22✔
918
    const char *query_str = NULL;
22✔
919

920
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnppz", (char **)kwlist,
22!
921
                                     &start_line, &end_line, &start_byte,
922
                                     &end_byte, &buffer_size, &line_aligned,
923
                                     &multi_line, &query_str)) {
924
        return NULL;
×
925
    }
926

927
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
22!
928
        buffer_size <= 0) {
20!
929
        PyErr_SetString(
2!
930
            PyExc_ValueError,
1✔
931
            "range arguments must be >= 0; buffer_size must be > 0");
932
        return NULL;
2✔
933
    }
934

935
    TraceReaderConfig cfg;
20✔
936
    try {
937
        cfg = build_config(self);
20!
938
    } catch (const std::exception &e) {
10!
939
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
940
        return NULL;
×
941
    }
×
942

943
    ReadConfig rc;
20✔
944
    rc.start_line = static_cast<std::size_t>(start_line);
20✔
945
    rc.end_line = static_cast<std::size_t>(end_line);
20✔
946
    rc.start_byte = static_cast<std::size_t>(start_byte);
20✔
947
    rc.end_byte = static_cast<std::size_t>(end_byte);
20✔
948
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
20✔
949
    rc.line_aligned = line_aligned != 0;
20✔
950
    rc.multi_line = multi_line != 0;
20✔
951
    if (query_str) rc.query = query_str;
20!
952

953
    auto state = std::make_shared<IteratorState>();
20!
954

955
    Runtime *rt = get_runtime(self);
20!
956
    try {
957
        rt->submit(produce_raw(state, cfg, rc), "iter_raw");
20!
958
    } catch (const std::exception &e) {
10!
959
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
960
        return NULL;
×
961
    }
×
962

963
    TraceReaderIteratorObject *it = make_iterator(state, IteratorMode::RAW);
20!
964
    return (PyObject *)it;
20✔
965
}
21✔
966

967
static PyObject *TraceReader_read_lines(TraceReaderObject *self, PyObject *args,
90✔
968
                                        PyObject *kwds) {
969
    PyObject *iter = TraceReader_iter_lines(self, args, kwds);
90✔
970
    if (!iter) return NULL;
90✔
971
    PyObject *list = PySequence_List(iter);
86✔
972
    Py_DECREF(iter);
43✔
973
    return list;
86✔
974
}
45✔
975

976
static PyObject *TraceReader_read_raw(TraceReaderObject *self, PyObject *args,
8✔
977
                                      PyObject *kwds) {
978
    PyObject *iter = TraceReader_iter_raw(self, args, kwds);
8✔
979
    if (!iter) return NULL;
8✔
980
    PyObject *list = PySequence_List(iter);
8✔
981
    Py_DECREF(iter);
4✔
982
    return list;
8✔
983
}
4✔
984

985
static PyObject *TraceReader_iter_lines_json(TraceReaderObject *self,
20✔
986
                                             PyObject *args, PyObject *kwds) {
987
    static const char *kwlist[] = {"start_line", "end_line",    "start_byte",
988
                                   "end_byte",   "buffer_size", "query",
989
                                   NULL};
990
    Py_ssize_t start_line = 0, end_line = 0;
20✔
991
    Py_ssize_t start_byte = 0, end_byte = 0;
20✔
992
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
20✔
993
    const char *query_str = NULL;
20✔
994

995
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnz", (char **)kwlist,
20!
996
                                     &start_line, &end_line, &start_byte,
997
                                     &end_byte, &buffer_size, &query_str)) {
998
        return NULL;
×
999
    }
1000

1001
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
20!
1002
        buffer_size <= 0) {
20!
1003
        PyErr_SetString(
×
1004
            PyExc_ValueError,
1005
            "range arguments must be >= 0; buffer_size must be > 0");
1006
        return NULL;
×
1007
    }
1008

1009
    TraceReaderConfig cfg;
20✔
1010
    try {
1011
        cfg = build_config(self);
20!
1012
    } catch (const std::exception &e) {
10!
1013
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
1014
        return NULL;
×
1015
    }
×
1016

1017
    ReadConfig rc;
20✔
1018
    rc.start_line = static_cast<std::size_t>(start_line);
20✔
1019
    rc.end_line = static_cast<std::size_t>(end_line);
20✔
1020
    rc.start_byte = static_cast<std::size_t>(start_byte);
20✔
1021
    rc.end_byte = static_cast<std::size_t>(end_byte);
20✔
1022
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
20✔
1023
    if (query_str) rc.query = query_str;
20!
1024

1025
    auto state = std::make_shared<IteratorState>();
20!
1026

1027
    Runtime *rt = get_runtime(self);
20!
1028
    try {
1029
        rt->submit(produce_lines(state, cfg, rc), "iter_lines_json");
20!
1030
    } catch (const std::exception &e) {
10!
1031
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
1032
        return NULL;
×
1033
    }
×
1034

1035
    TraceReaderIteratorObject *it = make_iterator(state, IteratorMode::JSON);
20!
1036
    return (PyObject *)it;
20✔
1037
}
20✔
1038

1039
static PyObject *TraceReader_read_lines_json(TraceReaderObject *self,
14✔
1040
                                             PyObject *args, PyObject *kwds) {
1041
    PyObject *iter = TraceReader_iter_lines_json(self, args, kwds);
14✔
1042
    if (!iter) return NULL;
14✔
1043
    PyObject *list = PySequence_List(iter);
14✔
1044
    Py_DECREF(iter);
7✔
1045
    return list;
14✔
1046
}
7✔
1047

1048
#ifdef DFTRACER_UTILS_ENABLE_ARROW
1049

1050
static PyObject *TraceReader_iter_arrow(TraceReaderObject *self, PyObject *args,
28✔
1051
                                        PyObject *kwds) {
1052
    static const char *kwlist[] = {
1053
        "batch_size", "start_line",  "end_line", "start_byte",
1054
        "end_byte",   "buffer_size", "query",    "flatten_objects",
1055
        "normalize",  NULL};
1056
    Py_ssize_t batch_size = 10000;
28✔
1057
    Py_ssize_t start_line = 0, end_line = 0;
28✔
1058
    Py_ssize_t start_byte = 0, end_byte = 0;
28✔
1059
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
28✔
1060
    const char *query_str = NULL;
28✔
1061
    int flatten_objects = 0;
28✔
1062
    int normalize = 0;
28✔
1063

1064
    if (!PyArg_ParseTupleAndKeywords(
28!
1065
            args, kwds, "|nnnnnnzpp", (char **)kwlist, &batch_size, &start_line,
14✔
1066
            &end_line, &start_byte, &end_byte, &buffer_size, &query_str,
1067
            &flatten_objects, &normalize)) {
UNCOV
1068
        return NULL;
×
1069
    }
1070

1071
    if (batch_size <= 0) {
28!
1072
        PyErr_SetString(PyExc_ValueError, "batch_size must be > 0");
×
1073
        return NULL;
×
1074
    }
1075
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
28!
1076
        buffer_size <= 0) {
28!
1077
        PyErr_SetString(
×
1078
            PyExc_ValueError,
1079
            "range arguments must be >= 0; buffer_size must be > 0");
1080
        return NULL;
×
1081
    }
1082

1083
    TraceReaderConfig cfg;
28✔
1084
    try {
1085
        cfg = build_config(self);
28!
1086
    } catch (const std::exception &e) {
14!
1087
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
1088
        return NULL;
×
1089
    }
×
1090

1091
    ReadConfig rc;
28✔
1092
    rc.start_line = static_cast<std::size_t>(start_line);
28✔
1093
    rc.end_line = static_cast<std::size_t>(end_line);
28✔
1094
    rc.start_byte = static_cast<std::size_t>(start_byte);
28✔
1095
    rc.end_byte = static_cast<std::size_t>(end_byte);
28✔
1096
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
28✔
1097
    if (query_str) rc.query = query_str;
28!
1098

1099
    auto state = std::make_shared<ArrowIteratorState>();
28!
1100

1101
    Runtime *rt = get_runtime(self);
28!
1102
    try {
1103
        rt->submit(produce_arrow_batches(state, cfg, rc,
42!
1104
                                         static_cast<std::size_t>(batch_size),
14✔
1105
                                         flatten_objects != 0, normalize != 0),
14✔
1106
                   "iter_arrow");
14!
1107
    } catch (const std::exception &e) {
14!
1108
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
1109
        return NULL;
×
1110
    }
×
1111

1112
    TraceReaderIteratorObject *it = make_arrow_iterator(std::move(state));
28!
1113
    return (PyObject *)it;
28✔
1114
}
28✔
1115

1116
static PyObject *TraceReader_read_arrow(TraceReaderObject *self, PyObject *args,
10✔
1117
                                        PyObject *kwds) {
1118
    PyObject *iter = TraceReader_iter_arrow(self, args, kwds);
10✔
1119
    if (!iter) return NULL;
10✔
1120
    PyObject *list = PySequence_List(iter);
10✔
1121
    Py_DECREF(iter);
5✔
1122
    if (!list) return NULL;
10✔
1123

1124
    return wrap_arrow_table(list);
10✔
1125
}
5✔
1126

1127
#endif  // DFTRACER_UTILS_ENABLE_ARROW
1128

1129
static PyObject *TraceReader_enter(TraceReaderObject *self,
8!
1130
                                   PyObject *Py_UNUSED(ignored)) {
1131
    Py_INCREF(self);
4✔
1132
    return (PyObject *)self;
8✔
1133
}
1134

1135
static PyObject *TraceReader_exit(TraceReaderObject *self, PyObject *args) {
6✔
1136
    Py_RETURN_NONE;
6✔
1137
}
1138

1139
static PyObject *TraceReader_get_file_path(TraceReaderObject *self,
16✔
1140
                                           void *closure) {
1141
    Py_INCREF(self->file_path);
16!
1142
    return self->file_path;
16✔
1143
}
1144

1145
static PyObject *TraceReader_get_index_dir(TraceReaderObject *self,
6✔
1146
                                           void *closure) {
1147
    Py_INCREF(self->index_dir);
6✔
1148
    return self->index_dir;
6✔
1149
}
1150

1151
static PyObject *TraceReader_get_has_index(TraceReaderObject *self,
12✔
1152
                                           void *closure) {
1153
    return PyBool_FromLong(self->has_index);
12✔
1154
}
1155

1156
static PyObject *TraceReader_get_num_lines_prop(TraceReaderObject *self,
8✔
1157
                                                void *closure) {
1158
    try {
1159
        TraceReaderConfig cfg = build_config(self);
8!
1160
        TraceReader reader(std::move(cfg));
8!
1161
        std::size_t n = reader.get_num_lines();
8!
1162
        if (n > 0) return PyLong_FromSize_t(n);
8!
1163
    } catch (...) {
8!
1164
    }
×
1165
    PyObject *empty_args = PyTuple_New(0);
8✔
1166
    if (!empty_args) return NULL;
8✔
1167
    PyObject *list = TraceReader_read_lines(self, empty_args, NULL);
8✔
1168
    Py_DECREF(empty_args);
4✔
1169
    if (!list) return NULL;
8✔
1170
    Py_ssize_t n = PyList_GET_SIZE(list);
8✔
1171
    Py_DECREF(list);
4✔
1172
    return PyLong_FromSsize_t(n);
8✔
1173
}
4✔
1174

1175
static PyObject *TraceReader_get_max_bytes(TraceReaderObject *self,
26✔
1176
                                           PyObject *Py_UNUSED(ignored)) {
1177
    try {
1178
        TraceReaderConfig cfg = build_config(self);
26!
1179
        TraceReader reader(std::move(cfg));
26!
1180
        return PyLong_FromSize_t(reader.get_max_bytes());
26!
1181
    } catch (const std::exception &e) {
26!
1182
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
1183
        return NULL;
×
1184
    }
×
1185
}
13✔
1186

1187
static PyObject *TraceReader_get_num_lines(TraceReaderObject *self,
4✔
1188
                                           PyObject *Py_UNUSED(ignored)) {
1189
    try {
1190
        TraceReaderConfig cfg = build_config(self);
4!
1191
        TraceReader reader(std::move(cfg));
4!
1192
        return PyLong_FromSize_t(reader.get_num_lines());
4!
1193
    } catch (const std::exception &e) {
4!
1194
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
1195
        return NULL;
×
1196
    }
×
1197
}
2✔
1198

1199
static PyMethodDef TraceReader_methods[] = {
1200
    {"iter_lines", (PyCFunction)TraceReader_iter_lines,
1201
     METH_VARARGS | METH_KEYWORDS,
1202
     "Return an iterator over decoded lines.\n"
1203
     "\n"
1204
     "Args:\n"
1205
     "    start_line (int): First line (0 = beginning).\n"
1206
     "    end_line (int): Last line (0 = end of file).\n"
1207
     "    start_byte (int): First byte offset (0 = beginning).\n"
1208
     "    end_byte (int): Last byte offset (0 = end of file).\n"
1209
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
1210
    {"iter_raw", (PyCFunction)TraceReader_iter_raw,
1211
     METH_VARARGS | METH_KEYWORDS,
1212
     "Return an iterator over raw byte chunks.\n"
1213
     "\n"
1214
     "Args:\n"
1215
     "    start_line (int): First line (0 = beginning).\n"
1216
     "    end_line (int): Last line (0 = end of file).\n"
1217
     "    start_byte (int): First byte offset (0 = beginning).\n"
1218
     "    end_byte (int): Last byte offset (0 = end of file).\n"
1219
     "    buffer_size (int): Internal read buffer size in bytes.\n"
1220
     "    line_aligned (bool): Align chunks to line boundaries.\n"
1221
     "    multi_line (bool): Allow multiple lines per chunk.\n"},
1222
    {"read_lines", (PyCFunction)TraceReader_read_lines,
1223
     METH_VARARGS | METH_KEYWORDS,
1224
     "Read all lines and return as list.\n"
1225
     "\n"
1226
     "Args:\n"
1227
     "    start_line (int): First line (0 = beginning).\n"
1228
     "    end_line (int): Last line (0 = end of file).\n"
1229
     "    start_byte (int): First byte offset (0 = beginning).\n"
1230
     "    end_byte (int): Last byte offset (0 = end of file).\n"
1231
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
1232
    {"read_raw", (PyCFunction)TraceReader_read_raw,
1233
     METH_VARARGS | METH_KEYWORDS,
1234
     "Read all raw chunks and return as list.\n"
1235
     "\n"
1236
     "Args:\n"
1237
     "    start_line (int): First line (0 = beginning).\n"
1238
     "    end_line (int): Last line (0 = end of file).\n"
1239
     "    start_byte (int): First byte offset (0 = beginning).\n"
1240
     "    end_byte (int): Last byte offset (0 = end of file).\n"
1241
     "    buffer_size (int): Internal read buffer size in bytes.\n"
1242
     "    line_aligned (bool): Align chunks to line boundaries.\n"
1243
     "    multi_line (bool): Allow multiple lines per chunk.\n"},
1244
    {"iter_lines_json", (PyCFunction)TraceReader_iter_lines_json,
1245
     METH_VARARGS | METH_KEYWORDS,
1246
     "Return an iterator over parsed JSON objects.\n"
1247
     "\n"
1248
     "Args:\n"
1249
     "    start_line (int): First line (0 = beginning).\n"
1250
     "    end_line (int): Last line (0 = end of file).\n"
1251
     "    start_byte (int): First byte offset (0 = beginning).\n"
1252
     "    end_byte (int): Last byte offset (0 = end of file).\n"
1253
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
1254
    {"read_lines_json", (PyCFunction)TraceReader_read_lines_json,
1255
     METH_VARARGS | METH_KEYWORDS,
1256
     "Read all lines as parsed JSON objects.\n"
1257
     "\n"
1258
     "Args:\n"
1259
     "    start_line (int): First line (0 = beginning).\n"
1260
     "    end_line (int): Last line (0 = end of file).\n"
1261
     "    start_byte (int): First byte offset (0 = beginning).\n"
1262
     "    end_byte (int): Last byte offset (0 = end of file).\n"
1263
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
1264
#ifdef DFTRACER_UTILS_ENABLE_ARROW
1265
    {"iter_arrow", (PyCFunction)TraceReader_iter_arrow,
1266
     METH_VARARGS | METH_KEYWORDS,
1267
     "Return an iterator over Arrow record batches.\n"
1268
     "\n"
1269
     "Args:\n"
1270
     "    batch_size (int): Maximum rows per Arrow batch.\n"
1271
     "    start_line (int): First line (0 = beginning).\n"
1272
     "    end_line (int): Last line (0 = end of file).\n"
1273
     "    start_byte (int): First byte offset (0 = beginning).\n"
1274
     "    end_byte (int): Last byte offset (0 = end of file).\n"
1275
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
1276
    {"read_arrow", (PyCFunction)TraceReader_read_arrow,
1277
     METH_VARARGS | METH_KEYWORDS,
1278
     "Read all events as a materialized ArrowTable.\n"
1279
     "\n"
1280
     "Args:\n"
1281
     "    batch_size (int): Maximum rows per Arrow batch.\n"
1282
     "    start_line (int): First line (0 = beginning).\n"
1283
     "    end_line (int): Last line (0 = end of file).\n"
1284
     "    start_byte (int): First byte offset (0 = beginning).\n"
1285
     "    end_byte (int): Last byte offset (0 = end of file).\n"
1286
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
1287
#endif
1288
    {"get_max_bytes", (PyCFunction)TraceReader_get_max_bytes, METH_NOARGS,
1289
     "Get the maximum byte position (0 if unknown for compressed\n"
1290
     "files without index)."},
1291
    {"get_num_lines", (PyCFunction)TraceReader_get_num_lines, METH_NOARGS,
1292
     "Get the total number of lines (0 if unknown for files without\n"
1293
     "index)."},
1294
    {"__enter__", (PyCFunction)TraceReader_enter, METH_NOARGS,
1295
     "Enter the runtime context for the with statement."},
1296
    {"__exit__", (PyCFunction)TraceReader_exit, METH_VARARGS,
1297
     "Exit the runtime context for the with statement."},
1298
    {NULL}};
1299

1300
static PyGetSetDef TraceReader_getsetters[] = {
1301
    {"file_path", (getter)TraceReader_get_file_path, NULL,
1302
     "Path to the trace file", NULL},
1303
    {"index_dir", (getter)TraceReader_get_index_dir, NULL,
1304
     "Directory for index files", NULL},
1305
    {"has_index", (getter)TraceReader_get_has_index, NULL,
1306
     "True if a checkpoint index was found", NULL},
1307
    {"num_lines", (getter)TraceReader_get_num_lines_prop, NULL,
1308
     "Total line count (reads all lines if needed)", NULL},
1309
    {NULL}};
1310

1311
PyTypeObject TraceReaderType = {
1312
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.TraceReader",
1313
    sizeof(TraceReaderObject),                /* tp_basicsize */
1314
    0,                                        /* tp_itemsize */
1315
    (destructor)TraceReader_dealloc,          /* tp_dealloc */
1316
    0,                                        /* tp_vectorcall_offset */
1317
    0,                                        /* tp_getattr */
1318
    0,                                        /* tp_setattr */
1319
    0,                                        /* tp_as_async */
1320
    0,                                        /* tp_repr */
1321
    0,                                        /* tp_as_number */
1322
    0,                                        /* tp_as_sequence */
1323
    0,                                        /* tp_as_mapping */
1324
    0,                                        /* tp_hash */
1325
    0,                                        /* tp_call */
1326
    0,                                        /* tp_str */
1327
    0,                                        /* tp_getattro */
1328
    0,                                        /* tp_setattro */
1329
    0,                                        /* tp_as_buffer */
1330
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
1331
    "TraceReader(file_path: str, index_dir: str = '',\n"
1332
    "            checkpoint_size: int = 33554432,\n"
1333
    "            auto_build_index: bool = False,\n"
1334
    "            index_threshold: int = 1048576,\n"
1335
    "            runtime: Runtime | None = None)\n"
1336
    "--\n"
1337
    "\n"
1338
    "Smart trace file reader that auto-selects sequential or indexed\n"
1339
    "reading based on whether an ``.idx`` sidecar exists.\n"
1340
    "\n"
1341
    "Args:\n"
1342
    "    file_path (str): Path to the trace file (.pfw.gz or plain "
1343
    "text).\n"
1344
    "    index_dir (str): Directory to search for ``.idx`` sidecar "
1345
    "files.\n"
1346
    "        Empty string (default) searches next to the trace file.\n"
1347
    "    checkpoint_size (int): Checkpoint interval in bytes for index\n"
1348
    "        building (default 32 MB).\n"
1349
    "    auto_build_index (bool): If True, automatically build an "
1350
    "index\n"
1351
    "        when none exists and the file exceeds *index_threshold*.\n"
1352
    "    index_threshold (int): Minimum file size in bytes before\n"
1353
    "        auto-indexing is triggered (default 8 MB).\n"
1354
    "    runtime (Runtime or None): Runtime instance for thread pool "
1355
    "control.\n"
1356
    "        If None, uses the default global Runtime.\n"
1357
    "\n"
1358
    "Raises:\n"
1359
    "    RuntimeError: If *file_path* does not exist or cannot be "
1360
    "opened.\n",                /* tp_doc */
1361
    0,                          /* tp_traverse */
1362
    0,                          /* tp_clear */
1363
    0,                          /* tp_richcompare */
1364
    0,                          /* tp_weaklistoffset */
1365
    0,                          /* tp_iter */
1366
    0,                          /* tp_iternext */
1367
    TraceReader_methods,        /* tp_methods */
1368
    0,                          /* tp_members */
1369
    TraceReader_getsetters,     /* tp_getset */
1370
    0,                          /* tp_base */
1371
    0,                          /* tp_dict */
1372
    0,                          /* tp_descr_get */
1373
    0,                          /* tp_descr_set */
1374
    0,                          /* tp_dictoffset */
1375
    (initproc)TraceReader_init, /* tp_init */
1376
    0,                          /* tp_alloc */
1377
    TraceReader_new,            /* tp_new */
1378
};
1379

1380
int init_trace_reader(PyObject *m) {
2✔
1381
    if (PyType_Ready(&TraceReaderType) < 0) return -1;
2✔
1382

1383
    Py_INCREF(&TraceReaderType);
1✔
1384
    if (PyModule_AddObject(m, "TraceReader", (PyObject *)&TraceReaderType) <
2✔
1385
        0) {
1386
        Py_DECREF(&TraceReaderType);
1387
        Py_DECREF(m);
1388
        return -1;
×
1389
    }
1390

1391
    return 0;
2✔
1392
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc