• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28693295402

04 Jul 2026 03:17AM UTC coverage: 52.408% (+0.1%) from 52.278%
28693295402

push

github

hariharan-devarajan
feat: silence noisy warnings on aarch64

37318 of 92666 branches covered (40.27%)

Branch coverage included in aggregate %.

33462 of 42389 relevant lines covered (78.94%)

20557.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.74
/src/dftracer/utils/python/index_database.cpp
1
#include <dftracer/utils/python/index_database.h>
2
#include <dftracer/utils/python/py_errors.h>
3
#include <dftracer/utils/python/py_list_helpers.h>
4
#include <dftracer/utils/python/py_runtime_mixin.h>
5
#include <dftracer/utils/python/py_type_helpers.h>
6
#include <dftracer/utils/python/sst_distribution.h>
7
#include <dftracer/utils/utilities/indexer/index_database.h>
8
#include <dftracer/utils/utilities/indexer/index_database_sst_writer_context.h>
9

10
#include <new>
11
#include <string>
12
#include <unordered_set>
13
#include <vector>
14

15
using dftracer::utils::utilities::indexer::IndexDatabase;
16
using dftracer::utils::utilities::indexer::SstArtifactRegistry;
17

18
static void IndexDatabase_dealloc(IndexDatabaseObject *self) {
×
19
    self->db.~shared_ptr<IndexDatabase>();
×
20
    Py_TYPE(self)->tp_free((PyObject *)self);
×
21
}
×
22

23
static PyObject *IndexDatabase_new(PyTypeObject *type, PyObject * /*args*/,
×
24
                                   PyObject * /*kwds*/) {
25
    auto *self = (IndexDatabaseObject *)type->tp_alloc(type, 0);
×
26
    if (!self) return NULL;
×
27
    new (&self->db) std::shared_ptr<IndexDatabase>();
×
28
    return (PyObject *)self;
×
29
}
30

31
static int IndexDatabase_init(IndexDatabaseObject *self, PyObject *args,
×
32
                              PyObject *kwds) {
33
    static const char *kwlist[] = {"index_path", NULL};
34
    const char *index_path;
35
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s", (char **)kwlist,
×
36
                                     &index_path)) {
37
        return -1;
×
38
    }
39
    try {
40
        self->db = std::make_shared<IndexDatabase>(index_path);
×
41
    } catch (const std::exception &e) {
×
42
        set_typed_py_error(e);
×
43
        return -1;
×
44
    }
×
45
    return 0;
×
46
}
47

48
static PyObject *IndexDatabase_init_schema(IndexDatabaseObject *self,
×
49
                                           PyObject * /*ignored*/) {
50
    if (!self->db) {
×
51
        PyErr_SetString(PyExc_RuntimeError, "IndexDatabase not initialised");
×
52
        return NULL;
×
53
    }
54
    if (!run_blocking([&] { self->db->init_schema(); })) return NULL;
×
55
    Py_RETURN_NONE;
×
56
}
57

58
static PyObject *IndexDatabase_register_files(IndexDatabaseObject *self,
×
59
                                              PyObject *args, PyObject *kwds) {
60
    static const char *kwlist[] = {"paths", "build_manifest", NULL};
61
    PyObject *paths_obj;
62
    int build_manifest = 0;
×
63
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|p", (char **)kwlist,
×
64
                                     &paths_obj, &build_manifest)) {
65
        return NULL;
×
66
    }
67
    std::vector<std::string> paths;
×
68
    if (!parse_str_list(paths_obj, "paths", paths)) return NULL;
×
69

70
    std::vector<int> ids;
×
71
    if (!run_blocking_r(
×
72
            [&] {
×
73
                return self->db->register_files(paths, build_manifest != 0);
×
74
            },
75
            ids)) {
76
        return NULL;
×
77
    }
78

79
    PyObject *out = PyList_New(static_cast<Py_ssize_t>(ids.size()));
×
80
    if (!out) return NULL;
×
81
    for (Py_ssize_t i = 0; i < static_cast<Py_ssize_t>(ids.size()); ++i) {
×
82
        PyList_SET_ITEM(out, i, PyLong_FromLong(ids[i]));
×
83
    }
84
    return out;
×
85
}
×
86

87
static PyObject *IndexDatabase_reserve_file_id_range(IndexDatabaseObject *self,
×
88
                                                     PyObject *args) {
89
    Py_ssize_t count;
90
    if (!PyArg_ParseTuple(args, "n", &count)) return NULL;
×
91
    if (count < 0) {
×
92
        PyErr_SetString(PyExc_ValueError, "count must be >= 0");
×
93
        return NULL;
×
94
    }
95
    int first;
96
    if (!run_blocking_r(
×
97
            [&] {
×
98
                return self->db->reserve_file_id_range(
×
99
                    static_cast<std::size_t>(count));
×
100
            },
101
            first)) {
102
        return NULL;
×
103
    }
104
    return PyLong_FromLong(first);
×
105
}
106

107
static PyObject *IndexDatabase_bulk_ingest(IndexDatabaseObject *self,
×
108
                                           PyObject *args, PyObject *kwds) {
109
    static const char *kwlist[] = {"registry", "skip_cfs", NULL};
110
    PyObject *registry_obj;
111
    PyObject *skip_cfs_obj = NULL;
×
112
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|O", (char **)kwlist,
×
113
                                     &registry_obj, &skip_cfs_obj)) {
114
        return NULL;
×
115
    }
116

117
    SstArtifactRegistry *registry = sst_artifact_registry_get(registry_obj);
×
118
    if (!registry) {
×
119
        PyErr_SetString(PyExc_TypeError,
×
120
                        "expected an SstArtifactRegistry instance");
121
        return NULL;
×
122
    }
123

124
    std::unordered_set<std::string> skip_cfs;
×
125
    if (skip_cfs_obj && skip_cfs_obj != Py_None) {
×
126
        PyObject *seq =
127
            PySequence_Fast(skip_cfs_obj, "skip_cfs must be an iterable");
×
128
        if (!seq) return NULL;
×
129
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
130
        for (Py_ssize_t i = 0; i < n; ++i) {
×
131
            PyObject *item = PySequence_Fast_GET_ITEM(seq, i);
×
132
            const char *s = PyUnicode_AsUTF8(item);
×
133
            if (!s) {
×
134
                Py_DECREF(seq);
×
135
                return NULL;
×
136
            }
137
            skip_cfs.emplace(s);
×
138
        }
139
        Py_DECREF(seq);
×
140
    }
141

142
    if (!run_blocking([&] { self->db->bulk_ingest(*registry, skip_cfs); }))
×
143
        return NULL;
×
144
    Py_RETURN_NONE;
×
145
}
×
146

147
static PyObject *IndexDatabase_write_agg_file_markers(IndexDatabaseObject *self,
×
148
                                                      PyObject *args) {
149
    PyObject *ids_obj;
150
    if (!PyArg_ParseTuple(args, "O", &ids_obj)) return NULL;
×
151

152
    PyObject *seq = PySequence_Fast(ids_obj, "file_ids must be an iterable");
×
153
    if (!seq) return NULL;
×
154
    Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
155
    std::vector<int> file_ids;
×
156
    file_ids.reserve(static_cast<std::size_t>(n));
×
157
    for (Py_ssize_t i = 0; i < n; ++i) {
×
158
        PyObject *item = PySequence_Fast_GET_ITEM(seq, i);
×
159
        long v = PyLong_AsLong(item);
×
160
        if (v == -1 && PyErr_Occurred()) {
×
161
            Py_DECREF(seq);
×
162
            return NULL;
×
163
        }
164
        file_ids.push_back(static_cast<int>(v));
×
165
    }
166
    Py_DECREF(seq);
×
167

168
    if (!run_blocking([&] { self->db->write_agg_file_markers(file_ids); }))
×
169
        return NULL;
×
170
    Py_RETURN_NONE;
×
171
}
×
172

173
static PyObject *IndexDatabase_write_agg_global_config(
×
174
    IndexDatabaseObject *self, PyObject *args, PyObject *kwds) {
175
    static const char *kwlist[] = {"time_interval_us", "config_hash", NULL};
176
    unsigned long long time_interval_us = 0;
×
177
    unsigned int config_hash = 0;
×
178
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "K|I", (char **)kwlist,
×
179
                                     &time_interval_us, &config_hash)) {
180
        return NULL;
×
181
    }
182
    if (!run_blocking([&] {
×
183
            self->db->write_agg_global_config(
×
184
                static_cast<std::uint64_t>(time_interval_us),
×
185
                static_cast<std::uint32_t>(config_hash));
×
186
        })) {
×
187
        return NULL;
×
188
    }
189
    Py_RETURN_NONE;
×
190
}
191

192
static PyObject *IndexDatabase_write_aggregation_tracker(
×
193
    IndexDatabaseObject *self, PyObject *args) {
194
    PyObject *blobs_obj;
195
    if (!PyArg_ParseTuple(args, "O", &blobs_obj)) return NULL;
×
196
    PyObject *seq = PySequence_Fast(blobs_obj, "blobs must be an iterable");
×
197
    if (!seq) return NULL;
×
198
    Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
199
    std::vector<std::string> blobs;
×
200
    blobs.reserve(static_cast<std::size_t>(n));
×
201
    for (Py_ssize_t i = 0; i < n; ++i) {
×
202
        PyObject *item = PySequence_Fast_GET_ITEM(seq, i);
×
203
        if (item == Py_None) continue;
×
204
        char *buf = nullptr;
×
205
        Py_ssize_t len = 0;
×
206
        if (PyBytes_Check(item)) {
×
207
            if (PyBytes_AsStringAndSize(item, &buf, &len) < 0) {
×
208
                Py_DECREF(seq);
×
209
                return NULL;
×
210
            }
211
        } else {
212
            Py_DECREF(seq);
×
213
            PyErr_SetString(PyExc_TypeError,
×
214
                            "blobs entries must be bytes or None");
215
            return NULL;
×
216
        }
217
        if (len > 0) blobs.emplace_back(buf, static_cast<std::size_t>(len));
×
218
    }
219
    Py_DECREF(seq);
×
220
    if (!run_blocking([&] { self->db->write_aggregation_tracker(blobs); }))
×
221
        return NULL;
×
222
    Py_RETURN_NONE;
×
223
}
×
224

225
static PyObject *IndexDatabase_rebuild_root_summaries(IndexDatabaseObject *self,
×
226
                                                      PyObject * /*ignored*/) {
227
    if (!run_blocking([&] { self->db->rebuild_root_summaries(); })) return NULL;
×
228
    Py_RETURN_NONE;
×
229
}
230

231
static PyMethodDef IndexDatabase_methods[] = {
232
    {"init_schema", (PyCFunction)IndexDatabase_init_schema, METH_NOARGS,
233
     "Idempotently initialise the schema version key."},
234
    {"register_files", (PyCFunction)IndexDatabase_register_files,
235
     METH_VARARGS | METH_KEYWORDS,
236
     "register_files(paths, build_manifest=False) -> list[int]\n"
237
     "Register each path in the DEFAULT-CF file registry and return the "
238
     "assigned file_ids. Idempotent for files with matching hash."},
239
    {"reserve_file_id_range", (PyCFunction)IndexDatabase_reserve_file_id_range,
240
     METH_VARARGS,
241
     "reserve_file_id_range(count) -> int\n"
242
     "Atomically reserve `count` contiguous file_ids, return the first."},
243
    {"bulk_ingest", (PyCFunction)IndexDatabase_bulk_ingest,
244
     METH_VARARGS | METH_KEYWORDS,
245
     "bulk_ingest(registry, skip_cfs=None) -> None\n"
246
     "Ingest all SSTs collected in the SstArtifactRegistry.\n"
247
     "skip_cfs is an optional iterable of CF names whose SSTs are left "
248
     "outside the unified DB (used by distributed builds to keep "
249
     "AGGREGATION/SYSTEM_METRICS SSTs addressable by manifest)."},
250
    {"rebuild_root_summaries",
251
     (PyCFunction)IndexDatabase_rebuild_root_summaries, METH_NOARGS,
252
     "Recompute ROOT_* summary column families from per-file CFs."},
253
    {"write_agg_global_config",
254
     (PyCFunction)IndexDatabase_write_agg_global_config,
255
     METH_VARARGS | METH_KEYWORDS,
256
     "write_agg_global_config(time_interval_us, config_hash=0) -> None\n"
257
     "Write the AGG_GLOBAL_CONFIG_KEY marker into the AGGREGATION CF. "
258
     "Required for `iter_arrow_dfanalyzer_all` on distributed builds "
259
     "(which never materialise the key via worker SSTs) or "
260
     "post-consolidate indices."},
261
    {"write_agg_file_markers",
262
     (PyCFunction)IndexDatabase_write_agg_file_markers, METH_VARARGS,
263
     "write_agg_file_markers(file_ids) -> None\n"
264
     "Write per-file aggregation completion markers (\\xFF\\xFF + file_id) "
265
     "into the AGGREGATION CF. Required after distributed_index otherwise "
266
     "`ensure_indexed()` concludes aggregation is incomplete and re-runs "
267
     "the entire build."},
268
    {"write_aggregation_tracker",
269
     (PyCFunction)IndexDatabase_write_aggregation_tracker, METH_VARARGS,
270
     "write_aggregation_tracker(blobs) -> None\n"
271
     "Merge a list of serialized AssociationTracker bytes and write the "
272
     "result to the AGGREGATION CF under the `__tracker__` key."},
273
    {NULL}};
274

275
PyTypeObject IndexDatabaseType = {
276
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.IndexDatabase",
277
    sizeof(IndexDatabaseObject),
278
    0,
279
    (destructor)IndexDatabase_dealloc,
280
    0,
281
    0,
282
    0,
283
    0,
284
    0,
285
    0,
286
    0,
287
    0,
288
    0,
289
    0,
290
    0,
291
    0,
292
    0,
293
    0,
294
    Py_TPFLAGS_DEFAULT,
295
    "Handle to a .dftindex RocksDB store.",
296
    0,
297
    0,
298
    0,
299
    0,
300
    0,
301
    0,
302
    IndexDatabase_methods,
303
    0,
304
    0,
305
    0,
306
    0,
307
    0,
308
    0,
309
    0,
310
    (initproc)IndexDatabase_init,
311
    0,
312
    IndexDatabase_new,
313
};
314

315
int init_index_database(PyObject *m) {
2✔
316
    if (register_type(m, &IndexDatabaseType, "IndexDatabase") < 0) return -1;
2✔
317
    return 0;
2✔
318
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc