• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28398085302

29 Jun 2026 07:43PM UTC coverage: 50.067% (-2.2%) from 52.278%
28398085302

Pull #83

github

web-flow
Merge 9a615085e into 2efed6649
Pull Request #83: refactor and improve code QoL

16342 of 44293 branches covered (36.9%)

Branch coverage included in aggregate %.

613 of 1132 new or added lines in 52 files covered. (54.15%)

687 existing lines in 116 files now uncovered.

21698 of 31685 relevant lines covered (68.48%)

12958.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.67
/src/dftracer/utils/python/streaming_iterator.cpp
1
#include <dftracer/utils/core/common/config.h>
2
#include <dftracer/utils/python/py_type_helpers.h>
3
#ifdef DFTRACER_UTILS_ENABLE_ARROW
4

5
#define PY_SSIZE_T_CLEAN
6
#include <Python.h>
7
#include <dftracer/utils/python/streaming_iterator.h>
8
#include <dftracer/utils/python/trace_reader_iterator.h>
9

10
namespace dftracer::utils::python {
11

12
static PyObject* ArrowStreamingIterator_new(PyTypeObject* type,
3✔
13
                                            PyObject* /*args*/,
14
                                            PyObject* /*kwds*/) {
15
    ArrowStreamingIteratorObject* self =
16
        (ArrowStreamingIteratorObject*)type->tp_alloc(type, 0);
3✔
17
    if (self) {
3!
18
        // Allocate C++ state separately to avoid layout issues
19
        self->cpp_state = new ArrowStreamingIteratorState();
3✔
20
    }
21
    return (PyObject*)self;
3✔
22
}
23

24
static void ArrowStreamingIterator_dealloc(ArrowStreamingIteratorObject* self) {
3✔
25
    if (self->cpp_state) {
3!
26
        // Cancel the stream if still running
27
        if (self->cpp_state->cancel) {
3!
28
            self->cpp_state->cancel();
3✔
29
        }
30
        delete self->cpp_state;
3!
31
        self->cpp_state = nullptr;
3✔
32
    }
33
    Py_TYPE(self)->tp_free((PyObject*)self);
3✔
34
}
3✔
35

36
static PyObject* ArrowStreamingIterator_iter(PyObject* self) {
3!
37
    Py_INCREF(self);
38
    return self;
3✔
39
}
40

41
static PyObject* ArrowStreamingIterator_next(
8✔
42
    ArrowStreamingIteratorObject* self) {
43
    if (!self->cpp_state || !self->cpp_state->pull_next) {
8!
44
        PyErr_SetString(PyExc_RuntimeError, "Iterator not initialized");
×
45
        return NULL;
×
46
    }
47

48
    std::optional<ArrowExportResult> result;
8✔
49
    bool had_error = false;
8✔
50
    std::string error_msg;
8✔
51

52
    Py_BEGIN_ALLOW_THREADS try {
8!
53
        result = self->cpp_state->pull_next();
8!
UNCOV
54
    } catch (const std::exception& e) {
×
55
        had_error = true;
×
56
        error_msg = e.what();
×
57
    } catch (...) {
×
58
        had_error = true;
×
59
        error_msg = "Unknown error in streaming iterator";
×
60
    }
×
61
    Py_END_ALLOW_THREADS
8!
62

63
        if (had_error) {
8!
64
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
65
        return NULL;
×
66
    }
67

68
    if (!result.has_value()) {
8✔
69
        // Check for error
70
        if (self->cpp_state->get_error) {
3!
71
            auto ex = self->cpp_state->get_error();
3!
72
            if (ex) {
3!
73
                try {
74
                    std::rethrow_exception(ex);
×
75
                } catch (const std::exception& e) {
×
76
                    PyErr_SetString(PyExc_RuntimeError, e.what());
×
77
                    return NULL;
×
78
                } catch (...) {
×
79
                    PyErr_SetString(PyExc_RuntimeError,
×
80
                                    "Unknown error in streaming iterator");
81
                    return NULL;
×
82
                }
×
83
            }
84
        }
3!
85
        // Normal completion
86
        return NULL;  // StopIteration
3✔
87
    }
88

89
    // Wrap the ArrowExportResult in an ArrowBatchCapsule
90
    ArrowBatchCapsuleObject* obj =
91
        (ArrowBatchCapsuleObject*)ArrowBatchCapsuleType.tp_alloc(
5!
92
            &ArrowBatchCapsuleType, 0);
93
    if (!obj) return NULL;
5!
94
    obj->result = new ArrowExportResult(std::move(*result));
5!
95
    return (PyObject*)obj;
5✔
96
}
8✔
97

98
static PyObject* ArrowStreamingIterator_cancel(
×
99
    ArrowStreamingIteratorObject* self, PyObject* Py_UNUSED(args)) {
100
    if (self->cpp_state && self->cpp_state->cancel) {
×
101
        self->cpp_state->cancel();
×
102
    }
103
    Py_RETURN_NONE;
×
104
}
105

106
static PyMethodDef ArrowStreamingIterator_methods[] = {
107
    {"cancel", (PyCFunction)ArrowStreamingIterator_cancel, METH_NOARGS,
108
     "Cancel the streaming iterator."},
109
    {NULL}};
110

111
PyTypeObject ArrowStreamingIteratorType = {
112
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext._ArrowStreamingIterator",
113
    sizeof(ArrowStreamingIteratorObject),       /* tp_basicsize */
114
    0,                                          /* tp_itemsize */
115
    (destructor)ArrowStreamingIterator_dealloc, /* tp_dealloc */
116
    0,                                          /* tp_vectorcall_offset */
117
    0,                                          /* tp_getattr */
118
    0,                                          /* tp_setattr */
119
    0,                                          /* tp_as_async */
120
    0,                                          /* tp_repr */
121
    0,                                          /* tp_as_number */
122
    0,                                          /* tp_as_sequence */
123
    0,                                          /* tp_as_mapping */
124
    0,                                          /* tp_hash */
125
    0,                                          /* tp_call */
126
    0,                                          /* tp_str */
127
    0,                                          /* tp_getattro */
128
    0,                                          /* tp_setattro */
129
    0,                                          /* tp_as_buffer */
130
    Py_TPFLAGS_DEFAULT,                         /* tp_flags */
131
    "Streaming Arrow batch iterator.\n\n"
132
    "Yields ArrowBatch objects as they become available from the C++ "
133
    "pipeline.\n"
134
    "Call cancel() to stop the stream early.", /* tp_doc */
135
    0,                                         /* tp_traverse */
136
    0,                                         /* tp_clear */
137
    0,                                         /* tp_richcompare */
138
    0,                                         /* tp_weaklistoffset */
139
    ArrowStreamingIterator_iter,               /* tp_iter */
140
    (iternextfunc)ArrowStreamingIterator_next, /* tp_iternext */
141
    ArrowStreamingIterator_methods,            /* tp_methods */
142
    0,                                         /* tp_members */
143
    0,                                         /* tp_getset */
144
    0,                                         /* tp_base */
145
    0,                                         /* tp_dict */
146
    0,                                         /* tp_descr_get */
147
    0,                                         /* tp_descr_set */
148
    0,                                         /* tp_dictoffset */
149
    0,                                         /* tp_init */
150
    0,                                         /* tp_alloc */
151
    ArrowStreamingIterator_new,                /* tp_new */
152
};
153

154
int init_arrow_streaming_iterator(PyObject* m) {
1✔
155
    if (register_type(m, &ArrowStreamingIteratorType,
1✔
156
                      "_ArrowStreamingIterator") < 0)
1!
UNCOV
157
        return -1;
×
158

159
    return 0;
1✔
160
}
161

162
}  // namespace dftracer::utils::python
163

164
#endif  // DFTRACER_UTILS_ENABLE_ARROW
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc