• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28521653886

01 Jul 2026 01:36PM UTC coverage: 50.92% (-1.4%) from 52.278%
28521653886

Pull #83

github

web-flow
Merge 9bdedb1e9 into 2efed6649
Pull Request #83: refactor and improve code QoL

31893 of 80049 branches covered (39.84%)

Branch coverage included in aggregate %.

789 of 1613 new or added lines in 87 files covered. (48.92%)

5007 existing lines in 181 files now uncovered.

32812 of 47024 relevant lines covered (69.78%)

9905.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.02
/src/dftracer/utils/python/index_database.cpp
1
#include <dftracer/utils/python/index_database.h>
2
#include <dftracer/utils/python/py_errors.h>
3
#include <dftracer/utils/python/py_type_helpers.h>
4
#include <dftracer/utils/python/sst_distribution.h>
5
#include <dftracer/utils/utilities/indexer/index_database.h>
6
#include <dftracer/utils/utilities/indexer/index_database_sst_writer_context.h>
7

8
#include <new>
9
#include <string>
10
#include <unordered_set>
11
#include <vector>
12

13
using dftracer::utils::utilities::indexer::IndexDatabase;
14
using dftracer::utils::utilities::indexer::SstArtifactRegistry;
15

16
static void IndexDatabase_dealloc(IndexDatabaseObject *self) {
×
17
    self->db.~shared_ptr<IndexDatabase>();
×
18
    Py_TYPE(self)->tp_free((PyObject *)self);
×
19
}
×
20

21
static PyObject *IndexDatabase_new(PyTypeObject *type, PyObject * /*args*/,
×
22
                                   PyObject * /*kwds*/) {
23
    auto *self = (IndexDatabaseObject *)type->tp_alloc(type, 0);
×
24
    if (!self) return NULL;
×
25
    new (&self->db) std::shared_ptr<IndexDatabase>();
×
26
    return (PyObject *)self;
×
UNCOV
27
}
×
28

29
static int IndexDatabase_init(IndexDatabaseObject *self, PyObject *args,
×
30
                              PyObject *kwds) {
31
    static const char *kwlist[] = {"index_path", NULL};
32
    const char *index_path;
33
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s", (char **)kwlist,
×
34
                                     &index_path)) {
35
        return -1;
×
36
    }
37
    try {
38
        self->db = std::make_shared<IndexDatabase>(index_path);
×
39
    } catch (const std::exception &e) {
×
NEW
40
        set_typed_py_error(e);
×
41
        return -1;
×
42
    }
×
43
    return 0;
×
UNCOV
44
}
×
45

46
static PyObject *IndexDatabase_init_schema(IndexDatabaseObject *self,
×
47
                                           PyObject * /*ignored*/) {
48
    if (!self->db) {
×
49
        PyErr_SetString(PyExc_RuntimeError, "IndexDatabase not initialised");
×
50
        return NULL;
×
51
    }
52
    try {
53
        Py_BEGIN_ALLOW_THREADS self->db->init_schema();
×
54
        Py_END_ALLOW_THREADS
×
55
    } catch (const std::exception &e) {
×
NEW
56
        set_typed_py_error(e);
×
57
        return NULL;
×
58
    }
×
59
    Py_RETURN_NONE;
×
UNCOV
60
}
×
61

62
static PyObject *IndexDatabase_register_files(IndexDatabaseObject *self,
×
63
                                              PyObject *args, PyObject *kwds) {
64
    static const char *kwlist[] = {"paths", "build_manifest", NULL};
65
    PyObject *paths_obj;
66
    int build_manifest = 0;
×
67
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|p", (char **)kwlist,
×
68
                                     &paths_obj, &build_manifest)) {
69
        return NULL;
×
70
    }
71
    std::vector<std::string> paths;
×
72
    PyObject *seq = PySequence_Fast(paths_obj, "paths must be a sequence");
×
73
    if (!seq) return NULL;
×
74
    Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
75
    paths.reserve(n);
×
76
    for (Py_ssize_t i = 0; i < n; ++i) {
×
77
        PyObject *item = PySequence_Fast_GET_ITEM(seq, i);
×
78
        const char *s = PyUnicode_AsUTF8(item);
×
79
        if (!s) {
×
UNCOV
80
            Py_DECREF(seq);
×
81
            return NULL;
×
82
        }
83
        paths.emplace_back(s);
×
UNCOV
84
    }
×
UNCOV
85
    Py_DECREF(seq);
×
86

87
    std::vector<int> ids;
×
88
    try {
89
        Py_BEGIN_ALLOW_THREADS ids =
×
90
            self->db->register_files(paths, build_manifest != 0);
×
91
        Py_END_ALLOW_THREADS
×
92
    } catch (const std::exception &e) {
×
NEW
93
        set_typed_py_error(e);
×
94
        return NULL;
×
95
    }
×
96

97
    PyObject *out = PyList_New(static_cast<Py_ssize_t>(ids.size()));
×
98
    if (!out) return NULL;
×
99
    for (Py_ssize_t i = 0; i < static_cast<Py_ssize_t>(ids.size()); ++i) {
×
100
        PyList_SET_ITEM(out, i, PyLong_FromLong(ids[i]));
×
UNCOV
101
    }
×
102
    return out;
×
103
}
×
104

105
static PyObject *IndexDatabase_reserve_file_id_range(IndexDatabaseObject *self,
×
106
                                                     PyObject *args) {
107
    Py_ssize_t count;
108
    if (!PyArg_ParseTuple(args, "n", &count)) return NULL;
×
109
    if (count < 0) {
×
110
        PyErr_SetString(PyExc_ValueError, "count must be >= 0");
×
111
        return NULL;
×
112
    }
113
    int first;
114
    try {
115
        Py_BEGIN_ALLOW_THREADS first =
×
116
            self->db->reserve_file_id_range(static_cast<std::size_t>(count));
×
117
        Py_END_ALLOW_THREADS
×
118
    } catch (const std::exception &e) {
×
NEW
119
        set_typed_py_error(e);
×
120
        return NULL;
×
121
    }
×
122
    return PyLong_FromLong(first);
×
UNCOV
123
}
×
124

125
static PyObject *IndexDatabase_bulk_ingest(IndexDatabaseObject *self,
×
126
                                           PyObject *args, PyObject *kwds) {
127
    static const char *kwlist[] = {"registry", "skip_cfs", NULL};
128
    PyObject *registry_obj;
129
    PyObject *skip_cfs_obj = NULL;
×
130
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|O", (char **)kwlist,
×
131
                                     &registry_obj, &skip_cfs_obj)) {
132
        return NULL;
×
133
    }
134

135
    SstArtifactRegistry *registry = sst_artifact_registry_get(registry_obj);
×
136
    if (!registry) {
×
137
        PyErr_SetString(PyExc_TypeError,
×
138
                        "expected an SstArtifactRegistry instance");
139
        return NULL;
×
140
    }
141

142
    std::unordered_set<std::string> skip_cfs;
×
143
    if (skip_cfs_obj && skip_cfs_obj != Py_None) {
×
UNCOV
144
        PyObject *seq =
×
145
            PySequence_Fast(skip_cfs_obj, "skip_cfs must be an iterable");
×
146
        if (!seq) return NULL;
×
147
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
148
        for (Py_ssize_t i = 0; i < n; ++i) {
×
149
            PyObject *item = PySequence_Fast_GET_ITEM(seq, i);
×
150
            const char *s = PyUnicode_AsUTF8(item);
×
151
            if (!s) {
×
UNCOV
152
                Py_DECREF(seq);
×
153
                return NULL;
×
154
            }
155
            skip_cfs.emplace(s);
×
UNCOV
156
        }
×
UNCOV
157
        Py_DECREF(seq);
×
UNCOV
158
    }
×
159

160
    try {
161
        Py_BEGIN_ALLOW_THREADS self->db->bulk_ingest(*registry, skip_cfs);
×
162
        Py_END_ALLOW_THREADS
×
163
    } catch (const std::exception &e) {
×
NEW
164
        set_typed_py_error(e);
×
165
        return NULL;
×
166
    }
×
167
    Py_RETURN_NONE;
×
168
}
×
169

170
static PyObject *IndexDatabase_write_agg_file_markers(IndexDatabaseObject *self,
×
171
                                                      PyObject *args) {
172
    PyObject *ids_obj;
173
    if (!PyArg_ParseTuple(args, "O", &ids_obj)) return NULL;
×
174

175
    PyObject *seq = PySequence_Fast(ids_obj, "file_ids must be an iterable");
×
176
    if (!seq) return NULL;
×
177
    Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
178
    std::vector<int> file_ids;
×
179
    file_ids.reserve(static_cast<std::size_t>(n));
×
180
    for (Py_ssize_t i = 0; i < n; ++i) {
×
181
        PyObject *item = PySequence_Fast_GET_ITEM(seq, i);
×
182
        long v = PyLong_AsLong(item);
×
183
        if (v == -1 && PyErr_Occurred()) {
×
UNCOV
184
            Py_DECREF(seq);
×
185
            return NULL;
×
186
        }
187
        file_ids.push_back(static_cast<int>(v));
×
UNCOV
188
    }
×
UNCOV
189
    Py_DECREF(seq);
×
190

191
    try {
192
        Py_BEGIN_ALLOW_THREADS self->db->write_agg_file_markers(file_ids);
×
193
        Py_END_ALLOW_THREADS
×
194
    } catch (const std::exception &e) {
×
NEW
195
        set_typed_py_error(e);
×
196
        return NULL;
×
197
    }
×
198
    Py_RETURN_NONE;
×
199
}
×
200

201
static PyObject *IndexDatabase_write_agg_global_config(
×
202
    IndexDatabaseObject *self, PyObject *args, PyObject *kwds) {
203
    static const char *kwlist[] = {"time_interval_us", "config_hash", NULL};
204
    unsigned long long time_interval_us = 0;
×
205
    unsigned int config_hash = 0;
×
206
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "K|I", (char **)kwlist,
×
207
                                     &time_interval_us, &config_hash)) {
208
        return NULL;
×
209
    }
210
    try {
211
        Py_BEGIN_ALLOW_THREADS self->db->write_agg_global_config(
×
UNCOV
212
            static_cast<std::uint64_t>(time_interval_us),
×
UNCOV
213
            static_cast<std::uint32_t>(config_hash));
×
214
        Py_END_ALLOW_THREADS
×
215
    } catch (const std::exception &e) {
×
NEW
216
        set_typed_py_error(e);
×
217
        return NULL;
×
218
    }
×
219
    Py_RETURN_NONE;
×
UNCOV
220
}
×
221

222
static PyObject *IndexDatabase_write_aggregation_tracker(
×
223
    IndexDatabaseObject *self, PyObject *args) {
224
    PyObject *blobs_obj;
225
    if (!PyArg_ParseTuple(args, "O", &blobs_obj)) return NULL;
×
226
    PyObject *seq = PySequence_Fast(blobs_obj, "blobs must be an iterable");
×
227
    if (!seq) return NULL;
×
228
    Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
229
    std::vector<std::string> blobs;
×
230
    blobs.reserve(static_cast<std::size_t>(n));
×
231
    for (Py_ssize_t i = 0; i < n; ++i) {
×
232
        PyObject *item = PySequence_Fast_GET_ITEM(seq, i);
×
233
        if (item == Py_None) continue;
×
234
        char *buf = nullptr;
×
235
        Py_ssize_t len = 0;
×
236
        if (PyBytes_Check(item)) {
×
237
            if (PyBytes_AsStringAndSize(item, &buf, &len) < 0) {
×
UNCOV
238
                Py_DECREF(seq);
×
239
                return NULL;
×
240
            }
UNCOV
241
        } else {
×
UNCOV
242
            Py_DECREF(seq);
×
243
            PyErr_SetString(PyExc_TypeError,
×
244
                            "blobs entries must be bytes or None");
245
            return NULL;
×
246
        }
247
        if (len > 0) blobs.emplace_back(buf, static_cast<std::size_t>(len));
×
UNCOV
248
    }
×
UNCOV
249
    Py_DECREF(seq);
×
250
    try {
251
        Py_BEGIN_ALLOW_THREADS self->db->write_aggregation_tracker(blobs);
×
252
        Py_END_ALLOW_THREADS
×
253
    } catch (const std::exception &e) {
×
NEW
254
        set_typed_py_error(e);
×
255
        return NULL;
×
256
    }
×
257
    Py_RETURN_NONE;
×
258
}
×
259

260
static PyObject *IndexDatabase_rebuild_root_summaries(IndexDatabaseObject *self,
×
261
                                                      PyObject * /*ignored*/) {
262
    try {
263
        Py_BEGIN_ALLOW_THREADS self->db->rebuild_root_summaries();
×
264
        Py_END_ALLOW_THREADS
×
265
    } catch (const std::exception &e) {
×
NEW
266
        set_typed_py_error(e);
×
267
        return NULL;
×
268
    }
×
269
    Py_RETURN_NONE;
×
UNCOV
270
}
×
271

272
static PyMethodDef IndexDatabase_methods[] = {
273
    {"init_schema", (PyCFunction)IndexDatabase_init_schema, METH_NOARGS,
274
     "Idempotently initialise the schema version key."},
275
    {"register_files", (PyCFunction)IndexDatabase_register_files,
276
     METH_VARARGS | METH_KEYWORDS,
277
     "register_files(paths, build_manifest=False) -> list[int]\n"
278
     "Register each path in the DEFAULT-CF file registry and return the "
279
     "assigned file_ids. Idempotent for files with matching hash."},
280
    {"reserve_file_id_range", (PyCFunction)IndexDatabase_reserve_file_id_range,
281
     METH_VARARGS,
282
     "reserve_file_id_range(count) -> int\n"
283
     "Atomically reserve `count` contiguous file_ids, return the first."},
284
    {"bulk_ingest", (PyCFunction)IndexDatabase_bulk_ingest,
285
     METH_VARARGS | METH_KEYWORDS,
286
     "bulk_ingest(registry, skip_cfs=None) -> None\n"
287
     "Ingest all SSTs collected in the SstArtifactRegistry.\n"
288
     "skip_cfs is an optional iterable of CF names whose SSTs are left "
289
     "outside the unified DB (used by distributed builds to keep "
290
     "AGGREGATION/SYSTEM_METRICS SSTs addressable by manifest)."},
291
    {"rebuild_root_summaries",
292
     (PyCFunction)IndexDatabase_rebuild_root_summaries, METH_NOARGS,
293
     "Recompute ROOT_* summary column families from per-file CFs."},
294
    {"write_agg_global_config",
295
     (PyCFunction)IndexDatabase_write_agg_global_config,
296
     METH_VARARGS | METH_KEYWORDS,
297
     "write_agg_global_config(time_interval_us, config_hash=0) -> None\n"
298
     "Write the AGG_GLOBAL_CONFIG_KEY marker into the AGGREGATION CF. "
299
     "Required for `iter_arrow_dfanalyzer_all` on distributed builds "
300
     "(which never materialise the key via worker SSTs) or "
301
     "post-consolidate indices."},
302
    {"write_agg_file_markers",
303
     (PyCFunction)IndexDatabase_write_agg_file_markers, METH_VARARGS,
304
     "write_agg_file_markers(file_ids) -> None\n"
305
     "Write per-file aggregation completion markers (\\xFF\\xFF + file_id) "
306
     "into the AGGREGATION CF. Required after distributed_index otherwise "
307
     "`ensure_indexed()` concludes aggregation is incomplete and re-runs "
308
     "the entire build."},
309
    {"write_aggregation_tracker",
310
     (PyCFunction)IndexDatabase_write_aggregation_tracker, METH_VARARGS,
311
     "write_aggregation_tracker(blobs) -> None\n"
312
     "Merge a list of serialized AssociationTracker bytes and write the "
313
     "result to the AGGREGATION CF under the `__tracker__` key."},
314
    {NULL}};
315

316
PyTypeObject IndexDatabaseType = {
317
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.IndexDatabase",
318
    sizeof(IndexDatabaseObject),
319
    0,
320
    (destructor)IndexDatabase_dealloc,
321
    0,
322
    0,
323
    0,
324
    0,
325
    0,
326
    0,
327
    0,
328
    0,
329
    0,
330
    0,
331
    0,
332
    0,
333
    0,
334
    0,
335
    Py_TPFLAGS_DEFAULT,
336
    "Handle to a .dftindex RocksDB store.",
337
    0,
338
    0,
339
    0,
340
    0,
341
    0,
342
    0,
343
    IndexDatabase_methods,
344
    0,
345
    0,
346
    0,
347
    0,
348
    0,
349
    0,
350
    0,
351
    (initproc)IndexDatabase_init,
352
    0,
353
    IndexDatabase_new,
354
};
355

356
int init_index_database(PyObject *m) {
1✔
357
    if (register_type(m, &IndexDatabaseType, "IndexDatabase") < 0) return -1;
1!
358
    return 0;
1✔
359
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc