• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 27052412546

06 Jun 2026 04:20AM UTC coverage: 50.862% (+1.0%) from 49.905%
27052412546

Pull #73

github

web-flow
Merge 734572730 into 88a3c8457
Pull Request #73: add portable dependencies wheel support

31801 of 79859 branches covered (39.82%)

Branch coverage included in aggregate %.

32491 of 46545 relevant lines covered (69.81%)

9947.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.85
/src/dftracer/utils/python/trace_reader_iterator.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <Python.h>
3
#include <dftracer/utils/core/common/config.h>
4
#include <dftracer/utils/python/batch_byte_size.h>
5
#include <dftracer/utils/python/json.h>
6
#include <dftracer/utils/python/trace_reader_iterator.h>
7
#ifdef DFTRACER_UTILS_ENABLE_ARROW
8
#include <nanoarrow/nanoarrow.h>
9

10
using ArrowExportResult =
11
    dftracer::utils::utilities::common::arrow::ArrowExportResult;
12

13
static void release_arrow_schema(PyObject *capsule) {
60✔
14
    auto *schema = static_cast<ArrowSchema *>(
60✔
15
        PyCapsule_GetPointer(capsule, "arrow_schema"));
60✔
16
    if (schema && schema->release) {
60!
17
        schema->release(schema);
×
18
    }
×
19
    delete schema;
60!
20
}
60✔
21

22
static void release_arrow_array(PyObject *capsule) {
60✔
23
    auto *array =
60✔
24
        static_cast<ArrowArray *>(PyCapsule_GetPointer(capsule, "arrow_array"));
60✔
25
    if (array && array->release) {
60!
26
        array->release(array);
×
27
    }
×
28
    delete array;
60!
29
}
60✔
30

31
static PyObject *ArrowBatchCapsule_arrow_c_array(ArrowBatchCapsuleObject *self,
60✔
32
                                                 PyObject *args) {
33
    PyObject *requested_schema = Py_None;
60✔
34
    if (!PyArg_ParseTuple(args, "|O", &requested_schema)) return NULL;
60!
35

36
    if (!self->result || !self->result->valid()) {
60!
37
        PyErr_SetString(PyExc_RuntimeError,
×
38
                        "Arrow data already exported via __arrow_c_array__. "
39
                        "Each batch can only be exported once.");
40
        return NULL;
×
41
    }
42

43
    auto *schema = new ArrowSchema;
60✔
44
    auto *array = new ArrowArray;
60✔
45

46
    self->result->release_schema().move(schema);
60!
47
    self->result->release_array().move(array);
60!
48

49
    PyObject *schema_capsule =
60✔
50
        PyCapsule_New(schema, "arrow_schema", release_arrow_schema);
60✔
51
    if (!schema_capsule) {
60!
52
        if (schema->release) schema->release(schema);
×
53
        delete schema;
×
54
        if (array->release) array->release(array);
×
55
        delete array;
×
56
        return NULL;
×
57
    }
58

59
    PyObject *array_capsule =
60✔
60
        PyCapsule_New(array, "arrow_array", release_arrow_array);
60✔
61
    if (!array_capsule) {
60!
62
        Py_DECREF(schema_capsule);
×
63
        if (array->release) array->release(array);
×
64
        delete array;
×
65
        return NULL;
×
66
    }
67

68
    PyObject *tuple = PyTuple_Pack(2, schema_capsule, array_capsule);
60✔
69
    Py_DECREF(schema_capsule);
60✔
70
    Py_DECREF(array_capsule);
60✔
71
    return tuple;
60✔
72
}
60✔
73

74
static PyObject *ArrowBatchCapsule_get_num_rows(ArrowBatchCapsuleObject *self,
47✔
75
                                                void *) {
76
    if (!self->result || !self->result->valid()) return PyLong_FromLong(0);
47!
77
    return PyLong_FromLongLong(self->result->num_rows());
47✔
78
}
47✔
79

80
static PyObject *ArrowBatchCapsule_get_num_columns(
2✔
81
    ArrowBatchCapsuleObject *self, void *) {
82
    if (!self->result || !self->result->valid()) return PyLong_FromLong(0);
2!
83
    return PyLong_FromLongLong(self->result->num_columns());
2✔
84
}
2✔
85

86
static void ArrowBatchCapsule_dealloc(ArrowBatchCapsuleObject *self) {
106✔
87
    delete self->result;
106!
88
    Py_TYPE(self)->tp_free((PyObject *)self);
106✔
89
}
106✔
90

91
static PyMethodDef ArrowBatchCapsule_methods[] = {
92
    {"__arrow_c_array__", (PyCFunction)ArrowBatchCapsule_arrow_c_array,
93
     METH_VARARGS,
94
     "Export as Arrow C Data Interface PyCapsule pair (schema, array)"},
95
    {NULL}};
96

97
static PyGetSetDef ArrowBatchCapsule_getsetters[] = {
98
    {"num_rows", (getter)ArrowBatchCapsule_get_num_rows, NULL, "Number of rows",
99
     NULL},
100
    {"num_columns", (getter)ArrowBatchCapsule_get_num_columns, NULL,
101
     "Number of columns", NULL},
102
    {NULL}};
103

104
PyTypeObject ArrowBatchCapsuleType = {
105
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext._ArrowBatchCapsule",
106
    sizeof(ArrowBatchCapsuleObject),       /* tp_basicsize */
107
    0,                                     /* tp_itemsize */
108
    (destructor)ArrowBatchCapsule_dealloc, /* tp_dealloc */
109
    0,                                     /* tp_vectorcall_offset */
110
    0,                                     /* tp_getattr */
111
    0,                                     /* tp_setattr */
112
    0,                                     /* tp_as_async */
113
    0,                                     /* tp_repr */
114
    0,                                     /* tp_as_number */
115
    0,                                     /* tp_as_sequence */
116
    0,                                     /* tp_as_mapping */
117
    0,                                     /* tp_hash */
118
    0,                                     /* tp_call */
119
    0,                                     /* tp_str */
120
    0,                                     /* tp_getattro */
121
    0,                                     /* tp_setattro */
122
    0,                                     /* tp_as_buffer */
123
    Py_TPFLAGS_DEFAULT,                    /* tp_flags */
124
    "Internal Arrow batch wrapper implementing __arrow_c_array__ protocol",
125
    0,                                     /* tp_traverse */
126
    0,                                     /* tp_clear */
127
    0,                                     /* tp_richcompare */
128
    0,                                     /* tp_weaklistoffset */
129
    0,                                     /* tp_iter */
130
    0,                                     /* tp_iternext */
131
    ArrowBatchCapsule_methods,             /* tp_methods */
132
    0,                                     /* tp_members */
133
    ArrowBatchCapsule_getsetters,          /* tp_getset */
134
};
135

136
#endif                                     // DFTRACER_UTILS_ENABLE_ARROW
137

138
static void cancel_and_wait_batch_state(MemoryViewBatchIteratorState *bs) {
71✔
139
    bs->cancelled.store(true, std::memory_order_release);
71✔
140
    if (bs->channel) bs->channel->close();
71!
141
    if (bs->task_future.valid()) bs->task_future.wait();
71!
142
}
71✔
143

144
static void cancel_and_wait_json_dict_state(JsonDictIteratorState *js) {
7✔
145
    js->cancelled.store(true, std::memory_order_release);
7✔
146
    if (js->channel) js->channel->close();
7!
147
    if (js->task_future.valid()) js->task_future.wait();
7!
148
}
7✔
149

150
static void TraceReaderIterator_dealloc(TraceReaderIteratorObject *self) {
105✔
151
#ifdef DFTRACER_UTILS_ENABLE_ARROW
152
    if (self->arrow_state) {
105✔
153
        self->arrow_state->cancelled.store(true, std::memory_order_release);
27✔
154
        if (self->arrow_state->channel) self->arrow_state->channel->close();
27!
155
        Py_BEGIN_ALLOW_THREADS if (self->arrow_state->task_future.valid()) {
27!
156
            self->arrow_state->task_future.wait();
27✔
157
        }
27✔
158
        Py_END_ALLOW_THREADS self->arrow_state.reset();
27✔
159
    }
27✔
160
#endif
161
    if (self->json_dict_state) {
105✔
162
        Py_BEGIN_ALLOW_THREADS cancel_and_wait_json_dict_state(
14✔
163
            self->json_dict_state.get());
7✔
164
        Py_END_ALLOW_THREADS self->json_dict_state.reset();
7✔
165
    }
7✔
166
    if (self->batch_state) {
105✔
167
        Py_BEGIN_ALLOW_THREADS cancel_and_wait_batch_state(
142✔
168
            self->batch_state.get());
71✔
169
        Py_END_ALLOW_THREADS self->batch_state.reset();
71✔
170
    }
71✔
171
    Py_XDECREF(self->current_batch);
105✔
172
    self->current_batch = NULL;
105✔
173
    Py_TYPE(self)->tp_free((PyObject *)self);
105✔
174
}
105✔
175

176
static PyObject *TraceReaderIterator_iter(TraceReaderIteratorObject *self) {
103✔
177
    Py_INCREF(self);
103✔
178
    return (PyObject *)self;
103✔
179
}
180

181
static PyObject *TraceReaderIterator_next(TraceReaderIteratorObject *self) {
1,536✔
182
    if (self->mode == IteratorMode::JSON_DICT) {
1,536✔
183
        while (true) {
262✔
184
            if (self->json_dict_current_batch) {
262✔
185
                auto &events = self->json_dict_current_batch->events;
255✔
186
                Py_ssize_t n = static_cast<Py_ssize_t>(events.size());
255✔
187
                if (self->json_dict_index < n) {
255✔
188
                    JsonDictValueObject *obj =
240✔
189
                        (JsonDictValueObject *)JsonDictValueType.tp_alloc(
240✔
190
                            &JsonDictValueType, 0);
191
                    if (!obj) return NULL;
240!
192
                    new (&obj->batch) std::shared_ptr<JsonDictBatch>(
480✔
193
                        self->json_dict_current_batch);
240✔
194
                    obj->event_index =
240✔
195
                        static_cast<std::size_t>(self->json_dict_index);
240✔
196
                    obj->is_args = false;
240✔
197
                    self->json_dict_index++;
240✔
198
                    return (PyObject *)obj;
240✔
199
                }
200
                self->json_dict_current_batch.reset();
15✔
201
                self->json_dict_index = 0;
15✔
202
            }
15✔
203

204
            auto *js = self->json_dict_state.get();
22✔
205
            std::optional<JsonDictBatch> batch;
22✔
206
            Py_BEGIN_ALLOW_THREADS batch = js->channel->blocking_receive();
22!
207
            Py_END_ALLOW_THREADS
22!
208

209
                if (!batch.has_value()) {
22✔
210
                std::lock_guard<std::mutex> lock(js->error_mtx);
7!
211
                if (js->error) {
7!
212
                    try {
213
                        std::rethrow_exception(js->error);
×
214
                    } catch (const std::exception &e) {
×
215
                        PyErr_SetString(PyExc_RuntimeError, e.what());
×
216
                        return NULL;
×
217
                    } catch (...) {
×
218
                        PyErr_SetString(PyExc_RuntimeError,
×
219
                                        "Unknown error in json dict iterator");
220
                        return NULL;
×
221
                    }
×
222
                }
×
223
                return NULL;
7✔
224
            }
7✔
225

226
            auto dequeued_bytes = dftracer::utils::python::byte_size(*batch);
15!
227
            js->bytes_in_queue.fetch_sub(dequeued_bytes,
15✔
228
                                         std::memory_order_acq_rel);
229
            self->json_dict_current_batch =
15✔
230
                std::make_shared<JsonDictBatch>(std::move(*batch));
15!
231
            self->json_dict_index = 0;
15✔
232
        }
22✔
233
    }
234

235
#ifdef DFTRACER_UTILS_ENABLE_ARROW
236
    if (self->mode == IteratorMode::ARROW) {
1,289✔
237
        auto *astate = self->arrow_state.get();
71✔
238
        std::optional<ArrowExportResult> batch;
71✔
239
        Py_BEGIN_ALLOW_THREADS batch = astate->channel->blocking_receive();
71!
240
        Py_END_ALLOW_THREADS
71!
241

242
            if (!batch.has_value()) {
71✔
243
            std::lock_guard<std::mutex> lock(astate->error_mtx);
25!
244
            if (astate->error) {
25!
245
                try {
246
                    std::rethrow_exception(astate->error);
×
247
                } catch (const std::exception &e) {
×
248
                    PyErr_SetString(PyExc_RuntimeError, e.what());
×
249
                    return NULL;
×
250
                } catch (...) {
×
251
                    PyErr_SetString(PyExc_RuntimeError,
×
252
                                    "Unknown error in Arrow iterator");
253
                    return NULL;
×
254
                }
×
255
            }
×
256
            return NULL;
25✔
257
        }
25✔
258

259
        auto dequeued_bytes = dftracer::utils::python::byte_size(*batch);
46!
260
        astate->bytes_in_queue.fetch_sub(dequeued_bytes,
46✔
261
                                         std::memory_order_acq_rel);
262

263
        ArrowBatchCapsuleObject *obj =
46✔
264
            (ArrowBatchCapsuleObject *)ArrowBatchCapsuleType.tp_alloc(
46!
265
                &ArrowBatchCapsuleType, 0);
266
        if (!obj) return NULL;
46!
267
        obj->result = new ArrowExportResult(std::move(*batch));
46!
268
        return (PyObject *)obj;
46✔
269
    }
71✔
270
#endif
271

272
    using namespace dftracer::utils::python;
273
    while (true) {
1,470✔
274
        if (self->current_batch) {
1,470✔
275
            auto *batch_obj = (MemoryViewBatchObject *)self->current_batch;
1,401✔
276
            Py_ssize_t n =
1,401✔
277
                static_cast<Py_ssize_t>(batch_obj->data->num_entries());
1,401✔
278
            if (self->batch_index < n) {
1,401✔
279
                PyObject *mv =
1,151✔
280
                    MemoryViewBatch_item(batch_obj, self->batch_index);
1,151✔
281
                self->batch_index++;
1,151✔
282
                return mv;
1,151✔
283
            }
284
            Py_DECREF(self->current_batch);
250✔
285
            self->current_batch = NULL;
250✔
286
            self->batch_index = 0;
250✔
287
        }
250✔
288

289
        auto *bs = self->batch_state.get();
319✔
290
        std::optional<MemoryViewBatchData> batch_data;
319✔
291
        Py_BEGIN_ALLOW_THREADS batch_data = bs->channel->blocking_receive();
319!
292
        Py_END_ALLOW_THREADS
319!
293

294
            if (!batch_data.has_value()) {
319✔
295
            std::lock_guard<std::mutex> lock(bs->error_mtx);
67!
296
            if (bs->error) {
67✔
297
                try {
298
                    std::rethrow_exception(bs->error);
1!
299
                } catch (const std::exception &e) {
1!
300
                    PyErr_SetString(PyExc_RuntimeError, e.what());
1!
301
                    return NULL;
1✔
302
                } catch (...) {
1!
303
                    PyErr_SetString(PyExc_RuntimeError,
×
304
                                    "Unknown error in batch iterator");
305
                    return NULL;
×
306
                }
1!
307
            }
×
308
            return NULL;
66✔
309
        }
67✔
310

311
        auto dequeued_bytes = dftracer::utils::python::byte_size(*batch_data);
252!
312
        bs->bytes_in_queue.fetch_sub(dequeued_bytes, std::memory_order_acq_rel);
252✔
313

314
        auto *obj = (MemoryViewBatchObject *)MemoryViewBatchType.tp_alloc(
252!
315
            &MemoryViewBatchType, 0);
316
        if (!obj) return NULL;
252!
317
        obj->data = new MemoryViewBatchData(std::move(*batch_data));
252!
318
        self->current_batch = (PyObject *)obj;
252✔
319
        self->batch_index = 0;
252✔
320
    }
319✔
321
}
1,537✔
322

323
PyTypeObject TraceReaderIteratorType = {
324
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.TraceReaderIterator",
325
    sizeof(TraceReaderIteratorObject),       /* tp_basicsize */
326
    0,                                       /* tp_itemsize */
327
    (destructor)TraceReaderIterator_dealloc, /* tp_dealloc */
328
    0,                                       /* tp_vectorcall_offset */
329
    0,                                       /* tp_getattr */
330
    0,                                       /* tp_setattr */
331
    0,                                       /* tp_as_async */
332
    0,                                       /* tp_repr */
333
    0,                                       /* tp_as_number */
334
    0,                                       /* tp_as_sequence */
335
    0,                                       /* tp_as_mapping */
336
    0,                                       /* tp_hash */
337
    0,                                       /* tp_call */
338
    0,                                       /* tp_str */
339
    0,                                       /* tp_getattro */
340
    0,                                       /* tp_setattro */
341
    0,                                       /* tp_as_buffer */
342
    Py_TPFLAGS_DEFAULT,                      /* tp_flags */
343
    "Lazy iterator over TraceReader lines or raw chunks",
344
    0,                                       /* tp_traverse */
345
    0,                                       /* tp_clear */
346
    0,                                       /* tp_richcompare */
347
    0,                                       /* tp_weaklistoffset */
348
    (getiterfunc)TraceReaderIterator_iter,   /* tp_iter */
349
    (iternextfunc)TraceReaderIterator_next,  /* tp_iternext */
350
    0,                                       /* tp_methods */
351
    0,                                       /* tp_members */
352
    0,                                       /* tp_getset */
353
    0,                                       /* tp_base */
354
    0,                                       /* tp_dict */
355
    0,                                       /* tp_descr_get */
356
    0,                                       /* tp_descr_set */
357
    0,                                       /* tp_dictoffset */
358
    0,                                       /* tp_init */
359
    0,                                       /* tp_alloc */
360
    0,                                       /* tp_new */
361
};
362

363
int init_trace_reader_iterator(PyObject *m) {
1✔
364
    if (PyType_Ready(&TraceReaderIteratorType) < 0) return -1;
1!
365

366
    Py_INCREF(&TraceReaderIteratorType);
1✔
367
    if (PyModule_AddObject(m, "TraceReaderIterator",
2!
368
                           (PyObject *)&TraceReaderIteratorType) < 0) {
1✔
369
        Py_DECREF(&TraceReaderIteratorType);
×
370
        Py_DECREF(m);
×
371
        return -1;
×
372
    }
373

374
#ifdef DFTRACER_UTILS_ENABLE_ARROW
375
    if (PyType_Ready(&ArrowBatchCapsuleType) < 0) return -1;
1!
376
    Py_INCREF(&ArrowBatchCapsuleType);
1✔
377
    if (PyModule_AddObject(m, "_ArrowBatchCapsule",
2!
378
                           (PyObject *)&ArrowBatchCapsuleType) < 0) {
1✔
379
        Py_DECREF(&ArrowBatchCapsuleType);
×
380
        return -1;
×
381
    }
382
#endif
383

384
    return 0;
1✔
385
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc