• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26043728131

18 May 2026 03:37PM UTC coverage: 51.706% (-0.4%) from 52.076%
26043728131

push

github

hariharan-devarajan
feat(perf): performance improvements for parallel reading, indexing, and aggregation

Indexer
- Streaming parse-and-emit worker pipeline with bounded memory usage
- Concurrent SST artifact ingestion with staging support
- Gzip member slicing for parallel indexing
- Lazy decoding for compressed value counts
- Bypass DOM wrapper for indexer hot path (simdjson on_demand)
- Decoupled write workers from parse workers
- --rebuild-summaries flag and optimized root summary rebuild

Aggregator / MPI
- Task-based DAG execution for aggregator pipeline
- Shared staging for multi-node artifact relocation
- Per-node thread scaling to avoid oversubscription
- Unified distributed aggregation tracking, removed manifest consolidation
- Deterministic aggregation and intra-file parallelism

Trace reader / query
- Compiled predicate evaluation for AND-of-EQ queries
- Uniform-match shortcut for AND-of-EQ queries
- Line-range support for work items and checkpoint processing
- Optimized chunk pruning and checkpoint handling

Replay
- Pipelined replay with coroutines and channels
- JsonParser-based trace processing
- Optimized string handling and i/o buffering

Organize / writer / dft
- Parallel slice creation and merging in organize visitor
- Inline indexer in organize
- Gzip member tracking in writer
- Coroutine-based event dispatcher with extracted parse logic
- Batch flushing in organize visitor

Arrow / call_tree
- Optimized arrow conversion
- Arrow IPC support and improved save/load in call_tree

Build / infrastructure
- zlib-ng option, system simdjson fallback
- cgroup v1/v2 memory limit detection
- Auto-computed per-file memory estimates and batch sizes
- CI: perf branch trigger, formatting

Docs
- Rewritten indexer and trace reader API references

35907 of 90345 branches covered (39.74%)

Branch coverage included in aggregate %.

16869 of 21880 new or added lines in 137 files covered. (77.1%)

273 existing lines in 39 files now uncovered.

32021 of 41028 relevant lines covered (78.05%)

13164.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.01
/src/dftracer/utils/python/index_database.cpp
1
#include <dftracer/utils/python/index_database.h>
2
#include <dftracer/utils/python/sst_distribution.h>
3
#include <dftracer/utils/utilities/indexer/index_database.h>
4
#include <dftracer/utils/utilities/indexer/index_database_sst_writer_context.h>
5

6
#include <new>
7
#include <string>
8
#include <unordered_set>
9
#include <vector>
10

11
using dftracer::utils::utilities::indexer::IndexDatabase;
12
using dftracer::utils::utilities::indexer::SstArtifactRegistry;
13

NEW
14
static void IndexDatabase_dealloc(IndexDatabaseObject *self) {
×
NEW
15
    self->db.~shared_ptr<IndexDatabase>();
×
NEW
16
    Py_TYPE(self)->tp_free((PyObject *)self);
×
NEW
17
}
×
18

NEW
19
static PyObject *IndexDatabase_new(PyTypeObject *type, PyObject * /*args*/,
×
20
                                   PyObject * /*kwds*/) {
NEW
21
    auto *self = (IndexDatabaseObject *)type->tp_alloc(type, 0);
×
NEW
22
    if (!self) return NULL;
×
NEW
23
    new (&self->db) std::shared_ptr<IndexDatabase>();
×
NEW
24
    return (PyObject *)self;
×
25
}
26

NEW
27
static int IndexDatabase_init(IndexDatabaseObject *self, PyObject *args,
×
28
                              PyObject *kwds) {
29
    static const char *kwlist[] = {"index_path", NULL};
30
    const char *index_path;
NEW
31
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s", (char **)kwlist,
×
32
                                     &index_path)) {
NEW
33
        return -1;
×
34
    }
35
    try {
NEW
36
        self->db = std::make_shared<IndexDatabase>(index_path);
×
NEW
37
    } catch (const std::exception &e) {
×
NEW
38
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
NEW
39
        return -1;
×
NEW
40
    }
×
NEW
41
    return 0;
×
42
}
43

NEW
44
static PyObject *IndexDatabase_init_schema(IndexDatabaseObject *self,
×
45
                                           PyObject * /*ignored*/) {
NEW
46
    if (!self->db) {
×
NEW
47
        PyErr_SetString(PyExc_RuntimeError, "IndexDatabase not initialised");
×
NEW
48
        return NULL;
×
49
    }
50
    try {
NEW
51
        Py_BEGIN_ALLOW_THREADS self->db->init_schema();
×
NEW
52
        Py_END_ALLOW_THREADS
×
NEW
53
    } catch (const std::exception &e) {
×
NEW
54
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
NEW
55
        return NULL;
×
NEW
56
    }
×
NEW
57
    Py_RETURN_NONE;
×
58
}
59

NEW
60
static PyObject *IndexDatabase_register_files(IndexDatabaseObject *self,
×
61
                                              PyObject *args, PyObject *kwds) {
62
    static const char *kwlist[] = {"paths", "build_manifest", NULL};
63
    PyObject *paths_obj;
NEW
64
    int build_manifest = 0;
×
NEW
65
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|p", (char **)kwlist,
×
66
                                     &paths_obj, &build_manifest)) {
NEW
67
        return NULL;
×
68
    }
NEW
69
    std::vector<std::string> paths;
×
NEW
70
    PyObject *seq = PySequence_Fast(paths_obj, "paths must be a sequence");
×
NEW
71
    if (!seq) return NULL;
×
NEW
72
    Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
NEW
73
    paths.reserve(n);
×
NEW
74
    for (Py_ssize_t i = 0; i < n; ++i) {
×
NEW
75
        PyObject *item = PySequence_Fast_GET_ITEM(seq, i);
×
NEW
76
        const char *s = PyUnicode_AsUTF8(item);
×
NEW
77
        if (!s) {
×
78
            Py_DECREF(seq);
×
NEW
79
            return NULL;
×
80
        }
NEW
81
        paths.emplace_back(s);
×
82
    }
83
    Py_DECREF(seq);
×
84

NEW
85
    std::vector<int> ids;
×
86
    try {
NEW
87
        Py_BEGIN_ALLOW_THREADS ids =
×
NEW
88
            self->db->register_files(paths, build_manifest != 0);
×
NEW
89
        Py_END_ALLOW_THREADS
×
NEW
90
    } catch (const std::exception &e) {
×
NEW
91
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
NEW
92
        return NULL;
×
NEW
93
    }
×
94

NEW
95
    PyObject *out = PyList_New(static_cast<Py_ssize_t>(ids.size()));
×
NEW
96
    if (!out) return NULL;
×
NEW
97
    for (Py_ssize_t i = 0; i < static_cast<Py_ssize_t>(ids.size()); ++i) {
×
NEW
98
        PyList_SET_ITEM(out, i, PyLong_FromLong(ids[i]));
×
99
    }
NEW
100
    return out;
×
NEW
101
}
×
102

NEW
103
static PyObject *IndexDatabase_reserve_file_id_range(IndexDatabaseObject *self,
×
104
                                                     PyObject *args) {
105
    Py_ssize_t count;
NEW
106
    if (!PyArg_ParseTuple(args, "n", &count)) return NULL;
×
NEW
107
    if (count < 0) {
×
NEW
108
        PyErr_SetString(PyExc_ValueError, "count must be >= 0");
×
NEW
109
        return NULL;
×
110
    }
111
    int first;
112
    try {
NEW
113
        Py_BEGIN_ALLOW_THREADS first =
×
NEW
114
            self->db->reserve_file_id_range(static_cast<std::size_t>(count));
×
NEW
115
        Py_END_ALLOW_THREADS
×
NEW
116
    } catch (const std::exception &e) {
×
NEW
117
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
NEW
118
        return NULL;
×
NEW
119
    }
×
NEW
120
    return PyLong_FromLong(first);
×
121
}
122

NEW
123
static PyObject *IndexDatabase_bulk_ingest(IndexDatabaseObject *self,
×
124
                                           PyObject *args, PyObject *kwds) {
125
    static const char *kwlist[] = {"registry", "skip_cfs", NULL};
126
    PyObject *registry_obj;
NEW
127
    PyObject *skip_cfs_obj = NULL;
×
NEW
128
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|O", (char **)kwlist,
×
129
                                     &registry_obj, &skip_cfs_obj)) {
NEW
130
        return NULL;
×
131
    }
132

NEW
133
    SstArtifactRegistry *registry = sst_artifact_registry_get(registry_obj);
×
NEW
134
    if (!registry) {
×
NEW
135
        PyErr_SetString(PyExc_TypeError,
×
136
                        "expected an SstArtifactRegistry instance");
NEW
137
        return NULL;
×
138
    }
139

NEW
140
    std::unordered_set<std::string> skip_cfs;
×
NEW
141
    if (skip_cfs_obj && skip_cfs_obj != Py_None) {
×
142
        PyObject *seq =
NEW
143
            PySequence_Fast(skip_cfs_obj, "skip_cfs must be an iterable");
×
NEW
144
        if (!seq) return NULL;
×
NEW
145
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
NEW
146
        for (Py_ssize_t i = 0; i < n; ++i) {
×
NEW
147
            PyObject *item = PySequence_Fast_GET_ITEM(seq, i);
×
NEW
148
            const char *s = PyUnicode_AsUTF8(item);
×
NEW
149
            if (!s) {
×
150
                Py_DECREF(seq);
×
NEW
151
                return NULL;
×
152
            }
NEW
153
            skip_cfs.emplace(s);
×
154
        }
155
        Py_DECREF(seq);
×
156
    }
157

158
    try {
NEW
159
        Py_BEGIN_ALLOW_THREADS self->db->bulk_ingest(*registry, skip_cfs);
×
NEW
160
        Py_END_ALLOW_THREADS
×
NEW
161
    } catch (const std::exception &e) {
×
NEW
162
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
NEW
163
        return NULL;
×
NEW
164
    }
×
NEW
165
    Py_RETURN_NONE;
×
NEW
166
}
×
167

NEW
168
static PyObject *IndexDatabase_write_agg_file_markers(IndexDatabaseObject *self,
×
169
                                                      PyObject *args) {
170
    PyObject *ids_obj;
NEW
171
    if (!PyArg_ParseTuple(args, "O", &ids_obj)) return NULL;
×
172

NEW
173
    PyObject *seq = PySequence_Fast(ids_obj, "file_ids must be an iterable");
×
NEW
174
    if (!seq) return NULL;
×
NEW
175
    Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
NEW
176
    std::vector<int> file_ids;
×
NEW
177
    file_ids.reserve(static_cast<std::size_t>(n));
×
NEW
178
    for (Py_ssize_t i = 0; i < n; ++i) {
×
NEW
179
        PyObject *item = PySequence_Fast_GET_ITEM(seq, i);
×
NEW
180
        long v = PyLong_AsLong(item);
×
NEW
181
        if (v == -1 && PyErr_Occurred()) {
×
182
            Py_DECREF(seq);
×
NEW
183
            return NULL;
×
184
        }
NEW
185
        file_ids.push_back(static_cast<int>(v));
×
186
    }
187
    Py_DECREF(seq);
×
188

189
    try {
NEW
190
        Py_BEGIN_ALLOW_THREADS self->db->write_agg_file_markers(file_ids);
×
NEW
191
        Py_END_ALLOW_THREADS
×
NEW
192
    } catch (const std::exception &e) {
×
NEW
193
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
NEW
194
        return NULL;
×
NEW
195
    }
×
NEW
196
    Py_RETURN_NONE;
×
NEW
197
}
×
198

NEW
199
static PyObject *IndexDatabase_write_agg_global_config(
×
200
    IndexDatabaseObject *self, PyObject *args, PyObject *kwds) {
201
    static const char *kwlist[] = {"time_interval_us", "config_hash", NULL};
NEW
202
    unsigned long long time_interval_us = 0;
×
NEW
203
    unsigned int config_hash = 0;
×
NEW
204
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "K|I", (char **)kwlist,
×
205
                                     &time_interval_us, &config_hash)) {
NEW
206
        return NULL;
×
207
    }
208
    try {
NEW
209
        Py_BEGIN_ALLOW_THREADS self->db->write_agg_global_config(
×
210
            static_cast<std::uint64_t>(time_interval_us),
211
            static_cast<std::uint32_t>(config_hash));
NEW
212
        Py_END_ALLOW_THREADS
×
NEW
213
    } catch (const std::exception &e) {
×
NEW
214
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
NEW
215
        return NULL;
×
NEW
216
    }
×
NEW
217
    Py_RETURN_NONE;
×
218
}
219

NEW
220
static PyObject *IndexDatabase_write_aggregation_tracker(
×
221
    IndexDatabaseObject *self, PyObject *args) {
222
    PyObject *blobs_obj;
NEW
223
    if (!PyArg_ParseTuple(args, "O", &blobs_obj)) return NULL;
×
NEW
224
    PyObject *seq = PySequence_Fast(blobs_obj, "blobs must be an iterable");
×
NEW
225
    if (!seq) return NULL;
×
NEW
226
    Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
NEW
227
    std::vector<std::string> blobs;
×
NEW
228
    blobs.reserve(static_cast<std::size_t>(n));
×
NEW
229
    for (Py_ssize_t i = 0; i < n; ++i) {
×
NEW
230
        PyObject *item = PySequence_Fast_GET_ITEM(seq, i);
×
NEW
231
        if (item == Py_None) continue;
×
NEW
232
        char *buf = nullptr;
×
NEW
233
        Py_ssize_t len = 0;
×
NEW
234
        if (PyBytes_Check(item)) {
×
NEW
235
            if (PyBytes_AsStringAndSize(item, &buf, &len) < 0) {
×
236
                Py_DECREF(seq);
×
NEW
237
                return NULL;
×
238
            }
239
        } else {
240
            Py_DECREF(seq);
×
NEW
241
            PyErr_SetString(PyExc_TypeError,
×
242
                            "blobs entries must be bytes or None");
NEW
243
            return NULL;
×
244
        }
NEW
245
        if (len > 0) blobs.emplace_back(buf, static_cast<std::size_t>(len));
×
246
    }
247
    Py_DECREF(seq);
×
248
    try {
NEW
249
        Py_BEGIN_ALLOW_THREADS self->db->write_aggregation_tracker(blobs);
×
NEW
250
        Py_END_ALLOW_THREADS
×
NEW
251
    } catch (const std::exception &e) {
×
NEW
252
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
NEW
253
        return NULL;
×
NEW
254
    }
×
NEW
255
    Py_RETURN_NONE;
×
NEW
256
}
×
257

NEW
258
static PyObject *IndexDatabase_rebuild_root_summaries(IndexDatabaseObject *self,
×
259
                                                      PyObject * /*ignored*/) {
260
    try {
NEW
261
        Py_BEGIN_ALLOW_THREADS self->db->rebuild_root_summaries();
×
NEW
262
        Py_END_ALLOW_THREADS
×
NEW
263
    } catch (const std::exception &e) {
×
NEW
264
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
NEW
265
        return NULL;
×
NEW
266
    }
×
NEW
267
    Py_RETURN_NONE;
×
268
}
269

270
static PyMethodDef IndexDatabase_methods[] = {
271
    {"init_schema", (PyCFunction)IndexDatabase_init_schema, METH_NOARGS,
272
     "Idempotently initialise the schema version key."},
273
    {"register_files", (PyCFunction)IndexDatabase_register_files,
274
     METH_VARARGS | METH_KEYWORDS,
275
     "register_files(paths, build_manifest=False) -> list[int]\n"
276
     "Register each path in the DEFAULT-CF file registry and return the "
277
     "assigned file_ids. Idempotent for files with matching hash."},
278
    {"reserve_file_id_range", (PyCFunction)IndexDatabase_reserve_file_id_range,
279
     METH_VARARGS,
280
     "reserve_file_id_range(count) -> int\n"
281
     "Atomically reserve `count` contiguous file_ids, return the first."},
282
    {"bulk_ingest", (PyCFunction)IndexDatabase_bulk_ingest,
283
     METH_VARARGS | METH_KEYWORDS,
284
     "bulk_ingest(registry, skip_cfs=None) -> None\n"
285
     "Ingest all SSTs collected in the SstArtifactRegistry.\n"
286
     "skip_cfs is an optional iterable of CF names whose SSTs are left "
287
     "outside the unified DB (used by distributed builds to keep "
288
     "AGGREGATION/SYSTEM_METRICS SSTs addressable by manifest)."},
289
    {"rebuild_root_summaries",
290
     (PyCFunction)IndexDatabase_rebuild_root_summaries, METH_NOARGS,
291
     "Recompute ROOT_* summary column families from per-file CFs."},
292
    {"write_agg_global_config",
293
     (PyCFunction)IndexDatabase_write_agg_global_config,
294
     METH_VARARGS | METH_KEYWORDS,
295
     "write_agg_global_config(time_interval_us, config_hash=0) -> None\n"
296
     "Write the AGG_GLOBAL_CONFIG_KEY marker into the AGGREGATION CF. "
297
     "Required for `iter_arrow_dfanalyzer_all` on distributed builds "
298
     "(which never materialise the key via worker SSTs) or "
299
     "post-consolidate indices."},
300
    {"write_agg_file_markers",
301
     (PyCFunction)IndexDatabase_write_agg_file_markers, METH_VARARGS,
302
     "write_agg_file_markers(file_ids) -> None\n"
303
     "Write per-file aggregation completion markers (\\xFF\\xFF + file_id) "
304
     "into the AGGREGATION CF. Required after distributed_index otherwise "
305
     "`ensure_indexed()` concludes aggregation is incomplete and re-runs "
306
     "the entire build."},
307
    {"write_aggregation_tracker",
308
     (PyCFunction)IndexDatabase_write_aggregation_tracker, METH_VARARGS,
309
     "write_aggregation_tracker(blobs) -> None\n"
310
     "Merge a list of serialized AssociationTracker bytes and write the "
311
     "result to the AGGREGATION CF under the `__tracker__` key."},
312
    {NULL}};
313

314
PyTypeObject IndexDatabaseType = {
315
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.IndexDatabase",
316
    sizeof(IndexDatabaseObject),
317
    0,
318
    (destructor)IndexDatabase_dealloc,
319
    0,
320
    0,
321
    0,
322
    0,
323
    0,
324
    0,
325
    0,
326
    0,
327
    0,
328
    0,
329
    0,
330
    0,
331
    0,
332
    0,
333
    Py_TPFLAGS_DEFAULT,
334
    "Handle to a .dftindex RocksDB store.",
335
    0,
336
    0,
337
    0,
338
    0,
339
    0,
340
    0,
341
    IndexDatabase_methods,
342
    0,
343
    0,
344
    0,
345
    0,
346
    0,
347
    0,
348
    0,
349
    (initproc)IndexDatabase_init,
350
    0,
351
    IndexDatabase_new,
352
};
353

354
int init_index_database(PyObject *m) {
2✔
355
    if (PyType_Ready(&IndexDatabaseType) < 0) return -1;
2✔
356
    Py_INCREF(&IndexDatabaseType);
1✔
357
    if (PyModule_AddObject(m, "IndexDatabase", (PyObject *)&IndexDatabaseType) <
2✔
358
        0) {
359
        Py_DECREF(&IndexDatabaseType);
NEW
360
        return -1;
×
361
    }
362
    return 0;
2✔
363
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc