• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28521653886

01 Jul 2026 01:36PM UTC coverage: 50.92% (-1.4%) from 52.278%
28521653886

Pull #83

github

web-flow
Merge 9bdedb1e9 into 2efed6649
Pull Request #83: refactor and improve code QoL

31893 of 80049 branches covered (39.84%)

Branch coverage included in aggregate %.

789 of 1613 new or added lines in 87 files covered. (48.92%)

5007 existing lines in 181 files now uncovered.

32812 of 47024 relevant lines covered (69.78%)

9905.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.06
/src/dftracer/utils/python/trace_reader_iterator.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <Python.h>
3
#include <dftracer/utils/core/common/config.h>
4
#include <dftracer/utils/python/batch_byte_size.h>
5
#include <dftracer/utils/python/json.h>
6
#include <dftracer/utils/python/py_errors.h>
7
#include <dftracer/utils/python/py_type_helpers.h>
8
#include <dftracer/utils/python/trace_reader_iterator.h>
9
#ifdef DFTRACER_UTILS_ENABLE_ARROW
10
#include <nanoarrow/nanoarrow.h>
11

12
using ArrowExportResult =
13
    dftracer::utils::utilities::common::arrow::ArrowExportResult;
14

15
static void release_arrow_schema(PyObject *capsule) {
60✔
16
    auto *schema = static_cast<ArrowSchema *>(
60✔
17
        PyCapsule_GetPointer(capsule, "arrow_schema"));
60✔
18
    if (schema && schema->release) {
60!
19
        schema->release(schema);
×
UNCOV
20
    }
×
21
    delete schema;
60!
22
}
60✔
23

24
static void release_arrow_array(PyObject *capsule) {
60✔
25
    auto *array =
60✔
26
        static_cast<ArrowArray *>(PyCapsule_GetPointer(capsule, "arrow_array"));
60✔
27
    if (array && array->release) {
60!
28
        array->release(array);
×
UNCOV
29
    }
×
30
    delete array;
60!
31
}
60✔
32

33
static PyObject *ArrowBatchCapsule_arrow_c_array(ArrowBatchCapsuleObject *self,
60✔
34
                                                 PyObject *args) {
35
    PyObject *requested_schema = Py_None;
60✔
36
    if (!PyArg_ParseTuple(args, "|O", &requested_schema)) return NULL;
60!
37

38
    if (!self->result || !self->result->valid()) {
60!
39
        PyErr_SetString(PyExc_RuntimeError,
×
40
                        "Arrow data already exported via __arrow_c_array__. "
41
                        "Each batch can only be exported once.");
42
        return NULL;
×
43
    }
44

45
    auto *schema = new ArrowSchema;
60✔
46
    auto *array = new ArrowArray;
60✔
47

48
    self->result->release_schema().move(schema);
60!
49
    self->result->release_array().move(array);
60!
50

51
    PyObject *schema_capsule =
60✔
52
        PyCapsule_New(schema, "arrow_schema", release_arrow_schema);
60✔
53
    if (!schema_capsule) {
60!
54
        if (schema->release) schema->release(schema);
×
55
        delete schema;
×
56
        if (array->release) array->release(array);
×
57
        delete array;
×
58
        return NULL;
×
59
    }
60

61
    PyObject *array_capsule =
60✔
62
        PyCapsule_New(array, "arrow_array", release_arrow_array);
60✔
63
    if (!array_capsule) {
60!
UNCOV
64
        Py_DECREF(schema_capsule);
×
65
        if (array->release) array->release(array);
×
66
        delete array;
×
67
        return NULL;
×
68
    }
69

70
    PyObject *tuple = PyTuple_Pack(2, schema_capsule, array_capsule);
60✔
71
    Py_DECREF(schema_capsule);
60✔
72
    Py_DECREF(array_capsule);
60✔
73
    return tuple;
60✔
74
}
60✔
75

76
static PyObject *ArrowBatchCapsule_get_num_rows(ArrowBatchCapsuleObject *self,
47✔
77
                                                void *) {
78
    if (!self->result || !self->result->valid()) return PyLong_FromLong(0);
47!
79
    return PyLong_FromLongLong(self->result->num_rows());
47✔
80
}
47✔
81

82
static PyObject *ArrowBatchCapsule_get_num_columns(
2✔
83
    ArrowBatchCapsuleObject *self, void *) {
84
    if (!self->result || !self->result->valid()) return PyLong_FromLong(0);
2!
85
    return PyLong_FromLongLong(self->result->num_columns());
2✔
86
}
2✔
87

88
static void ArrowBatchCapsule_dealloc(ArrowBatchCapsuleObject *self) {
106✔
89
    delete self->result;
106!
90
    Py_TYPE(self)->tp_free((PyObject *)self);
106✔
91
}
106✔
92

93
static PyMethodDef ArrowBatchCapsule_methods[] = {
94
    {"__arrow_c_array__", (PyCFunction)ArrowBatchCapsule_arrow_c_array,
95
     METH_VARARGS,
96
     "Export as Arrow C Data Interface PyCapsule pair (schema, array)"},
97
    {NULL}};
98

99
static PyGetSetDef ArrowBatchCapsule_getsetters[] = {
100
    {"num_rows", (getter)ArrowBatchCapsule_get_num_rows, NULL, "Number of rows",
101
     NULL},
102
    {"num_columns", (getter)ArrowBatchCapsule_get_num_columns, NULL,
103
     "Number of columns", NULL},
104
    {NULL}};
105

106
PyTypeObject ArrowBatchCapsuleType = {
107
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext._ArrowBatchCapsule",
108
    sizeof(ArrowBatchCapsuleObject),       /* tp_basicsize */
109
    0,                                     /* tp_itemsize */
110
    (destructor)ArrowBatchCapsule_dealloc, /* tp_dealloc */
111
    0,                                     /* tp_vectorcall_offset */
112
    0,                                     /* tp_getattr */
113
    0,                                     /* tp_setattr */
114
    0,                                     /* tp_as_async */
115
    0,                                     /* tp_repr */
116
    0,                                     /* tp_as_number */
117
    0,                                     /* tp_as_sequence */
118
    0,                                     /* tp_as_mapping */
119
    0,                                     /* tp_hash */
120
    0,                                     /* tp_call */
121
    0,                                     /* tp_str */
122
    0,                                     /* tp_getattro */
123
    0,                                     /* tp_setattro */
124
    0,                                     /* tp_as_buffer */
125
    Py_TPFLAGS_DEFAULT,                    /* tp_flags */
126
    "Internal Arrow batch wrapper implementing __arrow_c_array__ protocol",
127
    0,                                     /* tp_traverse */
128
    0,                                     /* tp_clear */
129
    0,                                     /* tp_richcompare */
130
    0,                                     /* tp_weaklistoffset */
131
    0,                                     /* tp_iter */
132
    0,                                     /* tp_iternext */
133
    ArrowBatchCapsule_methods,             /* tp_methods */
134
    0,                                     /* tp_members */
135
    ArrowBatchCapsule_getsetters,          /* tp_getset */
136
};
137

138
#endif                                     // DFTRACER_UTILS_ENABLE_ARROW
139

140
static void cancel_and_wait_batch_state(MemoryViewBatchIteratorState *bs) {
71✔
141
    bs->cancelled.store(true, std::memory_order_release);
71✔
142
    if (bs->channel) bs->channel->close();
71!
143
    if (bs->task_future.valid()) bs->task_future.wait();
71!
144
}
71✔
145

146
static void cancel_and_wait_json_dict_state(JsonDictIteratorState *js) {
7✔
147
    js->cancelled.store(true, std::memory_order_release);
7✔
148
    if (js->channel) js->channel->close();
7!
149
    if (js->task_future.valid()) js->task_future.wait();
7!
150
}
7✔
151

152
static void TraceReaderIterator_dealloc(TraceReaderIteratorObject *self) {
105✔
153
#ifdef DFTRACER_UTILS_ENABLE_ARROW
154
    if (self->arrow_state) {
105✔
155
        self->arrow_state->cancelled.store(true, std::memory_order_release);
27✔
156
        if (self->arrow_state->channel) self->arrow_state->channel->close();
27!
157
        Py_BEGIN_ALLOW_THREADS if (self->arrow_state->task_future.valid()) {
27!
158
            self->arrow_state->task_future.wait();
27✔
159
        }
27✔
160
        Py_END_ALLOW_THREADS self->arrow_state.reset();
27✔
161
    }
27✔
162
#endif
163
    if (self->json_dict_state) {
105✔
164
        Py_BEGIN_ALLOW_THREADS cancel_and_wait_json_dict_state(
14✔
165
            self->json_dict_state.get());
7✔
166
        Py_END_ALLOW_THREADS self->json_dict_state.reset();
7✔
167
    }
7✔
168
    if (self->batch_state) {
105✔
169
        Py_BEGIN_ALLOW_THREADS cancel_and_wait_batch_state(
142✔
170
            self->batch_state.get());
71✔
171
        Py_END_ALLOW_THREADS self->batch_state.reset();
71✔
172
    }
71✔
173
    Py_XDECREF(self->current_batch);
105✔
174
    self->current_batch = NULL;
105✔
175
    Py_TYPE(self)->tp_free((PyObject *)self);
105✔
176
}
105✔
177

178
static PyObject *TraceReaderIterator_iter(TraceReaderIteratorObject *self) {
103✔
179
    Py_INCREF(self);
103✔
180
    return (PyObject *)self;
103✔
181
}
182

183
static PyObject *TraceReaderIterator_next(TraceReaderIteratorObject *self) {
1,536✔
184
    if (self->mode == IteratorMode::JSON_DICT) {
1,536✔
185
        while (true) {
262✔
186
            if (self->json_dict_current_batch) {
262✔
187
                auto &events = self->json_dict_current_batch->events;
255✔
188
                Py_ssize_t n = static_cast<Py_ssize_t>(events.size());
255✔
189
                if (self->json_dict_index < n) {
255✔
190
                    JsonDictValueObject *obj =
240✔
191
                        (JsonDictValueObject *)JsonDictValueType.tp_alloc(
240✔
192
                            &JsonDictValueType, 0);
193
                    if (!obj) return NULL;
240!
194
                    new (&obj->batch) std::shared_ptr<JsonDictBatch>(
480✔
195
                        self->json_dict_current_batch);
240✔
196
                    obj->event_index =
240✔
197
                        static_cast<std::size_t>(self->json_dict_index);
240✔
198
                    obj->is_args = false;
240✔
199
                    self->json_dict_index++;
240✔
200
                    return (PyObject *)obj;
240✔
201
                }
202
                self->json_dict_current_batch.reset();
15✔
203
                self->json_dict_index = 0;
15✔
204
            }
15✔
205

206
            auto *js = self->json_dict_state.get();
22✔
207
            std::optional<JsonDictBatch> batch;
22✔
208
            Py_BEGIN_ALLOW_THREADS batch = js->channel->blocking_receive();
22!
209
            Py_END_ALLOW_THREADS
22!
210

211
                if (!batch.has_value()) {
22✔
212
                std::lock_guard<std::mutex> lock(js->error_mtx);
7!
213
                if (js->error) {
7!
214
                    try {
215
                        std::rethrow_exception(js->error);
×
216
                    } catch (const std::exception &e) {
×
NEW
217
                        set_typed_py_error(e);
×
218
                        return NULL;
×
219
                    } catch (...) {
×
220
                        PyErr_SetString(PyExc_RuntimeError,
×
221
                                        "Unknown error in json dict iterator");
222
                        return NULL;
×
223
                    }
×
UNCOV
224
                }
×
225
                return NULL;
7✔
226
            }
7✔
227

228
            auto dequeued_bytes = dftracer::utils::python::byte_size(*batch);
15!
229
            js->bytes_in_queue.fetch_sub(dequeued_bytes,
15✔
230
                                         std::memory_order_acq_rel);
231
            self->json_dict_current_batch =
15✔
232
                std::make_shared<JsonDictBatch>(std::move(*batch));
15!
233
            self->json_dict_index = 0;
15✔
234
        }
22✔
235
    }
236

237
#ifdef DFTRACER_UTILS_ENABLE_ARROW
238
    if (self->mode == IteratorMode::ARROW) {
1,289✔
239
        auto *astate = self->arrow_state.get();
71✔
240
        std::optional<ArrowExportResult> batch;
71✔
241
        Py_BEGIN_ALLOW_THREADS batch = astate->channel->blocking_receive();
71!
242
        Py_END_ALLOW_THREADS
71!
243

244
            if (!batch.has_value()) {
71✔
245
            std::lock_guard<std::mutex> lock(astate->error_mtx);
25!
246
            if (astate->error) {
25!
247
                try {
248
                    std::rethrow_exception(astate->error);
×
249
                } catch (const std::exception &e) {
×
NEW
250
                    set_typed_py_error(e);
×
251
                    return NULL;
×
252
                } catch (...) {
×
253
                    PyErr_SetString(PyExc_RuntimeError,
×
254
                                    "Unknown error in Arrow iterator");
255
                    return NULL;
×
256
                }
×
UNCOV
257
            }
×
258
            return NULL;
25✔
259
        }
25✔
260

261
        auto dequeued_bytes = dftracer::utils::python::byte_size(*batch);
46!
262
        astate->bytes_in_queue.fetch_sub(dequeued_bytes,
46✔
263
                                         std::memory_order_acq_rel);
264

265
        ArrowBatchCapsuleObject *obj =
46✔
266
            (ArrowBatchCapsuleObject *)ArrowBatchCapsuleType.tp_alloc(
46!
267
                &ArrowBatchCapsuleType, 0);
268
        if (!obj) return NULL;
46!
269
        obj->result = new ArrowExportResult(std::move(*batch));
46!
270
        return (PyObject *)obj;
46✔
271
    }
71✔
272
#endif
273

274
    using namespace dftracer::utils::python;
275
    while (true) {
1,470✔
276
        if (self->current_batch) {
1,470✔
277
            auto *batch_obj = (MemoryViewBatchObject *)self->current_batch;
1,401✔
278
            Py_ssize_t n =
1,401✔
279
                static_cast<Py_ssize_t>(batch_obj->data->num_entries());
1,401✔
280
            if (self->batch_index < n) {
1,401✔
281
                PyObject *mv =
1,151✔
282
                    MemoryViewBatch_item(batch_obj, self->batch_index);
1,151✔
283
                self->batch_index++;
1,151✔
284
                return mv;
1,151✔
285
            }
286
            Py_DECREF(self->current_batch);
250✔
287
            self->current_batch = NULL;
250✔
288
            self->batch_index = 0;
250✔
289
        }
250✔
290

291
        auto *bs = self->batch_state.get();
319✔
292
        std::optional<MemoryViewBatchData> batch_data;
319✔
293
        Py_BEGIN_ALLOW_THREADS batch_data = bs->channel->blocking_receive();
319!
294
        Py_END_ALLOW_THREADS
319!
295

296
            if (!batch_data.has_value()) {
319✔
297
            std::lock_guard<std::mutex> lock(bs->error_mtx);
67!
298
            if (bs->error) {
67✔
299
                try {
300
                    std::rethrow_exception(bs->error);
1!
301
                } catch (const std::exception &e) {
1!
302
                    set_typed_py_error(e);
1!
303
                    return NULL;
1✔
304
                } catch (...) {
1!
305
                    PyErr_SetString(PyExc_RuntimeError,
×
306
                                    "Unknown error in batch iterator");
307
                    return NULL;
×
308
                }
1!
UNCOV
309
            }
×
310
            return NULL;
66✔
311
        }
67✔
312

313
        auto dequeued_bytes = dftracer::utils::python::byte_size(*batch_data);
252!
314
        bs->bytes_in_queue.fetch_sub(dequeued_bytes, std::memory_order_acq_rel);
252✔
315

316
        auto *obj = (MemoryViewBatchObject *)MemoryViewBatchType.tp_alloc(
252!
317
            &MemoryViewBatchType, 0);
318
        if (!obj) return NULL;
252!
319
        obj->data = new MemoryViewBatchData(std::move(*batch_data));
252!
320
        self->current_batch = (PyObject *)obj;
252✔
321
        self->batch_index = 0;
252✔
322
    }
319✔
323
}
1,537✔
324

325
PyTypeObject TraceReaderIteratorType = {
326
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.TraceReaderIterator",
327
    sizeof(TraceReaderIteratorObject),       /* tp_basicsize */
328
    0,                                       /* tp_itemsize */
329
    (destructor)TraceReaderIterator_dealloc, /* tp_dealloc */
330
    0,                                       /* tp_vectorcall_offset */
331
    0,                                       /* tp_getattr */
332
    0,                                       /* tp_setattr */
333
    0,                                       /* tp_as_async */
334
    0,                                       /* tp_repr */
335
    0,                                       /* tp_as_number */
336
    0,                                       /* tp_as_sequence */
337
    0,                                       /* tp_as_mapping */
338
    0,                                       /* tp_hash */
339
    0,                                       /* tp_call */
340
    0,                                       /* tp_str */
341
    0,                                       /* tp_getattro */
342
    0,                                       /* tp_setattro */
343
    0,                                       /* tp_as_buffer */
344
    Py_TPFLAGS_DEFAULT,                      /* tp_flags */
345
    "Lazy iterator over TraceReader lines or raw chunks",
346
    0,                                       /* tp_traverse */
347
    0,                                       /* tp_clear */
348
    0,                                       /* tp_richcompare */
349
    0,                                       /* tp_weaklistoffset */
350
    (getiterfunc)TraceReaderIterator_iter,   /* tp_iter */
351
    (iternextfunc)TraceReaderIterator_next,  /* tp_iternext */
352
    0,                                       /* tp_methods */
353
    0,                                       /* tp_members */
354
    0,                                       /* tp_getset */
355
    0,                                       /* tp_base */
356
    0,                                       /* tp_dict */
357
    0,                                       /* tp_descr_get */
358
    0,                                       /* tp_descr_set */
359
    0,                                       /* tp_dictoffset */
360
    0,                                       /* tp_init */
361
    0,                                       /* tp_alloc */
362
    0,                                       /* tp_new */
363
};
364

365
int init_trace_reader_iterator(PyObject *m) {
1✔
366
    if (register_type(m, &TraceReaderIteratorType, "TraceReaderIterator") < 0)
1!
UNCOV
367
        return -1;
×
368

369
#ifdef DFTRACER_UTILS_ENABLE_ARROW
370
    if (register_type(m, &ArrowBatchCapsuleType, "_ArrowBatchCapsule") < 0)
1!
UNCOV
371
        return -1;
×
372
#endif
373

374
    return 0;
1✔
375
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc