• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 27052412546

06 Jun 2026 04:20AM UTC coverage: 50.862% (+1.0%) from 49.905%
27052412546

Pull #73

github

web-flow
Merge 734572730 into 88a3c8457
Pull Request #73: add portable dependencies wheel support

31801 of 79859 branches covered (39.82%)

Branch coverage included in aggregate %.

32491 of 46545 relevant lines covered (69.81%)

9947.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.26
/src/dftracer/utils/python/arrow_stream_capsule.cpp
1
#include <dftracer/utils/core/common/config.h>
2
#ifdef DFTRACER_UTILS_ENABLE_ARROW
3

4
#define PY_SSIZE_T_CLEAN
5
#include <Python.h>
6
#include <dftracer/utils/python/arrow_stream_capsule.h>
7
#include <dftracer/utils/python/batch_byte_size.h>
8
#include <dftracer/utils/python/schema_reconcile.h>
9
#include <nanoarrow/nanoarrow.h>
10

11
#include <cerrno>
12
#include <cstring>
13
#include <deque>
14
#include <exception>
15
#include <mutex>
16
#include <optional>
17
#include <string>
18

19
using ArrowExportResult =
20
    dftracer::utils::utilities::common::arrow::ArrowExportResult;
21

22
namespace {
23

24
// Drain until K consecutive batches add no new columns, bounded by MAX.
25
constexpr int STABLE_BATCHES = 5;
26
constexpr int MAX_DRAIN = 128;
27

28
struct StreamPrivate {
50✔
29
    std::shared_ptr<ArrowIteratorState> state;
30
    dftracer::utils::python::SchemaReconciler reconciler;
31
    // Drained during discovery, emitted first from get_next.
32
    std::deque<ArrowExportResult> pending;
33
    std::string last_error;
34
    bool initialized = false;
25✔
35
    // Sticky: once set, all entry points short-circuit to EIO.
36
    bool error_set = false;
25✔
37
};
38

39
static void mark_error(StreamPrivate *p, std::string msg) {
×
40
    if (p->last_error.empty()) p->last_error = std::move(msg);
×
41
    p->error_set = true;
×
42
    p->initialized = true;
×
43
}
×
44

45
static int initialize_stream(StreamPrivate *p) {
24✔
46
    if (p->error_set) return EIO;
24!
47
    if (p->initialized) return 0;
24!
48
    auto *astate = p->state.get();
24✔
49

50
    int stable_run = 0;
24✔
51
    int drained = 0;
24✔
52
    while (stable_run < STABLE_BATCHES && drained < MAX_DRAIN) {
89✔
53
        auto batch = astate->channel->blocking_receive();
88✔
54
        if (!batch.has_value()) {
88✔
55
            // End-of-stream or producer error before discovery converged.
56
            std::lock_guard<std::mutex> lock(astate->error_mtx);
23!
57
            if (astate->error) {
23!
58
                try {
59
                    std::rethrow_exception(astate->error);
×
60
                } catch (const std::exception &e) {
×
61
                    mark_error(p, e.what());
×
62
                } catch (...) {
×
63
                    mark_error(p, "unknown error in Arrow stream");
×
64
                }
×
65
                return EIO;
×
66
            }
67
            break;  // clean early EOS; finalize with whatever we have
23✔
68
        }
23✔
69
        auto dequeued = dftracer::utils::python::byte_size(*batch);
65!
70
        astate->bytes_in_queue.fetch_sub(dequeued, std::memory_order_acq_rel);
65✔
71

72
        bool added = p->reconciler.merge(batch->get_schema());
65!
73
        if (!p->reconciler.last_error().empty()) {
65!
74
            mark_error(p, p->reconciler.last_error());
×
75
            return EIO;
×
76
        }
77
        p->pending.push_back(std::move(*batch));
65!
78
        stable_run = added ? 0 : (stable_run + 1);
65✔
79
        ++drained;
65✔
80
    }
88!
81

82
    if (p->reconciler.finalize() != 0) {
24!
83
        mark_error(p, p->reconciler.last_error().empty()
×
84
                          ? "failed to finalize schema union"
×
85
                          : p->reconciler.last_error());
×
86
        return EIO;
×
87
    }
88
    p->initialized = true;
24✔
89
    return 0;
24✔
90
}
24✔
91

92
static int stream_get_schema(struct ArrowArrayStream *s,
24✔
93
                             struct ArrowSchema *out) {
94
    auto *p = static_cast<StreamPrivate *>(s->private_data);
24✔
95
    int rc = initialize_stream(p);
24✔
96
    if (rc != 0) return rc;
24!
97
    if (p->error_set) return EIO;
24!
98
    if (p->reconciler.copy_schema(out) != 0) {
24!
99
        mark_error(p, p->reconciler.last_error().empty()
×
100
                          ? "failed to copy locked schema"
×
101
                          : p->reconciler.last_error());
×
102
        return EIO;
×
103
    }
104
    return 0;
24✔
105
}
24✔
106

107
static int stream_get_next(struct ArrowArrayStream *s, struct ArrowArray *out) {
85✔
108
    auto *p = static_cast<StreamPrivate *>(s->private_data);
85✔
109
    if (p->error_set) return EIO;
85!
110
    if (!p->initialized) {
85!
111
        int rc = initialize_stream(p);
×
112
        if (rc != 0) return rc;
×
113
    }
×
114

115
    // Drain any discovery-phase batches first, then pull from the channel.
116
    std::optional<ArrowExportResult> batch;
85✔
117
    if (!p->pending.empty()) {
85✔
118
        batch = std::move(p->pending.front());
63!
119
        p->pending.pop_front();
63!
120
    } else {
63✔
121
        auto *astate = p->state.get();
22✔
122
        batch = astate->channel->blocking_receive();
22!
123
        if (!batch.has_value()) {
22!
124
            std::lock_guard<std::mutex> lock(astate->error_mtx);
22!
125
            if (astate->error) {
22!
126
                try {
127
                    std::rethrow_exception(astate->error);
×
128
                } catch (const std::exception &e) {
×
129
                    mark_error(p, e.what());
×
130
                } catch (...) {
×
131
                    mark_error(p, "unknown error in Arrow stream");
×
132
                }
×
133
                return EIO;
×
134
            }
135
            // End of stream per Arrow C spec: return success with
136
            // out->release == nullptr.
137
            out->release = nullptr;
22✔
138
            return 0;
22✔
139
        }
22✔
140
        auto dequeued = dftracer::utils::python::byte_size(*batch);
×
141
        astate->bytes_in_queue.fetch_sub(dequeued, std::memory_order_acq_rel);
×
142
    }
143

144
    if (p->reconciler.reconcile(batch->get_schema(), batch->get_array(), out) !=
63!
145
        0) {
146
        mark_error(p, p->reconciler.last_error().empty()
×
147
                          ? "schema reconciliation failed"
×
148
                          : p->reconciler.last_error());
×
149
        return EIO;
×
150
    }
151
    return 0;
63✔
152
}
85✔
153

154
static const char *stream_get_last_error(struct ArrowArrayStream *s) {
×
155
    auto *p = static_cast<StreamPrivate *>(s->private_data);
×
156
    if (!p || p->last_error.empty()) return nullptr;
×
157
    return p->last_error.c_str();
×
158
}
×
159

160
static void stream_release(struct ArrowArrayStream *s) {
25✔
161
    auto *p = static_cast<StreamPrivate *>(s->private_data);
25✔
162
    if (p) {
25!
163
        if (p->state) {
25!
164
            p->state->cancelled.store(true, std::memory_order_release);
25✔
165
            if (p->state->channel) p->state->channel->close();
25!
166
            if (p->state->task_future.valid()) {
25!
167
                // Release the GIL if this callback was invoked from a
168
                // Python-holding context (e.g. capsule destructor during
169
                // GC). If the GIL is not held (pyarrow's C reader path),
170
                // _PyThreadState_UncheckedGet() returns null and we wait
171
                // without touching the Python thread state.
172
                if (Py_IsInitialized() && PyGILState_Check()) {
25!
173
                    Py_BEGIN_ALLOW_THREADS p->state->task_future.wait();
1✔
174
                    Py_END_ALLOW_THREADS
1✔
175
                } else {
1✔
176
                    p->state->task_future.wait();
24✔
177
                }
178
            }
25✔
179
        }
25✔
180
        delete p;
25!
181
    }
25✔
182
    s->private_data = nullptr;
25✔
183
    s->release = nullptr;
25✔
184
}
25✔
185

186
static void release_stream_capsule(PyObject *capsule) {
25✔
187
    auto *stream = static_cast<ArrowArrayStream *>(
25✔
188
        PyCapsule_GetPointer(capsule, "arrow_array_stream"));
25✔
189
    if (stream && stream->release) {
25!
190
        stream->release(stream);
1✔
191
    }
1✔
192
    delete stream;
25!
193
}
25✔
194

195
static PyObject *ArrowBatchStream_arrow_c_stream(ArrowBatchStreamObject *self,
26✔
196
                                                 PyObject *args) {
197
    PyObject *requested_schema = Py_None;
26✔
198
    if (!PyArg_ParseTuple(args, "|O", &requested_schema)) return NULL;
26!
199

200
    // Per the PyCapsule protocol, a non-None `requested_schema` means the
201
    // caller wants the stream cast to that schema. We only emit our native
202
    // schema today; reject explicitly so misuse fails loudly instead of
203
    // silently returning arrays that don't match what the caller asked for.
204
    if (requested_schema != Py_None) {
26!
205
        PyErr_SetString(PyExc_NotImplementedError,
×
206
                        "iter_arrow_stream does not support "
207
                        "requested_schema casting; pass None to use the "
208
                        "native schema.");
209
        return NULL;
×
210
    }
211

212
    if (self->consumed || !self->state) {
26!
213
        PyErr_SetString(PyExc_RuntimeError,
1✔
214
                        "Arrow stream already exported via "
215
                        "__arrow_c_stream__; each stream can be "
216
                        "exported only once.");
217
        return NULL;
1✔
218
    }
219

220
    auto *priv = new StreamPrivate;
25!
221
    priv->state = self->state;
25✔
222
    self->consumed = true;
25✔
223
    self->state.reset();
25✔
224

225
    auto *stream = new ArrowArrayStream;
25✔
226
    std::memset(stream, 0, sizeof(*stream));
25✔
227
    stream->get_schema = stream_get_schema;
25✔
228
    stream->get_next = stream_get_next;
25✔
229
    stream->get_last_error = stream_get_last_error;
25✔
230
    stream->release = stream_release;
25✔
231
    stream->private_data = priv;
25✔
232

233
    PyObject *capsule =
25✔
234
        PyCapsule_New(stream, "arrow_array_stream", release_stream_capsule);
25✔
235
    if (!capsule) {
25!
236
        stream->release(stream);
×
237
        delete stream;
×
238
        return NULL;
×
239
    }
240
    return capsule;
25✔
241
}
26✔
242

243
static void ArrowBatchStream_dealloc(ArrowBatchStreamObject *self) {
27✔
244
    if (self->state) {
27✔
245
        self->state->cancelled.store(true, std::memory_order_release);
2✔
246
        if (self->state->channel) self->state->channel->close();
2!
247
        Py_BEGIN_ALLOW_THREADS if (self->state->task_future.valid()) {
2!
248
            self->state->task_future.wait();
2✔
249
        }
2✔
250
        Py_END_ALLOW_THREADS
2✔
251
    }
2✔
252
    self->state.~shared_ptr<ArrowIteratorState>();
27✔
253
    Py_TYPE(self)->tp_free((PyObject *)self);
27✔
254
}
27✔
255

256
static PyMethodDef ArrowBatchStream_methods[] = {
257
    {"__arrow_c_stream__", (PyCFunction)ArrowBatchStream_arrow_c_stream,
258
     METH_VARARGS, "Export as Arrow C Data Interface stream PyCapsule"},
259
    {NULL}};
260

261
}  // namespace
262

263
PyTypeObject ArrowBatchStreamType = {
264
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext._ArrowBatchStream",
265
    sizeof(ArrowBatchStreamObject), /* tp_basicsize */
266
    0,                              /* tp_itemsize */
267
    (destructor)ArrowBatchStream_dealloc,
268
    0,
269
    0,
270
    0,
271
    0,
272
    0,
273
    0,
274
    0,
275
    0,
276
    0,
277
    0,
278
    0,
279
    0,
280
    0,
281
    0,
282
    Py_TPFLAGS_DEFAULT,
283
    "Zero-iteration Arrow stream backed by a C++ coroutine channel",
284
    0,
285
    0,
286
    0,
287
    0,
288
    0,
289
    0,
290
    ArrowBatchStream_methods,
291
    0,
292
    0,
293
    0,
294
    0,
295
    0,
296
    0,
297
    0,
298
    0,
299
    0,
300
    0,
301
};
302

303
int init_arrow_batch_stream(PyObject *m) {
1✔
304
    if (PyType_Ready(&ArrowBatchStreamType) < 0) return -1;
1!
305
    Py_INCREF(&ArrowBatchStreamType);
1✔
306
    if (PyModule_AddObject(m, "_ArrowBatchStream",
2!
307
                           (PyObject *)&ArrowBatchStreamType) < 0) {
1✔
308
        Py_DECREF(&ArrowBatchStreamType);
×
309
        return -1;
×
310
    }
311
    return 0;
1✔
312
}
1✔
313

314
PyObject *make_arrow_batch_stream(std::shared_ptr<ArrowIteratorState> state) {
27✔
315
    auto *obj = (ArrowBatchStreamObject *)ArrowBatchStreamType.tp_alloc(
27✔
316
        &ArrowBatchStreamType, 0);
317
    if (!obj) return NULL;
27!
318
    new (&obj->state) std::shared_ptr<ArrowIteratorState>(std::move(state));
27✔
319
    obj->consumed = false;
27✔
320
    return (PyObject *)obj;
27✔
321
}
27✔
322

323
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc