• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28521653886

01 Jul 2026 01:36PM UTC coverage: 50.92% (-1.4%) from 52.278%
28521653886

Pull #83

github

web-flow
Merge 9bdedb1e9 into 2efed6649
Pull Request #83: refactor and improve code QoL

31893 of 80049 branches covered (39.84%)

Branch coverage included in aggregate %.

789 of 1613 new or added lines in 87 files covered. (48.92%)

5007 existing lines in 181 files now uncovered.

32812 of 47024 relevant lines covered (69.78%)

9905.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.0
/src/dftracer/utils/python/streaming_iterator.cpp
1
#include <dftracer/utils/core/common/config.h>
2
#include <dftracer/utils/python/py_errors.h>
3
#include <dftracer/utils/python/py_type_helpers.h>
4
#ifdef DFTRACER_UTILS_ENABLE_ARROW
5

6
#define PY_SSIZE_T_CLEAN
7
#include <Python.h>
8
#include <dftracer/utils/python/streaming_iterator.h>
9
#include <dftracer/utils/python/trace_reader_iterator.h>
10

11
namespace dftracer::utils::python {
12

13
static PyObject* ArrowStreamingIterator_new(PyTypeObject* type,
3✔
14
                                            PyObject* /*args*/,
15
                                            PyObject* /*kwds*/) {
16
    ArrowStreamingIteratorObject* self =
3✔
17
        (ArrowStreamingIteratorObject*)type->tp_alloc(type, 0);
3✔
18
    if (self) {
3!
19
        // Allocate C++ state separately to avoid layout issues
20
        self->cpp_state = new ArrowStreamingIteratorState();
3✔
21
    }
3✔
22
    return (PyObject*)self;
3✔
23
}
24

25
static void ArrowStreamingIterator_dealloc(ArrowStreamingIteratorObject* self) {
3✔
26
    if (self->cpp_state) {
3!
27
        // Cancel the stream if still running
28
        if (self->cpp_state->cancel) {
3!
29
            self->cpp_state->cancel();
3✔
30
        }
3✔
31
        delete self->cpp_state;
3!
32
        self->cpp_state = nullptr;
3✔
33
    }
3✔
34
    Py_TYPE(self)->tp_free((PyObject*)self);
3✔
35
}
3✔
36

37
static PyObject* ArrowStreamingIterator_iter(PyObject* self) {
3✔
38
    Py_INCREF(self);
3✔
39
    return self;
3✔
40
}
41

42
static PyObject* ArrowStreamingIterator_next(
8✔
43
    ArrowStreamingIteratorObject* self) {
44
    if (!self->cpp_state || !self->cpp_state->pull_next) {
8!
45
        PyErr_SetString(PyExc_RuntimeError, "Iterator not initialized");
×
46
        return NULL;
×
47
    }
48

49
    std::optional<ArrowExportResult> result;
8✔
50
    bool had_error = false;
8✔
51
    std::string error_msg;
8✔
52

53
    Py_BEGIN_ALLOW_THREADS try {
8!
54
        result = self->cpp_state->pull_next();
8!
55
    } catch (const std::exception& e) {
8!
56
        had_error = true;
×
57
        error_msg = e.what();
×
58
    } catch (...) {
×
59
        had_error = true;
×
60
        error_msg = "Unknown error in streaming iterator";
×
61
    }
×
62
    Py_END_ALLOW_THREADS
8!
63

64
        if (had_error) {
8!
65
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
66
        return NULL;
×
67
    }
68

69
    if (!result.has_value()) {
8✔
70
        // Check for error
71
        if (self->cpp_state->get_error) {
3!
72
            auto ex = self->cpp_state->get_error();
3!
73
            if (ex) {
3!
74
                try {
75
                    std::rethrow_exception(ex);
×
76
                } catch (const std::exception& e) {
×
NEW
77
                    set_typed_py_error(e);
×
78
                    return NULL;
×
79
                } catch (...) {
×
80
                    PyErr_SetString(PyExc_RuntimeError,
×
81
                                    "Unknown error in streaming iterator");
82
                    return NULL;
×
83
                }
×
UNCOV
84
            }
×
85
        }
3!
86
        // Normal completion
87
        return NULL;  // StopIteration
3✔
88
    }
89

90
    // Wrap the ArrowExportResult in an ArrowBatchCapsule
91
    ArrowBatchCapsuleObject* obj =
5✔
92
        (ArrowBatchCapsuleObject*)ArrowBatchCapsuleType.tp_alloc(
5!
93
            &ArrowBatchCapsuleType, 0);
94
    if (!obj) return NULL;
5!
95
    obj->result = new ArrowExportResult(std::move(*result));
5!
96
    return (PyObject*)obj;
5✔
97
}
8✔
98

99
static PyObject* ArrowStreamingIterator_cancel(
×
100
    ArrowStreamingIteratorObject* self, PyObject* Py_UNUSED(args)) {
101
    if (self->cpp_state && self->cpp_state->cancel) {
×
102
        self->cpp_state->cancel();
×
UNCOV
103
    }
×
104
    Py_RETURN_NONE;
×
105
}
106

107
static PyMethodDef ArrowStreamingIterator_methods[] = {
108
    {"cancel", (PyCFunction)ArrowStreamingIterator_cancel, METH_NOARGS,
109
     "Cancel the streaming iterator."},
110
    {NULL}};
111

112
PyTypeObject ArrowStreamingIteratorType = {
113
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext._ArrowStreamingIterator",
114
    sizeof(ArrowStreamingIteratorObject),       /* tp_basicsize */
115
    0,                                          /* tp_itemsize */
116
    (destructor)ArrowStreamingIterator_dealloc, /* tp_dealloc */
117
    0,                                          /* tp_vectorcall_offset */
118
    0,                                          /* tp_getattr */
119
    0,                                          /* tp_setattr */
120
    0,                                          /* tp_as_async */
121
    0,                                          /* tp_repr */
122
    0,                                          /* tp_as_number */
123
    0,                                          /* tp_as_sequence */
124
    0,                                          /* tp_as_mapping */
125
    0,                                          /* tp_hash */
126
    0,                                          /* tp_call */
127
    0,                                          /* tp_str */
128
    0,                                          /* tp_getattro */
129
    0,                                          /* tp_setattro */
130
    0,                                          /* tp_as_buffer */
131
    Py_TPFLAGS_DEFAULT,                         /* tp_flags */
132
    "Streaming Arrow batch iterator.\n\n"
133
    "Yields ArrowBatch objects as they become available from the C++ "
134
    "pipeline.\n"
135
    "Call cancel() to stop the stream early.", /* tp_doc */
136
    0,                                         /* tp_traverse */
137
    0,                                         /* tp_clear */
138
    0,                                         /* tp_richcompare */
139
    0,                                         /* tp_weaklistoffset */
140
    ArrowStreamingIterator_iter,               /* tp_iter */
141
    (iternextfunc)ArrowStreamingIterator_next, /* tp_iternext */
142
    ArrowStreamingIterator_methods,            /* tp_methods */
143
    0,                                         /* tp_members */
144
    0,                                         /* tp_getset */
145
    0,                                         /* tp_base */
146
    0,                                         /* tp_dict */
147
    0,                                         /* tp_descr_get */
148
    0,                                         /* tp_descr_set */
149
    0,                                         /* tp_dictoffset */
150
    0,                                         /* tp_init */
151
    0,                                         /* tp_alloc */
152
    ArrowStreamingIterator_new,                /* tp_new */
153
};
154

155
int init_arrow_streaming_iterator(PyObject* m) {
1✔
156
    if (register_type(m, &ArrowStreamingIteratorType,
1✔
157
                      "_ArrowStreamingIterator") < 0)
1!
UNCOV
158
        return -1;
×
159

160
    return 0;
1✔
161
}
1✔
162

163
}  // namespace dftracer::utils::python
164

165
#endif  // DFTRACER_UTILS_ENABLE_ARROW
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc