• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 27052412546

06 Jun 2026 04:20AM UTC coverage: 50.862% (+1.0%) from 49.905%
27052412546

Pull #73

github

web-flow
Merge 734572730 into 88a3c8457
Pull Request #73: add portable dependencies wheel support

31801 of 79859 branches covered (39.82%)

Branch coverage included in aggregate %.

32491 of 46545 relevant lines covered (69.81%)

9947.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.5
/src/dftracer/utils/python/streaming_iterator.cpp
1
#include <dftracer/utils/core/common/config.h>
2
#ifdef DFTRACER_UTILS_ENABLE_ARROW
3

4
#define PY_SSIZE_T_CLEAN
5
#include <Python.h>
6
#include <dftracer/utils/python/streaming_iterator.h>
7
#include <dftracer/utils/python/trace_reader_iterator.h>
8

9
namespace dftracer::utils::python {
10

11
static PyObject* ArrowStreamingIterator_new(PyTypeObject* type,
3✔
12
                                            PyObject* /*args*/,
13
                                            PyObject* /*kwds*/) {
14
    ArrowStreamingIteratorObject* self =
3✔
15
        (ArrowStreamingIteratorObject*)type->tp_alloc(type, 0);
3✔
16
    if (self) {
3!
17
        // Allocate C++ state separately to avoid layout issues
18
        self->cpp_state = new ArrowStreamingIteratorState();
3✔
19
    }
3✔
20
    return (PyObject*)self;
3✔
21
}
22

23
static void ArrowStreamingIterator_dealloc(ArrowStreamingIteratorObject* self) {
3✔
24
    if (self->cpp_state) {
3!
25
        // Cancel the stream if still running
26
        if (self->cpp_state->cancel) {
3!
27
            self->cpp_state->cancel();
3✔
28
        }
3✔
29
        delete self->cpp_state;
3!
30
        self->cpp_state = nullptr;
3✔
31
    }
3✔
32
    Py_TYPE(self)->tp_free((PyObject*)self);
3✔
33
}
3✔
34

35
static PyObject* ArrowStreamingIterator_iter(PyObject* self) {
3✔
36
    Py_INCREF(self);
3✔
37
    return self;
3✔
38
}
39

40
static PyObject* ArrowStreamingIterator_next(
8✔
41
    ArrowStreamingIteratorObject* self) {
42
    if (!self->cpp_state || !self->cpp_state->pull_next) {
8!
43
        PyErr_SetString(PyExc_RuntimeError, "Iterator not initialized");
×
44
        return NULL;
×
45
    }
46

47
    std::optional<ArrowExportResult> result;
8✔
48
    bool had_error = false;
8✔
49
    std::string error_msg;
8✔
50

51
    Py_BEGIN_ALLOW_THREADS try {
8!
52
        result = self->cpp_state->pull_next();
8!
53
    } catch (const std::exception& e) {
8!
54
        had_error = true;
×
55
        error_msg = e.what();
×
56
    } catch (...) {
×
57
        had_error = true;
×
58
        error_msg = "Unknown error in streaming iterator";
×
59
    }
×
60
    Py_END_ALLOW_THREADS
8!
61

62
        if (had_error) {
8!
63
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
64
        return NULL;
×
65
    }
66

67
    if (!result.has_value()) {
8✔
68
        // Check for error
69
        if (self->cpp_state->get_error) {
3!
70
            auto ex = self->cpp_state->get_error();
3!
71
            if (ex) {
3!
72
                try {
73
                    std::rethrow_exception(ex);
×
74
                } catch (const std::exception& e) {
×
75
                    PyErr_SetString(PyExc_RuntimeError, e.what());
×
76
                    return NULL;
×
77
                } catch (...) {
×
78
                    PyErr_SetString(PyExc_RuntimeError,
×
79
                                    "Unknown error in streaming iterator");
80
                    return NULL;
×
81
                }
×
82
            }
×
83
        }
3!
84
        // Normal completion
85
        return NULL;  // StopIteration
3✔
86
    }
87

88
    // Wrap the ArrowExportResult in an ArrowBatchCapsule
89
    ArrowBatchCapsuleObject* obj =
5✔
90
        (ArrowBatchCapsuleObject*)ArrowBatchCapsuleType.tp_alloc(
5!
91
            &ArrowBatchCapsuleType, 0);
92
    if (!obj) return NULL;
5!
93
    obj->result = new ArrowExportResult(std::move(*result));
5!
94
    return (PyObject*)obj;
5✔
95
}
8✔
96

97
static PyObject* ArrowStreamingIterator_cancel(
×
98
    ArrowStreamingIteratorObject* self, PyObject* Py_UNUSED(args)) {
99
    if (self->cpp_state && self->cpp_state->cancel) {
×
100
        self->cpp_state->cancel();
×
101
    }
×
102
    Py_RETURN_NONE;
×
103
}
104

105
static PyMethodDef ArrowStreamingIterator_methods[] = {
106
    {"cancel", (PyCFunction)ArrowStreamingIterator_cancel, METH_NOARGS,
107
     "Cancel the streaming iterator."},
108
    {NULL}};
109

110
PyTypeObject ArrowStreamingIteratorType = {
111
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext._ArrowStreamingIterator",
112
    sizeof(ArrowStreamingIteratorObject),       /* tp_basicsize */
113
    0,                                          /* tp_itemsize */
114
    (destructor)ArrowStreamingIterator_dealloc, /* tp_dealloc */
115
    0,                                          /* tp_vectorcall_offset */
116
    0,                                          /* tp_getattr */
117
    0,                                          /* tp_setattr */
118
    0,                                          /* tp_as_async */
119
    0,                                          /* tp_repr */
120
    0,                                          /* tp_as_number */
121
    0,                                          /* tp_as_sequence */
122
    0,                                          /* tp_as_mapping */
123
    0,                                          /* tp_hash */
124
    0,                                          /* tp_call */
125
    0,                                          /* tp_str */
126
    0,                                          /* tp_getattro */
127
    0,                                          /* tp_setattro */
128
    0,                                          /* tp_as_buffer */
129
    Py_TPFLAGS_DEFAULT,                         /* tp_flags */
130
    "Streaming Arrow batch iterator.\n\n"
131
    "Yields ArrowBatch objects as they become available from the C++ "
132
    "pipeline.\n"
133
    "Call cancel() to stop the stream early.", /* tp_doc */
134
    0,                                         /* tp_traverse */
135
    0,                                         /* tp_clear */
136
    0,                                         /* tp_richcompare */
137
    0,                                         /* tp_weaklistoffset */
138
    ArrowStreamingIterator_iter,               /* tp_iter */
139
    (iternextfunc)ArrowStreamingIterator_next, /* tp_iternext */
140
    ArrowStreamingIterator_methods,            /* tp_methods */
141
    0,                                         /* tp_members */
142
    0,                                         /* tp_getset */
143
    0,                                         /* tp_base */
144
    0,                                         /* tp_dict */
145
    0,                                         /* tp_descr_get */
146
    0,                                         /* tp_descr_set */
147
    0,                                         /* tp_dictoffset */
148
    0,                                         /* tp_init */
149
    0,                                         /* tp_alloc */
150
    ArrowStreamingIterator_new,                /* tp_new */
151
};
152

153
int init_arrow_streaming_iterator(PyObject* m) {
1✔
154
    if (PyType_Ready(&ArrowStreamingIteratorType) < 0) return -1;
1!
155

156
    Py_INCREF(&ArrowStreamingIteratorType);
1✔
157
    if (PyModule_AddObject(m, "_ArrowStreamingIterator",
2!
158
                           (PyObject*)&ArrowStreamingIteratorType) < 0) {
1✔
159
        Py_DECREF(&ArrowStreamingIteratorType);
×
160
        return -1;
×
161
    }
162

163
    return 0;
1✔
164
}
1✔
165

166
}  // namespace dftracer::utils::python
167

168
#endif  // DFTRACER_UTILS_ENABLE_ARROW
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc