• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28398085302

29 Jun 2026 07:43PM UTC coverage: 50.067% (-2.2%) from 52.278%
28398085302

Pull #83

github

web-flow
Merge 9a615085e into 2efed6649
Pull Request #83: refactor and improve code QoL

16342 of 44293 branches covered (36.9%)

Branch coverage included in aggregate %.

613 of 1132 new or added lines in 52 files covered. (54.15%)

687 existing lines in 116 files now uncovered.

21698 of 31685 relevant lines covered (68.48%)

12958.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.68
/src/dftracer/utils/python/trace_reader_iterator.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <Python.h>
3
#include <dftracer/utils/core/common/config.h>
4
#include <dftracer/utils/python/batch_byte_size.h>
5
#include <dftracer/utils/python/json.h>
6
#include <dftracer/utils/python/py_type_helpers.h>
7
#include <dftracer/utils/python/trace_reader_iterator.h>
8
#ifdef DFTRACER_UTILS_ENABLE_ARROW
9
#include <nanoarrow/nanoarrow.h>
10

11
using ArrowExportResult =
12
    dftracer::utils::utilities::common::arrow::ArrowExportResult;
13

14
static void release_arrow_schema(PyObject *capsule) {
71✔
15
    auto *schema = static_cast<ArrowSchema *>(
16
        PyCapsule_GetPointer(capsule, "arrow_schema"));
71✔
17
    if (schema && schema->release) {
71!
18
        schema->release(schema);
×
19
    }
20
    delete schema;
71!
21
}
71✔
22

23
static void release_arrow_array(PyObject *capsule) {
71✔
24
    auto *array =
25
        static_cast<ArrowArray *>(PyCapsule_GetPointer(capsule, "arrow_array"));
71✔
26
    if (array && array->release) {
71!
27
        array->release(array);
×
28
    }
29
    delete array;
71!
30
}
71✔
31

32
static PyObject *ArrowBatchCapsule_arrow_c_array(ArrowBatchCapsuleObject *self,
71✔
33
                                                 PyObject *args) {
34
    PyObject *requested_schema = Py_None;
71✔
35
    if (!PyArg_ParseTuple(args, "|O", &requested_schema)) return NULL;
71!
36

37
    if (!self->result || !self->result->valid()) {
71!
38
        PyErr_SetString(PyExc_RuntimeError,
×
39
                        "Arrow data already exported via __arrow_c_array__. "
40
                        "Each batch can only be exported once.");
41
        return NULL;
×
42
    }
43

44
    auto *schema = new ArrowSchema;
71!
45
    auto *array = new ArrowArray;
71!
46

47
    self->result->release_schema().move(schema);
71!
48
    self->result->release_array().move(array);
71!
49

50
    PyObject *schema_capsule =
51
        PyCapsule_New(schema, "arrow_schema", release_arrow_schema);
71!
52
    if (!schema_capsule) {
71!
53
        if (schema->release) schema->release(schema);
×
54
        delete schema;
×
55
        if (array->release) array->release(array);
×
56
        delete array;
×
57
        return NULL;
×
58
    }
59

60
    PyObject *array_capsule =
61
        PyCapsule_New(array, "arrow_array", release_arrow_array);
71!
62
    if (!array_capsule) {
71!
63
        Py_DECREF(schema_capsule);
64
        if (array->release) array->release(array);
×
65
        delete array;
×
66
        return NULL;
×
67
    }
68

69
    PyObject *tuple = PyTuple_Pack(2, schema_capsule, array_capsule);
71!
70
    Py_DECREF(schema_capsule);
71
    Py_DECREF(array_capsule);
72
    return tuple;
71✔
73
}
74

75
static PyObject *ArrowBatchCapsule_get_num_rows(ArrowBatchCapsuleObject *self,
47✔
76
                                                void *) {
77
    if (!self->result || !self->result->valid()) return PyLong_FromLong(0);
47!
78
    return PyLong_FromLongLong(self->result->num_rows());
47✔
79
}
80

81
static PyObject *ArrowBatchCapsule_get_num_columns(
2✔
82
    ArrowBatchCapsuleObject *self, void *) {
83
    if (!self->result || !self->result->valid()) return PyLong_FromLong(0);
2!
84
    return PyLong_FromLongLong(self->result->num_columns());
2✔
85
}
86

87
static void ArrowBatchCapsule_dealloc(ArrowBatchCapsuleObject *self) {
117✔
88
    delete self->result;
117!
89
    Py_TYPE(self)->tp_free((PyObject *)self);
117✔
90
}
117✔
91

92
static PyMethodDef ArrowBatchCapsule_methods[] = {
93
    {"__arrow_c_array__", (PyCFunction)ArrowBatchCapsule_arrow_c_array,
94
     METH_VARARGS,
95
     "Export as Arrow C Data Interface PyCapsule pair (schema, array)"},
96
    {NULL}};
97

98
static PyGetSetDef ArrowBatchCapsule_getsetters[] = {
99
    {"num_rows", (getter)ArrowBatchCapsule_get_num_rows, NULL, "Number of rows",
100
     NULL},
101
    {"num_columns", (getter)ArrowBatchCapsule_get_num_columns, NULL,
102
     "Number of columns", NULL},
103
    {NULL}};
104

105
PyTypeObject ArrowBatchCapsuleType = {
106
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext._ArrowBatchCapsule",
107
    sizeof(ArrowBatchCapsuleObject),       /* tp_basicsize */
108
    0,                                     /* tp_itemsize */
109
    (destructor)ArrowBatchCapsule_dealloc, /* tp_dealloc */
110
    0,                                     /* tp_vectorcall_offset */
111
    0,                                     /* tp_getattr */
112
    0,                                     /* tp_setattr */
113
    0,                                     /* tp_as_async */
114
    0,                                     /* tp_repr */
115
    0,                                     /* tp_as_number */
116
    0,                                     /* tp_as_sequence */
117
    0,                                     /* tp_as_mapping */
118
    0,                                     /* tp_hash */
119
    0,                                     /* tp_call */
120
    0,                                     /* tp_str */
121
    0,                                     /* tp_getattro */
122
    0,                                     /* tp_setattro */
123
    0,                                     /* tp_as_buffer */
124
    Py_TPFLAGS_DEFAULT,                    /* tp_flags */
125
    "Internal Arrow batch wrapper implementing __arrow_c_array__ protocol",
126
    0,                                     /* tp_traverse */
127
    0,                                     /* tp_clear */
128
    0,                                     /* tp_richcompare */
129
    0,                                     /* tp_weaklistoffset */
130
    0,                                     /* tp_iter */
131
    0,                                     /* tp_iternext */
132
    ArrowBatchCapsule_methods,             /* tp_methods */
133
    0,                                     /* tp_members */
134
    ArrowBatchCapsule_getsetters,          /* tp_getset */
135
};
136

137
#endif                                     // DFTRACER_UTILS_ENABLE_ARROW
138

139
static void cancel_and_wait_batch_state(MemoryViewBatchIteratorState *bs) {
71✔
140
    bs->cancelled.store(true, std::memory_order_release);
71✔
141
    if (bs->channel) bs->channel->close();
71!
142
    if (bs->task_future.valid()) bs->task_future.wait();
71!
143
}
71✔
144

145
static void cancel_and_wait_json_dict_state(JsonDictIteratorState *js) {
7✔
146
    js->cancelled.store(true, std::memory_order_release);
7✔
147
    if (js->channel) js->channel->close();
7!
148
    if (js->task_future.valid()) js->task_future.wait();
7!
149
}
7✔
150

151
static void TraceReaderIterator_dealloc(TraceReaderIteratorObject *self) {
105✔
152
#ifdef DFTRACER_UTILS_ENABLE_ARROW
153
    if (self->arrow_state) {
105✔
154
        self->arrow_state->cancelled.store(true, std::memory_order_release);
27✔
155
        if (self->arrow_state->channel) self->arrow_state->channel->close();
27!
156
        Py_BEGIN_ALLOW_THREADS if (self->arrow_state->task_future.valid()) {
27!
157
            self->arrow_state->task_future.wait();
27✔
158
        }
159
        Py_END_ALLOW_THREADS self->arrow_state.reset();
27✔
160
    }
161
#endif
162
    if (self->json_dict_state) {
105✔
163
        Py_BEGIN_ALLOW_THREADS cancel_and_wait_json_dict_state(
7✔
164
            self->json_dict_state.get());
165
        Py_END_ALLOW_THREADS self->json_dict_state.reset();
7✔
166
    }
167
    if (self->batch_state) {
105✔
168
        Py_BEGIN_ALLOW_THREADS cancel_and_wait_batch_state(
71✔
169
            self->batch_state.get());
170
        Py_END_ALLOW_THREADS self->batch_state.reset();
71✔
171
    }
172
    Py_XDECREF(self->current_batch);
105✔
173
    self->current_batch = NULL;
105✔
174
    Py_TYPE(self)->tp_free((PyObject *)self);
105✔
175
}
105✔
176

177
static PyObject *TraceReaderIterator_iter(TraceReaderIteratorObject *self) {
103!
178
    Py_INCREF(self);
179
    return (PyObject *)self;
103✔
180
}
181

182
static PyObject *TraceReaderIterator_next(TraceReaderIteratorObject *self) {
1,536✔
183
    if (self->mode == IteratorMode::JSON_DICT) {
1,536✔
184
        while (true) {
185
            if (self->json_dict_current_batch) {
262✔
186
                auto &events = self->json_dict_current_batch->events;
255✔
187
                Py_ssize_t n = static_cast<Py_ssize_t>(events.size());
255✔
188
                if (self->json_dict_index < n) {
255✔
189
                    JsonDictValueObject *obj =
190
                        (JsonDictValueObject *)JsonDictValueType.tp_alloc(
240!
191
                            &JsonDictValueType, 0);
192
                    if (!obj) return NULL;
487!
193
                    new (&obj->batch) std::shared_ptr<JsonDictBatch>(
240✔
194
                        self->json_dict_current_batch);
240✔
195
                    obj->event_index =
240✔
196
                        static_cast<std::size_t>(self->json_dict_index);
240✔
197
                    obj->is_args = false;
240✔
198
                    self->json_dict_index++;
240✔
199
                    return (PyObject *)obj;
240✔
200
                }
201
                self->json_dict_current_batch.reset();
15✔
202
                self->json_dict_index = 0;
15✔
203
            }
204

205
            auto *js = self->json_dict_state.get();
22✔
206
            std::optional<JsonDictBatch> batch;
22✔
207
            Py_BEGIN_ALLOW_THREADS batch = js->channel->blocking_receive();
22!
208
            Py_END_ALLOW_THREADS
22!
209

210
                if (!batch.has_value()) {
22✔
211
                std::lock_guard<std::mutex> lock(js->error_mtx);
7!
212
                if (js->error) {
7!
213
                    try {
214
                        std::rethrow_exception(js->error);
×
215
                    } catch (const std::exception &e) {
×
216
                        PyErr_SetString(PyExc_RuntimeError, e.what());
×
217
                        return NULL;
×
218
                    } catch (...) {
×
219
                        PyErr_SetString(PyExc_RuntimeError,
×
220
                                        "Unknown error in json dict iterator");
221
                        return NULL;
×
222
                    }
×
223
                }
224
                return NULL;
7✔
225
            }
7✔
226

227
            auto dequeued_bytes = dftracer::utils::python::byte_size(*batch);
15✔
228
            js->bytes_in_queue.fetch_sub(dequeued_bytes,
15✔
229
                                         std::memory_order_acq_rel);
230
            self->json_dict_current_batch =
231
                std::make_shared<JsonDictBatch>(std::move(*batch));
15!
232
            self->json_dict_index = 0;
15✔
233
        }
37✔
234
    }
235

236
#ifdef DFTRACER_UTILS_ENABLE_ARROW
237
    if (self->mode == IteratorMode::ARROW) {
1,289✔
238
        auto *astate = self->arrow_state.get();
71✔
239
        std::optional<ArrowExportResult> batch;
71✔
240
        Py_BEGIN_ALLOW_THREADS batch = astate->channel->blocking_receive();
71!
241
        Py_END_ALLOW_THREADS
71!
242

243
            if (!batch.has_value()) {
71✔
244
            std::lock_guard<std::mutex> lock(astate->error_mtx);
25!
245
            if (astate->error) {
25!
246
                try {
247
                    std::rethrow_exception(astate->error);
×
248
                } catch (const std::exception &e) {
×
249
                    PyErr_SetString(PyExc_RuntimeError, e.what());
×
250
                    return NULL;
×
251
                } catch (...) {
×
252
                    PyErr_SetString(PyExc_RuntimeError,
×
253
                                    "Unknown error in Arrow iterator");
254
                    return NULL;
×
255
                }
×
256
            }
257
            return NULL;
25✔
258
        }
25✔
259

260
        auto dequeued_bytes = dftracer::utils::python::byte_size(*batch);
46!
261
        astate->bytes_in_queue.fetch_sub(dequeued_bytes,
46✔
262
                                         std::memory_order_acq_rel);
263

264
        ArrowBatchCapsuleObject *obj =
265
            (ArrowBatchCapsuleObject *)ArrowBatchCapsuleType.tp_alloc(
46!
266
                &ArrowBatchCapsuleType, 0);
267
        if (!obj) return NULL;
46!
268
        obj->result = new ArrowExportResult(std::move(*batch));
46!
269
        return (PyObject *)obj;
46✔
270
    }
71✔
271
#endif
272

273
    using namespace dftracer::utils::python;
274
    while (true) {
275
        if (self->current_batch) {
1,470✔
276
            auto *batch_obj = (MemoryViewBatchObject *)self->current_batch;
1,401✔
277
            Py_ssize_t n =
278
                static_cast<Py_ssize_t>(batch_obj->data->num_entries());
1,401✔
279
            if (self->batch_index < n) {
1,401✔
280
                PyObject *mv =
281
                    MemoryViewBatch_item(batch_obj, self->batch_index);
1,151!
282
                self->batch_index++;
1,151✔
283
                return mv;
1,218✔
284
            }
285
            Py_DECREF(self->current_batch);
250✔
286
            self->current_batch = NULL;
250✔
287
            self->batch_index = 0;
250✔
288
        }
289

290
        auto *bs = self->batch_state.get();
319✔
291
        std::optional<MemoryViewBatchData> batch_data;
319✔
292
        Py_BEGIN_ALLOW_THREADS batch_data = bs->channel->blocking_receive();
319!
293
        Py_END_ALLOW_THREADS
319!
294

295
            if (!batch_data.has_value()) {
319✔
296
            std::lock_guard<std::mutex> lock(bs->error_mtx);
67!
297
            if (bs->error) {
67✔
298
                try {
299
                    std::rethrow_exception(bs->error);
2✔
300
                } catch (const std::exception &e) {
1!
301
                    PyErr_SetString(PyExc_RuntimeError, e.what());
1!
302
                    return NULL;
1✔
303
                } catch (...) {
1✔
304
                    PyErr_SetString(PyExc_RuntimeError,
×
305
                                    "Unknown error in batch iterator");
306
                    return NULL;
×
UNCOV
307
                }
×
308
            }
309
            return NULL;
66✔
310
        }
67✔
311

312
        auto dequeued_bytes = dftracer::utils::python::byte_size(*batch_data);
252✔
313
        bs->bytes_in_queue.fetch_sub(dequeued_bytes, std::memory_order_acq_rel);
252✔
314

315
        auto *obj = (MemoryViewBatchObject *)MemoryViewBatchType.tp_alloc(
252!
316
            &MemoryViewBatchType, 0);
317
        if (!obj) return NULL;
252!
318
        obj->data = new MemoryViewBatchData(std::move(*batch_data));
252!
319
        self->current_batch = (PyObject *)obj;
252✔
320
        self->batch_index = 0;
252✔
321
    }
571✔
322
}
323

324
PyTypeObject TraceReaderIteratorType = {
325
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.TraceReaderIterator",
326
    sizeof(TraceReaderIteratorObject),       /* tp_basicsize */
327
    0,                                       /* tp_itemsize */
328
    (destructor)TraceReaderIterator_dealloc, /* tp_dealloc */
329
    0,                                       /* tp_vectorcall_offset */
330
    0,                                       /* tp_getattr */
331
    0,                                       /* tp_setattr */
332
    0,                                       /* tp_as_async */
333
    0,                                       /* tp_repr */
334
    0,                                       /* tp_as_number */
335
    0,                                       /* tp_as_sequence */
336
    0,                                       /* tp_as_mapping */
337
    0,                                       /* tp_hash */
338
    0,                                       /* tp_call */
339
    0,                                       /* tp_str */
340
    0,                                       /* tp_getattro */
341
    0,                                       /* tp_setattro */
342
    0,                                       /* tp_as_buffer */
343
    Py_TPFLAGS_DEFAULT,                      /* tp_flags */
344
    "Lazy iterator over TraceReader lines or raw chunks",
345
    0,                                       /* tp_traverse */
346
    0,                                       /* tp_clear */
347
    0,                                       /* tp_richcompare */
348
    0,                                       /* tp_weaklistoffset */
349
    (getiterfunc)TraceReaderIterator_iter,   /* tp_iter */
350
    (iternextfunc)TraceReaderIterator_next,  /* tp_iternext */
351
    0,                                       /* tp_methods */
352
    0,                                       /* tp_members */
353
    0,                                       /* tp_getset */
354
    0,                                       /* tp_base */
355
    0,                                       /* tp_dict */
356
    0,                                       /* tp_descr_get */
357
    0,                                       /* tp_descr_set */
358
    0,                                       /* tp_dictoffset */
359
    0,                                       /* tp_init */
360
    0,                                       /* tp_alloc */
361
    0,                                       /* tp_new */
362
};
363

364
int init_trace_reader_iterator(PyObject *m) {
1✔
365
    if (register_type(m, &TraceReaderIteratorType, "TraceReaderIterator") < 0)
1!
UNCOV
366
        return -1;
×
367

368
#ifdef DFTRACER_UTILS_ENABLE_ARROW
369
    if (register_type(m, &ArrowBatchCapsuleType, "_ArrowBatchCapsule") < 0)
1!
UNCOV
370
        return -1;
×
371
#endif
372

373
    return 0;
1✔
374
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc