• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26043728131

18 May 2026 03:37PM UTC coverage: 51.706% (-0.4%) from 52.076%
26043728131

push

github

hariharan-devarajan
feat(perf): performance improvements for parallel reading, indexing, and aggregation

Indexer
- Streaming parse-and-emit worker pipeline with bounded memory usage
- Concurrent SST artifact ingestion with staging support
- Gzip member slicing for parallel indexing
- Lazy decoding for compressed value counts
- Bypass DOM wrapper for indexer hot path (simdjson on_demand)
- Decoupled write workers from parse workers
- --rebuild-summaries flag and optimized root summary rebuild

Aggregator / MPI
- Task-based DAG execution for aggregator pipeline
- Shared staging for multi-node artifact relocation
- Per-node thread scaling to avoid oversubscription
- Unified distributed aggregation tracking, removed manifest consolidation
- Deterministic aggregation and intra-file parallelism

Trace reader / query
- Compiled predicate evaluation for AND-of-EQ queries
- Uniform-match shortcut for AND-of-EQ queries
- Line-range support for work items and checkpoint processing
- Optimized chunk pruning and checkpoint handling

Replay
- Pipelined replay with coroutines and channels
- JsonParser-based trace processing
- Optimized string handling and i/o buffering

Organize / writer / dft
- Parallel slice creation and merging in organize visitor
- Inline indexer in organize
- Gzip member tracking in writer
- Coroutine-based event dispatcher with extracted parse logic
- Batch flushing in organize visitor

Arrow / call_tree
- Optimized arrow conversion
- Arrow IPC support and improved save/load in call_tree

Build / infrastructure
- zlib-ng option, system simdjson fallback
- cgroup v1/v2 memory limit detection
- Auto-computed per-file memory estimates and batch sizes
- CI: perf branch trigger, formatting

Docs
- Rewritten indexer and trace reader API references

35907 of 90345 branches covered (39.74%)

Branch coverage included in aggregate %.

16869 of 21880 new or added lines in 137 files covered. (77.1%)

273 existing lines in 39 files now uncovered.

32021 of 41028 relevant lines covered (78.05%)

13164.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.57
/src/dftracer/utils/python/arrow_stream_capsule.cpp
1
#include <dftracer/utils/core/common/config.h>
2
#ifdef DFTRACER_UTILS_ENABLE_ARROW
3

4
#define PY_SSIZE_T_CLEAN
5
#include <Python.h>
6
#include <dftracer/utils/python/arrow_stream_capsule.h>
7
#include <dftracer/utils/python/batch_byte_size.h>
8
#include <dftracer/utils/python/schema_reconcile.h>
9
#include <nanoarrow/nanoarrow.h>
10

11
#include <cerrno>
12
#include <cstring>
13
#include <deque>
14
#include <exception>
15
#include <mutex>
16
#include <optional>
17
#include <string>
18

19
using ArrowExportResult =
20
    dftracer::utils::utilities::common::arrow::ArrowExportResult;
21

22
namespace {
23

24
// Drain until K consecutive batches add no new columns, bounded by MAX.
25
constexpr int STABLE_BATCHES = 5;
26
constexpr int MAX_DRAIN = 128;
27

28
struct StreamPrivate {
50✔
29
    std::shared_ptr<ArrowIteratorState> state;
30
    dftracer::utils::python::SchemaReconciler reconciler;
31
    // Drained during discovery, emitted first from get_next.
32
    std::deque<ArrowExportResult> pending;
33
    std::string last_error;
34
    bool initialized = false;
25✔
35
    // Sticky: once set, all entry points short-circuit to EIO.
36
    bool error_set = false;
25✔
37
};
38

NEW
39
static void mark_error(StreamPrivate *p, std::string msg) {
×
NEW
40
    if (p->last_error.empty()) p->last_error = std::move(msg);
×
NEW
41
    p->error_set = true;
×
NEW
42
    p->initialized = true;
×
NEW
43
}
×
44

45
static int initialize_stream(StreamPrivate *p) {
48✔
46
    if (p->error_set) return EIO;
48!
47
    if (p->initialized) return 0;
48!
48
    auto *astate = p->state.get();
48✔
49

50
    int stable_run = 0;
48✔
51
    int drained = 0;
48✔
52
    while (stable_run < STABLE_BATCHES && drained < MAX_DRAIN) {
178✔
53
        auto batch = astate->channel->blocking_receive();
176!
54
        if (!batch.has_value()) {
176✔
55
            // End-of-stream or producer error before discovery converged.
56
            std::lock_guard<std::mutex> lock(astate->error_mtx);
46!
57
            if (astate->error) {
46✔
58
                try {
NEW
59
                    std::rethrow_exception(astate->error);
×
NEW
60
                } catch (const std::exception &e) {
×
NEW
61
                    mark_error(p, e.what());
×
NEW
62
                } catch (...) {
×
NEW
63
                    mark_error(p, "unknown error in Arrow stream");
×
NEW
64
                }
×
NEW
65
                return EIO;
×
66
            }
67
            break;  // clean early EOS; finalize with whatever we have
46✔
68
        }
46!
69
        auto dequeued = dftracer::utils::python::byte_size(*batch);
130!
70
        astate->bytes_in_queue.fetch_sub(dequeued, std::memory_order_acq_rel);
130✔
71

72
        bool added = p->reconciler.merge(batch->get_schema());
130!
73
        if (!p->reconciler.last_error().empty()) {
130!
NEW
74
            mark_error(p, p->reconciler.last_error());
×
NEW
75
            return EIO;
×
76
        }
77
        p->pending.push_back(std::move(*batch));
130!
78
        stable_run = added ? 0 : (stable_run + 1);
130✔
79
        ++drained;
130✔
80
    }
176!
81

82
    if (p->reconciler.finalize() != 0) {
48!
NEW
83
        mark_error(p, p->reconciler.last_error().empty()
×
NEW
84
                          ? "failed to finalize schema union"
×
NEW
85
                          : p->reconciler.last_error());
×
NEW
86
        return EIO;
×
87
    }
88
    p->initialized = true;
48✔
89
    return 0;
48✔
90
}
24✔
91

92
static int stream_get_schema(struct ArrowArrayStream *s,
48✔
93
                             struct ArrowSchema *out) {
94
    auto *p = static_cast<StreamPrivate *>(s->private_data);
48✔
95
    int rc = initialize_stream(p);
48✔
96
    if (rc != 0) return rc;
48!
97
    if (p->error_set) return EIO;
48!
98
    if (p->reconciler.copy_schema(out) != 0) {
48!
NEW
99
        mark_error(p, p->reconciler.last_error().empty()
×
NEW
100
                          ? "failed to copy locked schema"
×
NEW
101
                          : p->reconciler.last_error());
×
NEW
102
        return EIO;
×
103
    }
104
    return 0;
48✔
105
}
24✔
106

107
static int stream_get_next(struct ArrowArrayStream *s, struct ArrowArray *out) {
170✔
108
    auto *p = static_cast<StreamPrivate *>(s->private_data);
170✔
109
    if (p->error_set) return EIO;
170!
110
    if (!p->initialized) {
170✔
NEW
111
        int rc = initialize_stream(p);
×
NEW
112
        if (rc != 0) return rc;
×
113
    }
114

115
    // Drain any discovery-phase batches first, then pull from the channel.
116
    std::optional<ArrowExportResult> batch;
170✔
117
    if (!p->pending.empty()) {
170✔
118
        batch = std::move(p->pending.front());
126!
119
        p->pending.pop_front();
126!
120
    } else {
63✔
121
        auto *astate = p->state.get();
44✔
122
        batch = astate->channel->blocking_receive();
44!
123
        if (!batch.has_value()) {
44!
124
            std::lock_guard<std::mutex> lock(astate->error_mtx);
44!
125
            if (astate->error) {
44✔
126
                try {
NEW
127
                    std::rethrow_exception(astate->error);
×
NEW
128
                } catch (const std::exception &e) {
×
NEW
129
                    mark_error(p, e.what());
×
NEW
130
                } catch (...) {
×
NEW
131
                    mark_error(p, "unknown error in Arrow stream");
×
NEW
132
                }
×
NEW
133
                return EIO;
×
134
            }
135
            // End of stream per Arrow C spec: return success with
136
            // out->release == nullptr.
137
            out->release = nullptr;
44✔
138
            return 0;
44✔
139
        }
44✔
NEW
140
        auto dequeued = dftracer::utils::python::byte_size(*batch);
×
NEW
141
        astate->bytes_in_queue.fetch_sub(dequeued, std::memory_order_acq_rel);
×
142
    }
143

144
    if (p->reconciler.reconcile(batch->get_schema(), batch->get_array(), out) !=
126!
145
        0) {
NEW
146
        mark_error(p, p->reconciler.last_error().empty()
×
NEW
147
                          ? "schema reconciliation failed"
×
NEW
148
                          : p->reconciler.last_error());
×
NEW
149
        return EIO;
×
150
    }
151
    return 0;
126✔
152
}
170✔
153

NEW
154
static const char *stream_get_last_error(struct ArrowArrayStream *s) {
×
NEW
155
    auto *p = static_cast<StreamPrivate *>(s->private_data);
×
NEW
156
    if (!p || p->last_error.empty()) return nullptr;
×
NEW
157
    return p->last_error.c_str();
×
158
}
159

160
static void stream_release(struct ArrowArrayStream *s) {
50✔
161
    auto *p = static_cast<StreamPrivate *>(s->private_data);
50✔
162
    if (p) {
50✔
163
        if (p->state) {
50✔
164
            p->state->cancelled.store(true, std::memory_order_release);
50✔
165
            if (p->state->channel) p->state->channel->close();
50✔
166
            if (p->state->task_future.valid()) {
50✔
167
                // Release the GIL if this callback was invoked from a
168
                // Python-holding context (e.g. capsule destructor during
169
                // GC). If the GIL is not held (pyarrow's C reader path),
170
                // _PyThreadState_UncheckedGet() returns null and we wait
171
                // without touching the Python thread state.
172
                if (Py_IsInitialized() && PyGILState_Check()) {
50!
173
                    Py_BEGIN_ALLOW_THREADS p->state->task_future.wait();
2✔
174
                    Py_END_ALLOW_THREADS
2✔
175
                } else {
1✔
176
                    p->state->task_future.wait();
48✔
177
                }
178
            }
25✔
179
        }
25✔
180
        delete p;
50✔
181
    }
25✔
182
    s->private_data = nullptr;
50✔
183
    s->release = nullptr;
50✔
184
}
50✔
185

186
static void release_stream_capsule(PyObject *capsule) {
50✔
187
    auto *stream = static_cast<ArrowArrayStream *>(
25✔
188
        PyCapsule_GetPointer(capsule, "arrow_array_stream"));
50✔
189
    if (stream && stream->release) {
50!
190
        stream->release(stream);
2✔
191
    }
1✔
192
    delete stream;
50✔
193
}
50✔
194

195
static PyObject *ArrowBatchStream_arrow_c_stream(ArrowBatchStreamObject *self,
52✔
196
                                                 PyObject *args) {
197
    PyObject *requested_schema = Py_None;
52✔
198
    if (!PyArg_ParseTuple(args, "|O", &requested_schema)) return NULL;
52!
199

200
    // Per the PyCapsule protocol, a non-None `requested_schema` means the
201
    // caller wants the stream cast to that schema. We only emit our native
202
    // schema today; reject explicitly so misuse fails loudly instead of
203
    // silently returning arrays that don't match what the caller asked for.
204
    if (requested_schema != Py_None) {
52!
NEW
205
        PyErr_SetString(PyExc_NotImplementedError,
×
206
                        "iter_arrow_stream does not support "
207
                        "requested_schema casting; pass None to use the "
208
                        "native schema.");
NEW
209
        return NULL;
×
210
    }
211

212
    if (self->consumed || !self->state) {
52!
213
        PyErr_SetString(PyExc_RuntimeError,
2!
214
                        "Arrow stream already exported via "
215
                        "__arrow_c_stream__; each stream can be "
216
                        "exported only once.");
217
        return NULL;
2✔
218
    }
219

220
    auto *priv = new StreamPrivate;
50!
221
    priv->state = self->state;
50✔
222
    self->consumed = true;
50✔
223
    self->state.reset();
50✔
224

225
    auto *stream = new ArrowArrayStream;
50!
226
    std::memset(stream, 0, sizeof(*stream));
50✔
227
    stream->get_schema = stream_get_schema;
50✔
228
    stream->get_next = stream_get_next;
50✔
229
    stream->get_last_error = stream_get_last_error;
50✔
230
    stream->release = stream_release;
50✔
231
    stream->private_data = priv;
50✔
232

233
    PyObject *capsule =
25✔
234
        PyCapsule_New(stream, "arrow_array_stream", release_stream_capsule);
50!
235
    if (!capsule) {
50✔
NEW
236
        stream->release(stream);
×
NEW
237
        delete stream;
×
NEW
238
        return NULL;
×
239
    }
240
    return capsule;
50✔
241
}
26✔
242

243
static void ArrowBatchStream_dealloc(ArrowBatchStreamObject *self) {
54✔
244
    if (self->state) {
54✔
245
        self->state->cancelled.store(true, std::memory_order_release);
4✔
246
        if (self->state->channel) self->state->channel->close();
4✔
247
        Py_BEGIN_ALLOW_THREADS if (self->state->task_future.valid()) {
4✔
248
            self->state->task_future.wait();
4✔
249
        }
2✔
250
        Py_END_ALLOW_THREADS
4✔
251
    }
2✔
252
    self->state.~shared_ptr<ArrowIteratorState>();
54✔
253
    Py_TYPE(self)->tp_free((PyObject *)self);
54✔
254
}
54✔
255

256
static PyMethodDef ArrowBatchStream_methods[] = {
257
    {"__arrow_c_stream__", (PyCFunction)ArrowBatchStream_arrow_c_stream,
258
     METH_VARARGS, "Export as Arrow C Data Interface stream PyCapsule"},
259
    {NULL}};
260

261
}  // namespace
262

263
PyTypeObject ArrowBatchStreamType = {
264
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext._ArrowBatchStream",
265
    sizeof(ArrowBatchStreamObject), /* tp_basicsize */
266
    0,                              /* tp_itemsize */
267
    (destructor)ArrowBatchStream_dealloc,
268
    0,
269
    0,
270
    0,
271
    0,
272
    0,
273
    0,
274
    0,
275
    0,
276
    0,
277
    0,
278
    0,
279
    0,
280
    0,
281
    0,
282
    Py_TPFLAGS_DEFAULT,
283
    "Zero-iteration Arrow stream backed by a C++ coroutine channel",
284
    0,
285
    0,
286
    0,
287
    0,
288
    0,
289
    0,
290
    ArrowBatchStream_methods,
291
    0,
292
    0,
293
    0,
294
    0,
295
    0,
296
    0,
297
    0,
298
    0,
299
    0,
300
    0,
301
};
302

303
int init_arrow_batch_stream(PyObject *m) {
2✔
304
    if (PyType_Ready(&ArrowBatchStreamType) < 0) return -1;
2✔
305
    Py_INCREF(&ArrowBatchStreamType);
1✔
306
    if (PyModule_AddObject(m, "_ArrowBatchStream",
3!
307
                           (PyObject *)&ArrowBatchStreamType) < 0) {
2!
308
        Py_DECREF(&ArrowBatchStreamType);
NEW
309
        return -1;
×
310
    }
311
    return 0;
2✔
312
}
1✔
313

314
PyObject *make_arrow_batch_stream(std::shared_ptr<ArrowIteratorState> state) {
54✔
315
    auto *obj = (ArrowBatchStreamObject *)ArrowBatchStreamType.tp_alloc(
54✔
316
        &ArrowBatchStreamType, 0);
317
    if (!obj) return NULL;
54✔
318
    new (&obj->state) std::shared_ptr<ArrowIteratorState>(std::move(state));
54✔
319
    obj->consumed = false;
54✔
320
    return (PyObject *)obj;
54✔
321
}
27✔
322

323
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc