• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 27052412546

06 Jun 2026 04:20AM UTC coverage: 50.862% (+1.0%) from 49.905%
27052412546

Pull #73

github

web-flow
Merge 734572730 into 88a3c8457
Pull Request #73: add portable dependencies wheel support

31801 of 79859 branches covered (39.82%)

Branch coverage included in aggregate %.

32491 of 46545 relevant lines covered (69.81%)

9947.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.61
/src/dftracer/utils/python/index_database.cpp
1
#include <dftracer/utils/python/index_database.h>
2
#include <dftracer/utils/python/sst_distribution.h>
3
#include <dftracer/utils/utilities/indexer/index_database.h>
4
#include <dftracer/utils/utilities/indexer/index_database_sst_writer_context.h>
5

6
#include <new>
7
#include <string>
8
#include <unordered_set>
9
#include <vector>
10

11
using dftracer::utils::utilities::indexer::IndexDatabase;
12
using dftracer::utils::utilities::indexer::SstArtifactRegistry;
13

14
static void IndexDatabase_dealloc(IndexDatabaseObject *self) {
×
15
    self->db.~shared_ptr<IndexDatabase>();
×
16
    Py_TYPE(self)->tp_free((PyObject *)self);
×
17
}
×
18

19
static PyObject *IndexDatabase_new(PyTypeObject *type, PyObject * /*args*/,
×
20
                                   PyObject * /*kwds*/) {
21
    auto *self = (IndexDatabaseObject *)type->tp_alloc(type, 0);
×
22
    if (!self) return NULL;
×
23
    new (&self->db) std::shared_ptr<IndexDatabase>();
×
24
    return (PyObject *)self;
×
25
}
×
26

27
static int IndexDatabase_init(IndexDatabaseObject *self, PyObject *args,
×
28
                              PyObject *kwds) {
29
    static const char *kwlist[] = {"index_path", NULL};
30
    const char *index_path;
31
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s", (char **)kwlist,
×
32
                                     &index_path)) {
33
        return -1;
×
34
    }
35
    try {
36
        self->db = std::make_shared<IndexDatabase>(index_path);
×
37
    } catch (const std::exception &e) {
×
38
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
39
        return -1;
×
40
    }
×
41
    return 0;
×
42
}
×
43

44
static PyObject *IndexDatabase_init_schema(IndexDatabaseObject *self,
×
45
                                           PyObject * /*ignored*/) {
46
    if (!self->db) {
×
47
        PyErr_SetString(PyExc_RuntimeError, "IndexDatabase not initialised");
×
48
        return NULL;
×
49
    }
50
    try {
51
        Py_BEGIN_ALLOW_THREADS self->db->init_schema();
×
52
        Py_END_ALLOW_THREADS
×
53
    } catch (const std::exception &e) {
×
54
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
55
        return NULL;
×
56
    }
×
57
    Py_RETURN_NONE;
×
58
}
×
59

60
static PyObject *IndexDatabase_register_files(IndexDatabaseObject *self,
×
61
                                              PyObject *args, PyObject *kwds) {
62
    static const char *kwlist[] = {"paths", "build_manifest", NULL};
63
    PyObject *paths_obj;
64
    int build_manifest = 0;
×
65
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|p", (char **)kwlist,
×
66
                                     &paths_obj, &build_manifest)) {
67
        return NULL;
×
68
    }
69
    std::vector<std::string> paths;
×
70
    PyObject *seq = PySequence_Fast(paths_obj, "paths must be a sequence");
×
71
    if (!seq) return NULL;
×
72
    Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
73
    paths.reserve(n);
×
74
    for (Py_ssize_t i = 0; i < n; ++i) {
×
75
        PyObject *item = PySequence_Fast_GET_ITEM(seq, i);
×
76
        const char *s = PyUnicode_AsUTF8(item);
×
77
        if (!s) {
×
78
            Py_DECREF(seq);
×
79
            return NULL;
×
80
        }
81
        paths.emplace_back(s);
×
82
    }
×
83
    Py_DECREF(seq);
×
84

85
    std::vector<int> ids;
×
86
    try {
87
        Py_BEGIN_ALLOW_THREADS ids =
×
88
            self->db->register_files(paths, build_manifest != 0);
×
89
        Py_END_ALLOW_THREADS
×
90
    } catch (const std::exception &e) {
×
91
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
92
        return NULL;
×
93
    }
×
94

95
    PyObject *out = PyList_New(static_cast<Py_ssize_t>(ids.size()));
×
96
    if (!out) return NULL;
×
97
    for (Py_ssize_t i = 0; i < static_cast<Py_ssize_t>(ids.size()); ++i) {
×
98
        PyList_SET_ITEM(out, i, PyLong_FromLong(ids[i]));
×
99
    }
×
100
    return out;
×
101
}
×
102

103
static PyObject *IndexDatabase_reserve_file_id_range(IndexDatabaseObject *self,
×
104
                                                     PyObject *args) {
105
    Py_ssize_t count;
106
    if (!PyArg_ParseTuple(args, "n", &count)) return NULL;
×
107
    if (count < 0) {
×
108
        PyErr_SetString(PyExc_ValueError, "count must be >= 0");
×
109
        return NULL;
×
110
    }
111
    int first;
112
    try {
113
        Py_BEGIN_ALLOW_THREADS first =
×
114
            self->db->reserve_file_id_range(static_cast<std::size_t>(count));
×
115
        Py_END_ALLOW_THREADS
×
116
    } catch (const std::exception &e) {
×
117
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
118
        return NULL;
×
119
    }
×
120
    return PyLong_FromLong(first);
×
121
}
×
122

123
static PyObject *IndexDatabase_bulk_ingest(IndexDatabaseObject *self,
×
124
                                           PyObject *args, PyObject *kwds) {
125
    static const char *kwlist[] = {"registry", "skip_cfs", NULL};
126
    PyObject *registry_obj;
127
    PyObject *skip_cfs_obj = NULL;
×
128
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|O", (char **)kwlist,
×
129
                                     &registry_obj, &skip_cfs_obj)) {
130
        return NULL;
×
131
    }
132

133
    SstArtifactRegistry *registry = sst_artifact_registry_get(registry_obj);
×
134
    if (!registry) {
×
135
        PyErr_SetString(PyExc_TypeError,
×
136
                        "expected an SstArtifactRegistry instance");
137
        return NULL;
×
138
    }
139

140
    std::unordered_set<std::string> skip_cfs;
×
141
    if (skip_cfs_obj && skip_cfs_obj != Py_None) {
×
142
        PyObject *seq =
×
143
            PySequence_Fast(skip_cfs_obj, "skip_cfs must be an iterable");
×
144
        if (!seq) return NULL;
×
145
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
146
        for (Py_ssize_t i = 0; i < n; ++i) {
×
147
            PyObject *item = PySequence_Fast_GET_ITEM(seq, i);
×
148
            const char *s = PyUnicode_AsUTF8(item);
×
149
            if (!s) {
×
150
                Py_DECREF(seq);
×
151
                return NULL;
×
152
            }
153
            skip_cfs.emplace(s);
×
154
        }
×
155
        Py_DECREF(seq);
×
156
    }
×
157

158
    try {
159
        Py_BEGIN_ALLOW_THREADS self->db->bulk_ingest(*registry, skip_cfs);
×
160
        Py_END_ALLOW_THREADS
×
161
    } catch (const std::exception &e) {
×
162
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
163
        return NULL;
×
164
    }
×
165
    Py_RETURN_NONE;
×
166
}
×
167

168
static PyObject *IndexDatabase_write_agg_file_markers(IndexDatabaseObject *self,
×
169
                                                      PyObject *args) {
170
    PyObject *ids_obj;
171
    if (!PyArg_ParseTuple(args, "O", &ids_obj)) return NULL;
×
172

173
    PyObject *seq = PySequence_Fast(ids_obj, "file_ids must be an iterable");
×
174
    if (!seq) return NULL;
×
175
    Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
176
    std::vector<int> file_ids;
×
177
    file_ids.reserve(static_cast<std::size_t>(n));
×
178
    for (Py_ssize_t i = 0; i < n; ++i) {
×
179
        PyObject *item = PySequence_Fast_GET_ITEM(seq, i);
×
180
        long v = PyLong_AsLong(item);
×
181
        if (v == -1 && PyErr_Occurred()) {
×
182
            Py_DECREF(seq);
×
183
            return NULL;
×
184
        }
185
        file_ids.push_back(static_cast<int>(v));
×
186
    }
×
187
    Py_DECREF(seq);
×
188

189
    try {
190
        Py_BEGIN_ALLOW_THREADS self->db->write_agg_file_markers(file_ids);
×
191
        Py_END_ALLOW_THREADS
×
192
    } catch (const std::exception &e) {
×
193
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
194
        return NULL;
×
195
    }
×
196
    Py_RETURN_NONE;
×
197
}
×
198

199
static PyObject *IndexDatabase_write_agg_global_config(
×
200
    IndexDatabaseObject *self, PyObject *args, PyObject *kwds) {
201
    static const char *kwlist[] = {"time_interval_us", "config_hash", NULL};
202
    unsigned long long time_interval_us = 0;
×
203
    unsigned int config_hash = 0;
×
204
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "K|I", (char **)kwlist,
×
205
                                     &time_interval_us, &config_hash)) {
206
        return NULL;
×
207
    }
208
    try {
209
        Py_BEGIN_ALLOW_THREADS self->db->write_agg_global_config(
×
210
            static_cast<std::uint64_t>(time_interval_us),
×
211
            static_cast<std::uint32_t>(config_hash));
×
212
        Py_END_ALLOW_THREADS
×
213
    } catch (const std::exception &e) {
×
214
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
215
        return NULL;
×
216
    }
×
217
    Py_RETURN_NONE;
×
218
}
×
219

220
static PyObject *IndexDatabase_write_aggregation_tracker(
×
221
    IndexDatabaseObject *self, PyObject *args) {
222
    PyObject *blobs_obj;
223
    if (!PyArg_ParseTuple(args, "O", &blobs_obj)) return NULL;
×
224
    PyObject *seq = PySequence_Fast(blobs_obj, "blobs must be an iterable");
×
225
    if (!seq) return NULL;
×
226
    Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
227
    std::vector<std::string> blobs;
×
228
    blobs.reserve(static_cast<std::size_t>(n));
×
229
    for (Py_ssize_t i = 0; i < n; ++i) {
×
230
        PyObject *item = PySequence_Fast_GET_ITEM(seq, i);
×
231
        if (item == Py_None) continue;
×
232
        char *buf = nullptr;
×
233
        Py_ssize_t len = 0;
×
234
        if (PyBytes_Check(item)) {
×
235
            if (PyBytes_AsStringAndSize(item, &buf, &len) < 0) {
×
236
                Py_DECREF(seq);
×
237
                return NULL;
×
238
            }
239
        } else {
×
240
            Py_DECREF(seq);
×
241
            PyErr_SetString(PyExc_TypeError,
×
242
                            "blobs entries must be bytes or None");
243
            return NULL;
×
244
        }
245
        if (len > 0) blobs.emplace_back(buf, static_cast<std::size_t>(len));
×
246
    }
×
247
    Py_DECREF(seq);
×
248
    try {
249
        Py_BEGIN_ALLOW_THREADS self->db->write_aggregation_tracker(blobs);
×
250
        Py_END_ALLOW_THREADS
×
251
    } catch (const std::exception &e) {
×
252
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
253
        return NULL;
×
254
    }
×
255
    Py_RETURN_NONE;
×
256
}
×
257

258
static PyObject *IndexDatabase_rebuild_root_summaries(IndexDatabaseObject *self,
×
259
                                                      PyObject * /*ignored*/) {
260
    try {
261
        Py_BEGIN_ALLOW_THREADS self->db->rebuild_root_summaries();
×
262
        Py_END_ALLOW_THREADS
×
263
    } catch (const std::exception &e) {
×
264
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
265
        return NULL;
×
266
    }
×
267
    Py_RETURN_NONE;
×
268
}
×
269

270
static PyMethodDef IndexDatabase_methods[] = {
271
    {"init_schema", (PyCFunction)IndexDatabase_init_schema, METH_NOARGS,
272
     "Idempotently initialise the schema version key."},
273
    {"register_files", (PyCFunction)IndexDatabase_register_files,
274
     METH_VARARGS | METH_KEYWORDS,
275
     "register_files(paths, build_manifest=False) -> list[int]\n"
276
     "Register each path in the DEFAULT-CF file registry and return the "
277
     "assigned file_ids. Idempotent for files with matching hash."},
278
    {"reserve_file_id_range", (PyCFunction)IndexDatabase_reserve_file_id_range,
279
     METH_VARARGS,
280
     "reserve_file_id_range(count) -> int\n"
281
     "Atomically reserve `count` contiguous file_ids, return the first."},
282
    {"bulk_ingest", (PyCFunction)IndexDatabase_bulk_ingest,
283
     METH_VARARGS | METH_KEYWORDS,
284
     "bulk_ingest(registry, skip_cfs=None) -> None\n"
285
     "Ingest all SSTs collected in the SstArtifactRegistry.\n"
286
     "skip_cfs is an optional iterable of CF names whose SSTs are left "
287
     "outside the unified DB (used by distributed builds to keep "
288
     "AGGREGATION/SYSTEM_METRICS SSTs addressable by manifest)."},
289
    {"rebuild_root_summaries",
290
     (PyCFunction)IndexDatabase_rebuild_root_summaries, METH_NOARGS,
291
     "Recompute ROOT_* summary column families from per-file CFs."},
292
    {"write_agg_global_config",
293
     (PyCFunction)IndexDatabase_write_agg_global_config,
294
     METH_VARARGS | METH_KEYWORDS,
295
     "write_agg_global_config(time_interval_us, config_hash=0) -> None\n"
296
     "Write the AGG_GLOBAL_CONFIG_KEY marker into the AGGREGATION CF. "
297
     "Required for `iter_arrow_dfanalyzer_all` on distributed builds "
298
     "(which never materialise the key via worker SSTs) or "
299
     "post-consolidate indices."},
300
    {"write_agg_file_markers",
301
     (PyCFunction)IndexDatabase_write_agg_file_markers, METH_VARARGS,
302
     "write_agg_file_markers(file_ids) -> None\n"
303
     "Write per-file aggregation completion markers (\\xFF\\xFF + file_id) "
304
     "into the AGGREGATION CF. Required after distributed_index otherwise "
305
     "`ensure_indexed()` concludes aggregation is incomplete and re-runs "
306
     "the entire build."},
307
    {"write_aggregation_tracker",
308
     (PyCFunction)IndexDatabase_write_aggregation_tracker, METH_VARARGS,
309
     "write_aggregation_tracker(blobs) -> None\n"
310
     "Merge a list of serialized AssociationTracker bytes and write the "
311
     "result to the AGGREGATION CF under the `__tracker__` key."},
312
    {NULL}};
313

314
PyTypeObject IndexDatabaseType = {
315
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.IndexDatabase",
316
    sizeof(IndexDatabaseObject),
317
    0,
318
    (destructor)IndexDatabase_dealloc,
319
    0,
320
    0,
321
    0,
322
    0,
323
    0,
324
    0,
325
    0,
326
    0,
327
    0,
328
    0,
329
    0,
330
    0,
331
    0,
332
    0,
333
    Py_TPFLAGS_DEFAULT,
334
    "Handle to a .dftindex RocksDB store.",
335
    0,
336
    0,
337
    0,
338
    0,
339
    0,
340
    0,
341
    IndexDatabase_methods,
342
    0,
343
    0,
344
    0,
345
    0,
346
    0,
347
    0,
348
    0,
349
    (initproc)IndexDatabase_init,
350
    0,
351
    IndexDatabase_new,
352
};
353

354
int init_index_database(PyObject *m) {
1✔
355
    if (PyType_Ready(&IndexDatabaseType) < 0) return -1;
1!
356
    Py_INCREF(&IndexDatabaseType);
1✔
357
    if (PyModule_AddObject(m, "IndexDatabase", (PyObject *)&IndexDatabaseType) <
1!
358
        0) {
359
        Py_DECREF(&IndexDatabaseType);
×
360
        return -1;
×
361
    }
362
    return 0;
1✔
363
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc