• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 23531027933

25 Mar 2026 08:05AM UTC coverage: 48.592% (-1.5%) from 50.098%
23531027933

Pull #57

github

web-flow
Merge d1070e289 into 38f9f3616
Pull Request #57: feat(comparator): add pairwise traces comparator

18900 of 49456 branches covered (38.22%)

Branch coverage included in aggregate %.

1604 of 1954 new or added lines in 25 files covered. (82.09%)

3407 existing lines in 135 files now uncovered.

18487 of 27485 relevant lines covered (67.26%)

240991.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.48
/src/dftracer/utils/python/trace_reader_iterator.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <Python.h>
3
#include <dftracer/utils/core/utils/string.h>
4
#include <dftracer/utils/python/json.h>
5
#include <dftracer/utils/python/trace_reader_iterator.h>
6

7
#ifdef DFTRACER_UTILS_ENABLE_ARROW
8
#include <nanoarrow/nanoarrow.h>
9

10
using ArrowExportResult =
11
    dftracer::utils::utilities::common::arrow::ArrowExportResult;
12

13
static void release_arrow_schema(PyObject *capsule) {
5✔
14
    auto *schema = static_cast<ArrowSchema *>(
5✔
15
        PyCapsule_GetPointer(capsule, "arrow_schema"));
5✔
16
    if (schema && schema->release) {
5!
17
        schema->release(schema);
×
UNCOV
18
    }
×
19
    delete schema;
5!
20
}
5✔
21

22
static void release_arrow_array(PyObject *capsule) {
5✔
23
    auto *array =
5✔
24
        static_cast<ArrowArray *>(PyCapsule_GetPointer(capsule, "arrow_array"));
5✔
25
    if (array && array->release) {
5!
26
        array->release(array);
×
UNCOV
27
    }
×
28
    delete array;
5!
29
}
5✔
30

31
static PyObject *ArrowBatchCapsule_arrow_c_array(ArrowBatchCapsuleObject *self,
5✔
32
                                                 PyObject *args) {
33
    PyObject *requested_schema = Py_None;
5✔
34
    if (!PyArg_ParseTuple(args, "|O", &requested_schema)) return NULL;
5!
35

36
    if (!self->result || !self->result->valid()) {
5!
37
        PyErr_SetString(PyExc_RuntimeError,
×
38
                        "Arrow data already exported via __arrow_c_array__. "
39
                        "Each batch can only be exported once.");
40
        return NULL;
×
41
    }
42

43
    auto *schema = new ArrowSchema;
5✔
44
    auto *array = new ArrowArray;
5✔
45

46
    self->result->release_schema().move(schema);
5!
47
    self->result->release_array().move(array);
5!
48

49
    PyObject *schema_capsule =
5✔
50
        PyCapsule_New(schema, "arrow_schema", release_arrow_schema);
5✔
51
    if (!schema_capsule) {
5!
52
        if (schema->release) schema->release(schema);
×
53
        delete schema;
×
54
        if (array->release) array->release(array);
×
55
        delete array;
×
56
        return NULL;
×
57
    }
58

59
    PyObject *array_capsule =
5✔
60
        PyCapsule_New(array, "arrow_array", release_arrow_array);
5✔
61
    if (!array_capsule) {
5!
UNCOV
62
        Py_DECREF(schema_capsule);
×
63
        if (array->release) array->release(array);
×
64
        delete array;
×
65
        return NULL;
×
66
    }
67

68
    PyObject *tuple = PyTuple_Pack(2, schema_capsule, array_capsule);
5✔
69
    Py_DECREF(schema_capsule);
5✔
70
    Py_DECREF(array_capsule);
5✔
71
    return tuple;
5✔
72
}
5✔
73

74
static PyObject *ArrowBatchCapsule_get_num_rows(ArrowBatchCapsuleObject *self,
23✔
75
                                                void *) {
76
    if (!self->result || !self->result->valid()) return PyLong_FromLong(0);
23!
77
    return PyLong_FromLongLong(self->result->num_rows());
23✔
78
}
23✔
79

80
static PyObject *ArrowBatchCapsule_get_num_columns(
3✔
81
    ArrowBatchCapsuleObject *self, void *) {
82
    if (!self->result || !self->result->valid()) return PyLong_FromLong(0);
3!
83
    return PyLong_FromLongLong(self->result->num_columns());
3✔
84
}
3✔
85

86
static void ArrowBatchCapsule_dealloc(ArrowBatchCapsuleObject *self) {
35✔
87
    delete self->result;
35!
88
    Py_TYPE(self)->tp_free((PyObject *)self);
35✔
89
}
35✔
90

91
static PyMethodDef ArrowBatchCapsule_methods[] = {
92
    {"__arrow_c_array__", (PyCFunction)ArrowBatchCapsule_arrow_c_array,
93
     METH_VARARGS,
94
     "Export as Arrow C Data Interface PyCapsule pair (schema, array)"},
95
    {NULL}};
96

97
static PyGetSetDef ArrowBatchCapsule_getsetters[] = {
98
    {"num_rows", (getter)ArrowBatchCapsule_get_num_rows, NULL, "Number of rows",
99
     NULL},
100
    {"num_columns", (getter)ArrowBatchCapsule_get_num_columns, NULL,
101
     "Number of columns", NULL},
102
    {NULL}};
103

104
PyTypeObject ArrowBatchCapsuleType = {
105
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext._ArrowBatchCapsule",
106
    sizeof(ArrowBatchCapsuleObject),       /* tp_basicsize */
107
    0,                                     /* tp_itemsize */
108
    (destructor)ArrowBatchCapsule_dealloc, /* tp_dealloc */
109
    0,                                     /* tp_vectorcall_offset */
110
    0,                                     /* tp_getattr */
111
    0,                                     /* tp_setattr */
112
    0,                                     /* tp_as_async */
113
    0,                                     /* tp_repr */
114
    0,                                     /* tp_as_number */
115
    0,                                     /* tp_as_sequence */
116
    0,                                     /* tp_as_mapping */
117
    0,                                     /* tp_hash */
118
    0,                                     /* tp_call */
119
    0,                                     /* tp_str */
120
    0,                                     /* tp_getattro */
121
    0,                                     /* tp_setattro */
122
    0,                                     /* tp_as_buffer */
123
    Py_TPFLAGS_DEFAULT,                    /* tp_flags */
124
    "Internal Arrow batch wrapper implementing __arrow_c_array__ protocol",
125
    0,                                     /* tp_traverse */
126
    0,                                     /* tp_clear */
127
    0,                                     /* tp_richcompare */
128
    0,                                     /* tp_weaklistoffset */
129
    0,                                     /* tp_iter */
130
    0,                                     /* tp_iternext */
131
    ArrowBatchCapsule_methods,             /* tp_methods */
132
    0,                                     /* tp_members */
133
    ArrowBatchCapsule_getsetters,          /* tp_getset */
134
};
135

136
#endif                                     // DFTRACER_UTILS_ENABLE_ARROW
137

138
static void TraceReaderIterator_dealloc(TraceReaderIteratorObject *self) {
90✔
139
#ifdef DFTRACER_UTILS_ENABLE_ARROW
140
    if (self->arrow_state) {
90✔
141
        self->arrow_state->cancelled.store(true, std::memory_order_release);
14✔
142
        self->arrow_state->cv_producer.notify_all();
14✔
143
        self->arrow_state->cv_consumer.notify_all();  // wake blocked __next__
14✔
144
        self->arrow_state.reset();
14✔
145
    }
14✔
146
#endif
147
    if (self->state) {
90✔
148
        self->state->cancelled.store(true, std::memory_order_release);
76✔
149
        self->state->cv_producer.notify_all();
76✔
150
        self->state->cv_consumer.notify_all();  // wake blocked __next__
76✔
151
        self->state.reset();
76✔
152
    }
76✔
153
    Py_TYPE(self)->tp_free((PyObject *)self);
90✔
154
}
90✔
155

156
static PyObject *TraceReaderIterator_iter(TraceReaderIteratorObject *self) {
87✔
157
    Py_INCREF(self);
87✔
158
    return (PyObject *)self;
87✔
159
}
160

161
static PyObject *TraceReaderIterator_next(TraceReaderIteratorObject *self) {
1,239✔
162
#ifdef DFTRACER_UTILS_ENABLE_ARROW
163
    if (self->mode == IteratorMode::ARROW) {
1,239✔
164
        auto *astate = self->arrow_state.get();
35✔
165
        ArrowIteratorState::BatchItem batch;
35✔
166
        bool cancelled = false;
35✔
167
        {
168
            Py_BEGIN_ALLOW_THREADS std::unique_lock<std::mutex> lock(
35!
169
                astate->mtx);
35✔
170
            astate->cv_consumer.wait(lock, [astate] {
92!
171
                return !astate->queue.empty() ||
79✔
172
                       astate->cancelled.load(std::memory_order_acquire) ||
22!
173
                       astate->done.load(std::memory_order_acquire);
22✔
174
            });
175
            cancelled = astate->cancelled.load(std::memory_order_acquire) &&
35!
176
                        astate->queue.empty();
×
177
            if (!cancelled) {
35!
178
                batch = std::move(astate->queue.front());
35!
179
                astate->queue.pop();
35!
180
            }
35✔
181
            Py_END_ALLOW_THREADS
35!
182
        }
183
        if (cancelled) return NULL;  // StopIteration
35!
184
        astate->cv_producer.notify_one();
35✔
185

186
        if (!batch.has_value()) {
35✔
187
            if (astate->error) {
14!
188
                try {
189
                    std::rethrow_exception(astate->error);
×
190
                } catch (const std::exception &e) {
×
191
                    PyErr_SetString(PyExc_RuntimeError, e.what());
×
192
                    return NULL;
×
193
                } catch (...) {
×
194
                    PyErr_SetString(PyExc_RuntimeError,
×
195
                                    "Unknown error in Arrow iterator");
196
                    return NULL;
×
197
                }
×
UNCOV
198
            }
×
199
            return NULL;  // StopIteration
14✔
200
        }
201

202
        ArrowBatchCapsuleObject *obj =
21✔
203
            (ArrowBatchCapsuleObject *)ArrowBatchCapsuleType.tp_alloc(
21!
204
                &ArrowBatchCapsuleType, 0);
205
        if (!obj) return NULL;
21!
206
        obj->result = new ArrowExportResult(std::move(*batch));
21!
207
        return (PyObject *)obj;
21✔
208
    }
35✔
209
#endif
210

211
    auto *state = self->state.get();
1,204✔
212

213
    // Loop to skip non-JSON lines without recursion (avoids stack overflow
214
    // on files with many delimiter lines like "[" and "]").
215
    while (true) {
1,204✔
216
        std::optional<std::string> item;
1,219✔
217
        bool cancelled = false;
1,219✔
218

219
        {
220
            Py_BEGIN_ALLOW_THREADS std::unique_lock<std::mutex> lock(
1,219!
221
                state->mtx);
1,219✔
222
            state->cv_consumer.wait(lock, [state] {
3,085!
223
                return !state->queue.empty() ||
2,513✔
224
                       state->cancelled.load(std::memory_order_acquire) ||
647!
225
                       state->done.load(std::memory_order_acquire);
647✔
226
            });
227
            cancelled = state->cancelled.load(std::memory_order_acquire) &&
1,219!
228
                        state->queue.empty();
×
229
            if (!cancelled) {
1,219!
230
                item = std::move(state->queue.front());
1,219!
231
                state->queue.pop();
1,219!
232
            }
1,219✔
233
            Py_END_ALLOW_THREADS
1,219!
234
        }
235
        if (cancelled) return NULL;  // StopIteration
1,219!
236
        state->cv_producer.notify_one();
1,219✔
237

238
        if (!item.has_value()) {
1,219✔
239
            if (state->error) {
71✔
240
                try {
241
                    std::rethrow_exception(state->error);
1!
242
                } catch (const std::exception &e) {
1!
243
                    PyErr_SetString(PyExc_RuntimeError, e.what());
1!
244
                    return NULL;
1✔
245
                } catch (...) {
1!
246
                    PyErr_SetString(PyExc_RuntimeError,
×
247
                                    "Unknown error in TraceReaderIterator");
248
                    return NULL;
×
249
                }
1!
UNCOV
250
            }
×
251
            return NULL;  // StopIteration
70✔
252
        }
253

254
        switch (self->mode) {
1,148!
255
            case IteratorMode::LINES:
256
                return PyUnicode_FromStringAndSize(
837!
257
                    item->data(), static_cast<Py_ssize_t>(item->size()));
837✔
258
            case IteratorMode::JSON: {
259
                const char *trimmed;
260
                std::size_t trimmed_length;
261
                if (!dftracer::utils::json_trim_and_validate(
177!
262
                        item->data(), item->size(), trimmed, trimmed_length)) {
177✔
263
                    continue;  // skip non-JSON delimiter lines
15✔
264
                }
265
                PyObject *json_obj = JSON_from_data(trimmed, trimmed_length);
162!
266
                if (!json_obj) {
162!
267
                    PyErr_Clear();
×
268
                    continue;  // skip unparseable lines
×
269
                }
270
                return json_obj;
162✔
271
            }
272
            case IteratorMode::RAW:
134✔
273
            default:
274
                return PyBytes_FromStringAndSize(
134!
275
                    item->data(), static_cast<Py_ssize_t>(item->size()));
134✔
276
        }
277
    }
1,219!
278
}
1,240✔
279

280
PyTypeObject TraceReaderIteratorType = {
281
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.TraceReaderIterator",
282
    sizeof(TraceReaderIteratorObject),       /* tp_basicsize */
283
    0,                                       /* tp_itemsize */
284
    (destructor)TraceReaderIterator_dealloc, /* tp_dealloc */
285
    0,                                       /* tp_vectorcall_offset */
286
    0,                                       /* tp_getattr */
287
    0,                                       /* tp_setattr */
288
    0,                                       /* tp_as_async */
289
    0,                                       /* tp_repr */
290
    0,                                       /* tp_as_number */
291
    0,                                       /* tp_as_sequence */
292
    0,                                       /* tp_as_mapping */
293
    0,                                       /* tp_hash */
294
    0,                                       /* tp_call */
295
    0,                                       /* tp_str */
296
    0,                                       /* tp_getattro */
297
    0,                                       /* tp_setattro */
298
    0,                                       /* tp_as_buffer */
299
    Py_TPFLAGS_DEFAULT,                      /* tp_flags */
300
    "Lazy iterator over TraceReader lines or raw chunks", /* tp_doc */
301
    0,                                                    /* tp_traverse */
302
    0,                                                    /* tp_clear */
303
    0,                                                    /* tp_richcompare */
304
    0,                                      /* tp_weaklistoffset */
305
    (getiterfunc)TraceReaderIterator_iter,  /* tp_iter */
306
    (iternextfunc)TraceReaderIterator_next, /* tp_iternext */
307
    0,                                      /* tp_methods */
308
    0,                                      /* tp_members */
309
    0,                                      /* tp_getset */
310
    0,                                      /* tp_base */
311
    0,                                      /* tp_dict */
312
    0,                                      /* tp_descr_get */
313
    0,                                      /* tp_descr_set */
314
    0,                                      /* tp_dictoffset */
315
    0,                                      /* tp_init */
316
    0,                                      /* tp_alloc */
317
    0,                                      /* tp_new */
318
};
319

320
int init_trace_reader_iterator(PyObject *m) {
1✔
321
    if (PyType_Ready(&TraceReaderIteratorType) < 0) return -1;
1!
322

323
    Py_INCREF(&TraceReaderIteratorType);
1✔
324
    if (PyModule_AddObject(m, "TraceReaderIterator",
2!
325
                           (PyObject *)&TraceReaderIteratorType) < 0) {
1✔
UNCOV
326
        Py_DECREF(&TraceReaderIteratorType);
×
UNCOV
327
        Py_DECREF(m);
×
328
        return -1;
×
329
    }
330

331
#ifdef DFTRACER_UTILS_ENABLE_ARROW
332
    if (PyType_Ready(&ArrowBatchCapsuleType) < 0) return -1;
1!
333
    Py_INCREF(&ArrowBatchCapsuleType);
1✔
334
    if (PyModule_AddObject(m, "_ArrowBatchCapsule",
2!
335
                           (PyObject *)&ArrowBatchCapsuleType) < 0) {
1✔
UNCOV
336
        Py_DECREF(&ArrowBatchCapsuleType);
×
337
        return -1;
×
338
    }
339
#endif
340

341
    return 0;
1✔
342
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc