• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28693295402

04 Jul 2026 03:17AM UTC coverage: 52.408% (+0.1%) from 52.278%
28693295402

push

github

hariharan-devarajan
feat: silence noisy warnings on aarch64

37318 of 92666 branches covered (40.27%)

Branch coverage included in aggregate %.

33462 of 42389 relevant lines covered (78.94%)

20557.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.85
/src/dftracer/utils/python/streaming_iterator.cpp
1
#include <dftracer/utils/core/common/config.h>
2
#include <dftracer/utils/python/py_errors.h>
3
#include <dftracer/utils/python/py_runtime_mixin.h>
4
#include <dftracer/utils/python/py_type_helpers.h>
5
#ifdef DFTRACER_UTILS_ENABLE_ARROW
6

7
#define PY_SSIZE_T_CLEAN
8
#include <Python.h>
9
#include <dftracer/utils/python/streaming_iterator.h>
10
#include <dftracer/utils/python/trace_reader_iterator.h>
11

12
namespace dftracer::utils::python {
13

14
static PyObject* ArrowStreamingIterator_new(PyTypeObject* type,
6✔
15
                                            PyObject* /*args*/,
16
                                            PyObject* /*kwds*/) {
17
    ArrowStreamingIteratorObject* self =
3✔
18
        (ArrowStreamingIteratorObject*)type->tp_alloc(type, 0);
6✔
19
    if (self) {
6✔
20
        // Allocate C++ state separately to avoid layout issues
21
        self->cpp_state = new ArrowStreamingIteratorState();
6✔
22
    }
3✔
23
    return (PyObject*)self;
6✔
24
}
25

26
static void ArrowStreamingIterator_dealloc(ArrowStreamingIteratorObject* self) {
6✔
27
    if (self->cpp_state) {
6✔
28
        // Cancel the stream if still running
29
        if (self->cpp_state->cancel) {
6✔
30
            self->cpp_state->cancel();
6✔
31
        }
3✔
32
        delete self->cpp_state;
6✔
33
        self->cpp_state = nullptr;
6✔
34
    }
3✔
35
    Py_TYPE(self)->tp_free((PyObject*)self);
6✔
36
}
6✔
37

38
static PyObject* ArrowStreamingIterator_iter(PyObject* self) {
6!
39
    Py_INCREF(self);
3✔
40
    return self;
6✔
41
}
42

43
static PyObject* ArrowStreamingIterator_next(
16✔
44
    ArrowStreamingIteratorObject* self) {
45
    if (!self->cpp_state || !self->cpp_state->pull_next) {
16!
46
        PyErr_SetString(PyExc_RuntimeError, "Iterator not initialized");
×
47
        return NULL;
×
48
    }
49

50
    std::optional<ArrowExportResult> result;
16✔
51
    if (!run_blocking_r([&] { return self->cpp_state->pull_next(); }, result))
32!
52
        return NULL;
×
53

54
    if (!result.has_value()) {
16✔
55
        // Check for error
56
        if (self->cpp_state->get_error) {
6!
57
            auto ex = self->cpp_state->get_error();
6!
58
            if (ex) {
6✔
59
                try {
60
                    std::rethrow_exception(ex);
×
61
                } catch (const std::exception& e) {
×
62
                    set_typed_py_error(e);
×
63
                    return NULL;
×
64
                } catch (...) {
×
65
                    PyErr_SetString(PyExc_RuntimeError,
×
66
                                    "Unknown error in streaming iterator");
67
                    return NULL;
×
68
                }
×
69
            }
70
        }
6✔
71
        // Normal completion
72
        return NULL;  // StopIteration
6✔
73
    }
74

75
    // Wrap the ArrowExportResult in an ArrowBatchCapsule
76
    ArrowBatchCapsuleObject* obj =
5✔
77
        (ArrowBatchCapsuleObject*)ArrowBatchCapsuleType.tp_alloc(
10!
78
            &ArrowBatchCapsuleType, 0);
79
    if (!obj) return NULL;
10✔
80
    obj->result = new ArrowExportResult(std::move(*result));
10!
81
    return (PyObject*)obj;
10✔
82
}
16✔
83

84
static PyObject* ArrowStreamingIterator_cancel(
×
85
    ArrowStreamingIteratorObject* self, PyObject* Py_UNUSED(args)) {
86
    if (self->cpp_state && self->cpp_state->cancel) {
×
87
        self->cpp_state->cancel();
×
88
    }
89
    Py_RETURN_NONE;
×
90
}
91

92
static PyMethodDef ArrowStreamingIterator_methods[] = {
93
    {"cancel", (PyCFunction)ArrowStreamingIterator_cancel, METH_NOARGS,
94
     "Cancel the streaming iterator."},
95
    {NULL}};
96

97
PyTypeObject ArrowStreamingIteratorType = {
98
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext._ArrowStreamingIterator",
99
    sizeof(ArrowStreamingIteratorObject),       /* tp_basicsize */
100
    0,                                          /* tp_itemsize */
101
    (destructor)ArrowStreamingIterator_dealloc, /* tp_dealloc */
102
    0,                                          /* tp_vectorcall_offset */
103
    0,                                          /* tp_getattr */
104
    0,                                          /* tp_setattr */
105
    0,                                          /* tp_as_async */
106
    0,                                          /* tp_repr */
107
    0,                                          /* tp_as_number */
108
    0,                                          /* tp_as_sequence */
109
    0,                                          /* tp_as_mapping */
110
    0,                                          /* tp_hash */
111
    0,                                          /* tp_call */
112
    0,                                          /* tp_str */
113
    0,                                          /* tp_getattro */
114
    0,                                          /* tp_setattro */
115
    0,                                          /* tp_as_buffer */
116
    Py_TPFLAGS_DEFAULT,                         /* tp_flags */
117
    "Streaming Arrow batch iterator.\n\n"
118
    "Yields ArrowBatch objects as they become available from the C++ "
119
    "pipeline.\n"
120
    "Call cancel() to stop the stream early.", /* tp_doc */
121
    0,                                         /* tp_traverse */
122
    0,                                         /* tp_clear */
123
    0,                                         /* tp_richcompare */
124
    0,                                         /* tp_weaklistoffset */
125
    ArrowStreamingIterator_iter,               /* tp_iter */
126
    (iternextfunc)ArrowStreamingIterator_next, /* tp_iternext */
127
    ArrowStreamingIterator_methods,            /* tp_methods */
128
    0,                                         /* tp_members */
129
    0,                                         /* tp_getset */
130
    0,                                         /* tp_base */
131
    0,                                         /* tp_dict */
132
    0,                                         /* tp_descr_get */
133
    0,                                         /* tp_descr_set */
134
    0,                                         /* tp_dictoffset */
135
    0,                                         /* tp_init */
136
    0,                                         /* tp_alloc */
137
    ArrowStreamingIterator_new,                /* tp_new */
138
};
139

140
int init_arrow_streaming_iterator(PyObject* m) {
2✔
141
    if (register_type(m, &ArrowStreamingIteratorType,
2✔
142
                      "_ArrowStreamingIterator") < 0)
2✔
143
        return -1;
×
144

145
    return 0;
2✔
146
}
1✔
147

148
}  // namespace dftracer::utils::python
149

150
#endif  // DFTRACER_UTILS_ENABLE_ARROW
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc