• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28398085302

29 Jun 2026 07:43PM UTC coverage: 50.067% (-2.2%) from 52.278%
28398085302

Pull #83

github

web-flow
Merge 9a615085e into 2efed6649
Pull Request #83: refactor and improve code QoL

16342 of 44293 branches covered (36.9%)

Branch coverage included in aggregate %.

613 of 1132 new or added lines in 52 files covered. (54.15%)

687 existing lines in 116 files now uncovered.

21698 of 31685 relevant lines covered (68.48%)

12958.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.48
/src/dftracer/utils/python/arrow_stream_capsule.cpp
1
#include <dftracer/utils/core/common/config.h>
2
#include <dftracer/utils/python/py_type_helpers.h>
3
#ifdef DFTRACER_UTILS_ENABLE_ARROW
4

5
#define PY_SSIZE_T_CLEAN
6
#include <Python.h>
7
#include <dftracer/utils/python/arrow_stream_capsule.h>
8
#include <dftracer/utils/python/batch_byte_size.h>
9
#include <dftracer/utils/python/schema_reconcile.h>
10
#include <nanoarrow/nanoarrow.h>
11

12
#include <cerrno>
13
#include <cstring>
14
#include <deque>
15
#include <exception>
16
#include <mutex>
17
#include <optional>
18
#include <string>
19

20
using ArrowExportResult =
21
    dftracer::utils::utilities::common::arrow::ArrowExportResult;
22

23
namespace {
24

25
// Drain until K consecutive batches add no new columns, bounded by MAX.
26
constexpr int STABLE_BATCHES = 5;
27
constexpr int MAX_DRAIN = 128;
28

29
struct StreamPrivate {
30
    std::shared_ptr<ArrowIteratorState> state;
31
    dftracer::utils::python::SchemaReconciler reconciler;
32
    // Drained during discovery, emitted first from get_next.
33
    std::deque<ArrowExportResult> pending;
34
    std::string last_error;
35
    bool initialized = false;
36
    // Sticky: once set, all entry points short-circuit to EIO.
37
    bool error_set = false;
38
};
39

40
static void mark_error(StreamPrivate *p, std::string msg) {
×
41
    if (p->last_error.empty()) p->last_error = std::move(msg);
×
42
    p->error_set = true;
×
43
    p->initialized = true;
×
44
}
×
45

46
static int initialize_stream(StreamPrivate *p) {
24✔
47
    if (p->error_set) return EIO;
24!
48
    if (p->initialized) return 0;
24!
49
    auto *astate = p->state.get();
24✔
50

51
    int stable_run = 0;
24✔
52
    int drained = 0;
24✔
53
    while (stable_run < STABLE_BATCHES && drained < MAX_DRAIN) {
89!
54
        auto batch = astate->channel->blocking_receive();
88!
55
        if (!batch.has_value()) {
88✔
56
            // End-of-stream or producer error before discovery converged.
57
            std::lock_guard<std::mutex> lock(astate->error_mtx);
23!
58
            if (astate->error) {
23!
59
                try {
60
                    std::rethrow_exception(astate->error);
×
61
                } catch (const std::exception &e) {
×
62
                    mark_error(p, e.what());
×
63
                } catch (...) {
×
64
                    mark_error(p, "unknown error in Arrow stream");
×
65
                }
×
66
                return EIO;
×
67
            }
68
            break;  // clean early EOS; finalize with whatever we have
23✔
69
        }
23!
70
        auto dequeued = dftracer::utils::python::byte_size(*batch);
65!
71
        astate->bytes_in_queue.fetch_sub(dequeued, std::memory_order_acq_rel);
65✔
72

73
        bool added = p->reconciler.merge(batch->get_schema());
65!
74
        if (!p->reconciler.last_error().empty()) {
65!
75
            mark_error(p, p->reconciler.last_error());
×
76
            return EIO;
×
77
        }
78
        p->pending.push_back(std::move(*batch));
65!
79
        stable_run = added ? 0 : (stable_run + 1);
65✔
80
        ++drained;
65✔
81
    }
88!
82

83
    if (p->reconciler.finalize() != 0) {
24!
84
        mark_error(p, p->reconciler.last_error().empty()
×
85
                          ? "failed to finalize schema union"
×
86
                          : p->reconciler.last_error());
×
87
        return EIO;
×
88
    }
89
    p->initialized = true;
24✔
90
    return 0;
24✔
91
}
92

93
static int stream_get_schema(struct ArrowArrayStream *s,
24✔
94
                             struct ArrowSchema *out) {
95
    auto *p = static_cast<StreamPrivate *>(s->private_data);
24✔
96
    int rc = initialize_stream(p);
24✔
97
    if (rc != 0) return rc;
24!
98
    if (p->error_set) return EIO;
24!
99
    if (p->reconciler.copy_schema(out) != 0) {
24!
100
        mark_error(p, p->reconciler.last_error().empty()
×
101
                          ? "failed to copy locked schema"
×
102
                          : p->reconciler.last_error());
×
103
        return EIO;
×
104
    }
105
    return 0;
24✔
106
}
107

108
static int stream_get_next(struct ArrowArrayStream *s, struct ArrowArray *out) {
85✔
109
    auto *p = static_cast<StreamPrivate *>(s->private_data);
85✔
110
    if (p->error_set) return EIO;
85!
111
    if (!p->initialized) {
85!
112
        int rc = initialize_stream(p);
×
113
        if (rc != 0) return rc;
×
114
    }
115

116
    // Drain any discovery-phase batches first, then pull from the channel.
117
    std::optional<ArrowExportResult> batch;
85✔
118
    if (!p->pending.empty()) {
85✔
119
        batch = std::move(p->pending.front());
63!
120
        p->pending.pop_front();
63✔
121
    } else {
122
        auto *astate = p->state.get();
22✔
123
        batch = astate->channel->blocking_receive();
22!
124
        if (!batch.has_value()) {
22!
125
            std::lock_guard<std::mutex> lock(astate->error_mtx);
22!
126
            if (astate->error) {
22!
127
                try {
128
                    std::rethrow_exception(astate->error);
×
129
                } catch (const std::exception &e) {
×
130
                    mark_error(p, e.what());
×
131
                } catch (...) {
×
132
                    mark_error(p, "unknown error in Arrow stream");
×
133
                }
×
134
                return EIO;
×
135
            }
136
            // End of stream per Arrow C spec: return success with
137
            // out->release == nullptr.
138
            out->release = nullptr;
22✔
139
            return 0;
22✔
140
        }
22✔
141
        auto dequeued = dftracer::utils::python::byte_size(*batch);
×
142
        astate->bytes_in_queue.fetch_sub(dequeued, std::memory_order_acq_rel);
×
143
    }
144

145
    if (p->reconciler.reconcile(batch->get_schema(), batch->get_array(), out) !=
63!
146
        0) {
147
        mark_error(p, p->reconciler.last_error().empty()
×
148
                          ? "schema reconciliation failed"
×
149
                          : p->reconciler.last_error());
×
150
        return EIO;
×
151
    }
152
    return 0;
63✔
153
}
85✔
154

155
static const char *stream_get_last_error(struct ArrowArrayStream *s) {
×
156
    auto *p = static_cast<StreamPrivate *>(s->private_data);
×
157
    if (!p || p->last_error.empty()) return nullptr;
×
158
    return p->last_error.c_str();
×
159
}
160

161
static void stream_release(struct ArrowArrayStream *s) {
25✔
162
    auto *p = static_cast<StreamPrivate *>(s->private_data);
25✔
163
    if (p) {
25!
164
        if (p->state) {
25!
165
            p->state->cancelled.store(true, std::memory_order_release);
25✔
166
            if (p->state->channel) p->state->channel->close();
25!
167
            if (p->state->task_future.valid()) {
25!
168
                // Release the GIL if this callback was invoked from a
169
                // Python-holding context (e.g. capsule destructor during
170
                // GC). If the GIL is not held (pyarrow's C reader path),
171
                // _PyThreadState_UncheckedGet() returns null and we wait
172
                // without touching the Python thread state.
173
                if (Py_IsInitialized() && PyGILState_Check()) {
25!
174
                    Py_BEGIN_ALLOW_THREADS p->state->task_future.wait();
1✔
175
                    Py_END_ALLOW_THREADS
1✔
176
                } else {
177
                    p->state->task_future.wait();
24✔
178
                }
179
            }
180
        }
181
        delete p;
25!
182
    }
183
    s->private_data = nullptr;
25✔
184
    s->release = nullptr;
25✔
185
}
25✔
186

187
static void release_stream_capsule(PyObject *capsule) {
25✔
188
    auto *stream = static_cast<ArrowArrayStream *>(
189
        PyCapsule_GetPointer(capsule, "arrow_array_stream"));
25✔
190
    if (stream && stream->release) {
25!
191
        stream->release(stream);
1✔
192
    }
193
    delete stream;
25!
194
}
25✔
195

196
static PyObject *ArrowBatchStream_arrow_c_stream(ArrowBatchStreamObject *self,
26✔
197
                                                 PyObject *args) {
198
    PyObject *requested_schema = Py_None;
26✔
199
    if (!PyArg_ParseTuple(args, "|O", &requested_schema)) return NULL;
26!
200

201
    // Per the PyCapsule protocol, a non-None `requested_schema` means the
202
    // caller wants the stream cast to that schema. We only emit our native
203
    // schema today; reject explicitly so misuse fails loudly instead of
204
    // silently returning arrays that don't match what the caller asked for.
205
    if (requested_schema != Py_None) {
26!
206
        PyErr_SetString(PyExc_NotImplementedError,
×
207
                        "iter_arrow_stream does not support "
208
                        "requested_schema casting; pass None to use the "
209
                        "native schema.");
210
        return NULL;
×
211
    }
212

213
    if (self->consumed || !self->state) {
26!
214
        PyErr_SetString(PyExc_RuntimeError,
1!
215
                        "Arrow stream already exported via "
216
                        "__arrow_c_stream__; each stream can be "
217
                        "exported only once.");
218
        return NULL;
1✔
219
    }
220

221
    auto *priv = new StreamPrivate;
25!
222
    priv->state = self->state;
25✔
223
    self->consumed = true;
25✔
224
    self->state.reset();
25✔
225

226
    auto *stream = new ArrowArrayStream;
25!
227
    std::memset(stream, 0, sizeof(*stream));
25✔
228
    stream->get_schema = stream_get_schema;
25✔
229
    stream->get_next = stream_get_next;
25✔
230
    stream->get_last_error = stream_get_last_error;
25✔
231
    stream->release = stream_release;
25✔
232
    stream->private_data = priv;
25✔
233

234
    PyObject *capsule =
235
        PyCapsule_New(stream, "arrow_array_stream", release_stream_capsule);
25!
236
    if (!capsule) {
25!
237
        stream->release(stream);
×
238
        delete stream;
×
239
        return NULL;
×
240
    }
241
    return capsule;
25✔
242
}
243

244
static void ArrowBatchStream_dealloc(ArrowBatchStreamObject *self) {
27✔
245
    if (self->state) {
27✔
246
        self->state->cancelled.store(true, std::memory_order_release);
2✔
247
        if (self->state->channel) self->state->channel->close();
2!
248
        Py_BEGIN_ALLOW_THREADS if (self->state->task_future.valid()) {
2!
249
            self->state->task_future.wait();
2✔
250
        }
251
        Py_END_ALLOW_THREADS
2✔
252
    }
253
    self->state.~shared_ptr<ArrowIteratorState>();
27✔
254
    Py_TYPE(self)->tp_free((PyObject *)self);
27✔
255
}
27✔
256

257
static PyMethodDef ArrowBatchStream_methods[] = {
258
    {"__arrow_c_stream__", (PyCFunction)ArrowBatchStream_arrow_c_stream,
259
     METH_VARARGS, "Export as Arrow C Data Interface stream PyCapsule"},
260
    {NULL}};
261

262
}  // namespace
263

264
PyTypeObject ArrowBatchStreamType = {
265
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext._ArrowBatchStream",
266
    sizeof(ArrowBatchStreamObject), /* tp_basicsize */
267
    0,                              /* tp_itemsize */
268
    (destructor)ArrowBatchStream_dealloc,
269
    0,
270
    0,
271
    0,
272
    0,
273
    0,
274
    0,
275
    0,
276
    0,
277
    0,
278
    0,
279
    0,
280
    0,
281
    0,
282
    0,
283
    Py_TPFLAGS_DEFAULT,
284
    "Zero-iteration Arrow stream backed by a C++ coroutine channel",
285
    0,
286
    0,
287
    0,
288
    0,
289
    0,
290
    0,
291
    ArrowBatchStream_methods,
292
    0,
293
    0,
294
    0,
295
    0,
296
    0,
297
    0,
298
    0,
299
    0,
300
    0,
301
    0,
302
};
303

304
int init_arrow_batch_stream(PyObject *m) {
1✔
305
    if (register_type(m, &ArrowBatchStreamType, "_ArrowBatchStream") < 0)
1!
UNCOV
306
        return -1;
×
307
    return 0;
1✔
308
}
309

310
PyObject *make_arrow_batch_stream(std::shared_ptr<ArrowIteratorState> state) {
27✔
311
    auto *obj = (ArrowBatchStreamObject *)ArrowBatchStreamType.tp_alloc(
27✔
312
        &ArrowBatchStreamType, 0);
313
    if (!obj) return NULL;
27!
314
    new (&obj->state) std::shared_ptr<ArrowIteratorState>(std::move(state));
27✔
315
    obj->consumed = false;
27✔
316
    return (PyObject *)obj;
27✔
317
}
318

319
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc