• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 23531917822

25 Mar 2026 08:31AM UTC coverage: 50.205% (+0.1%) from 50.098%
23531917822

push

github

rayandrew
chore(docs): update utility behavior descriptions

19928 of 51702 branches covered (38.54%)

Branch coverage included in aggregate %.

17727 of 23300 relevant lines covered (76.08%)

474762.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.63
/src/dftracer/utils/python/trace_reader.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <Python.h>
3
#include <dftracer/utils/core/coro/task.h>
4
#include <dftracer/utils/core/utils/string.h>
5
#include <dftracer/utils/python/arrow_helpers.h>
6
#include <dftracer/utils/python/runtime.h>
7
#include <dftracer/utils/python/trace_reader.h>
8
#include <dftracer/utils/python/trace_reader_iterator.h>
9
#include <dftracer/utils/utilities/reader/trace_reader.h>
10

11
#include <cstddef>
12
#include <exception>
13
#include <memory>
14
#include <string>
15
#include <vector>
16

17
#ifdef DFTRACER_UTILS_ENABLE_ARROW
18
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
19
#include <yyjson.h>
20
#endif
21

22
namespace {
23

24
using dftracer::utils::Runtime;
25
using dftracer::utils::coro::CoroTask;
26
using dftracer::utils::utilities::reader::ReadConfig;
27
using dftracer::utils::utilities::reader::TraceReader;
28
using dftracer::utils::utilities::reader::TraceReaderConfig;
29

30
CoroTask<void> produce_lines(std::shared_ptr<IteratorState> state,
4,684!
31
                             TraceReaderConfig cfg, ReadConfig rc) {
66!
32
    auto *sp = state.get();
66✔
33
    try {
34
        TraceReader reader(std::move(cfg));
66!
35
        auto gen = reader.read_lines(rc);
66!
36
        while (auto opt = co_await gen.next()) {
4,470!
37
            if (sp->cancelled.load(std::memory_order_acquire)) break;
1,038✔
38
            std::string item(opt->content);
1,036!
39
            {
40
                std::unique_lock<std::mutex> lock(sp->mtx);
1,036!
41
                sp->cv_producer.wait(lock, [sp] {
4,104!
42
                    return sp->queue.size() < sp->max_queue_size ||
2,052!
43
                           sp->cancelled.load(std::memory_order_acquire);
1,016✔
44
                });
45
                if (sp->cancelled.load(std::memory_order_acquire)) break;
1,036!
46
                sp->queue.push(std::move(item));
1,036!
47
            }
1,036!
48
            sp->cv_consumer.notify_one();
1,036✔
49
        }
1,102!
50
    } catch (...) {
2,266✔
51
        std::lock_guard<std::mutex> lock(sp->mtx);
2!
52
        sp->error = std::current_exception();
2✔
53
        sp->queue.push(std::nullopt);
2!
54
        sp->done.store(true, std::memory_order_release);
2✔
55
        sp->cv_consumer.notify_one();
2✔
56
        co_return;
2✔
57
    }
2!
58
    {
59
        std::lock_guard<std::mutex> lock(sp->mtx);
64!
60
        sp->queue.push(std::nullopt);
64!
61
        sp->done.store(true, std::memory_order_release);
64✔
62
    }
64✔
63
    sp->cv_consumer.notify_one();
64✔
64
}
6,800!
65

66
CoroTask<void> produce_raw(std::shared_ptr<IteratorState> state,
624!
67
                           TraceReaderConfig cfg, ReadConfig rc) {
10!
68
    auto *sp = state.get();
10✔
69
    try {
70
        TraceReader reader(std::move(cfg));
10!
71
        auto gen = reader.read_raw(rc);
10!
72
        while (auto opt = co_await gen.next()) {
592!
73
            if (sp->cancelled.load(std::memory_order_acquire)) break;
137✔
74
            std::string item(opt->data(), opt->size());
136!
75
            {
76
                std::unique_lock<std::mutex> lock(sp->mtx);
136!
77
                sp->cv_producer.wait(lock, [sp] {
540!
78
                    return sp->queue.size() < sp->max_queue_size ||
270!
79
                           sp->cancelled.load(std::memory_order_acquire);
134✔
80
                });
81
                if (sp->cancelled.load(std::memory_order_acquire)) break;
136!
82
                sp->queue.push(std::move(item));
136!
83
            }
136!
84
            sp->cv_consumer.notify_one();
136✔
85
        }
146!
86
    } catch (...) {
300✔
87
        std::lock_guard<std::mutex> lock(sp->mtx);
1!
88
        sp->error = std::current_exception();
1✔
89
        sp->queue.push(std::nullopt);
1!
90
        sp->done.store(true, std::memory_order_release);
1✔
91
        sp->cv_consumer.notify_one();
1✔
92
        co_return;
1✔
93
    }
1!
94
    {
95
        std::lock_guard<std::mutex> lock(sp->mtx);
9!
96
        sp->queue.push(std::nullopt);
9!
97
        sp->done.store(true, std::memory_order_release);
9✔
98
    }
9✔
99
    sp->cv_consumer.notify_one();
9✔
100
}
901!
101

102
#ifdef DFTRACER_UTILS_ENABLE_ARROW
103

104
using dftracer::utils::utilities::common::arrow::ColumnType;
105
using dftracer::utils::utilities::common::arrow::RecordBatchBuilder;
106

107
CoroTask<void> produce_arrow_batches(std::shared_ptr<ArrowIteratorState> state,
1,800!
108
                                     TraceReaderConfig cfg, ReadConfig rc,
109
                                     std::size_t batch_size) {
14!
110
    auto *sp = state.get();
14✔
111
    try {
112
        TraceReader reader(std::move(cfg));
14!
113
        auto gen = reader.read_lines(rc);
14!
114
        RecordBatchBuilder builder;
14✔
115
        builder.reserve(batch_size);
14!
116

117
        // Keep yyjson docs alive until finish() since string columns hold
118
        // string_views into doc memory. Serialized object/array values are
119
        // stored as owned strings in held_serialized.
120
        std::vector<yyjson_doc *> held_docs;
14✔
121
        std::vector<std::string> held_serialized;
14✔
122
        held_docs.reserve(batch_size);
14!
123

124
        while (auto opt = co_await gen.next()) {
1,758!
125
            if (sp->cancelled.load(std::memory_order_acquire)) break;
422!
126

127
            const char *trimmed;
422✔
128
            std::size_t trimmed_length;
422✔
129
            if (!dftracer::utils::json_trim_and_validate(
422!
130
                    opt->content.data(), opt->content.size(), trimmed,
422✔
131
                    trimmed_length)) {
132
                continue;
26✔
133
            }
134

135
            yyjson_doc *doc = yyjson_read(trimmed, trimmed_length, 0);
396!
136
            if (!doc) continue;
396!
137

138
            yyjson_val *root = yyjson_doc_get_root(doc);
396!
139
            if (!root || !yyjson_is_obj(root)) {
396!
140
                yyjson_doc_free(doc);
×
141
                continue;
142
            }
143

144
            yyjson_obj_iter iter;
396✔
145
            yyjson_obj_iter_init(root, &iter);
396!
146
            yyjson_val *key;
396✔
147
            while ((key = yyjson_obj_iter_next(&iter))) {
1,980!
148
                yyjson_val *val = yyjson_obj_iter_get_val(key);
1,584!
149
                const char *key_str = yyjson_get_str(key);
1,584!
150
                std::size_t key_len = yyjson_get_len(key);
1,584!
151
                std::string_view key_sv(key_str, key_len);
1,584✔
152

153
                if (yyjson_is_int(val)) {
1,584!
154
                    std::size_t idx =
396✔
155
                        builder.add_or_get_column(key_sv, ColumnType::INT64);
396!
156
                    builder.append_int64(idx, yyjson_get_sint(val));
396!
157
                } else if (yyjson_is_uint(val)) {
1,584!
158
                    std::size_t idx =
159
                        builder.add_or_get_column(key_sv, ColumnType::UINT64);
×
160
                    builder.append_uint64(idx, yyjson_get_uint(val));
×
161
                } else if (yyjson_is_real(val)) {
1,188!
162
                    std::size_t idx =
163
                        builder.add_or_get_column(key_sv, ColumnType::DOUBLE);
×
164
                    builder.append_double(idx, yyjson_get_real(val));
×
165
                } else if (yyjson_is_bool(val)) {
1,188!
166
                    std::size_t idx =
167
                        builder.add_or_get_column(key_sv, ColumnType::BOOL);
×
168
                    builder.append_bool(idx, yyjson_get_bool(val));
×
169
                } else if (yyjson_is_str(val)) {
1,188!
170
                    std::size_t idx =
1,188✔
171
                        builder.add_or_get_column(key_sv, ColumnType::STRING);
1,188!
172
                    // string_view into doc memory — doc kept alive in
173
                    // held_docs until finish()
174
                    builder.append_string(
2,376!
175
                        idx, std::string_view(yyjson_get_str(val),
3,564!
176
                                              yyjson_get_len(val)));
1,188!
177
                } else if (yyjson_is_null(val)) {
1,188!
178
                    // Only append null to an existing column; skip if the
179
                    // column is new — we don't know its type yet and creating
180
                    // it as STRING would corrupt later typed appends.
181
                    auto existing = builder.find_column(key_sv);
×
182
                    if (existing) builder.append_null(*existing);
×
183
                } else {
184
                    // object/array: serialize to JSON string
185
                    std::size_t json_len;
186
                    char *json_str = yyjson_val_write(val, 0, &json_len);
×
187
                    std::size_t idx =
188
                        builder.add_or_get_column(key_sv, ColumnType::STRING);
×
189
                    if (json_str) {
×
190
                        held_serialized.emplace_back(json_str, json_len);
×
191
                        free(json_str);
×
192
                        builder.append_string(idx, held_serialized.back());
×
193
                    } else {
194
                        builder.append_null(idx);
×
195
                    }
196
                }
197
            }
1,584✔
198
            builder.end_row();
396!
199
            held_docs.push_back(doc);
396!
200

201
            if (builder.num_rows() >= batch_size) {
396✔
202
                auto result = builder.finish();
7!
203
                for (auto *d : held_docs) yyjson_doc_free(d);
177!
204
                held_docs.clear();
7✔
205
                held_serialized.clear();
7✔
206

207
                {
208
                    std::unique_lock<std::mutex> lock(sp->mtx);
7!
209
                    sp->cv_producer.wait(lock, [sp] {
28!
210
                        return sp->queue.size() < sp->max_queue_size ||
14!
211
                               sp->cancelled.load(std::memory_order_acquire);
7✔
212
                    });
213
                    if (sp->cancelled.load(std::memory_order_acquire)) break;
7!
214
                    sp->queue.push(std::move(result));
7!
215
                }
7!
216
                sp->cv_consumer.notify_one();
7✔
217
                builder.reset(false);
7!
218
                builder.reserve(batch_size);
7!
219
            }
7!
220
        }
436✔
221

222
        // Flush remaining rows
223
        if (builder.num_rows() > 0) {
14!
224
            auto result = builder.finish();
14!
225
            for (auto *d : held_docs) yyjson_doc_free(d);
240!
226
            held_docs.clear();
14✔
227
            held_serialized.clear();
14✔
228
            {
229
                std::lock_guard<std::mutex> lock(sp->mtx);
14!
230
                sp->queue.push(std::move(result));
14!
231
            }
14✔
232
            sp->cv_consumer.notify_one();
14✔
233
        } else {
14✔
234
            for (auto *d : held_docs) yyjson_doc_free(d);
×
235
        }
236
    } catch (...) {
886✔
237
        std::lock_guard<std::mutex> lock(sp->mtx);
×
238
        sp->error = std::current_exception();
239
        sp->queue.push(std::nullopt);
×
240
        sp->done.store(true, std::memory_order_release);
241
        sp->cv_consumer.notify_one();
242
        co_return;
243
    }
×
244
    {
245
        std::lock_guard<std::mutex> lock(sp->mtx);
14!
246
        sp->queue.push(std::nullopt);
14!
247
        sp->done.store(true, std::memory_order_release);
14✔
248
    }
14✔
249
    sp->cv_consumer.notify_one();
14✔
250
}
2,658!
251

252
#endif  // DFTRACER_UTILS_ENABLE_ARROW
253

254
TraceReaderConfig build_config(TraceReaderObject *self) {
218✔
255
    TraceReaderConfig cfg;
218✔
256
    cfg.file_path = PyUnicode_AsUTF8(self->file_path);
218!
257
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
218!
258
    if (idx) cfg.index_dir = idx;
218!
259
    cfg.checkpoint_size = self->checkpoint_size;
218✔
260
    cfg.auto_build_index = self->auto_build_index != 0;
218✔
261
    cfg.index_threshold = self->index_threshold;
218✔
262
    return cfg;
218✔
263
}
109!
264

265
static Runtime *get_runtime(TraceReaderObject *self) {
180✔
266
    if (self->runtime_obj) {
180✔
267
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
18✔
268
    }
269
    return get_default_runtime();
162✔
270
}
90✔
271

272
static TraceReaderIteratorObject *make_iterator(
152✔
273
    std::shared_ptr<IteratorState> state, IteratorMode mode) {
274
    TraceReaderIteratorObject *it =
76✔
275
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
152✔
276
            &TraceReaderIteratorType, 0);
277
    if (!it) return NULL;
152✔
278
    new (&it->state) std::shared_ptr<IteratorState>(std::move(state));
152✔
279
#ifdef DFTRACER_UTILS_ENABLE_ARROW
280
    new (&it->arrow_state) std::shared_ptr<ArrowIteratorState>();
152✔
281
#endif
282
    it->mode = mode;
152✔
283
    return it;
152✔
284
}
76✔
285

286
#ifdef DFTRACER_UTILS_ENABLE_ARROW
287
static TraceReaderIteratorObject *make_arrow_iterator(
28✔
288
    std::shared_ptr<ArrowIteratorState> state) {
289
    TraceReaderIteratorObject *it =
14✔
290
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
28✔
291
            &TraceReaderIteratorType, 0);
292
    if (!it) return NULL;
28✔
293
    new (&it->state) std::shared_ptr<IteratorState>();
28✔
294
    new (&it->arrow_state)
28✔
295
        std::shared_ptr<ArrowIteratorState>(std::move(state));
28✔
296
    it->mode = IteratorMode::ARROW;
28✔
297
    return it;
28✔
298
}
14✔
299
#endif
300

301
}  // namespace
302

303
using dftracer::utils::python::wrap_arrow_table;
304

305
static void TraceReader_dealloc(TraceReaderObject *self) {
204✔
306
    Py_XDECREF(self->file_path);
204✔
307
    Py_XDECREF(self->index_dir);
204✔
308
    Py_XDECREF(self->runtime_obj);
204✔
309
    Py_TYPE(self)->tp_free((PyObject *)self);
204✔
310
}
204✔
311

312
static PyObject *TraceReader_new(PyTypeObject *type, PyObject *args,
204✔
313
                                 PyObject *kwds) {
314
    TraceReaderObject *self = (TraceReaderObject *)type->tp_alloc(type, 0);
204✔
315
    if (self) {
204✔
316
        self->file_path = NULL;
204✔
317
        self->index_dir = NULL;
204✔
318
        self->checkpoint_size = 32 * 1024 * 1024;
204✔
319
        self->auto_build_index = 0;
204✔
320
        self->index_threshold =
204✔
321
            dftracer::utils::constants::indexer::DEFAULT_INDEX_SIZE_THRESHOLD;
322
        self->has_index = 0;
204✔
323
        self->runtime_obj = NULL;
204✔
324
    }
102✔
325
    return (PyObject *)self;
204✔
326
}
327

328
static int TraceReader_init(TraceReaderObject *self, PyObject *args,
204✔
329
                            PyObject *kwds) {
330
    static const char *kwlist[] = {"file_path",
331
                                   "index_dir",
332
                                   "checkpoint_size",
333
                                   "auto_build_index",
334
                                   "index_threshold",
335
                                   "runtime",
336
                                   NULL};
337

338
    const char *file_path;
339
    const char *index_dir = "";
204✔
340
    std::size_t checkpoint_size = 32 * 1024 * 1024;
204✔
341
    int auto_build_index = 0;
204✔
342
    std::size_t index_threshold =
204✔
343
        dftracer::utils::constants::indexer::DEFAULT_INDEX_SIZE_THRESHOLD;
344
    PyObject *runtime_arg = NULL;
204✔
345

346
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|snpnO", (char **)kwlist,
204!
347
                                     &file_path, &index_dir, &checkpoint_size,
348
                                     &auto_build_index, &index_threshold,
349
                                     &runtime_arg)) {
350
        return -1;
×
351
    }
352

353
    if (runtime_arg && runtime_arg != Py_None) {
204✔
354
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
14!
355
            // Direct C++ Runtime object
356
            Py_INCREF(runtime_arg);
×
357
            self->runtime_obj = runtime_arg;
×
358
        } else {
359
            // Python wrapper, extract _native attribute
360
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
14!
361
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
14!
362
                self->runtime_obj = native;  // already incref'd by GetAttr
14✔
363
            } else {
7✔
364
                Py_XDECREF(native);
×
365
                PyErr_SetString(PyExc_TypeError,
×
366
                                "runtime must be a Runtime instance or None");
367
                return -1;
×
368
            }
369
        }
370
    }
7✔
371

372
    self->file_path = PyUnicode_FromString(file_path);
204!
373
    if (!self->file_path) return -1;
204✔
374

375
    self->index_dir = PyUnicode_FromString(index_dir);
204!
376
    if (!self->index_dir) {
204✔
377
        Py_DECREF(self->file_path);
×
378
        self->file_path = NULL;
×
379
        return -1;
×
380
    }
381

382
    self->checkpoint_size = checkpoint_size;
204✔
383
    self->auto_build_index = auto_build_index;
204✔
384
    self->index_threshold = index_threshold;
204✔
385

386
    try {
387
        TraceReaderConfig cfg;
204✔
388
        cfg.file_path = file_path;
204!
389
        cfg.index_dir = index_dir;
204!
390
        cfg.checkpoint_size = checkpoint_size;
204✔
391
        cfg.auto_build_index = auto_build_index != 0;
204✔
392
        cfg.index_threshold = index_threshold;
204✔
393
        TraceReader probe(std::move(cfg));
204!
394
        self->has_index = probe.has_index() ? 1 : 0;
204!
395
    } catch (const std::exception &e) {
204!
396
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
397
        Py_DECREF(self->file_path);
×
398
        Py_DECREF(self->index_dir);
×
399
        self->file_path = NULL;
×
400
        self->index_dir = NULL;
×
401
        return -1;
×
402
    }
×
403

404
    return 0;
204✔
405
}
102✔
406

407
static PyObject *TraceReader_iter_lines(TraceReaderObject *self, PyObject *args,
118✔
408
                                        PyObject *kwds) {
409
    static const char *kwlist[] = {"start_line", "end_line",    "start_byte",
410
                                   "end_byte",   "buffer_size", "query",
411
                                   NULL};
412
    Py_ssize_t start_line = 0, end_line = 0;
118✔
413
    Py_ssize_t start_byte = 0, end_byte = 0;
118✔
414
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
118✔
415
    const char *query_str = NULL;
118✔
416

417
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnz", (char **)kwlist,
118!
418
                                     &start_line, &end_line, &start_byte,
419
                                     &end_byte, &buffer_size, &query_str)) {
420
        return NULL;
×
421
    }
422

423
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
118!
424
        buffer_size <= 0) {
112!
425
        PyErr_SetString(
6!
426
            PyExc_ValueError,
3✔
427
            "range arguments must be >= 0; buffer_size must be > 0");
428
        return NULL;
6✔
429
    }
430

431
    TraceReaderConfig cfg;
112✔
432
    try {
433
        cfg = build_config(self);
112!
434
    } catch (const std::exception &e) {
56!
435
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
436
        return NULL;
×
437
    }
×
438

439
    ReadConfig rc;
112✔
440
    rc.start_line = static_cast<std::size_t>(start_line);
112✔
441
    rc.end_line = static_cast<std::size_t>(end_line);
112✔
442
    rc.start_byte = static_cast<std::size_t>(start_byte);
112✔
443
    rc.end_byte = static_cast<std::size_t>(end_byte);
112✔
444
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
112✔
445
    if (query_str) rc.query = query_str;
112!
446

447
    auto state = std::make_shared<IteratorState>();
112!
448

449
    Runtime *rt = get_runtime(self);
112!
450
    try {
451
        rt->submit(produce_lines(state, cfg, rc), "iter_lines");
112!
452
    } catch (const std::exception &e) {
56!
453
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
454
        return NULL;
×
455
    }
×
456

457
    TraceReaderIteratorObject *it = make_iterator(state, IteratorMode::LINES);
112!
458
    return (PyObject *)it;
112✔
459
}
115✔
460

461
static PyObject *TraceReader_iter_raw(TraceReaderObject *self, PyObject *args,
22✔
462
                                      PyObject *kwds) {
463
    static const char *kwlist[] = {"start_line", "end_line",    "start_byte",
464
                                   "end_byte",   "buffer_size", "line_aligned",
465
                                   "multi_line", "query",       NULL};
466
    Py_ssize_t start_line = 0, end_line = 0;
22✔
467
    Py_ssize_t start_byte = 0, end_byte = 0;
22✔
468
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
22✔
469
    int line_aligned = 1;
22✔
470
    int multi_line = 1;
22✔
471
    const char *query_str = NULL;
22✔
472

473
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnppz", (char **)kwlist,
22!
474
                                     &start_line, &end_line, &start_byte,
475
                                     &end_byte, &buffer_size, &line_aligned,
476
                                     &multi_line, &query_str)) {
477
        return NULL;
×
478
    }
479

480
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
22!
481
        buffer_size <= 0) {
20!
482
        PyErr_SetString(
2!
483
            PyExc_ValueError,
1✔
484
            "range arguments must be >= 0; buffer_size must be > 0");
485
        return NULL;
2✔
486
    }
487

488
    TraceReaderConfig cfg;
20✔
489
    try {
490
        cfg = build_config(self);
20!
491
    } catch (const std::exception &e) {
10!
492
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
493
        return NULL;
×
494
    }
×
495

496
    ReadConfig rc;
20✔
497
    rc.start_line = static_cast<std::size_t>(start_line);
20✔
498
    rc.end_line = static_cast<std::size_t>(end_line);
20✔
499
    rc.start_byte = static_cast<std::size_t>(start_byte);
20✔
500
    rc.end_byte = static_cast<std::size_t>(end_byte);
20✔
501
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
20✔
502
    rc.line_aligned = line_aligned != 0;
20✔
503
    rc.multi_line = multi_line != 0;
20✔
504
    if (query_str) rc.query = query_str;
20!
505

506
    auto state = std::make_shared<IteratorState>();
20!
507

508
    Runtime *rt = get_runtime(self);
20!
509
    try {
510
        rt->submit(produce_raw(state, cfg, rc), "iter_raw");
20!
511
    } catch (const std::exception &e) {
10!
512
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
513
        return NULL;
×
514
    }
×
515

516
    TraceReaderIteratorObject *it = make_iterator(state, IteratorMode::RAW);
20!
517
    return (PyObject *)it;
20✔
518
}
21✔
519

520
static PyObject *TraceReader_read_lines(TraceReaderObject *self, PyObject *args,
90✔
521
                                        PyObject *kwds) {
522
    PyObject *iter = TraceReader_iter_lines(self, args, kwds);
90✔
523
    if (!iter) return NULL;
90✔
524
    PyObject *list = PySequence_List(iter);
86✔
525
    Py_DECREF(iter);
43✔
526
    return list;
86✔
527
}
45✔
528

529
static PyObject *TraceReader_read_raw(TraceReaderObject *self, PyObject *args,
8✔
530
                                      PyObject *kwds) {
531
    PyObject *iter = TraceReader_iter_raw(self, args, kwds);
8✔
532
    if (!iter) return NULL;
8✔
533
    PyObject *list = PySequence_List(iter);
8✔
534
    Py_DECREF(iter);
4✔
535
    return list;
8✔
536
}
4✔
537

538
static PyObject *TraceReader_iter_lines_json(TraceReaderObject *self,
20✔
539
                                             PyObject *args, PyObject *kwds) {
540
    static const char *kwlist[] = {"start_line", "end_line",    "start_byte",
541
                                   "end_byte",   "buffer_size", "query",
542
                                   NULL};
543
    Py_ssize_t start_line = 0, end_line = 0;
20✔
544
    Py_ssize_t start_byte = 0, end_byte = 0;
20✔
545
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
20✔
546
    const char *query_str = NULL;
20✔
547

548
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnz", (char **)kwlist,
20!
549
                                     &start_line, &end_line, &start_byte,
550
                                     &end_byte, &buffer_size, &query_str)) {
551
        return NULL;
×
552
    }
553

554
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
20!
555
        buffer_size <= 0) {
20!
556
        PyErr_SetString(
×
557
            PyExc_ValueError,
558
            "range arguments must be >= 0; buffer_size must be > 0");
559
        return NULL;
×
560
    }
561

562
    TraceReaderConfig cfg;
20✔
563
    try {
564
        cfg = build_config(self);
20!
565
    } catch (const std::exception &e) {
10!
566
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
567
        return NULL;
×
568
    }
×
569

570
    ReadConfig rc;
20✔
571
    rc.start_line = static_cast<std::size_t>(start_line);
20✔
572
    rc.end_line = static_cast<std::size_t>(end_line);
20✔
573
    rc.start_byte = static_cast<std::size_t>(start_byte);
20✔
574
    rc.end_byte = static_cast<std::size_t>(end_byte);
20✔
575
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
20✔
576
    if (query_str) rc.query = query_str;
20!
577

578
    auto state = std::make_shared<IteratorState>();
20!
579

580
    Runtime *rt = get_runtime(self);
20!
581
    try {
582
        rt->submit(produce_lines(state, cfg, rc), "iter_lines_json");
20!
583
    } catch (const std::exception &e) {
10!
584
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
585
        return NULL;
×
586
    }
×
587

588
    TraceReaderIteratorObject *it = make_iterator(state, IteratorMode::JSON);
20!
589
    return (PyObject *)it;
20✔
590
}
20✔
591

592
static PyObject *TraceReader_read_lines_json(TraceReaderObject *self,
14✔
593
                                             PyObject *args, PyObject *kwds) {
594
    PyObject *iter = TraceReader_iter_lines_json(self, args, kwds);
14✔
595
    if (!iter) return NULL;
14✔
596
    PyObject *list = PySequence_List(iter);
14✔
597
    Py_DECREF(iter);
7✔
598
    return list;
14✔
599
}
7✔
600

601
#ifdef DFTRACER_UTILS_ENABLE_ARROW
602

603
static PyObject *TraceReader_iter_arrow(TraceReaderObject *self, PyObject *args,
28✔
604
                                        PyObject *kwds) {
605
    static const char *kwlist[] = {"batch_size", "start_line", "end_line",
606
                                   "start_byte", "end_byte",   "buffer_size",
607
                                   "query",      NULL};
608
    Py_ssize_t batch_size = 10000;
28✔
609
    Py_ssize_t start_line = 0, end_line = 0;
28✔
610
    Py_ssize_t start_byte = 0, end_byte = 0;
28✔
611
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
28✔
612
    const char *query_str = NULL;
28✔
613

614
    if (!PyArg_ParseTupleAndKeywords(
28!
615
            args, kwds, "|nnnnnnz", (char **)kwlist, &batch_size, &start_line,
14✔
616
            &end_line, &start_byte, &end_byte, &buffer_size, &query_str)) {
617
        return NULL;
×
618
    }
619

620
    if (batch_size <= 0) {
28!
621
        PyErr_SetString(PyExc_ValueError, "batch_size must be > 0");
×
622
        return NULL;
×
623
    }
624
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
28!
625
        buffer_size <= 0) {
28!
626
        PyErr_SetString(
×
627
            PyExc_ValueError,
628
            "range arguments must be >= 0; buffer_size must be > 0");
629
        return NULL;
×
630
    }
631

632
    TraceReaderConfig cfg;
28✔
633
    try {
634
        cfg = build_config(self);
28!
635
    } catch (const std::exception &e) {
14!
636
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
637
        return NULL;
×
638
    }
×
639

640
    ReadConfig rc;
28✔
641
    rc.start_line = static_cast<std::size_t>(start_line);
28✔
642
    rc.end_line = static_cast<std::size_t>(end_line);
28✔
643
    rc.start_byte = static_cast<std::size_t>(start_byte);
28✔
644
    rc.end_byte = static_cast<std::size_t>(end_byte);
28✔
645
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
28✔
646
    if (query_str) rc.query = query_str;
28!
647

648
    auto state = std::make_shared<ArrowIteratorState>();
28!
649

650
    Runtime *rt = get_runtime(self);
28!
651
    try {
652
        rt->submit(produce_arrow_batches(state, cfg, rc,
42!
653
                                         static_cast<std::size_t>(batch_size)),
14✔
654
                   "iter_arrow");
14!
655
    } catch (const std::exception &e) {
14!
656
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
657
        return NULL;
×
658
    }
×
659

660
    TraceReaderIteratorObject *it = make_arrow_iterator(std::move(state));
28!
661
    return (PyObject *)it;
28✔
662
}
28✔
663

664
static PyObject *TraceReader_read_arrow(TraceReaderObject *self, PyObject *args,
10✔
665
                                        PyObject *kwds) {
666
    PyObject *iter = TraceReader_iter_arrow(self, args, kwds);
10✔
667
    if (!iter) return NULL;
10✔
668
    PyObject *list = PySequence_List(iter);
10✔
669
    Py_DECREF(iter);
5✔
670
    if (!list) return NULL;
10✔
671

672
    return wrap_arrow_table(list);
10✔
673
}
5✔
674

675
#endif  // DFTRACER_UTILS_ENABLE_ARROW
676

677
static PyObject *TraceReader_enter(TraceReaderObject *self,
8!
678
                                   PyObject *Py_UNUSED(ignored)) {
679
    Py_INCREF(self);
4✔
680
    return (PyObject *)self;
8✔
681
}
682

683
static PyObject *TraceReader_exit(TraceReaderObject *self, PyObject *args) {
6✔
684
    Py_RETURN_NONE;
6✔
685
}
686

687
static PyObject *TraceReader_get_file_path(TraceReaderObject *self,
16✔
688
                                           void *closure) {
689
    Py_INCREF(self->file_path);
16!
690
    return self->file_path;
16✔
691
}
692

693
static PyObject *TraceReader_get_index_dir(TraceReaderObject *self,
6✔
694
                                           void *closure) {
695
    Py_INCREF(self->index_dir);
6✔
696
    return self->index_dir;
6✔
697
}
698

699
static PyObject *TraceReader_get_has_index(TraceReaderObject *self,
12✔
700
                                           void *closure) {
701
    return PyBool_FromLong(self->has_index);
12✔
702
}
703

704
static PyObject *TraceReader_get_num_lines_prop(TraceReaderObject *self,
8✔
705
                                                void *closure) {
706
    try {
707
        TraceReaderConfig cfg = build_config(self);
8!
708
        TraceReader reader(std::move(cfg));
8!
709
        std::size_t n = reader.get_num_lines();
8!
710
        if (n > 0) return PyLong_FromSize_t(n);
8!
711
    } catch (...) {
8!
712
    }
×
713
    PyObject *empty_args = PyTuple_New(0);
8✔
714
    if (!empty_args) return NULL;
8✔
715
    PyObject *list = TraceReader_read_lines(self, empty_args, NULL);
8✔
716
    Py_DECREF(empty_args);
4✔
717
    if (!list) return NULL;
8✔
718
    Py_ssize_t n = PyList_GET_SIZE(list);
8✔
719
    Py_DECREF(list);
4✔
720
    return PyLong_FromSsize_t(n);
8✔
721
}
4✔
722

723
static PyObject *TraceReader_get_max_bytes(TraceReaderObject *self,
26✔
724
                                           PyObject *Py_UNUSED(ignored)) {
725
    try {
726
        TraceReaderConfig cfg = build_config(self);
26!
727
        TraceReader reader(std::move(cfg));
26!
728
        return PyLong_FromSize_t(reader.get_max_bytes());
26!
729
    } catch (const std::exception &e) {
26!
730
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
731
        return NULL;
×
732
    }
×
733
}
13✔
734

735
static PyObject *TraceReader_get_num_lines(TraceReaderObject *self,
4✔
736
                                           PyObject *Py_UNUSED(ignored)) {
737
    try {
738
        TraceReaderConfig cfg = build_config(self);
4!
739
        TraceReader reader(std::move(cfg));
4!
740
        return PyLong_FromSize_t(reader.get_num_lines());
4!
741
    } catch (const std::exception &e) {
4!
742
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
743
        return NULL;
×
744
    }
×
745
}
2✔
746

747
static PyMethodDef TraceReader_methods[] = {
748
    {"iter_lines", (PyCFunction)TraceReader_iter_lines,
749
     METH_VARARGS | METH_KEYWORDS,
750
     "Return an iterator over decoded lines.\n"
751
     "\n"
752
     "Args:\n"
753
     "    start_line (int): First line (0 = beginning).\n"
754
     "    end_line (int): Last line (0 = end of file).\n"
755
     "    start_byte (int): First byte offset (0 = beginning).\n"
756
     "    end_byte (int): Last byte offset (0 = end of file).\n"
757
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
758
    {"iter_raw", (PyCFunction)TraceReader_iter_raw,
759
     METH_VARARGS | METH_KEYWORDS,
760
     "Return an iterator over raw byte chunks.\n"
761
     "\n"
762
     "Args:\n"
763
     "    start_line (int): First line (0 = beginning).\n"
764
     "    end_line (int): Last line (0 = end of file).\n"
765
     "    start_byte (int): First byte offset (0 = beginning).\n"
766
     "    end_byte (int): Last byte offset (0 = end of file).\n"
767
     "    buffer_size (int): Internal read buffer size in bytes.\n"
768
     "    line_aligned (bool): Align chunks to line boundaries.\n"
769
     "    multi_line (bool): Allow multiple lines per chunk.\n"},
770
    {"read_lines", (PyCFunction)TraceReader_read_lines,
771
     METH_VARARGS | METH_KEYWORDS,
772
     "Read all lines and return as list.\n"
773
     "\n"
774
     "Args:\n"
775
     "    start_line (int): First line (0 = beginning).\n"
776
     "    end_line (int): Last line (0 = end of file).\n"
777
     "    start_byte (int): First byte offset (0 = beginning).\n"
778
     "    end_byte (int): Last byte offset (0 = end of file).\n"
779
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
780
    {"read_raw", (PyCFunction)TraceReader_read_raw,
781
     METH_VARARGS | METH_KEYWORDS,
782
     "Read all raw chunks and return as list.\n"
783
     "\n"
784
     "Args:\n"
785
     "    start_line (int): First line (0 = beginning).\n"
786
     "    end_line (int): Last line (0 = end of file).\n"
787
     "    start_byte (int): First byte offset (0 = beginning).\n"
788
     "    end_byte (int): Last byte offset (0 = end of file).\n"
789
     "    buffer_size (int): Internal read buffer size in bytes.\n"
790
     "    line_aligned (bool): Align chunks to line boundaries.\n"
791
     "    multi_line (bool): Allow multiple lines per chunk.\n"},
792
    {"iter_lines_json", (PyCFunction)TraceReader_iter_lines_json,
793
     METH_VARARGS | METH_KEYWORDS,
794
     "Return an iterator over parsed JSON objects.\n"
795
     "\n"
796
     "Args:\n"
797
     "    start_line (int): First line (0 = beginning).\n"
798
     "    end_line (int): Last line (0 = end of file).\n"
799
     "    start_byte (int): First byte offset (0 = beginning).\n"
800
     "    end_byte (int): Last byte offset (0 = end of file).\n"
801
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
802
    {"read_lines_json", (PyCFunction)TraceReader_read_lines_json,
803
     METH_VARARGS | METH_KEYWORDS,
804
     "Read all lines as parsed JSON objects.\n"
805
     "\n"
806
     "Args:\n"
807
     "    start_line (int): First line (0 = beginning).\n"
808
     "    end_line (int): Last line (0 = end of file).\n"
809
     "    start_byte (int): First byte offset (0 = beginning).\n"
810
     "    end_byte (int): Last byte offset (0 = end of file).\n"
811
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
812
#ifdef DFTRACER_UTILS_ENABLE_ARROW
813
    {"iter_arrow", (PyCFunction)TraceReader_iter_arrow,
814
     METH_VARARGS | METH_KEYWORDS,
815
     "Return an iterator over Arrow record batches.\n"
816
     "\n"
817
     "Args:\n"
818
     "    batch_size (int): Maximum rows per Arrow batch.\n"
819
     "    start_line (int): First line (0 = beginning).\n"
820
     "    end_line (int): Last line (0 = end of file).\n"
821
     "    start_byte (int): First byte offset (0 = beginning).\n"
822
     "    end_byte (int): Last byte offset (0 = end of file).\n"
823
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
824
    {"read_arrow", (PyCFunction)TraceReader_read_arrow,
825
     METH_VARARGS | METH_KEYWORDS,
826
     "Read all events as a materialized ArrowTable.\n"
827
     "\n"
828
     "Args:\n"
829
     "    batch_size (int): Maximum rows per Arrow batch.\n"
830
     "    start_line (int): First line (0 = beginning).\n"
831
     "    end_line (int): Last line (0 = end of file).\n"
832
     "    start_byte (int): First byte offset (0 = beginning).\n"
833
     "    end_byte (int): Last byte offset (0 = end of file).\n"
834
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
835
#endif
836
    {"get_max_bytes", (PyCFunction)TraceReader_get_max_bytes, METH_NOARGS,
837
     "Get the maximum byte position (0 if unknown for compressed\n"
838
     "files without index)."},
839
    {"get_num_lines", (PyCFunction)TraceReader_get_num_lines, METH_NOARGS,
840
     "Get the total number of lines (0 if unknown for files without\n"
841
     "index)."},
842
    {"__enter__", (PyCFunction)TraceReader_enter, METH_NOARGS,
843
     "Enter the runtime context for the with statement."},
844
    {"__exit__", (PyCFunction)TraceReader_exit, METH_VARARGS,
845
     "Exit the runtime context for the with statement."},
846
    {NULL}};
847

848
static PyGetSetDef TraceReader_getsetters[] = {
849
    {"file_path", (getter)TraceReader_get_file_path, NULL,
850
     "Path to the trace file", NULL},
851
    {"index_dir", (getter)TraceReader_get_index_dir, NULL,
852
     "Directory for index files", NULL},
853
    {"has_index", (getter)TraceReader_get_has_index, NULL,
854
     "True if a checkpoint index was found", NULL},
855
    {"num_lines", (getter)TraceReader_get_num_lines_prop, NULL,
856
     "Total line count (reads all lines if needed)", NULL},
857
    {NULL}};
858

859
PyTypeObject TraceReaderType = {
860
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.TraceReader",
861
    sizeof(TraceReaderObject),                /* tp_basicsize */
862
    0,                                        /* tp_itemsize */
863
    (destructor)TraceReader_dealloc,          /* tp_dealloc */
864
    0,                                        /* tp_vectorcall_offset */
865
    0,                                        /* tp_getattr */
866
    0,                                        /* tp_setattr */
867
    0,                                        /* tp_as_async */
868
    0,                                        /* tp_repr */
869
    0,                                        /* tp_as_number */
870
    0,                                        /* tp_as_sequence */
871
    0,                                        /* tp_as_mapping */
872
    0,                                        /* tp_hash */
873
    0,                                        /* tp_call */
874
    0,                                        /* tp_str */
875
    0,                                        /* tp_getattro */
876
    0,                                        /* tp_setattro */
877
    0,                                        /* tp_as_buffer */
878
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
879
    "TraceReader(file_path: str, index_dir: str = '',\n"
880
    "            checkpoint_size: int = 33554432,\n"
881
    "            auto_build_index: bool = False,\n"
882
    "            index_threshold: int = 8388608,\n"
883
    "            runtime: Runtime | None = None)\n"
884
    "--\n"
885
    "\n"
886
    "Smart trace file reader that auto-selects sequential or indexed\n"
887
    "reading based on whether an ``.idx`` sidecar exists.\n"
888
    "\n"
889
    "Args:\n"
890
    "    file_path (str): Path to the trace file (.pfw.gz or plain "
891
    "text).\n"
892
    "    index_dir (str): Directory to search for ``.idx`` sidecar "
893
    "files.\n"
894
    "        Empty string (default) searches next to the trace file.\n"
895
    "    checkpoint_size (int): Checkpoint interval in bytes for index\n"
896
    "        building (default 32 MB).\n"
897
    "    auto_build_index (bool): If True, automatically build an "
898
    "index\n"
899
    "        when none exists and the file exceeds *index_threshold*.\n"
900
    "    index_threshold (int): Minimum file size in bytes before\n"
901
    "        auto-indexing is triggered (default 8 MB).\n"
902
    "    runtime (Runtime or None): Runtime instance for thread pool "
903
    "control.\n"
904
    "        If None, uses the default global Runtime.\n"
905
    "\n"
906
    "Raises:\n"
907
    "    RuntimeError: If *file_path* does not exist or cannot be "
908
    "opened.\n",                /* tp_doc */
909
    0,                          /* tp_traverse */
910
    0,                          /* tp_clear */
911
    0,                          /* tp_richcompare */
912
    0,                          /* tp_weaklistoffset */
913
    0,                          /* tp_iter */
914
    0,                          /* tp_iternext */
915
    TraceReader_methods,        /* tp_methods */
916
    0,                          /* tp_members */
917
    TraceReader_getsetters,     /* tp_getset */
918
    0,                          /* tp_base */
919
    0,                          /* tp_dict */
920
    0,                          /* tp_descr_get */
921
    0,                          /* tp_descr_set */
922
    0,                          /* tp_dictoffset */
923
    (initproc)TraceReader_init, /* tp_init */
924
    0,                          /* tp_alloc */
925
    TraceReader_new,            /* tp_new */
926
};
927

928
int init_trace_reader(PyObject *m) {
2✔
929
    if (PyType_Ready(&TraceReaderType) < 0) return -1;
2✔
930

931
    Py_INCREF(&TraceReaderType);
1✔
932
    if (PyModule_AddObject(m, "TraceReader", (PyObject *)&TraceReaderType) <
2✔
933
        0) {
934
        Py_DECREF(&TraceReaderType);
935
        Py_DECREF(m);
936
        return -1;
×
937
    }
938

939
    return 0;
2✔
940
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc