• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28423703495

30 Jun 2026 05:59AM UTC coverage: 51.998% (-0.3%) from 52.278%
28423703495

Pull #83

github

web-flow
Merge fb542a938 into 2efed6649
Pull Request #83: refactor and improve code QoL

37282 of 93303 branches covered (39.96%)

Branch coverage included in aggregate %.

801 of 1525 new or added lines in 78 files covered. (52.52%)

98 existing lines in 37 files now uncovered.

33674 of 43157 relevant lines covered (78.03%)

20306.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.64
/src/dftracer/utils/python/streaming_iterator.cpp
1
#include <dftracer/utils/core/common/config.h>
2
#include <dftracer/utils/python/py_type_helpers.h>
3
#ifdef DFTRACER_UTILS_ENABLE_ARROW
4

5
#define PY_SSIZE_T_CLEAN
6
#include <Python.h>
7
#include <dftracer/utils/python/streaming_iterator.h>
8
#include <dftracer/utils/python/trace_reader_iterator.h>
9

10
namespace dftracer::utils::python {
11

12
static PyObject* ArrowStreamingIterator_new(PyTypeObject* type,
6✔
13
                                            PyObject* /*args*/,
14
                                            PyObject* /*kwds*/) {
15
    ArrowStreamingIteratorObject* self =
3✔
16
        (ArrowStreamingIteratorObject*)type->tp_alloc(type, 0);
6✔
17
    if (self) {
6✔
18
        // Allocate C++ state separately to avoid layout issues
19
        self->cpp_state = new ArrowStreamingIteratorState();
6✔
20
    }
3✔
21
    return (PyObject*)self;
6✔
22
}
23

24
static void ArrowStreamingIterator_dealloc(ArrowStreamingIteratorObject* self) {
6✔
25
    if (self->cpp_state) {
6✔
26
        // Cancel the stream if still running
27
        if (self->cpp_state->cancel) {
6✔
28
            self->cpp_state->cancel();
6✔
29
        }
3✔
30
        delete self->cpp_state;
6✔
31
        self->cpp_state = nullptr;
6✔
32
    }
3✔
33
    Py_TYPE(self)->tp_free((PyObject*)self);
6✔
34
}
6✔
35

36
static PyObject* ArrowStreamingIterator_iter(PyObject* self) {
6!
37
    Py_INCREF(self);
3✔
38
    return self;
6✔
39
}
40

41
static PyObject* ArrowStreamingIterator_next(
16✔
42
    ArrowStreamingIteratorObject* self) {
43
    if (!self->cpp_state || !self->cpp_state->pull_next) {
16!
44
        PyErr_SetString(PyExc_RuntimeError, "Iterator not initialized");
×
45
        return NULL;
×
46
    }
47

48
    std::optional<ArrowExportResult> result;
16✔
49
    bool had_error = false;
16✔
50
    std::string error_msg;
16✔
51

52
    Py_BEGIN_ALLOW_THREADS try {
16!
53
        result = self->cpp_state->pull_next();
16!
54
    } catch (const std::exception& e) {
8!
55
        had_error = true;
×
56
        error_msg = e.what();
×
57
    } catch (...) {
×
58
        had_error = true;
×
59
        error_msg = "Unknown error in streaming iterator";
×
60
    }
×
61
    Py_END_ALLOW_THREADS
16!
62

63
        if (had_error) {
16!
64
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
65
        return NULL;
×
66
    }
67

68
    if (!result.has_value()) {
16✔
69
        // Check for error
70
        if (self->cpp_state->get_error) {
6!
71
            auto ex = self->cpp_state->get_error();
6!
72
            if (ex) {
6✔
73
                try {
74
                    std::rethrow_exception(ex);
×
75
                } catch (const std::exception& e) {
×
76
                    PyErr_SetString(PyExc_RuntimeError, e.what());
×
77
                    return NULL;
×
78
                } catch (...) {
×
79
                    PyErr_SetString(PyExc_RuntimeError,
×
80
                                    "Unknown error in streaming iterator");
81
                    return NULL;
×
82
                }
×
83
            }
84
        }
6✔
85
        // Normal completion
86
        return NULL;  // StopIteration
6✔
87
    }
88

89
    // Wrap the ArrowExportResult in an ArrowBatchCapsule
90
    ArrowBatchCapsuleObject* obj =
5✔
91
        (ArrowBatchCapsuleObject*)ArrowBatchCapsuleType.tp_alloc(
10!
92
            &ArrowBatchCapsuleType, 0);
93
    if (!obj) return NULL;
10✔
94
    obj->result = new ArrowExportResult(std::move(*result));
10!
95
    return (PyObject*)obj;
10✔
96
}
16✔
97

98
static PyObject* ArrowStreamingIterator_cancel(
×
99
    ArrowStreamingIteratorObject* self, PyObject* Py_UNUSED(args)) {
100
    if (self->cpp_state && self->cpp_state->cancel) {
×
101
        self->cpp_state->cancel();
×
102
    }
103
    Py_RETURN_NONE;
×
104
}
105

106
static PyMethodDef ArrowStreamingIterator_methods[] = {
107
    {"cancel", (PyCFunction)ArrowStreamingIterator_cancel, METH_NOARGS,
108
     "Cancel the streaming iterator."},
109
    {NULL}};
110

111
PyTypeObject ArrowStreamingIteratorType = {
112
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext._ArrowStreamingIterator",
113
    sizeof(ArrowStreamingIteratorObject),       /* tp_basicsize */
114
    0,                                          /* tp_itemsize */
115
    (destructor)ArrowStreamingIterator_dealloc, /* tp_dealloc */
116
    0,                                          /* tp_vectorcall_offset */
117
    0,                                          /* tp_getattr */
118
    0,                                          /* tp_setattr */
119
    0,                                          /* tp_as_async */
120
    0,                                          /* tp_repr */
121
    0,                                          /* tp_as_number */
122
    0,                                          /* tp_as_sequence */
123
    0,                                          /* tp_as_mapping */
124
    0,                                          /* tp_hash */
125
    0,                                          /* tp_call */
126
    0,                                          /* tp_str */
127
    0,                                          /* tp_getattro */
128
    0,                                          /* tp_setattro */
129
    0,                                          /* tp_as_buffer */
130
    Py_TPFLAGS_DEFAULT,                         /* tp_flags */
131
    "Streaming Arrow batch iterator.\n\n"
132
    "Yields ArrowBatch objects as they become available from the C++ "
133
    "pipeline.\n"
134
    "Call cancel() to stop the stream early.", /* tp_doc */
135
    0,                                         /* tp_traverse */
136
    0,                                         /* tp_clear */
137
    0,                                         /* tp_richcompare */
138
    0,                                         /* tp_weaklistoffset */
139
    ArrowStreamingIterator_iter,               /* tp_iter */
140
    (iternextfunc)ArrowStreamingIterator_next, /* tp_iternext */
141
    ArrowStreamingIterator_methods,            /* tp_methods */
142
    0,                                         /* tp_members */
143
    0,                                         /* tp_getset */
144
    0,                                         /* tp_base */
145
    0,                                         /* tp_dict */
146
    0,                                         /* tp_descr_get */
147
    0,                                         /* tp_descr_set */
148
    0,                                         /* tp_dictoffset */
149
    0,                                         /* tp_init */
150
    0,                                         /* tp_alloc */
151
    ArrowStreamingIterator_new,                /* tp_new */
152
};
153

154
int init_arrow_streaming_iterator(PyObject* m) {
2✔
155
    if (register_type(m, &ArrowStreamingIteratorType,
2✔
156
                      "_ArrowStreamingIterator") < 0)
2✔
UNCOV
157
        return -1;
×
158

159
    return 0;
2✔
160
}
1✔
161

162
}  // namespace dftracer::utils::python
163

164
#endif  // DFTRACER_UTILS_ENABLE_ARROW
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc