• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26043728131

18 May 2026 03:37PM UTC coverage: 51.706% (-0.4%) from 52.076%
26043728131

push

github

hariharan-devarajan
feat(perf): performance improvements for parallel reading, indexing, and aggregation

Indexer
- Streaming parse-and-emit worker pipeline with bounded memory usage
- Concurrent SST artifact ingestion with staging support
- Gzip member slicing for parallel indexing
- Lazy decoding for compressed value counts
- Bypass DOM wrapper for indexer hot path (simdjson on_demand)
- Decoupled write workers from parse workers
- --rebuild-summaries flag and optimized root summary rebuild

Aggregator / MPI
- Task-based DAG execution for aggregator pipeline
- Shared staging for multi-node artifact relocation
- Per-node thread scaling to avoid oversubscription
- Unified distributed aggregation tracking, removed manifest consolidation
- Deterministic aggregation and intra-file parallelism

Trace reader / query
- Compiled predicate evaluation for AND-of-EQ queries
- Uniform-match shortcut for AND-of-EQ queries
- Line-range support for work items and checkpoint processing
- Optimized chunk pruning and checkpoint handling

Replay
- Pipelined replay with coroutines and channels
- JsonParser-based trace processing
- Optimized string handling and i/o buffering

Organize / writer / dft
- Parallel slice creation and merging in organize visitor
- Inline indexer in organize
- Gzip member tracking in writer
- Coroutine-based event dispatcher with extracted parse logic
- Batch flushing in organize visitor

Arrow / call_tree
- Optimized arrow conversion
- Arrow IPC support and improved save/load in call_tree

Build / infrastructure
- zlib-ng option, system simdjson fallback
- cgroup v1/v2 memory limit detection
- Auto-computed per-file memory estimates and batch sizes
- CI: perf branch trigger, formatting

Docs
- Rewritten indexer and trace reader API references

35907 of 90345 branches covered (39.74%)

Branch coverage included in aggregate %.

16869 of 21880 new or added lines in 137 files covered. (77.1%)

273 existing lines in 39 files now uncovered.

32021 of 41028 relevant lines covered (78.05%)

13164.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.55
/src/dftracer/utils/python/trace_reader_iterator.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <Python.h>
3
#include <dftracer/utils/core/common/config.h>
4
#include <dftracer/utils/python/batch_byte_size.h>
5
#include <dftracer/utils/python/json.h>
6
#include <dftracer/utils/python/trace_reader_iterator.h>
7
#ifdef DFTRACER_UTILS_ENABLE_ARROW
8
#include <nanoarrow/nanoarrow.h>
9

10
using ArrowExportResult =
11
    dftracer::utils::utilities::common::arrow::ArrowExportResult;
12

13
static void release_arrow_schema(PyObject *capsule) {
124✔
14
    auto *schema = static_cast<ArrowSchema *>(
57✔
15
        PyCapsule_GetPointer(capsule, "arrow_schema"));
124✔
16
    if (schema && schema->release) {
124!
17
        schema->release(schema);
×
18
    }
19
    delete schema;
124✔
20
}
124✔
21

22
static void release_arrow_array(PyObject *capsule) {
124✔
23
    auto *array =
57✔
24
        static_cast<ArrowArray *>(PyCapsule_GetPointer(capsule, "arrow_array"));
124✔
25
    if (array && array->release) {
124!
26
        array->release(array);
×
27
    }
28
    delete array;
124✔
29
}
124✔
30

31
static PyObject *ArrowBatchCapsule_arrow_c_array(ArrowBatchCapsuleObject *self,
124✔
32
                                                 PyObject *args) {
33
    PyObject *requested_schema = Py_None;
124✔
34
    if (!PyArg_ParseTuple(args, "|O", &requested_schema)) return NULL;
124!
35

36
    if (!self->result || !self->result->valid()) {
124!
37
        PyErr_SetString(PyExc_RuntimeError,
×
38
                        "Arrow data already exported via __arrow_c_array__. "
39
                        "Each batch can only be exported once.");
40
        return NULL;
×
41
    }
42

43
    auto *schema = new ArrowSchema;
124!
44
    auto *array = new ArrowArray;
124!
45

46
    self->result->release_schema().move(schema);
124!
47
    self->result->release_array().move(array);
124!
48

49
    PyObject *schema_capsule =
57✔
50
        PyCapsule_New(schema, "arrow_schema", release_arrow_schema);
124!
51
    if (!schema_capsule) {
124✔
52
        if (schema->release) schema->release(schema);
×
53
        delete schema;
×
54
        if (array->release) array->release(array);
×
55
        delete array;
×
56
        return NULL;
×
57
    }
58

59
    PyObject *array_capsule =
57✔
60
        PyCapsule_New(array, "arrow_array", release_arrow_array);
124!
61
    if (!array_capsule) {
124✔
62
        Py_DECREF(schema_capsule);
63
        if (array->release) array->release(array);
×
64
        delete array;
×
65
        return NULL;
×
66
    }
67

68
    PyObject *tuple = PyTuple_Pack(2, schema_capsule, array_capsule);
124!
69
    Py_DECREF(schema_capsule);
57✔
70
    Py_DECREF(array_capsule);
57✔
71
    return tuple;
124✔
72
}
57✔
73

74
static PyObject *ArrowBatchCapsule_get_num_rows(ArrowBatchCapsuleObject *self,
94✔
75
                                                void *) {
76
    if (!self->result || !self->result->valid()) return PyLong_FromLong(0);
94!
77
    return PyLong_FromLongLong(self->result->num_rows());
94✔
78
}
47✔
79

80
static PyObject *ArrowBatchCapsule_get_num_columns(
4✔
81
    ArrowBatchCapsuleObject *self, void *) {
82
    if (!self->result || !self->result->valid()) return PyLong_FromLong(0);
4!
83
    return PyLong_FromLongLong(self->result->num_columns());
4✔
84
}
2✔
85

86
static void ArrowBatchCapsule_dealloc(ArrowBatchCapsuleObject *self) {
216✔
87
    delete self->result;
216✔
88
    Py_TYPE(self)->tp_free((PyObject *)self);
216✔
89
}
216✔
90

91
static PyMethodDef ArrowBatchCapsule_methods[] = {
92
    {"__arrow_c_array__", (PyCFunction)ArrowBatchCapsule_arrow_c_array,
93
     METH_VARARGS,
94
     "Export as Arrow C Data Interface PyCapsule pair (schema, array)"},
95
    {NULL}};
96

97
static PyGetSetDef ArrowBatchCapsule_getsetters[] = {
98
    {"num_rows", (getter)ArrowBatchCapsule_get_num_rows, NULL, "Number of rows",
99
     NULL},
100
    {"num_columns", (getter)ArrowBatchCapsule_get_num_columns, NULL,
101
     "Number of columns", NULL},
102
    {NULL}};
103

104
PyTypeObject ArrowBatchCapsuleType = {
105
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext._ArrowBatchCapsule",
106
    sizeof(ArrowBatchCapsuleObject),       /* tp_basicsize */
107
    0,                                     /* tp_itemsize */
108
    (destructor)ArrowBatchCapsule_dealloc, /* tp_dealloc */
109
    0,                                     /* tp_vectorcall_offset */
110
    0,                                     /* tp_getattr */
111
    0,                                     /* tp_setattr */
112
    0,                                     /* tp_as_async */
113
    0,                                     /* tp_repr */
114
    0,                                     /* tp_as_number */
115
    0,                                     /* tp_as_sequence */
116
    0,                                     /* tp_as_mapping */
117
    0,                                     /* tp_hash */
118
    0,                                     /* tp_call */
119
    0,                                     /* tp_str */
120
    0,                                     /* tp_getattro */
121
    0,                                     /* tp_setattro */
122
    0,                                     /* tp_as_buffer */
123
    Py_TPFLAGS_DEFAULT,                    /* tp_flags */
124
    "Internal Arrow batch wrapper implementing __arrow_c_array__ protocol",
125
    0,                                     /* tp_traverse */
126
    0,                                     /* tp_clear */
127
    0,                                     /* tp_richcompare */
128
    0,                                     /* tp_weaklistoffset */
129
    0,                                     /* tp_iter */
130
    0,                                     /* tp_iternext */
131
    ArrowBatchCapsule_methods,             /* tp_methods */
132
    0,                                     /* tp_members */
133
    ArrowBatchCapsule_getsetters,          /* tp_getset */
134
};
135

136
#endif                                     // DFTRACER_UTILS_ENABLE_ARROW
137

138
static void cancel_and_wait_batch_state(MemoryViewBatchIteratorState *bs) {
142✔
139
    bs->cancelled.store(true, std::memory_order_release);
142✔
140
    if (bs->channel) bs->channel->close();
142✔
141
    if (bs->task_future.valid()) bs->task_future.wait();
142✔
142
}
142✔
143

144
static void cancel_and_wait_json_dict_state(JsonDictIteratorState *js) {
14✔
145
    js->cancelled.store(true, std::memory_order_release);
14✔
146
    if (js->channel) js->channel->close();
14✔
147
    if (js->task_future.valid()) js->task_future.wait();
14✔
148
}
14✔
149

150
static void TraceReaderIterator_dealloc(TraceReaderIteratorObject *self) {
210✔
151
#ifdef DFTRACER_UTILS_ENABLE_ARROW
152
    if (self->arrow_state) {
210✔
153
        self->arrow_state->cancelled.store(true, std::memory_order_release);
54✔
154
        if (self->arrow_state->channel) self->arrow_state->channel->close();
54✔
155
        Py_BEGIN_ALLOW_THREADS if (self->arrow_state->task_future.valid()) {
54✔
156
            self->arrow_state->task_future.wait();
54✔
157
        }
27✔
158
        Py_END_ALLOW_THREADS self->arrow_state.reset();
54✔
159
    }
27✔
160
#endif
161
    if (self->json_dict_state) {
210✔
162
        Py_BEGIN_ALLOW_THREADS cancel_and_wait_json_dict_state(
21✔
163
            self->json_dict_state.get());
7✔
164
        Py_END_ALLOW_THREADS self->json_dict_state.reset();
14✔
165
    }
7✔
166
    if (self->batch_state) {
210✔
167
        Py_BEGIN_ALLOW_THREADS cancel_and_wait_batch_state(
213✔
168
            self->batch_state.get());
71✔
169
        Py_END_ALLOW_THREADS self->batch_state.reset();
142✔
170
    }
71✔
171
    Py_XDECREF(self->current_batch);
210✔
172
    self->current_batch = NULL;
210✔
173
    Py_TYPE(self)->tp_free((PyObject *)self);
210✔
174
}
210✔
175

176
static PyObject *TraceReaderIterator_iter(TraceReaderIteratorObject *self) {
206!
177
    Py_INCREF(self);
103✔
178
    return (PyObject *)self;
206✔
179
}
180

181
static PyObject *TraceReaderIterator_next(TraceReaderIteratorObject *self) {
3,072✔
182
    if (self->mode == IteratorMode::JSON_DICT) {
3,072✔
183
        while (true) {
262✔
184
            if (self->json_dict_current_batch) {
524✔
185
                auto &events = self->json_dict_current_batch->events;
510✔
186
                Py_ssize_t n = static_cast<Py_ssize_t>(events.size());
510✔
187
                if (self->json_dict_index < n) {
510✔
188
                    JsonDictValueObject *obj =
240✔
189
                        (JsonDictValueObject *)JsonDictValueType.tp_alloc(
480!
190
                            &JsonDictValueType, 0);
191
                    if (!obj) return NULL;
727✔
192
                    new (&obj->batch) std::shared_ptr<JsonDictBatch>(
720✔
193
                        self->json_dict_current_batch);
480✔
194
                    obj->event_index =
480✔
195
                        static_cast<std::size_t>(self->json_dict_index);
480✔
196
                    obj->is_args = false;
480✔
197
                    self->json_dict_index++;
480✔
198
                    return (PyObject *)obj;
480✔
199
                }
200
                self->json_dict_current_batch.reset();
30✔
201
                self->json_dict_index = 0;
30✔
202
            }
15✔
203

204
            auto *js = self->json_dict_state.get();
44✔
205
            std::optional<JsonDictBatch> batch;
44✔
206
            Py_BEGIN_ALLOW_THREADS batch = js->channel->blocking_receive();
44!
207
            Py_END_ALLOW_THREADS
44!
208

209
                if (!batch.has_value()) {
44✔
210
                std::lock_guard<std::mutex> lock(js->error_mtx);
14!
211
                if (js->error) {
14✔
212
                    try {
NEW
213
                        std::rethrow_exception(js->error);
×
NEW
214
                    } catch (const std::exception &e) {
×
NEW
215
                        PyErr_SetString(PyExc_RuntimeError, e.what());
×
NEW
216
                        return NULL;
×
NEW
217
                    } catch (...) {
×
NEW
218
                        PyErr_SetString(PyExc_RuntimeError,
×
219
                                        "Unknown error in json dict iterator");
NEW
220
                        return NULL;
×
NEW
221
                    }
×
222
                }
223
                return NULL;
14✔
224
            }
14✔
225

226
            auto dequeued_bytes = dftracer::utils::python::byte_size(*batch);
30!
227
            js->bytes_in_queue.fetch_sub(dequeued_bytes,
30✔
228
                                         std::memory_order_acq_rel);
229
            self->json_dict_current_batch =
15✔
230
                std::make_shared<JsonDictBatch>(std::move(*batch));
30!
231
            self->json_dict_index = 0;
30✔
232
        }
59✔
233
    }
234

235
#ifdef DFTRACER_UTILS_ENABLE_ARROW
236
    if (self->mode == IteratorMode::ARROW) {
2,578✔
237
        auto *astate = self->arrow_state.get();
142✔
238
        std::optional<ArrowExportResult> batch;
142✔
239
        Py_BEGIN_ALLOW_THREADS batch = astate->channel->blocking_receive();
142!
240
        Py_END_ALLOW_THREADS
142!
241

242
            if (!batch.has_value()) {
142✔
243
            std::lock_guard<std::mutex> lock(astate->error_mtx);
50!
244
            if (astate->error) {
50✔
245
                try {
246
                    std::rethrow_exception(astate->error);
×
247
                } catch (const std::exception &e) {
×
248
                    PyErr_SetString(PyExc_RuntimeError, e.what());
×
249
                    return NULL;
×
250
                } catch (...) {
×
251
                    PyErr_SetString(PyExc_RuntimeError,
×
252
                                    "Unknown error in Arrow iterator");
253
                    return NULL;
×
254
                }
×
255
            }
256
            return NULL;
50✔
257
        }
50✔
258

259
        auto dequeued_bytes = dftracer::utils::python::byte_size(*batch);
92!
260
        astate->bytes_in_queue.fetch_sub(dequeued_bytes,
92✔
261
                                         std::memory_order_acq_rel);
262

263
        ArrowBatchCapsuleObject *obj =
46✔
264
            (ArrowBatchCapsuleObject *)ArrowBatchCapsuleType.tp_alloc(
92!
265
                &ArrowBatchCapsuleType, 0);
266
        if (!obj) return NULL;
92!
267
        obj->result = new ArrowExportResult(std::move(*batch));
92!
268
        return (PyObject *)obj;
92✔
269
    }
142✔
270
#endif
271

272
    using namespace dftracer::utils::python;
273
    while (true) {
1,470✔
274
        if (self->current_batch) {
2,940✔
275
            auto *batch_obj = (MemoryViewBatchObject *)self->current_batch;
2,802✔
276
            Py_ssize_t n =
1,401✔
277
                static_cast<Py_ssize_t>(batch_obj->data->num_entries());
2,802✔
278
            if (self->batch_index < n) {
2,802✔
279
                PyObject *mv =
1,151✔
280
                    MemoryViewBatch_item(batch_obj, self->batch_index);
2,302!
281
                self->batch_index++;
2,302✔
282
                return mv;
2,369✔
283
            }
284
            Py_DECREF(self->current_batch);
500✔
285
            self->current_batch = NULL;
500✔
286
            self->batch_index = 0;
500✔
287
        }
250✔
288

289
        auto *bs = self->batch_state.get();
638✔
290
        std::optional<MemoryViewBatchData> batch_data;
638✔
291
        Py_BEGIN_ALLOW_THREADS batch_data = bs->channel->blocking_receive();
638!
292
        Py_END_ALLOW_THREADS
638!
293

294
            if (!batch_data.has_value()) {
638✔
295
            std::lock_guard<std::mutex> lock(bs->error_mtx);
134!
296
            if (bs->error) {
134✔
297
                try {
298
                    std::rethrow_exception(bs->error);
3!
299
                } catch (const std::exception &e) {
2!
300
                    PyErr_SetString(PyExc_RuntimeError, e.what());
2✔
301
                    return NULL;
2✔
302
                } catch (...) {
2!
303
                    PyErr_SetString(PyExc_RuntimeError,
×
304
                                    "Unknown error in batch iterator");
305
                    return NULL;
×
306
                }
1!
307
            }
308
            return NULL;
132✔
309
        }
134✔
310

311
        auto dequeued_bytes = dftracer::utils::python::byte_size(*batch_data);
504!
312
        bs->bytes_in_queue.fetch_sub(dequeued_bytes, std::memory_order_acq_rel);
504✔
313

314
        auto *obj = (MemoryViewBatchObject *)MemoryViewBatchType.tp_alloc(
504!
315
            &MemoryViewBatchType, 0);
316
        if (!obj) return NULL;
504✔
317
        obj->data = new MemoryViewBatchData(std::move(*batch_data));
504!
318
        self->current_batch = (PyObject *)obj;
504✔
319
        self->batch_index = 0;
504✔
320
    }
890✔
321
}
1,537✔
322

323
PyTypeObject TraceReaderIteratorType = {
324
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.TraceReaderIterator",
325
    sizeof(TraceReaderIteratorObject),       /* tp_basicsize */
326
    0,                                       /* tp_itemsize */
327
    (destructor)TraceReaderIterator_dealloc, /* tp_dealloc */
328
    0,                                       /* tp_vectorcall_offset */
329
    0,                                       /* tp_getattr */
330
    0,                                       /* tp_setattr */
331
    0,                                       /* tp_as_async */
332
    0,                                       /* tp_repr */
333
    0,                                       /* tp_as_number */
334
    0,                                       /* tp_as_sequence */
335
    0,                                       /* tp_as_mapping */
336
    0,                                       /* tp_hash */
337
    0,                                       /* tp_call */
338
    0,                                       /* tp_str */
339
    0,                                       /* tp_getattro */
340
    0,                                       /* tp_setattro */
341
    0,                                       /* tp_as_buffer */
342
    Py_TPFLAGS_DEFAULT,                      /* tp_flags */
343
    "Lazy iterator over TraceReader lines or raw chunks",
344
    0,                                       /* tp_traverse */
345
    0,                                       /* tp_clear */
346
    0,                                       /* tp_richcompare */
347
    0,                                       /* tp_weaklistoffset */
348
    (getiterfunc)TraceReaderIterator_iter,   /* tp_iter */
349
    (iternextfunc)TraceReaderIterator_next,  /* tp_iternext */
350
    0,                                       /* tp_methods */
351
    0,                                       /* tp_members */
352
    0,                                       /* tp_getset */
353
    0,                                       /* tp_base */
354
    0,                                       /* tp_dict */
355
    0,                                       /* tp_descr_get */
356
    0,                                       /* tp_descr_set */
357
    0,                                       /* tp_dictoffset */
358
    0,                                       /* tp_init */
359
    0,                                       /* tp_alloc */
360
    0,                                       /* tp_new */
361
};
362

363
int init_trace_reader_iterator(PyObject *m) {
2✔
364
    if (PyType_Ready(&TraceReaderIteratorType) < 0) return -1;
2✔
365

366
    Py_INCREF(&TraceReaderIteratorType);
1✔
367
    if (PyModule_AddObject(m, "TraceReaderIterator",
3!
368
                           (PyObject *)&TraceReaderIteratorType) < 0) {
2!
369
        Py_DECREF(&TraceReaderIteratorType);
370
        Py_DECREF(m);
371
        return -1;
×
372
    }
373

374
#ifdef DFTRACER_UTILS_ENABLE_ARROW
375
    if (PyType_Ready(&ArrowBatchCapsuleType) < 0) return -1;
2✔
376
    Py_INCREF(&ArrowBatchCapsuleType);
1✔
377
    if (PyModule_AddObject(m, "_ArrowBatchCapsule",
3!
378
                           (PyObject *)&ArrowBatchCapsuleType) < 0) {
2!
379
        Py_DECREF(&ArrowBatchCapsuleType);
380
        return -1;
×
381
    }
382
#endif
383

384
    return 0;
2✔
385
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc