• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 23531027933

25 Mar 2026 08:05AM UTC coverage: 48.592% (-1.5%) from 50.098%
23531027933

Pull #57

github

web-flow
Merge d1070e289 into 38f9f3616
Pull Request #57: feat(comparator): add pairwise traces comparator

18900 of 49456 branches covered (38.22%)

Branch coverage included in aggregate %.

1604 of 1954 new or added lines in 25 files covered. (82.09%)

3407 existing lines in 135 files now uncovered.

18487 of 27485 relevant lines covered (67.26%)

240991.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.33
/src/dftracer/utils/python/trace_reader.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <Python.h>
3
#include <dftracer/utils/core/coro/task.h>
4
#include <dftracer/utils/core/utils/string.h>
5
#include <dftracer/utils/python/arrow_helpers.h>
6
#include <dftracer/utils/python/runtime.h>
7
#include <dftracer/utils/python/trace_reader.h>
8
#include <dftracer/utils/python/trace_reader_iterator.h>
9
#include <dftracer/utils/utilities/reader/trace_reader.h>
10

11
#include <cstddef>
12
#include <exception>
13
#include <memory>
14
#include <string>
15
#include <vector>
16

17
#ifdef DFTRACER_UTILS_ENABLE_ARROW
18
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
19
#include <yyjson.h>
20
#endif
21

22
namespace {
23

24
using dftracer::utils::Runtime;
25
using dftracer::utils::coro::CoroTask;
26
using dftracer::utils::utilities::reader::ReadConfig;
27
using dftracer::utils::utilities::reader::TraceReader;
28
using dftracer::utils::utilities::reader::TraceReaderConfig;
29

30
CoroTask<void> produce_lines(std::shared_ptr<IteratorState> state,
4,578!
31
                             TraceReaderConfig cfg, ReadConfig rc) {
66!
32
    auto *sp = state.get();
66✔
33
    try {
34
        TraceReader reader(std::move(cfg));
66!
35
        auto gen = reader.read_lines(rc);
66!
36
        while (auto opt = co_await gen.next()) {
4,430!
37
            if (sp->cancelled.load(std::memory_order_acquire)) break;
1,028✔
38
            std::string item(opt->content);
1,026!
39
            {
40
                std::unique_lock<std::mutex> lock(sp->mtx);
1,026!
41
                sp->cv_producer.wait(lock, [sp] {
2,052!
42
                    return sp->queue.size() < sp->max_queue_size ||
1,026!
UNCOV
43
                           sp->cancelled.load(std::memory_order_acquire);
×
44
                });
45
                if (sp->cancelled.load(std::memory_order_acquire)) break;
1,026!
46
                sp->queue.push(std::move(item));
1,026!
47
            }
1,026!
48
            sp->cv_consumer.notify_one();
1,026✔
49
        }
1,092!
50
    } catch (...) {
2,246✔
51
        std::lock_guard<std::mutex> lock(sp->mtx);
2!
52
        sp->error = std::current_exception();
2✔
53
        sp->queue.push(std::nullopt);
2!
54
        sp->done.store(true, std::memory_order_release);
2✔
55
        sp->cv_consumer.notify_one();
2✔
56
        co_return;
2✔
57
    }
2!
58
    {
59
        std::lock_guard<std::mutex> lock(sp->mtx);
64!
60
        sp->queue.push(std::nullopt);
64!
61
        sp->done.store(true, std::memory_order_release);
64✔
62
    }
64✔
63
    sp->cv_consumer.notify_one();
64✔
64
}
6,608✔
65

66
CoroTask<void> produce_raw(std::shared_ptr<IteratorState> state,
606!
67
                           TraceReaderConfig cfg, ReadConfig rc) {
10!
68
    auto *sp = state.get();
10✔
69
    try {
70
        TraceReader reader(std::move(cfg));
10!
71
        auto gen = reader.read_raw(rc);
10!
72
        while (auto opt = co_await gen.next()) {
584!
73
            if (sp->cancelled.load(std::memory_order_acquire)) break;
135✔
74
            std::string item(opt->data(), opt->size());
134!
75
            {
76
                std::unique_lock<std::mutex> lock(sp->mtx);
134!
77
                sp->cv_producer.wait(lock, [sp] {
268!
78
                    return sp->queue.size() < sp->max_queue_size ||
134!
UNCOV
79
                           sp->cancelled.load(std::memory_order_acquire);
×
80
                });
81
                if (sp->cancelled.load(std::memory_order_acquire)) break;
134!
82
                sp->queue.push(std::move(item));
134!
83
            }
134!
84
            sp->cv_consumer.notify_one();
134✔
85
        }
144!
86
    } catch (...) {
296✔
87
        std::lock_guard<std::mutex> lock(sp->mtx);
1!
88
        sp->error = std::current_exception();
1✔
89
        sp->queue.push(std::nullopt);
1!
90
        sp->done.store(true, std::memory_order_release);
1✔
91
        sp->cv_consumer.notify_one();
1✔
92
        co_return;
1✔
93
    }
1!
94
    {
95
        std::lock_guard<std::mutex> lock(sp->mtx);
9!
96
        sp->queue.push(std::nullopt);
9!
97
        sp->done.store(true, std::memory_order_release);
9✔
98
    }
9✔
99
    sp->cv_consumer.notify_one();
9✔
100
}
869✔
101

102
#ifdef DFTRACER_UTILS_ENABLE_ARROW
103

104
using dftracer::utils::utilities::common::arrow::ColumnType;
105
using dftracer::utils::utilities::common::arrow::RecordBatchBuilder;
106

107
CoroTask<void> produce_arrow_batches(std::shared_ptr<ArrowIteratorState> state,
1,786!
108
                                     TraceReaderConfig cfg, ReadConfig rc,
109
                                     std::size_t batch_size) {
14!
110
    auto *sp = state.get();
14✔
111
    try {
112
        TraceReader reader(std::move(cfg));
14!
113
        auto gen = reader.read_lines(rc);
14!
114
        RecordBatchBuilder builder;
14✔
115
        builder.reserve(batch_size);
14!
116

117
        // Keep yyjson docs alive until finish() since string columns hold
118
        // string_views into doc memory. Serialized object/array values are
119
        // stored as owned strings in held_serialized.
120
        std::vector<yyjson_doc *> held_docs;
14✔
121
        std::vector<std::string> held_serialized;
14✔
122
        held_docs.reserve(batch_size);
14!
123

124
        while (auto opt = co_await gen.next()) {
1,758!
125
            if (sp->cancelled.load(std::memory_order_acquire)) break;
422!
126

127
            const char *trimmed;
422✔
128
            std::size_t trimmed_length;
422✔
129
            if (!dftracer::utils::json_trim_and_validate(
422!
130
                    opt->content.data(), opt->content.size(), trimmed,
422✔
131
                    trimmed_length)) {
132
                continue;
26✔
133
            }
134

135
            yyjson_doc *doc = yyjson_read(trimmed, trimmed_length, 0);
396!
136
            if (!doc) continue;
396!
137

138
            yyjson_val *root = yyjson_doc_get_root(doc);
396!
139
            if (!root || !yyjson_is_obj(root)) {
396!
UNCOV
140
                yyjson_doc_free(doc);
×
UNCOV
141
                continue;
×
142
            }
143

144
            yyjson_obj_iter iter;
396✔
145
            yyjson_obj_iter_init(root, &iter);
396!
146
            yyjson_val *key;
396✔
147
            while ((key = yyjson_obj_iter_next(&iter))) {
1,980!
148
                yyjson_val *val = yyjson_obj_iter_get_val(key);
1,584!
149
                const char *key_str = yyjson_get_str(key);
1,584!
150
                std::size_t key_len = yyjson_get_len(key);
1,584!
151
                std::string_view key_sv(key_str, key_len);
1,584✔
152

153
                if (yyjson_is_int(val)) {
1,584!
154
                    std::size_t idx =
396✔
155
                        builder.add_or_get_column(key_sv, ColumnType::INT64);
396!
156
                    builder.append_int64(idx, yyjson_get_sint(val));
396!
157
                } else if (yyjson_is_uint(val)) {
1,584!
UNCOV
158
                    std::size_t idx =
×
UNCOV
159
                        builder.add_or_get_column(key_sv, ColumnType::UINT64);
×
UNCOV
160
                    builder.append_uint64(idx, yyjson_get_uint(val));
×
161
                } else if (yyjson_is_real(val)) {
1,188!
UNCOV
162
                    std::size_t idx =
×
UNCOV
163
                        builder.add_or_get_column(key_sv, ColumnType::DOUBLE);
×
UNCOV
164
                    builder.append_double(idx, yyjson_get_real(val));
×
165
                } else if (yyjson_is_bool(val)) {
1,188!
UNCOV
166
                    std::size_t idx =
×
UNCOV
167
                        builder.add_or_get_column(key_sv, ColumnType::BOOL);
×
UNCOV
168
                    builder.append_bool(idx, yyjson_get_bool(val));
×
169
                } else if (yyjson_is_str(val)) {
1,188!
170
                    std::size_t idx =
1,188✔
171
                        builder.add_or_get_column(key_sv, ColumnType::STRING);
1,188!
172
                    // string_view into doc memory — doc kept alive in
173
                    // held_docs until finish()
174
                    builder.append_string(
2,376!
175
                        idx, std::string_view(yyjson_get_str(val),
3,564!
176
                                              yyjson_get_len(val)));
1,188!
177
                } else if (yyjson_is_null(val)) {
1,188!
178
                    // Only append null to an existing column; skip if the
179
                    // column is new — we don't know its type yet and creating
180
                    // it as STRING would corrupt later typed appends.
UNCOV
181
                    auto existing = builder.find_column(key_sv);
×
UNCOV
182
                    if (existing) builder.append_null(*existing);
×
UNCOV
183
                } else {
×
184
                    // object/array: serialize to JSON string
UNCOV
185
                    std::size_t json_len;
×
UNCOV
186
                    char *json_str = yyjson_val_write(val, 0, &json_len);
×
UNCOV
187
                    std::size_t idx =
×
UNCOV
188
                        builder.add_or_get_column(key_sv, ColumnType::STRING);
×
UNCOV
189
                    if (json_str) {
×
UNCOV
190
                        held_serialized.emplace_back(json_str, json_len);
×
UNCOV
191
                        free(json_str);
×
UNCOV
192
                        builder.append_string(idx, held_serialized.back());
×
UNCOV
193
                    } else {
×
UNCOV
194
                        builder.append_null(idx);
×
195
                    }
UNCOV
196
                }
×
197
            }
1,584✔
198
            builder.end_row();
396!
199
            held_docs.push_back(doc);
396!
200

201
            if (builder.num_rows() >= batch_size) {
396✔
202
                auto result = builder.finish();
7!
203
                for (auto *d : held_docs) yyjson_doc_free(d);
177!
204
                held_docs.clear();
7✔
205
                held_serialized.clear();
7✔
206

207
                {
208
                    std::unique_lock<std::mutex> lock(sp->mtx);
7!
209
                    sp->cv_producer.wait(lock, [sp] {
14!
210
                        return sp->queue.size() < sp->max_queue_size ||
7!
UNCOV
211
                               sp->cancelled.load(std::memory_order_acquire);
×
212
                    });
213
                    if (sp->cancelled.load(std::memory_order_acquire)) break;
7!
214
                    sp->queue.push(std::move(result));
7!
215
                }
7!
216
                sp->cv_consumer.notify_one();
7✔
217
                builder.reset(false);
7!
218
                builder.reserve(batch_size);
7!
219
            }
7!
220
        }
436✔
221

222
        // Flush remaining rows
223
        if (builder.num_rows() > 0) {
14!
224
            auto result = builder.finish();
14!
225
            for (auto *d : held_docs) yyjson_doc_free(d);
240!
226
            held_docs.clear();
14✔
227
            held_serialized.clear();
14✔
228
            {
229
                std::lock_guard<std::mutex> lock(sp->mtx);
14!
230
                sp->queue.push(std::move(result));
14!
231
            }
14✔
232
            sp->cv_consumer.notify_one();
14✔
233
        } else {
14✔
UNCOV
234
            for (auto *d : held_docs) yyjson_doc_free(d);
×
235
        }
236
    } catch (...) {
886✔
UNCOV
237
        std::lock_guard<std::mutex> lock(sp->mtx);
×
UNCOV
238
        sp->error = std::current_exception();
×
UNCOV
239
        sp->queue.push(std::nullopt);
×
UNCOV
240
        sp->done.store(true, std::memory_order_release);
×
UNCOV
241
        sp->cv_consumer.notify_one();
×
UNCOV
242
        co_return;
×
UNCOV
243
    }
×
244
    {
245
        std::lock_guard<std::mutex> lock(sp->mtx);
14!
246
        sp->queue.push(std::nullopt);
14!
247
        sp->done.store(true, std::memory_order_release);
14✔
248
    }
14✔
249
    sp->cv_consumer.notify_one();
14✔
250
}
2,630✔
251

252
#endif  // DFTRACER_UTILS_ENABLE_ARROW
253

254
TraceReaderConfig build_config(TraceReaderObject *self) {
109✔
255
    TraceReaderConfig cfg;
109✔
256
    cfg.file_path = PyUnicode_AsUTF8(self->file_path);
109!
257
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
109!
258
    if (idx) cfg.index_dir = idx;
109!
259
    cfg.checkpoint_size = self->checkpoint_size;
109✔
260
    cfg.auto_build_index = self->auto_build_index != 0;
109✔
261
    cfg.index_threshold = self->index_threshold;
109✔
262
    return cfg;
109✔
263
}
109!
264

265
static Runtime *get_runtime(TraceReaderObject *self) {
90✔
266
    if (self->runtime_obj) {
90✔
267
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
9✔
268
    }
269
    return get_default_runtime();
81✔
270
}
90✔
271

272
static TraceReaderIteratorObject *make_iterator(
76✔
273
    std::shared_ptr<IteratorState> state, IteratorMode mode) {
274
    TraceReaderIteratorObject *it =
76✔
275
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
76✔
276
            &TraceReaderIteratorType, 0);
277
    if (!it) return NULL;
76!
278
    new (&it->state) std::shared_ptr<IteratorState>(std::move(state));
76✔
279
#ifdef DFTRACER_UTILS_ENABLE_ARROW
280
    new (&it->arrow_state) std::shared_ptr<ArrowIteratorState>();
76✔
281
#endif
282
    it->mode = mode;
76✔
283
    return it;
76✔
284
}
76✔
285

286
#ifdef DFTRACER_UTILS_ENABLE_ARROW
287
static TraceReaderIteratorObject *make_arrow_iterator(
14✔
288
    std::shared_ptr<ArrowIteratorState> state) {
289
    TraceReaderIteratorObject *it =
14✔
290
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
14✔
291
            &TraceReaderIteratorType, 0);
292
    if (!it) return NULL;
14!
293
    new (&it->state) std::shared_ptr<IteratorState>();
14✔
294
    new (&it->arrow_state)
14✔
295
        std::shared_ptr<ArrowIteratorState>(std::move(state));
14✔
296
    it->mode = IteratorMode::ARROW;
14✔
297
    return it;
14✔
298
}
14✔
299
#endif
300

301
}  // namespace
302

303
using dftracer::utils::python::wrap_arrow_table;
304

305
static void TraceReader_dealloc(TraceReaderObject *self) {
102✔
306
    Py_XDECREF(self->file_path);
102✔
307
    Py_XDECREF(self->index_dir);
102✔
308
    Py_XDECREF(self->runtime_obj);
102✔
309
    Py_TYPE(self)->tp_free((PyObject *)self);
102✔
310
}
102✔
311

312
static PyObject *TraceReader_new(PyTypeObject *type, PyObject *args,
102✔
313
                                 PyObject *kwds) {
314
    TraceReaderObject *self = (TraceReaderObject *)type->tp_alloc(type, 0);
102✔
315
    if (self) {
102!
316
        self->file_path = NULL;
102✔
317
        self->index_dir = NULL;
102✔
318
        self->checkpoint_size = 32 * 1024 * 1024;
102✔
319
        self->auto_build_index = 0;
102✔
320
        self->index_threshold = 8 * 1024 * 1024;
102✔
321
        self->has_index = 0;
102✔
322
        self->runtime_obj = NULL;
102✔
323
    }
102✔
324
    return (PyObject *)self;
102✔
325
}
326

327
static int TraceReader_init(TraceReaderObject *self, PyObject *args,
102✔
328
                            PyObject *kwds) {
329
    static const char *kwlist[] = {"file_path",
330
                                   "index_dir",
331
                                   "checkpoint_size",
332
                                   "auto_build_index",
333
                                   "index_threshold",
334
                                   "runtime",
335
                                   NULL};
336

337
    const char *file_path;
338
    const char *index_dir = "";
102✔
339
    std::size_t checkpoint_size = 32 * 1024 * 1024;
102✔
340
    int auto_build_index = 0;
102✔
341
    std::size_t index_threshold = 8 * 1024 * 1024;
102✔
342
    PyObject *runtime_arg = NULL;
102✔
343

344
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|snpnO", (char **)kwlist,
102!
345
                                     &file_path, &index_dir, &checkpoint_size,
346
                                     &auto_build_index, &index_threshold,
347
                                     &runtime_arg)) {
348
        return -1;
×
349
    }
350

351
    if (runtime_arg && runtime_arg != Py_None) {
102!
352
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
7!
353
            // Direct C++ Runtime object
354
            Py_INCREF(runtime_arg);
×
355
            self->runtime_obj = runtime_arg;
×
UNCOV
356
        } else {
×
357
            // Python wrapper, extract _native attribute
358
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
7✔
359
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
7!
360
                self->runtime_obj = native;  // already incref'd by GetAttr
7✔
361
            } else {
7✔
362
                Py_XDECREF(native);
×
363
                PyErr_SetString(PyExc_TypeError,
×
364
                                "runtime must be a Runtime instance or None");
365
                return -1;
×
366
            }
367
        }
368
    }
7✔
369

370
    self->file_path = PyUnicode_FromString(file_path);
102✔
371
    if (!self->file_path) return -1;
102!
372

373
    self->index_dir = PyUnicode_FromString(index_dir);
102✔
374
    if (!self->index_dir) {
102!
375
        Py_DECREF(self->file_path);
×
376
        self->file_path = NULL;
×
377
        return -1;
×
378
    }
379

380
    self->checkpoint_size = checkpoint_size;
102✔
381
    self->auto_build_index = auto_build_index;
102✔
382
    self->index_threshold = index_threshold;
102✔
383

384
    try {
385
        TraceReaderConfig cfg;
102✔
386
        cfg.file_path = file_path;
102!
387
        cfg.index_dir = index_dir;
102!
388
        cfg.checkpoint_size = checkpoint_size;
102✔
389
        cfg.auto_build_index = auto_build_index != 0;
102✔
390
        cfg.index_threshold = index_threshold;
102✔
391
        TraceReader probe(std::move(cfg));
102!
392
        self->has_index = probe.has_index() ? 1 : 0;
102!
393
    } catch (const std::exception &e) {
102!
394
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
395
        Py_DECREF(self->file_path);
×
396
        Py_DECREF(self->index_dir);
×
397
        self->file_path = NULL;
×
398
        self->index_dir = NULL;
×
399
        return -1;
×
400
    }
×
401

402
    return 0;
102✔
403
}
102✔
404

405
static PyObject *TraceReader_iter_lines(TraceReaderObject *self, PyObject *args,
59✔
406
                                        PyObject *kwds) {
407
    static const char *kwlist[] = {"start_line", "end_line",    "start_byte",
408
                                   "end_byte",   "buffer_size", "query",
409
                                   NULL};
410
    Py_ssize_t start_line = 0, end_line = 0;
59✔
411
    Py_ssize_t start_byte = 0, end_byte = 0;
59✔
412
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
59✔
413
    const char *query_str = NULL;
59✔
414

415
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnz", (char **)kwlist,
59!
416
                                     &start_line, &end_line, &start_byte,
417
                                     &end_byte, &buffer_size, &query_str)) {
418
        return NULL;
×
419
    }
420

421
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
59!
422
        buffer_size <= 0) {
56✔
423
        PyErr_SetString(
3✔
424
            PyExc_ValueError,
3✔
425
            "range arguments must be >= 0; buffer_size must be > 0");
426
        return NULL;
3✔
427
    }
428

429
    TraceReaderConfig cfg;
56✔
430
    try {
431
        cfg = build_config(self);
56!
432
    } catch (const std::exception &e) {
56!
433
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
434
        return NULL;
×
435
    }
×
436

437
    ReadConfig rc;
56✔
438
    rc.start_line = static_cast<std::size_t>(start_line);
56✔
439
    rc.end_line = static_cast<std::size_t>(end_line);
56✔
440
    rc.start_byte = static_cast<std::size_t>(start_byte);
56✔
441
    rc.end_byte = static_cast<std::size_t>(end_byte);
56✔
442
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
56✔
443
    if (query_str) rc.query = query_str;
56!
444

445
    auto state = std::make_shared<IteratorState>();
56!
446

447
    Runtime *rt = get_runtime(self);
56!
448
    try {
449
        rt->submit(produce_lines(state, cfg, rc), "iter_lines");
56!
450
    } catch (const std::exception &e) {
56!
451
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
452
        return NULL;
×
453
    }
×
454

455
    TraceReaderIteratorObject *it = make_iterator(state, IteratorMode::LINES);
56!
456
    return (PyObject *)it;
56✔
457
}
59✔
458

459
static PyObject *TraceReader_iter_raw(TraceReaderObject *self, PyObject *args,
11✔
460
                                      PyObject *kwds) {
461
    static const char *kwlist[] = {"start_line", "end_line",    "start_byte",
462
                                   "end_byte",   "buffer_size", "line_aligned",
463
                                   "multi_line", "query",       NULL};
464
    Py_ssize_t start_line = 0, end_line = 0;
11✔
465
    Py_ssize_t start_byte = 0, end_byte = 0;
11✔
466
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
11✔
467
    int line_aligned = 1;
11✔
468
    int multi_line = 1;
11✔
469
    const char *query_str = NULL;
11✔
470

471
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnppz", (char **)kwlist,
11!
472
                                     &start_line, &end_line, &start_byte,
473
                                     &end_byte, &buffer_size, &line_aligned,
474
                                     &multi_line, &query_str)) {
475
        return NULL;
×
476
    }
477

478
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
11!
479
        buffer_size <= 0) {
10✔
480
        PyErr_SetString(
1✔
481
            PyExc_ValueError,
1✔
482
            "range arguments must be >= 0; buffer_size must be > 0");
483
        return NULL;
1✔
484
    }
485

486
    TraceReaderConfig cfg;
10✔
487
    try {
488
        cfg = build_config(self);
10!
489
    } catch (const std::exception &e) {
10!
490
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
491
        return NULL;
×
492
    }
×
493

494
    ReadConfig rc;
10✔
495
    rc.start_line = static_cast<std::size_t>(start_line);
10✔
496
    rc.end_line = static_cast<std::size_t>(end_line);
10✔
497
    rc.start_byte = static_cast<std::size_t>(start_byte);
10✔
498
    rc.end_byte = static_cast<std::size_t>(end_byte);
10✔
499
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
10✔
500
    rc.line_aligned = line_aligned != 0;
10✔
501
    rc.multi_line = multi_line != 0;
10✔
502
    if (query_str) rc.query = query_str;
10!
503

504
    auto state = std::make_shared<IteratorState>();
10!
505

506
    Runtime *rt = get_runtime(self);
10!
507
    try {
508
        rt->submit(produce_raw(state, cfg, rc), "iter_raw");
10!
509
    } catch (const std::exception &e) {
10!
510
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
511
        return NULL;
×
512
    }
×
513

514
    TraceReaderIteratorObject *it = make_iterator(state, IteratorMode::RAW);
10!
515
    return (PyObject *)it;
10✔
516
}
11✔
517

518
static PyObject *TraceReader_read_lines(TraceReaderObject *self, PyObject *args,
45✔
519
                                        PyObject *kwds) {
520
    PyObject *iter = TraceReader_iter_lines(self, args, kwds);
45✔
521
    if (!iter) return NULL;
45✔
522
    PyObject *list = PySequence_List(iter);
43✔
523
    Py_DECREF(iter);
43✔
524
    return list;
43✔
525
}
45✔
526

527
static PyObject *TraceReader_read_raw(TraceReaderObject *self, PyObject *args,
4✔
528
                                      PyObject *kwds) {
529
    PyObject *iter = TraceReader_iter_raw(self, args, kwds);
4✔
530
    if (!iter) return NULL;
4!
531
    PyObject *list = PySequence_List(iter);
4✔
532
    Py_DECREF(iter);
4✔
533
    return list;
4✔
534
}
4✔
535

536
static PyObject *TraceReader_iter_lines_json(TraceReaderObject *self,
10✔
537
                                             PyObject *args, PyObject *kwds) {
538
    static const char *kwlist[] = {"start_line", "end_line",    "start_byte",
539
                                   "end_byte",   "buffer_size", "query",
540
                                   NULL};
541
    Py_ssize_t start_line = 0, end_line = 0;
10✔
542
    Py_ssize_t start_byte = 0, end_byte = 0;
10✔
543
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
10✔
544
    const char *query_str = NULL;
10✔
545

546
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnz", (char **)kwlist,
10!
547
                                     &start_line, &end_line, &start_byte,
548
                                     &end_byte, &buffer_size, &query_str)) {
549
        return NULL;
×
550
    }
551

552
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
10!
553
        buffer_size <= 0) {
10✔
554
        PyErr_SetString(
×
UNCOV
555
            PyExc_ValueError,
×
556
            "range arguments must be >= 0; buffer_size must be > 0");
557
        return NULL;
×
558
    }
559

560
    TraceReaderConfig cfg;
10✔
561
    try {
562
        cfg = build_config(self);
10!
563
    } catch (const std::exception &e) {
10!
564
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
565
        return NULL;
×
566
    }
×
567

568
    ReadConfig rc;
10✔
569
    rc.start_line = static_cast<std::size_t>(start_line);
10✔
570
    rc.end_line = static_cast<std::size_t>(end_line);
10✔
571
    rc.start_byte = static_cast<std::size_t>(start_byte);
10✔
572
    rc.end_byte = static_cast<std::size_t>(end_byte);
10✔
573
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
10✔
574
    if (query_str) rc.query = query_str;
10!
575

576
    auto state = std::make_shared<IteratorState>();
10!
577

578
    Runtime *rt = get_runtime(self);
10!
579
    try {
580
        rt->submit(produce_lines(state, cfg, rc), "iter_lines_json");
10!
581
    } catch (const std::exception &e) {
10!
582
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
583
        return NULL;
×
584
    }
×
585

586
    TraceReaderIteratorObject *it = make_iterator(state, IteratorMode::JSON);
10!
587
    return (PyObject *)it;
10✔
588
}
10✔
589

590
static PyObject *TraceReader_read_lines_json(TraceReaderObject *self,
7✔
591
                                             PyObject *args, PyObject *kwds) {
592
    PyObject *iter = TraceReader_iter_lines_json(self, args, kwds);
7✔
593
    if (!iter) return NULL;
7!
594
    PyObject *list = PySequence_List(iter);
7✔
595
    Py_DECREF(iter);
7✔
596
    return list;
7✔
597
}
7✔
598

599
#ifdef DFTRACER_UTILS_ENABLE_ARROW
600

601
static PyObject *TraceReader_iter_arrow(TraceReaderObject *self, PyObject *args,
14✔
602
                                        PyObject *kwds) {
603
    static const char *kwlist[] = {"batch_size", "start_line", "end_line",
604
                                   "start_byte", "end_byte",   "buffer_size",
605
                                   "query",      NULL};
606
    Py_ssize_t batch_size = 10000;
14✔
607
    Py_ssize_t start_line = 0, end_line = 0;
14✔
608
    Py_ssize_t start_byte = 0, end_byte = 0;
14✔
609
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
14✔
610
    const char *query_str = NULL;
14✔
611

612
    if (!PyArg_ParseTupleAndKeywords(
14!
613
            args, kwds, "|nnnnnnz", (char **)kwlist, &batch_size, &start_line,
14✔
614
            &end_line, &start_byte, &end_byte, &buffer_size, &query_str)) {
615
        return NULL;
×
616
    }
617

618
    if (batch_size <= 0) {
14!
619
        PyErr_SetString(PyExc_ValueError, "batch_size must be > 0");
×
620
        return NULL;
×
621
    }
622
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
14!
623
        buffer_size <= 0) {
14✔
624
        PyErr_SetString(
×
UNCOV
625
            PyExc_ValueError,
×
626
            "range arguments must be >= 0; buffer_size must be > 0");
627
        return NULL;
×
628
    }
629

630
    TraceReaderConfig cfg;
14✔
631
    try {
632
        cfg = build_config(self);
14!
633
    } catch (const std::exception &e) {
14!
634
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
635
        return NULL;
×
636
    }
×
637

638
    ReadConfig rc;
14✔
639
    rc.start_line = static_cast<std::size_t>(start_line);
14✔
640
    rc.end_line = static_cast<std::size_t>(end_line);
14✔
641
    rc.start_byte = static_cast<std::size_t>(start_byte);
14✔
642
    rc.end_byte = static_cast<std::size_t>(end_byte);
14✔
643
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
14✔
644
    if (query_str) rc.query = query_str;
14!
645

646
    auto state = std::make_shared<ArrowIteratorState>();
14!
647

648
    Runtime *rt = get_runtime(self);
14!
649
    try {
650
        rt->submit(produce_arrow_batches(state, cfg, rc,
28!
651
                                         static_cast<std::size_t>(batch_size)),
14✔
652
                   "iter_arrow");
14!
653
    } catch (const std::exception &e) {
14!
654
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
655
        return NULL;
×
656
    }
×
657

658
    TraceReaderIteratorObject *it = make_arrow_iterator(std::move(state));
14!
659
    return (PyObject *)it;
14✔
660
}
14✔
661

662
static PyObject *TraceReader_read_arrow(TraceReaderObject *self, PyObject *args,
5✔
663
                                        PyObject *kwds) {
664
    PyObject *iter = TraceReader_iter_arrow(self, args, kwds);
5✔
665
    if (!iter) return NULL;
5!
666
    PyObject *list = PySequence_List(iter);
5✔
667
    Py_DECREF(iter);
5✔
668
    if (!list) return NULL;
5!
669

670
    return wrap_arrow_table(list);
5✔
671
}
5✔
672

673
#endif  // DFTRACER_UTILS_ENABLE_ARROW
674

675
static PyObject *TraceReader_enter(TraceReaderObject *self,
4✔
676
                                   PyObject *Py_UNUSED(ignored)) {
677
    Py_INCREF(self);
4✔
678
    return (PyObject *)self;
4✔
679
}
680

681
static PyObject *TraceReader_exit(TraceReaderObject *self, PyObject *args) {
3✔
682
    Py_RETURN_NONE;
3✔
683
}
684

685
static PyObject *TraceReader_get_file_path(TraceReaderObject *self,
8✔
686
                                           void *closure) {
687
    Py_INCREF(self->file_path);
8✔
688
    return self->file_path;
8✔
689
}
690

691
static PyObject *TraceReader_get_index_dir(TraceReaderObject *self,
3✔
692
                                           void *closure) {
693
    Py_INCREF(self->index_dir);
3✔
694
    return self->index_dir;
3✔
695
}
696

697
static PyObject *TraceReader_get_has_index(TraceReaderObject *self,
6✔
698
                                           void *closure) {
699
    return PyBool_FromLong(self->has_index);
6✔
700
}
701

702
static PyObject *TraceReader_get_num_lines_prop(TraceReaderObject *self,
4✔
703
                                                void *closure) {
704
    try {
705
        TraceReaderConfig cfg = build_config(self);
4!
706
        TraceReader reader(std::move(cfg));
4!
707
        std::size_t n = reader.get_num_lines();
4!
708
        if (n > 0) return PyLong_FromSize_t(n);
4!
709
    } catch (...) {
4!
710
    }
×
711
    PyObject *empty_args = PyTuple_New(0);
4✔
712
    if (!empty_args) return NULL;
4!
713
    PyObject *list = TraceReader_read_lines(self, empty_args, NULL);
4✔
714
    Py_DECREF(empty_args);
4✔
715
    if (!list) return NULL;
4!
716
    Py_ssize_t n = PyList_GET_SIZE(list);
4✔
717
    Py_DECREF(list);
4✔
718
    return PyLong_FromSsize_t(n);
4✔
719
}
4✔
720

721
static PyObject *TraceReader_get_max_bytes(TraceReaderObject *self,
13✔
722
                                           PyObject *Py_UNUSED(ignored)) {
723
    try {
724
        TraceReaderConfig cfg = build_config(self);
13!
725
        TraceReader reader(std::move(cfg));
13!
726
        return PyLong_FromSize_t(reader.get_max_bytes());
13!
727
    } catch (const std::exception &e) {
13!
728
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
729
        return NULL;
×
730
    }
×
731
}
13✔
732

733
static PyObject *TraceReader_get_num_lines(TraceReaderObject *self,
2✔
734
                                           PyObject *Py_UNUSED(ignored)) {
735
    try {
736
        TraceReaderConfig cfg = build_config(self);
2!
737
        TraceReader reader(std::move(cfg));
2!
738
        return PyLong_FromSize_t(reader.get_num_lines());
2!
739
    } catch (const std::exception &e) {
2!
740
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
741
        return NULL;
×
742
    }
×
743
}
2✔
744

745
static PyMethodDef TraceReader_methods[] = {
746
    {"iter_lines", (PyCFunction)TraceReader_iter_lines,
747
     METH_VARARGS | METH_KEYWORDS,
748
     "Return an iterator over decoded lines.\n"
749
     "\n"
750
     "Args:\n"
751
     "    start_line (int): First line (0 = beginning).\n"
752
     "    end_line (int): Last line (0 = end of file).\n"
753
     "    start_byte (int): First byte offset (0 = beginning).\n"
754
     "    end_byte (int): Last byte offset (0 = end of file).\n"
755
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
756
    {"iter_raw", (PyCFunction)TraceReader_iter_raw,
757
     METH_VARARGS | METH_KEYWORDS,
758
     "Return an iterator over raw byte chunks.\n"
759
     "\n"
760
     "Args:\n"
761
     "    start_line (int): First line (0 = beginning).\n"
762
     "    end_line (int): Last line (0 = end of file).\n"
763
     "    start_byte (int): First byte offset (0 = beginning).\n"
764
     "    end_byte (int): Last byte offset (0 = end of file).\n"
765
     "    buffer_size (int): Internal read buffer size in bytes.\n"
766
     "    line_aligned (bool): Align chunks to line boundaries.\n"
767
     "    multi_line (bool): Allow multiple lines per chunk.\n"},
768
    {"read_lines", (PyCFunction)TraceReader_read_lines,
769
     METH_VARARGS | METH_KEYWORDS,
770
     "Read all lines and return as list.\n"
771
     "\n"
772
     "Args:\n"
773
     "    start_line (int): First line (0 = beginning).\n"
774
     "    end_line (int): Last line (0 = end of file).\n"
775
     "    start_byte (int): First byte offset (0 = beginning).\n"
776
     "    end_byte (int): Last byte offset (0 = end of file).\n"
777
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
778
    {"read_raw", (PyCFunction)TraceReader_read_raw,
779
     METH_VARARGS | METH_KEYWORDS,
780
     "Read all raw chunks and return as list.\n"
781
     "\n"
782
     "Args:\n"
783
     "    start_line (int): First line (0 = beginning).\n"
784
     "    end_line (int): Last line (0 = end of file).\n"
785
     "    start_byte (int): First byte offset (0 = beginning).\n"
786
     "    end_byte (int): Last byte offset (0 = end of file).\n"
787
     "    buffer_size (int): Internal read buffer size in bytes.\n"
788
     "    line_aligned (bool): Align chunks to line boundaries.\n"
789
     "    multi_line (bool): Allow multiple lines per chunk.\n"},
790
    {"iter_lines_json", (PyCFunction)TraceReader_iter_lines_json,
791
     METH_VARARGS | METH_KEYWORDS,
792
     "Return an iterator over parsed JSON objects.\n"
793
     "\n"
794
     "Args:\n"
795
     "    start_line (int): First line (0 = beginning).\n"
796
     "    end_line (int): Last line (0 = end of file).\n"
797
     "    start_byte (int): First byte offset (0 = beginning).\n"
798
     "    end_byte (int): Last byte offset (0 = end of file).\n"
799
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
800
    {"read_lines_json", (PyCFunction)TraceReader_read_lines_json,
801
     METH_VARARGS | METH_KEYWORDS,
802
     "Read all lines as parsed JSON objects.\n"
803
     "\n"
804
     "Args:\n"
805
     "    start_line (int): First line (0 = beginning).\n"
806
     "    end_line (int): Last line (0 = end of file).\n"
807
     "    start_byte (int): First byte offset (0 = beginning).\n"
808
     "    end_byte (int): Last byte offset (0 = end of file).\n"
809
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
810
#ifdef DFTRACER_UTILS_ENABLE_ARROW
811
    {"iter_arrow", (PyCFunction)TraceReader_iter_arrow,
812
     METH_VARARGS | METH_KEYWORDS,
813
     "Return an iterator over Arrow record batches.\n"
814
     "\n"
815
     "Args:\n"
816
     "    batch_size (int): Maximum rows per Arrow batch.\n"
817
     "    start_line (int): First line (0 = beginning).\n"
818
     "    end_line (int): Last line (0 = end of file).\n"
819
     "    start_byte (int): First byte offset (0 = beginning).\n"
820
     "    end_byte (int): Last byte offset (0 = end of file).\n"
821
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
822
    {"read_arrow", (PyCFunction)TraceReader_read_arrow,
823
     METH_VARARGS | METH_KEYWORDS,
824
     "Read all events as a materialized ArrowTable.\n"
825
     "\n"
826
     "Args:\n"
827
     "    batch_size (int): Maximum rows per Arrow batch.\n"
828
     "    start_line (int): First line (0 = beginning).\n"
829
     "    end_line (int): Last line (0 = end of file).\n"
830
     "    start_byte (int): First byte offset (0 = beginning).\n"
831
     "    end_byte (int): Last byte offset (0 = end of file).\n"
832
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
833
#endif
834
    {"get_max_bytes", (PyCFunction)TraceReader_get_max_bytes, METH_NOARGS,
835
     "Get the maximum byte position (0 if unknown for compressed\n"
836
     "files without index)."},
837
    {"get_num_lines", (PyCFunction)TraceReader_get_num_lines, METH_NOARGS,
838
     "Get the total number of lines (0 if unknown for files without\n"
839
     "index)."},
840
    {"__enter__", (PyCFunction)TraceReader_enter, METH_NOARGS,
841
     "Enter the runtime context for the with statement."},
842
    {"__exit__", (PyCFunction)TraceReader_exit, METH_VARARGS,
843
     "Exit the runtime context for the with statement."},
844
    {NULL}};
845

846
static PyGetSetDef TraceReader_getsetters[] = {
847
    {"file_path", (getter)TraceReader_get_file_path, NULL,
848
     "Path to the trace file", NULL},
849
    {"index_dir", (getter)TraceReader_get_index_dir, NULL,
850
     "Directory for index files", NULL},
851
    {"has_index", (getter)TraceReader_get_has_index, NULL,
852
     "True if a checkpoint index was found", NULL},
853
    {"num_lines", (getter)TraceReader_get_num_lines_prop, NULL,
854
     "Total line count (reads all lines if needed)", NULL},
855
    {NULL}};
856

857
PyTypeObject TraceReaderType = {
858
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.TraceReader",
859
    sizeof(TraceReaderObject),                /* tp_basicsize */
860
    0,                                        /* tp_itemsize */
861
    (destructor)TraceReader_dealloc,          /* tp_dealloc */
862
    0,                                        /* tp_vectorcall_offset */
863
    0,                                        /* tp_getattr */
864
    0,                                        /* tp_setattr */
865
    0,                                        /* tp_as_async */
866
    0,                                        /* tp_repr */
867
    0,                                        /* tp_as_number */
868
    0,                                        /* tp_as_sequence */
869
    0,                                        /* tp_as_mapping */
870
    0,                                        /* tp_hash */
871
    0,                                        /* tp_call */
872
    0,                                        /* tp_str */
873
    0,                                        /* tp_getattro */
874
    0,                                        /* tp_setattro */
875
    0,                                        /* tp_as_buffer */
876
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
877
    "TraceReader(file_path: str, index_dir: str = '',\n"
878
    "            checkpoint_size: int = 33554432,\n"
879
    "            auto_build_index: bool = False,\n"
880
    "            index_threshold: int = 8388608,\n"
881
    "            runtime: Runtime | None = None)\n"
882
    "--\n"
883
    "\n"
884
    "Smart trace file reader that auto-selects sequential or indexed\n"
885
    "reading based on whether an ``.idx`` sidecar exists.\n"
886
    "\n"
887
    "Args:\n"
888
    "    file_path (str): Path to the trace file (.pfw.gz or plain "
889
    "text).\n"
890
    "    index_dir (str): Directory to search for ``.idx`` sidecar "
891
    "files.\n"
892
    "        Empty string (default) searches next to the trace file.\n"
893
    "    checkpoint_size (int): Checkpoint interval in bytes for index\n"
894
    "        building (default 32 MB).\n"
895
    "    auto_build_index (bool): If True, automatically build an "
896
    "index\n"
897
    "        when none exists and the file exceeds *index_threshold*.\n"
898
    "    index_threshold (int): Minimum file size in bytes before\n"
899
    "        auto-indexing is triggered (default 8 MB).\n"
900
    "    runtime (Runtime or None): Runtime instance for thread pool "
901
    "control.\n"
902
    "        If None, uses the default global Runtime.\n"
903
    "\n"
904
    "Raises:\n"
905
    "    RuntimeError: If *file_path* does not exist or cannot be "
906
    "opened.\n",                /* tp_doc */
907
    0,                          /* tp_traverse */
908
    0,                          /* tp_clear */
909
    0,                          /* tp_richcompare */
910
    0,                          /* tp_weaklistoffset */
911
    0,                          /* tp_iter */
912
    0,                          /* tp_iternext */
913
    TraceReader_methods,        /* tp_methods */
914
    0,                          /* tp_members */
915
    TraceReader_getsetters,     /* tp_getset */
916
    0,                          /* tp_base */
917
    0,                          /* tp_dict */
918
    0,                          /* tp_descr_get */
919
    0,                          /* tp_descr_set */
920
    0,                          /* tp_dictoffset */
921
    (initproc)TraceReader_init, /* tp_init */
922
    0,                          /* tp_alloc */
923
    TraceReader_new,            /* tp_new */
924
};
925

926
int init_trace_reader(PyObject *m) {
1✔
927
    if (PyType_Ready(&TraceReaderType) < 0) return -1;
1!
928

929
    Py_INCREF(&TraceReaderType);
1✔
930
    if (PyModule_AddObject(m, "TraceReader", (PyObject *)&TraceReaderType) <
1!
931
        0) {
UNCOV
932
        Py_DECREF(&TraceReaderType);
×
UNCOV
933
        Py_DECREF(m);
×
934
        return -1;
×
935
    }
936

937
    return 0;
1✔
938
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc