• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28356348514

29 Jun 2026 07:40AM UTC coverage: 52.174% (-0.1%) from 52.278%
28356348514

Pull #83

github

web-flow
Merge 278203630 into 2efed6649
Pull Request #83: refactor and improve code QoL

37276 of 92891 branches covered (40.13%)

Branch coverage included in aggregate %.

671 of 1173 new or added lines in 58 files covered. (57.2%)

66 existing lines in 30 files now uncovered.

33619 of 42991 relevant lines covered (78.2%)

20387.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.59
/src/dftracer/utils/python/trace_reader_iterator.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <Python.h>
3
#include <dftracer/utils/core/common/config.h>
4
#include <dftracer/utils/python/batch_byte_size.h>
5
#include <dftracer/utils/python/json.h>
6
#include <dftracer/utils/python/py_type_helpers.h>
7
#include <dftracer/utils/python/trace_reader_iterator.h>
8
#ifdef DFTRACER_UTILS_ENABLE_ARROW
9
#include <nanoarrow/nanoarrow.h>
10

11
using ArrowExportResult =
12
    dftracer::utils::utilities::common::arrow::ArrowExportResult;
13

14
static void release_arrow_schema(PyObject *capsule) {
131✔
15
    auto *schema = static_cast<ArrowSchema *>(
60✔
16
        PyCapsule_GetPointer(capsule, "arrow_schema"));
131✔
17
    if (schema && schema->release) {
131!
18
        schema->release(schema);
×
19
    }
20
    delete schema;
131✔
21
}
131✔
22

23
static void release_arrow_array(PyObject *capsule) {
131✔
24
    auto *array =
60✔
25
        static_cast<ArrowArray *>(PyCapsule_GetPointer(capsule, "arrow_array"));
131✔
26
    if (array && array->release) {
131!
27
        array->release(array);
×
28
    }
29
    delete array;
131✔
30
}
131✔
31

32
static PyObject *ArrowBatchCapsule_arrow_c_array(ArrowBatchCapsuleObject *self,
131✔
33
                                                 PyObject *args) {
34
    PyObject *requested_schema = Py_None;
131✔
35
    if (!PyArg_ParseTuple(args, "|O", &requested_schema)) return NULL;
131!
36

37
    if (!self->result || !self->result->valid()) {
131!
38
        PyErr_SetString(PyExc_RuntimeError,
×
39
                        "Arrow data already exported via __arrow_c_array__. "
40
                        "Each batch can only be exported once.");
41
        return NULL;
×
42
    }
43

44
    auto *schema = new ArrowSchema;
131!
45
    auto *array = new ArrowArray;
131!
46

47
    self->result->release_schema().move(schema);
131!
48
    self->result->release_array().move(array);
131!
49

50
    PyObject *schema_capsule =
60✔
51
        PyCapsule_New(schema, "arrow_schema", release_arrow_schema);
131!
52
    if (!schema_capsule) {
131✔
53
        if (schema->release) schema->release(schema);
×
54
        delete schema;
×
55
        if (array->release) array->release(array);
×
56
        delete array;
×
57
        return NULL;
×
58
    }
59

60
    PyObject *array_capsule =
60✔
61
        PyCapsule_New(array, "arrow_array", release_arrow_array);
131!
62
    if (!array_capsule) {
131✔
63
        Py_DECREF(schema_capsule);
64
        if (array->release) array->release(array);
×
65
        delete array;
×
66
        return NULL;
×
67
    }
68

69
    PyObject *tuple = PyTuple_Pack(2, schema_capsule, array_capsule);
131!
70
    Py_DECREF(schema_capsule);
60✔
71
    Py_DECREF(array_capsule);
60✔
72
    return tuple;
131✔
73
}
60✔
74

75
static PyObject *ArrowBatchCapsule_get_num_rows(ArrowBatchCapsuleObject *self,
94✔
76
                                                void *) {
77
    if (!self->result || !self->result->valid()) return PyLong_FromLong(0);
94!
78
    return PyLong_FromLongLong(self->result->num_rows());
94✔
79
}
47✔
80

81
static PyObject *ArrowBatchCapsule_get_num_columns(
4✔
82
    ArrowBatchCapsuleObject *self, void *) {
83
    if (!self->result || !self->result->valid()) return PyLong_FromLong(0);
4!
84
    return PyLong_FromLongLong(self->result->num_columns());
4✔
85
}
2✔
86

87
static void ArrowBatchCapsule_dealloc(ArrowBatchCapsuleObject *self) {
223✔
88
    delete self->result;
223✔
89
    Py_TYPE(self)->tp_free((PyObject *)self);
223✔
90
}
223✔
91

92
static PyMethodDef ArrowBatchCapsule_methods[] = {
93
    {"__arrow_c_array__", (PyCFunction)ArrowBatchCapsule_arrow_c_array,
94
     METH_VARARGS,
95
     "Export as Arrow C Data Interface PyCapsule pair (schema, array)"},
96
    {NULL}};
97

98
static PyGetSetDef ArrowBatchCapsule_getsetters[] = {
99
    {"num_rows", (getter)ArrowBatchCapsule_get_num_rows, NULL, "Number of rows",
100
     NULL},
101
    {"num_columns", (getter)ArrowBatchCapsule_get_num_columns, NULL,
102
     "Number of columns", NULL},
103
    {NULL}};
104

105
PyTypeObject ArrowBatchCapsuleType = {
106
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext._ArrowBatchCapsule",
107
    sizeof(ArrowBatchCapsuleObject),       /* tp_basicsize */
108
    0,                                     /* tp_itemsize */
109
    (destructor)ArrowBatchCapsule_dealloc, /* tp_dealloc */
110
    0,                                     /* tp_vectorcall_offset */
111
    0,                                     /* tp_getattr */
112
    0,                                     /* tp_setattr */
113
    0,                                     /* tp_as_async */
114
    0,                                     /* tp_repr */
115
    0,                                     /* tp_as_number */
116
    0,                                     /* tp_as_sequence */
117
    0,                                     /* tp_as_mapping */
118
    0,                                     /* tp_hash */
119
    0,                                     /* tp_call */
120
    0,                                     /* tp_str */
121
    0,                                     /* tp_getattro */
122
    0,                                     /* tp_setattro */
123
    0,                                     /* tp_as_buffer */
124
    Py_TPFLAGS_DEFAULT,                    /* tp_flags */
125
    "Internal Arrow batch wrapper implementing __arrow_c_array__ protocol",
126
    0,                                     /* tp_traverse */
127
    0,                                     /* tp_clear */
128
    0,                                     /* tp_richcompare */
129
    0,                                     /* tp_weaklistoffset */
130
    0,                                     /* tp_iter */
131
    0,                                     /* tp_iternext */
132
    ArrowBatchCapsule_methods,             /* tp_methods */
133
    0,                                     /* tp_members */
134
    ArrowBatchCapsule_getsetters,          /* tp_getset */
135
};
136

137
#endif                                     // DFTRACER_UTILS_ENABLE_ARROW
138

139
static void cancel_and_wait_batch_state(MemoryViewBatchIteratorState *bs) {
142✔
140
    bs->cancelled.store(true, std::memory_order_release);
142✔
141
    if (bs->channel) bs->channel->close();
142✔
142
    if (bs->task_future.valid()) bs->task_future.wait();
142✔
143
}
142✔
144

145
static void cancel_and_wait_json_dict_state(JsonDictIteratorState *js) {
14✔
146
    js->cancelled.store(true, std::memory_order_release);
14✔
147
    if (js->channel) js->channel->close();
14✔
148
    if (js->task_future.valid()) js->task_future.wait();
14✔
149
}
14✔
150

151
static void TraceReaderIterator_dealloc(TraceReaderIteratorObject *self) {
210✔
152
#ifdef DFTRACER_UTILS_ENABLE_ARROW
153
    if (self->arrow_state) {
210✔
154
        self->arrow_state->cancelled.store(true, std::memory_order_release);
54✔
155
        if (self->arrow_state->channel) self->arrow_state->channel->close();
54✔
156
        Py_BEGIN_ALLOW_THREADS if (self->arrow_state->task_future.valid()) {
54✔
157
            self->arrow_state->task_future.wait();
54✔
158
        }
27✔
159
        Py_END_ALLOW_THREADS self->arrow_state.reset();
54✔
160
    }
27✔
161
#endif
162
    if (self->json_dict_state) {
210✔
163
        Py_BEGIN_ALLOW_THREADS cancel_and_wait_json_dict_state(
21✔
164
            self->json_dict_state.get());
7✔
165
        Py_END_ALLOW_THREADS self->json_dict_state.reset();
14✔
166
    }
7✔
167
    if (self->batch_state) {
210✔
168
        Py_BEGIN_ALLOW_THREADS cancel_and_wait_batch_state(
213✔
169
            self->batch_state.get());
71✔
170
        Py_END_ALLOW_THREADS self->batch_state.reset();
142✔
171
    }
71✔
172
    Py_XDECREF(self->current_batch);
210✔
173
    self->current_batch = NULL;
210✔
174
    Py_TYPE(self)->tp_free((PyObject *)self);
210✔
175
}
210✔
176

177
static PyObject *TraceReaderIterator_iter(TraceReaderIteratorObject *self) {
206!
178
    Py_INCREF(self);
103✔
179
    return (PyObject *)self;
206✔
180
}
181

182
static PyObject *TraceReaderIterator_next(TraceReaderIteratorObject *self) {
3,072✔
183
    if (self->mode == IteratorMode::JSON_DICT) {
3,072✔
184
        while (true) {
262✔
185
            if (self->json_dict_current_batch) {
524✔
186
                auto &events = self->json_dict_current_batch->events;
510✔
187
                Py_ssize_t n = static_cast<Py_ssize_t>(events.size());
510✔
188
                if (self->json_dict_index < n) {
510✔
189
                    JsonDictValueObject *obj =
240✔
190
                        (JsonDictValueObject *)JsonDictValueType.tp_alloc(
480!
191
                            &JsonDictValueType, 0);
192
                    if (!obj) return NULL;
727✔
193
                    new (&obj->batch) std::shared_ptr<JsonDictBatch>(
720✔
194
                        self->json_dict_current_batch);
480✔
195
                    obj->event_index =
480✔
196
                        static_cast<std::size_t>(self->json_dict_index);
480✔
197
                    obj->is_args = false;
480✔
198
                    self->json_dict_index++;
480✔
199
                    return (PyObject *)obj;
480✔
200
                }
201
                self->json_dict_current_batch.reset();
30✔
202
                self->json_dict_index = 0;
30✔
203
            }
15✔
204

205
            auto *js = self->json_dict_state.get();
44✔
206
            std::optional<JsonDictBatch> batch;
44✔
207
            Py_BEGIN_ALLOW_THREADS batch = js->channel->blocking_receive();
44!
208
            Py_END_ALLOW_THREADS
44!
209

210
                if (!batch.has_value()) {
44✔
211
                std::lock_guard<std::mutex> lock(js->error_mtx);
14!
212
                if (js->error) {
14✔
213
                    try {
214
                        std::rethrow_exception(js->error);
×
215
                    } catch (const std::exception &e) {
×
216
                        PyErr_SetString(PyExc_RuntimeError, e.what());
×
217
                        return NULL;
×
218
                    } catch (...) {
×
219
                        PyErr_SetString(PyExc_RuntimeError,
×
220
                                        "Unknown error in json dict iterator");
221
                        return NULL;
×
222
                    }
×
223
                }
224
                return NULL;
14✔
225
            }
14✔
226

227
            auto dequeued_bytes = dftracer::utils::python::byte_size(*batch);
30!
228
            js->bytes_in_queue.fetch_sub(dequeued_bytes,
30✔
229
                                         std::memory_order_acq_rel);
230
            self->json_dict_current_batch =
15✔
231
                std::make_shared<JsonDictBatch>(std::move(*batch));
30!
232
            self->json_dict_index = 0;
30✔
233
        }
59✔
234
    }
235

236
#ifdef DFTRACER_UTILS_ENABLE_ARROW
237
    if (self->mode == IteratorMode::ARROW) {
2,578✔
238
        auto *astate = self->arrow_state.get();
142✔
239
        std::optional<ArrowExportResult> batch;
142✔
240
        Py_BEGIN_ALLOW_THREADS batch = astate->channel->blocking_receive();
142!
241
        Py_END_ALLOW_THREADS
142!
242

243
            if (!batch.has_value()) {
142✔
244
            std::lock_guard<std::mutex> lock(astate->error_mtx);
50!
245
            if (astate->error) {
50✔
246
                try {
247
                    std::rethrow_exception(astate->error);
×
248
                } catch (const std::exception &e) {
×
249
                    PyErr_SetString(PyExc_RuntimeError, e.what());
×
250
                    return NULL;
×
251
                } catch (...) {
×
252
                    PyErr_SetString(PyExc_RuntimeError,
×
253
                                    "Unknown error in Arrow iterator");
254
                    return NULL;
×
255
                }
×
256
            }
257
            return NULL;
50✔
258
        }
50✔
259

260
        auto dequeued_bytes = dftracer::utils::python::byte_size(*batch);
92!
261
        astate->bytes_in_queue.fetch_sub(dequeued_bytes,
92✔
262
                                         std::memory_order_acq_rel);
263

264
        ArrowBatchCapsuleObject *obj =
46✔
265
            (ArrowBatchCapsuleObject *)ArrowBatchCapsuleType.tp_alloc(
92!
266
                &ArrowBatchCapsuleType, 0);
267
        if (!obj) return NULL;
92!
268
        obj->result = new ArrowExportResult(std::move(*batch));
92!
269
        return (PyObject *)obj;
92✔
270
    }
142✔
271
#endif
272

273
    using namespace dftracer::utils::python;
274
    while (true) {
1,470✔
275
        if (self->current_batch) {
2,940✔
276
            auto *batch_obj = (MemoryViewBatchObject *)self->current_batch;
2,802✔
277
            Py_ssize_t n =
1,401✔
278
                static_cast<Py_ssize_t>(batch_obj->data->num_entries());
2,802✔
279
            if (self->batch_index < n) {
2,802✔
280
                PyObject *mv =
1,151✔
281
                    MemoryViewBatch_item(batch_obj, self->batch_index);
2,302!
282
                self->batch_index++;
2,302✔
283
                return mv;
2,369✔
284
            }
285
            Py_DECREF(self->current_batch);
500✔
286
            self->current_batch = NULL;
500✔
287
            self->batch_index = 0;
500✔
288
        }
250✔
289

290
        auto *bs = self->batch_state.get();
638✔
291
        std::optional<MemoryViewBatchData> batch_data;
638✔
292
        Py_BEGIN_ALLOW_THREADS batch_data = bs->channel->blocking_receive();
638!
293
        Py_END_ALLOW_THREADS
638!
294

295
            if (!batch_data.has_value()) {
638✔
296
            std::lock_guard<std::mutex> lock(bs->error_mtx);
134!
297
            if (bs->error) {
134✔
298
                try {
299
                    std::rethrow_exception(bs->error);
3!
300
                } catch (const std::exception &e) {
2!
301
                    PyErr_SetString(PyExc_RuntimeError, e.what());
2✔
302
                    return NULL;
2✔
303
                } catch (...) {
2!
304
                    PyErr_SetString(PyExc_RuntimeError,
×
305
                                    "Unknown error in batch iterator");
306
                    return NULL;
×
307
                }
1!
308
            }
309
            return NULL;
132✔
310
        }
134✔
311

312
        auto dequeued_bytes = dftracer::utils::python::byte_size(*batch_data);
504!
313
        bs->bytes_in_queue.fetch_sub(dequeued_bytes, std::memory_order_acq_rel);
504✔
314

315
        auto *obj = (MemoryViewBatchObject *)MemoryViewBatchType.tp_alloc(
504!
316
            &MemoryViewBatchType, 0);
317
        if (!obj) return NULL;
504✔
318
        obj->data = new MemoryViewBatchData(std::move(*batch_data));
504!
319
        self->current_batch = (PyObject *)obj;
504✔
320
        self->batch_index = 0;
504✔
321
    }
890✔
322
}
1,537✔
323

324
PyTypeObject TraceReaderIteratorType = {
325
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.TraceReaderIterator",
326
    sizeof(TraceReaderIteratorObject),       /* tp_basicsize */
327
    0,                                       /* tp_itemsize */
328
    (destructor)TraceReaderIterator_dealloc, /* tp_dealloc */
329
    0,                                       /* tp_vectorcall_offset */
330
    0,                                       /* tp_getattr */
331
    0,                                       /* tp_setattr */
332
    0,                                       /* tp_as_async */
333
    0,                                       /* tp_repr */
334
    0,                                       /* tp_as_number */
335
    0,                                       /* tp_as_sequence */
336
    0,                                       /* tp_as_mapping */
337
    0,                                       /* tp_hash */
338
    0,                                       /* tp_call */
339
    0,                                       /* tp_str */
340
    0,                                       /* tp_getattro */
341
    0,                                       /* tp_setattro */
342
    0,                                       /* tp_as_buffer */
343
    Py_TPFLAGS_DEFAULT,                      /* tp_flags */
344
    "Lazy iterator over TraceReader lines or raw chunks",
345
    0,                                       /* tp_traverse */
346
    0,                                       /* tp_clear */
347
    0,                                       /* tp_richcompare */
348
    0,                                       /* tp_weaklistoffset */
349
    (getiterfunc)TraceReaderIterator_iter,   /* tp_iter */
350
    (iternextfunc)TraceReaderIterator_next,  /* tp_iternext */
351
    0,                                       /* tp_methods */
352
    0,                                       /* tp_members */
353
    0,                                       /* tp_getset */
354
    0,                                       /* tp_base */
355
    0,                                       /* tp_dict */
356
    0,                                       /* tp_descr_get */
357
    0,                                       /* tp_descr_set */
358
    0,                                       /* tp_dictoffset */
359
    0,                                       /* tp_init */
360
    0,                                       /* tp_alloc */
361
    0,                                       /* tp_new */
362
};
363

364
int init_trace_reader_iterator(PyObject *m) {
2✔
365
    if (register_type(m, &TraceReaderIteratorType, "TraceReaderIterator") < 0)
2✔
UNCOV
366
        return -1;
×
367

368
#ifdef DFTRACER_UTILS_ENABLE_ARROW
369
    if (register_type(m, &ArrowBatchCapsuleType, "_ArrowBatchCapsule") < 0)
2✔
UNCOV
370
        return -1;
×
371
#endif
372

373
    return 0;
2✔
374
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc