• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28356348514

29 Jun 2026 07:40AM UTC coverage: 52.174% (-0.1%) from 52.278%
28356348514

Pull #83

github

web-flow
Merge 278203630 into 2efed6649
Pull Request #83: refactor and improve code QoL

37276 of 92891 branches covered (40.13%)

Branch coverage included in aggregate %.

671 of 1173 new or added lines in 58 files covered. (57.2%)

66 existing lines in 30 files now uncovered.

33619 of 42991 relevant lines covered (78.2%)

20387.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.23
/src/dftracer/utils/python/arrow_stream_capsule.cpp
1
#include <dftracer/utils/core/common/config.h>
2
#include <dftracer/utils/python/py_type_helpers.h>
3
#ifdef DFTRACER_UTILS_ENABLE_ARROW
4

5
#define PY_SSIZE_T_CLEAN
6
#include <Python.h>
7
#include <dftracer/utils/python/arrow_stream_capsule.h>
8
#include <dftracer/utils/python/batch_byte_size.h>
9
#include <dftracer/utils/python/schema_reconcile.h>
10
#include <nanoarrow/nanoarrow.h>
11

12
#include <cerrno>
13
#include <cstring>
14
#include <deque>
15
#include <exception>
16
#include <mutex>
17
#include <optional>
18
#include <string>
19

20
using ArrowExportResult =
21
    dftracer::utils::utilities::common::arrow::ArrowExportResult;
22

23
namespace {
24

25
// Drain until K consecutive batches add no new columns, bounded by MAX.
26
constexpr int STABLE_BATCHES = 5;
27
constexpr int MAX_DRAIN = 128;
28

29
struct StreamPrivate {
50✔
30
    std::shared_ptr<ArrowIteratorState> state;
31
    dftracer::utils::python::SchemaReconciler reconciler;
32
    // Drained during discovery, emitted first from get_next.
33
    std::deque<ArrowExportResult> pending;
34
    std::string last_error;
35
    bool initialized = false;
25✔
36
    // Sticky: once set, all entry points short-circuit to EIO.
37
    bool error_set = false;
25✔
38
};
39

40
static void mark_error(StreamPrivate *p, std::string msg) {
×
41
    if (p->last_error.empty()) p->last_error = std::move(msg);
×
42
    p->error_set = true;
×
43
    p->initialized = true;
×
44
}
×
45

46
static int initialize_stream(StreamPrivate *p) {
48✔
47
    if (p->error_set) return EIO;
48!
48
    if (p->initialized) return 0;
48!
49
    auto *astate = p->state.get();
48✔
50

51
    int stable_run = 0;
48✔
52
    int drained = 0;
48✔
53
    while (stable_run < STABLE_BATCHES && drained < MAX_DRAIN) {
178✔
54
        auto batch = astate->channel->blocking_receive();
176!
55
        if (!batch.has_value()) {
176✔
56
            // End-of-stream or producer error before discovery converged.
57
            std::lock_guard<std::mutex> lock(astate->error_mtx);
46!
58
            if (astate->error) {
46✔
59
                try {
60
                    std::rethrow_exception(astate->error);
×
61
                } catch (const std::exception &e) {
×
62
                    mark_error(p, e.what());
×
63
                } catch (...) {
×
64
                    mark_error(p, "unknown error in Arrow stream");
×
65
                }
×
66
                return EIO;
×
67
            }
68
            break;  // clean early EOS; finalize with whatever we have
46✔
69
        }
46!
70
        auto dequeued = dftracer::utils::python::byte_size(*batch);
130!
71
        astate->bytes_in_queue.fetch_sub(dequeued, std::memory_order_acq_rel);
130✔
72

73
        bool added = p->reconciler.merge(batch->get_schema());
130!
74
        if (!p->reconciler.last_error().empty()) {
130!
75
            mark_error(p, p->reconciler.last_error());
×
76
            return EIO;
×
77
        }
78
        p->pending.push_back(std::move(*batch));
130!
79
        stable_run = added ? 0 : (stable_run + 1);
130✔
80
        ++drained;
130✔
81
    }
176!
82

83
    if (p->reconciler.finalize() != 0) {
48!
84
        mark_error(p, p->reconciler.last_error().empty()
×
85
                          ? "failed to finalize schema union"
×
86
                          : p->reconciler.last_error());
×
87
        return EIO;
×
88
    }
89
    p->initialized = true;
48✔
90
    return 0;
48✔
91
}
24✔
92

93
static int stream_get_schema(struct ArrowArrayStream *s,
48✔
94
                             struct ArrowSchema *out) {
95
    auto *p = static_cast<StreamPrivate *>(s->private_data);
48✔
96
    int rc = initialize_stream(p);
48✔
97
    if (rc != 0) return rc;
48!
98
    if (p->error_set) return EIO;
48!
99
    if (p->reconciler.copy_schema(out) != 0) {
48!
100
        mark_error(p, p->reconciler.last_error().empty()
×
101
                          ? "failed to copy locked schema"
×
102
                          : p->reconciler.last_error());
×
103
        return EIO;
×
104
    }
105
    return 0;
48✔
106
}
24✔
107

108
static int stream_get_next(struct ArrowArrayStream *s, struct ArrowArray *out) {
170✔
109
    auto *p = static_cast<StreamPrivate *>(s->private_data);
170✔
110
    if (p->error_set) return EIO;
170!
111
    if (!p->initialized) {
170✔
112
        int rc = initialize_stream(p);
×
113
        if (rc != 0) return rc;
×
114
    }
115

116
    // Drain any discovery-phase batches first, then pull from the channel.
117
    std::optional<ArrowExportResult> batch;
170✔
118
    if (!p->pending.empty()) {
170✔
119
        batch = std::move(p->pending.front());
126!
120
        p->pending.pop_front();
126!
121
    } else {
63✔
122
        auto *astate = p->state.get();
44✔
123
        batch = astate->channel->blocking_receive();
44!
124
        if (!batch.has_value()) {
44!
125
            std::lock_guard<std::mutex> lock(astate->error_mtx);
44!
126
            if (astate->error) {
44✔
127
                try {
128
                    std::rethrow_exception(astate->error);
×
129
                } catch (const std::exception &e) {
×
130
                    mark_error(p, e.what());
×
131
                } catch (...) {
×
132
                    mark_error(p, "unknown error in Arrow stream");
×
133
                }
×
134
                return EIO;
×
135
            }
136
            // End of stream per Arrow C spec: return success with
137
            // out->release == nullptr.
138
            out->release = nullptr;
44✔
139
            return 0;
44✔
140
        }
44✔
141
        auto dequeued = dftracer::utils::python::byte_size(*batch);
×
142
        astate->bytes_in_queue.fetch_sub(dequeued, std::memory_order_acq_rel);
×
143
    }
144

145
    if (p->reconciler.reconcile(batch->get_schema(), batch->get_array(), out) !=
126!
146
        0) {
147
        mark_error(p, p->reconciler.last_error().empty()
×
148
                          ? "schema reconciliation failed"
×
149
                          : p->reconciler.last_error());
×
150
        return EIO;
×
151
    }
152
    return 0;
126✔
153
}
170✔
154

155
static const char *stream_get_last_error(struct ArrowArrayStream *s) {
×
156
    auto *p = static_cast<StreamPrivate *>(s->private_data);
×
157
    if (!p || p->last_error.empty()) return nullptr;
×
158
    return p->last_error.c_str();
×
159
}
160

161
static void stream_release(struct ArrowArrayStream *s) {
50✔
162
    auto *p = static_cast<StreamPrivate *>(s->private_data);
50✔
163
    if (p) {
50✔
164
        if (p->state) {
50✔
165
            p->state->cancelled.store(true, std::memory_order_release);
50✔
166
            if (p->state->channel) p->state->channel->close();
50✔
167
            if (p->state->task_future.valid()) {
50✔
168
                // Release the GIL if this callback was invoked from a
169
                // Python-holding context (e.g. capsule destructor during
170
                // GC). If the GIL is not held (pyarrow's C reader path),
171
                // _PyThreadState_UncheckedGet() returns null and we wait
172
                // without touching the Python thread state.
173
                if (Py_IsInitialized() && PyGILState_Check()) {
50!
174
                    Py_BEGIN_ALLOW_THREADS p->state->task_future.wait();
2✔
175
                    Py_END_ALLOW_THREADS
2✔
176
                } else {
1✔
177
                    p->state->task_future.wait();
48✔
178
                }
179
            }
25✔
180
        }
25✔
181
        delete p;
50✔
182
    }
25✔
183
    s->private_data = nullptr;
50✔
184
    s->release = nullptr;
50✔
185
}
50✔
186

187
static void release_stream_capsule(PyObject *capsule) {
50✔
188
    auto *stream = static_cast<ArrowArrayStream *>(
25✔
189
        PyCapsule_GetPointer(capsule, "arrow_array_stream"));
50✔
190
    if (stream && stream->release) {
50!
191
        stream->release(stream);
2✔
192
    }
1✔
193
    delete stream;
50✔
194
}
50✔
195

196
static PyObject *ArrowBatchStream_arrow_c_stream(ArrowBatchStreamObject *self,
52✔
197
                                                 PyObject *args) {
198
    PyObject *requested_schema = Py_None;
52✔
199
    if (!PyArg_ParseTuple(args, "|O", &requested_schema)) return NULL;
52!
200

201
    // Per the PyCapsule protocol, a non-None `requested_schema` means the
202
    // caller wants the stream cast to that schema. We only emit our native
203
    // schema today; reject explicitly so misuse fails loudly instead of
204
    // silently returning arrays that don't match what the caller asked for.
205
    if (requested_schema != Py_None) {
52!
206
        PyErr_SetString(PyExc_NotImplementedError,
×
207
                        "iter_arrow_stream does not support "
208
                        "requested_schema casting; pass None to use the "
209
                        "native schema.");
210
        return NULL;
×
211
    }
212

213
    if (self->consumed || !self->state) {
52!
214
        PyErr_SetString(PyExc_RuntimeError,
2!
215
                        "Arrow stream already exported via "
216
                        "__arrow_c_stream__; each stream can be "
217
                        "exported only once.");
218
        return NULL;
2✔
219
    }
220

221
    auto *priv = new StreamPrivate;
50!
222
    priv->state = self->state;
50✔
223
    self->consumed = true;
50✔
224
    self->state.reset();
50✔
225

226
    auto *stream = new ArrowArrayStream;
50!
227
    std::memset(stream, 0, sizeof(*stream));
50✔
228
    stream->get_schema = stream_get_schema;
50✔
229
    stream->get_next = stream_get_next;
50✔
230
    stream->get_last_error = stream_get_last_error;
50✔
231
    stream->release = stream_release;
50✔
232
    stream->private_data = priv;
50✔
233

234
    PyObject *capsule =
25✔
235
        PyCapsule_New(stream, "arrow_array_stream", release_stream_capsule);
50!
236
    if (!capsule) {
50✔
237
        stream->release(stream);
×
238
        delete stream;
×
239
        return NULL;
×
240
    }
241
    return capsule;
50✔
242
}
26✔
243

244
static void ArrowBatchStream_dealloc(ArrowBatchStreamObject *self) {
54✔
245
    if (self->state) {
54✔
246
        self->state->cancelled.store(true, std::memory_order_release);
4✔
247
        if (self->state->channel) self->state->channel->close();
4✔
248
        Py_BEGIN_ALLOW_THREADS if (self->state->task_future.valid()) {
4✔
249
            self->state->task_future.wait();
4✔
250
        }
2✔
251
        Py_END_ALLOW_THREADS
4✔
252
    }
2✔
253
    self->state.~shared_ptr<ArrowIteratorState>();
54✔
254
    Py_TYPE(self)->tp_free((PyObject *)self);
54✔
255
}
54✔
256

257
static PyMethodDef ArrowBatchStream_methods[] = {
258
    {"__arrow_c_stream__", (PyCFunction)ArrowBatchStream_arrow_c_stream,
259
     METH_VARARGS, "Export as Arrow C Data Interface stream PyCapsule"},
260
    {NULL}};
261

262
}  // namespace
263

264
PyTypeObject ArrowBatchStreamType = {
265
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext._ArrowBatchStream",
266
    sizeof(ArrowBatchStreamObject), /* tp_basicsize */
267
    0,                              /* tp_itemsize */
268
    (destructor)ArrowBatchStream_dealloc,
269
    0,
270
    0,
271
    0,
272
    0,
273
    0,
274
    0,
275
    0,
276
    0,
277
    0,
278
    0,
279
    0,
280
    0,
281
    0,
282
    0,
283
    Py_TPFLAGS_DEFAULT,
284
    "Zero-iteration Arrow stream backed by a C++ coroutine channel",
285
    0,
286
    0,
287
    0,
288
    0,
289
    0,
290
    0,
291
    ArrowBatchStream_methods,
292
    0,
293
    0,
294
    0,
295
    0,
296
    0,
297
    0,
298
    0,
299
    0,
300
    0,
301
    0,
302
};
303

304
int init_arrow_batch_stream(PyObject *m) {
2✔
305
    if (register_type(m, &ArrowBatchStreamType, "_ArrowBatchStream") < 0)
2✔
UNCOV
306
        return -1;
×
307
    return 0;
2✔
308
}
1✔
309

310
PyObject *make_arrow_batch_stream(std::shared_ptr<ArrowIteratorState> state) {
54✔
311
    auto *obj = (ArrowBatchStreamObject *)ArrowBatchStreamType.tp_alloc(
54✔
312
        &ArrowBatchStreamType, 0);
313
    if (!obj) return NULL;
54✔
314
    new (&obj->state) std::shared_ptr<ArrowIteratorState>(std::move(state));
54✔
315
    obj->consumed = false;
54✔
316
    return (PyObject *)obj;
54✔
317
}
27✔
318

319
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc