• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28693295402

04 Jul 2026 03:17AM UTC coverage: 52.408% (+0.1%) from 52.278%
28693295402

push

github

hariharan-devarajan
feat: silence noisy warnings on aarch64

37318 of 92666 branches covered (40.27%)

Branch coverage included in aggregate %.

33462 of 42389 relevant lines covered (78.94%)

20557.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.45
/src/dftracer/utils/python/sst_distribution.cpp
1
#include <dftracer/utils/core/common/constants.h>
2
#include <dftracer/utils/core/runtime.h>
3
#include <dftracer/utils/core/tasks/coro_scope.h>
4
#include <dftracer/utils/python/py_errors.h>
5
#include <dftracer/utils/python/py_runtime_mixin.h>
6
#include <dftracer/utils/python/py_type_helpers.h>
7
#include <dftracer/utils/python/runtime.h>
8
#include <dftracer/utils/python/sst_distribution.h>
9
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_config.h>
10
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_key.h>
11
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_visitor.h>
12
#include <dftracer/utils/utilities/composites/dft/aggregators/association_tracker.h>
13
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
14
#include <dftracer/utils/utilities/indexer/file_partition.h>
15
#include <dftracer/utils/utilities/indexer/index_batch_sink.h>
16
#include <dftracer/utils/utilities/indexer/index_builder_utility.h>
17
#include <dftracer/utils/utilities/indexer/index_database_sst_writer_context.h>
18
#include <dftracer/utils/utilities/indexer/internal/common/gzip_member_scanner.h>
19
#include <fcntl.h>
20
#include <sys/stat.h>
21
#include <unistd.h>
22

23
#include <algorithm>
24
#include <atomic>
25
#include <cstdint>
26
#include <memory>
27
#include <mutex>
28
#include <new>
29
#include <optional>
30
#include <string>
31
#include <unordered_set>
32
#include <vector>
33

34
using dftracer::utils::Runtime;
35
using dftracer::utils::utilities::filesystem::FileEntry;
36
using dftracer::utils::utilities::filesystem::PatternDirectoryScannerUtility;
37
using dftracer::utils::utilities::filesystem::
38
    PatternDirectoryScannerUtilityInput;
39
using dftracer::utils::utilities::indexer::IndexBatchBuilderUtility;
40
using dftracer::utils::utilities::indexer::IndexBatchSink;
41
using dftracer::utils::utilities::indexer::IndexBuildBatchConfig;
42
using dftracer::utils::utilities::indexer::IndexBuildBatchResult;
43
using dftracer::utils::utilities::indexer::IndexDatabaseSstWriterContext;
44
using dftracer::utils::utilities::indexer::plan_lpt_partition;
45
using dftracer::utils::utilities::indexer::SstArtifactRegistry;
46
using dftracer::utils::utilities::indexer::internal::
47
    enumerate_gzip_member_candidates;
48
using dftracer::utils::utilities::indexer::internal::GzipMember;
49

50
// ---------------------------------------------------------------------------
51
// SstArtifactRegistry type
52
// ---------------------------------------------------------------------------
53

54
typedef struct {
55
    PyObject_HEAD std::shared_ptr<SstArtifactRegistry> registry;
56
} SstArtifactRegistryObject;
57

58
static void SstArtifactRegistry_dealloc(SstArtifactRegistryObject *self) {
×
59
    self->registry.~shared_ptr<SstArtifactRegistry>();
×
60
    Py_TYPE(self)->tp_free((PyObject *)self);
×
61
}
×
62

63
static PyObject *SstArtifactRegistry_new(PyTypeObject *type,
×
64
                                         PyObject * /*args*/,
65
                                         PyObject * /*kwds*/) {
66
    auto *self = (SstArtifactRegistryObject *)type->tp_alloc(type, 0);
×
67
    if (!self) return NULL;
×
68
    new (&self->registry) std::shared_ptr<SstArtifactRegistry>(
×
69
        std::make_shared<SstArtifactRegistry>());
×
70
    return (PyObject *)self;
×
71
}
72

73
namespace {
74

75
// Field names in the Artifacts dict returned by build_sst_batch and
76
// consumed by SstArtifactRegistry.append. Must match the field names on
77
// IndexDatabaseSstWriterContext::Artifacts.
78
constexpr const char *ARTIFACT_FIELDS[] = {
79
    "metadata_sst",        "checkpoints_sst",         "manifest_sst",
80
    "chunk_bloom_sst",     "file_bloom_sst",          "chunk_stats_sst",
81
    "chunk_dim_stats_sst", "dimensions_sst",          "file_scalar_stats_sst",
82
    "file_cat_counts_sst", "file_pid_tid_counts_sst", "file_name_counts_sst",
83
    "name_dictionary_sst", "name_file_postings_sst",  "name_chunk_postings_sst",
84
    "hash_tables_sst",     "aggregation_sst",         "system_metrics_sst",
85
};
86

87
/// Map a slot name to the matching Artifacts member. Kept in one place so
88
/// that adding a new CF requires updating only `ARTIFACT_FIELDS` plus
89
/// `dispatch_*` below.
90
std::optional<std::string> *artifacts_slot(
×
91
    IndexDatabaseSstWriterContext::Artifacts &a, std::string_view name) {
92
    if (name == "metadata_sst") return &a.metadata_sst;
×
93
    if (name == "checkpoints_sst") return &a.checkpoints_sst;
×
94
    if (name == "manifest_sst") return &a.manifest_sst;
×
95
    if (name == "chunk_bloom_sst") return &a.chunk_bloom_sst;
×
96
    if (name == "file_bloom_sst") return &a.file_bloom_sst;
×
97
    if (name == "chunk_stats_sst") return &a.chunk_stats_sst;
×
98
    if (name == "chunk_dim_stats_sst") return &a.chunk_dim_stats_sst;
×
99
    if (name == "dimensions_sst") return &a.dimensions_sst;
×
100
    if (name == "file_scalar_stats_sst") return &a.file_scalar_stats_sst;
×
101
    if (name == "file_cat_counts_sst") return &a.file_cat_counts_sst;
×
102
    if (name == "file_pid_tid_counts_sst") return &a.file_pid_tid_counts_sst;
×
103
    if (name == "file_name_counts_sst") return &a.file_name_counts_sst;
×
104
    if (name == "name_dictionary_sst") return &a.name_dictionary_sst;
×
105
    if (name == "name_file_postings_sst") return &a.name_file_postings_sst;
×
106
    if (name == "name_chunk_postings_sst") return &a.name_chunk_postings_sst;
×
107
    if (name == "hash_tables_sst") return &a.hash_tables_sst;
×
108
    if (name == "aggregation_sst") return &a.aggregation_sst;
×
109
    if (name == "system_metrics_sst") return &a.system_metrics_sst;
×
110
    return nullptr;
×
111
}
112

113
/// Convert a Python artifacts dict to the C++ Artifacts struct. Missing,
114
/// None, or empty-string entries become nullopt. Returns false on type
115
/// errors (exception set).
116
bool artifacts_from_dict(PyObject *dict,
×
117
                         IndexDatabaseSstWriterContext::Artifacts *out) {
118
    if (!PyDict_Check(dict)) {
×
119
        PyErr_SetString(PyExc_TypeError, "artifacts must be a dict");
×
120
        return false;
×
121
    }
122
    for (const char *field : ARTIFACT_FIELDS) {
×
123
        PyObject *val = PyDict_GetItemString(dict, field);  // borrowed
×
124
        if (!val || val == Py_None) continue;
×
125
        if (!PyUnicode_Check(val)) {
×
126
            PyErr_Format(PyExc_TypeError, "artifacts['%s'] must be str or None",
×
127
                         field);
128
            return false;
×
129
        }
130
        const char *s = PyUnicode_AsUTF8(val);
×
131
        if (!s) return false;
×
132
        if (s[0] == '\0') continue;
×
133
        auto *slot = artifacts_slot(*out, field);
×
134
        if (slot) *slot = std::string(s);
×
135
    }
136
    return true;
×
137
}
138

139
PyObject *artifacts_to_dict(const IndexDatabaseSstWriterContext::Artifacts &a) {
×
140
    PyObject *dict = PyDict_New();
×
141
    if (!dict) return NULL;
×
142
    auto set_field = [&](const char *name,
×
143
                         const std::optional<std::string> &slot) -> bool {
144
        PyObject *v = slot.has_value() ? PyUnicode_FromString(slot->c_str())
×
145
                                       : (Py_INCREF(Py_None), Py_None);
×
146
        if (!v) return false;
×
147
        int rc = PyDict_SetItemString(dict, name, v);
×
148
        Py_DECREF(v);
149
        return rc == 0;
×
150
    };
×
151
    if (!set_field("metadata_sst", a.metadata_sst) ||
×
152
        !set_field("checkpoints_sst", a.checkpoints_sst) ||
×
153
        !set_field("manifest_sst", a.manifest_sst) ||
×
154
        !set_field("chunk_bloom_sst", a.chunk_bloom_sst) ||
×
155
        !set_field("file_bloom_sst", a.file_bloom_sst) ||
×
156
        !set_field("chunk_stats_sst", a.chunk_stats_sst) ||
×
157
        !set_field("chunk_dim_stats_sst", a.chunk_dim_stats_sst) ||
×
158
        !set_field("dimensions_sst", a.dimensions_sst) ||
×
159
        !set_field("file_scalar_stats_sst", a.file_scalar_stats_sst) ||
×
160
        !set_field("file_cat_counts_sst", a.file_cat_counts_sst) ||
×
161
        !set_field("file_pid_tid_counts_sst", a.file_pid_tid_counts_sst) ||
×
162
        !set_field("file_name_counts_sst", a.file_name_counts_sst) ||
×
163
        !set_field("name_dictionary_sst", a.name_dictionary_sst) ||
×
164
        !set_field("name_file_postings_sst", a.name_file_postings_sst) ||
×
165
        !set_field("name_chunk_postings_sst", a.name_chunk_postings_sst) ||
×
166
        !set_field("hash_tables_sst", a.hash_tables_sst) ||
×
167
        !set_field("aggregation_sst", a.aggregation_sst) ||
×
168
        !set_field("system_metrics_sst", a.system_metrics_sst)) {
×
169
        Py_DECREF(dict);
×
170
        return NULL;
×
171
    }
172
    return dict;
×
173
}
174

175
}  // namespace
176

177
static PyObject *SstArtifactRegistry_append(SstArtifactRegistryObject *self,
×
178
                                            PyObject *args) {
179
    PyObject *dict;
180
    if (!PyArg_ParseTuple(args, "O", &dict)) return NULL;
×
181
    IndexDatabaseSstWriterContext::Artifacts a;
×
182
    if (!artifacts_from_dict(dict, &a)) return NULL;
×
183
    self->registry->append(std::move(a));
×
184
    Py_RETURN_NONE;
×
185
}
×
186

187
static PyMethodDef SstArtifactRegistry_methods[] = {
188
    {"append", (PyCFunction)SstArtifactRegistry_append, METH_VARARGS,
189
     "append(artifacts_dict) -> None\n"
190
     "Add a per-batch Artifacts dict (as returned by build_sst_batch or "
191
     "IndexDatabaseSstWriterContext.commit) to the registry."},
192
    {NULL}};
193

194
static PyTypeObject SstArtifactRegistryType = {
195
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.SstArtifactRegistry",
196
    sizeof(SstArtifactRegistryObject),
197
    0,
198
    (destructor)SstArtifactRegistry_dealloc,
199
    0,
200
    0,
201
    0,
202
    0,
203
    0,
204
    0,
205
    0,
206
    0,
207
    0,
208
    0,
209
    0,
210
    0,
211
    0,
212
    0,
213
    Py_TPFLAGS_DEFAULT,
214
    "Thread-safe collector for SST artifact paths.",
215
    0,
216
    0,
217
    0,
218
    0,
219
    0,
220
    0,
221
    SstArtifactRegistry_methods,
222
    0,
223
    0,
224
    0,
225
    0,
226
    0,
227
    0,
228
    0,
229
    0,
230
    0,
231
    SstArtifactRegistry_new,
232
};
233

234
SstArtifactRegistry *sst_artifact_registry_get(PyObject *obj) {
×
235
    if (!PyObject_TypeCheck(obj, &SstArtifactRegistryType)) return nullptr;
×
236
    return ((SstArtifactRegistryObject *)obj)->registry.get();
×
237
}
238

239
// ---------------------------------------------------------------------------
240
// scan_files: parallel directory scan with size info
241
// ---------------------------------------------------------------------------
242

243
static PyObject *scan_files_fn(PyObject * /*self*/, PyObject *args,
×
244
                               PyObject *kwds) {
245
    static const char *kwlist[] = {"directory", "patterns", "recursive",
246
                                   "runtime", NULL};
247
    const char *directory;
248
    PyObject *patterns_obj = NULL;
×
249
    int recursive = 0;
×
250
    PyObject *runtime_arg = NULL;
×
251
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|OpO", (char **)kwlist,
×
252
                                     &directory, &patterns_obj, &recursive,
253
                                     &runtime_arg)) {
254
        return NULL;
×
255
    }
256

257
    std::vector<std::string> patterns;
×
258
    if (patterns_obj && patterns_obj != Py_None) {
×
259
        PyObject *seq =
260
            PySequence_Fast(patterns_obj, "patterns must be a sequence");
×
261
        if (!seq) return NULL;
×
262
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
263
        patterns.reserve(n);
×
264
        for (Py_ssize_t i = 0; i < n; ++i) {
×
265
            const char *s = PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(seq, i));
×
266
            if (!s) {
×
267
                Py_DECREF(seq);
×
268
                return NULL;
×
269
            }
270
            patterns.emplace_back(s);
×
271
        }
272
        Py_DECREF(seq);
×
273
    }
274

275
    Runtime *rt = nullptr;
×
276
    if (runtime_arg && runtime_arg != Py_None) {
×
277
        if (!PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
278
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
279
            if (!native || !PyObject_TypeCheck(native, &RuntimeType)) {
×
280
                Py_XDECREF(native);
×
281
                PyErr_SetString(PyExc_TypeError,
×
282
                                "runtime must be a Runtime instance or None");
283
                return NULL;
×
284
            }
285
            rt = ((RuntimeObject *)native)->runtime.get();
×
286
            Py_DECREF(native);
×
287
        } else {
288
            rt = ((RuntimeObject *)runtime_arg)->runtime.get();
×
289
        }
290
    } else {
×
291
        rt = get_default_runtime();
×
292
    }
293

294
    PatternDirectoryScannerUtilityInput input(directory, patterns,
×
295
                                              recursive != 0, true);
×
296
    std::vector<FileEntry> entries;
×
297
    if (!run_blocking([&] {
×
298
            rt->submit(dftracer::utils::run_coro_scope(
×
299
                           rt->executor(),
300
                           [](dftracer::utils::CoroScope &scope,
×
301
                              PatternDirectoryScannerUtilityInput in,
302
                              std::vector<FileEntry> *out)
303
                               -> dftracer::utils::coro::CoroTask<void> {
×
304
                               PatternDirectoryScannerUtility scanner;
×
305
                               *out = co_await scope.spawn(scanner, in);
×
306
                           },
×
307
                           std::move(input), &entries),
×
308
                       "scan-files")
×
309
                .get();
×
310
        })) {
×
311
        return NULL;
×
312
    }
313

314
    PyObject *out = PyList_New(static_cast<Py_ssize_t>(entries.size()));
×
315
    if (!out) return NULL;
×
316
    for (std::size_t i = 0; i < entries.size(); ++i) {
×
317
        PyObject *t = Py_BuildValue("(sn)", entries[i].path.c_str(),
×
318
                                    (Py_ssize_t)entries[i].size);
×
319
        if (!t) {
×
320
            Py_DECREF(out);
×
321
            return NULL;
×
322
        }
323
        PyList_SET_ITEM(out, i, t);
×
324
    }
325
    return out;
×
326
}
×
327

328
// ---------------------------------------------------------------------------
329
// plan_lpt_partition: LPT bin-packing of (path, size) pairs
330
// ---------------------------------------------------------------------------
331

332
static PyObject *plan_lpt_partition_fn(PyObject * /*self*/, PyObject *args) {
×
333
    PyObject *entries_obj;
334
    Py_ssize_t num_workers;
335
    if (!PyArg_ParseTuple(args, "On", &entries_obj, &num_workers)) return NULL;
×
336
    if (num_workers <= 0) num_workers = 1;
×
337

338
    std::vector<FileEntry> entries;
×
339
    PyObject *seq = PySequence_Fast(entries_obj,
×
340
                                    "entries must be a sequence of "
341
                                    "(path, size) tuples");
342
    if (!seq) return NULL;
×
343
    Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
344
    entries.reserve(n);
×
345
    for (Py_ssize_t i = 0; i < n; ++i) {
×
346
        PyObject *item = PySequence_Fast_GET_ITEM(seq, i);
×
347
        const char *path = nullptr;
×
348
        Py_ssize_t size = 0;
×
349
        if (!PyArg_ParseTuple(item, "sn", &path, &size)) {
×
350
            Py_DECREF(seq);
×
351
            return NULL;
×
352
        }
353
        FileEntry fe;
×
354
        fe.path = path;
×
355
        fe.size = static_cast<std::size_t>(size);
×
356
        fe.is_regular_file = true;
×
357
        entries.push_back(std::move(fe));
×
358
    }
×
359
    Py_DECREF(seq);
×
360

361
    auto buckets = plan_lpt_partition(std::move(entries),
×
362
                                      static_cast<std::size_t>(num_workers));
×
363

364
    PyObject *out = PyList_New(static_cast<Py_ssize_t>(buckets.size()));
×
365
    if (!out) return NULL;
×
366
    for (std::size_t i = 0; i < buckets.size(); ++i) {
×
367
        PyObject *lst = PyList_New(static_cast<Py_ssize_t>(buckets[i].size()));
×
368
        if (!lst) {
×
369
            Py_DECREF(out);
×
370
            return NULL;
×
371
        }
372
        for (std::size_t j = 0; j < buckets[i].size(); ++j) {
×
373
            PyObject *t = Py_BuildValue("(sn)", buckets[i][j].path.c_str(),
×
374
                                        (Py_ssize_t)buckets[i][j].size);
×
375
            if (!t) {
×
376
                Py_DECREF(lst);
×
377
                Py_DECREF(out);
×
378
                return NULL;
×
379
            }
380
            PyList_SET_ITEM(lst, j, t);
×
381
        }
382
        PyList_SET_ITEM(out, i, lst);
×
383
    }
384
    return out;
×
385
}
×
386

387
// ---------------------------------------------------------------------------
388
// build_sst_batch: run the indexer pipeline with an SST sink and return
389
// the merged Artifacts dict.
390
// ---------------------------------------------------------------------------
391

392
static PyObject *build_sst_batch_fn(PyObject * /*self*/, PyObject *args,
×
393
                                    PyObject *kwds) {
394
    static const char *kwlist[] = {"files",
395
                                   "file_ids",
396
                                   "staging_dir",
397
                                   "batch_id",
398
                                   "index_dir",
399
                                   "checkpoint_size",
400
                                   "build_manifest",
401
                                   "force_rebuild",
402
                                   "bloom_dimensions",
403
                                   "parallelism",
404
                                   "flush_every_files",
405
                                   "runtime",
406
                                   "aggregation_config",
407
                                   "file_slices",
408
                                   NULL};
409
    PyObject *files_obj;
410
    PyObject *file_ids_obj;
411
    const char *staging_dir;
412
    const char *batch_id;
413
    const char *index_dir = "";
×
414
    Py_ssize_t checkpoint_size = static_cast<Py_ssize_t>(
×
415
        dftracer::utils::constants::indexer::DEFAULT_CHECKPOINT_SIZE);
416
    int build_manifest = 0;
×
417
    int force_rebuild = 0;
×
418
    PyObject *bloom_dims_obj = NULL;
×
419
    Py_ssize_t parallelism = 0;
×
420
    Py_ssize_t flush_every_files = 0;
×
421
    PyObject *runtime_arg = NULL;
×
422
    PyObject *aggregation_config_obj = NULL;
×
423
    PyObject *file_slices_obj = NULL;
×
424

425
    if (!PyArg_ParseTupleAndKeywords(
×
426
            args, kwds, "OOss|snppOnnOOO", (char **)kwlist, &files_obj,
427
            &file_ids_obj, &staging_dir, &batch_id, &index_dir,
428
            &checkpoint_size, &build_manifest, &force_rebuild, &bloom_dims_obj,
429
            &parallelism, &flush_every_files, &runtime_arg,
430
            &aggregation_config_obj, &file_slices_obj)) {
431
        return NULL;
×
432
    }
433

434
    // Unpack files.
435
    std::vector<std::string> files;
×
436
    {
437
        PyObject *seq = PySequence_Fast(files_obj, "files must be a sequence");
×
438
        if (!seq) return NULL;
×
439
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
440
        files.reserve(n);
×
441
        for (Py_ssize_t i = 0; i < n; ++i) {
×
442
            const char *s = PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(seq, i));
×
443
            if (!s) {
×
444
                Py_DECREF(seq);
×
445
                return NULL;
×
446
            }
447
            files.emplace_back(s);
×
448
        }
449
        Py_DECREF(seq);
×
450
    }
451
    if (files.empty()) {
×
452
        return PyDict_New();
×
453
    }
454

455
    // Unpack file_ids, parallel to files.
456
    std::vector<int> file_ids;
×
457
    {
458
        PyObject *seq =
459
            PySequence_Fast(file_ids_obj, "file_ids must be a sequence");
×
460
        if (!seq) return NULL;
×
461
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
462
        if (static_cast<std::size_t>(n) != files.size()) {
×
463
            Py_DECREF(seq);
×
464
            PyErr_SetString(PyExc_ValueError,
×
465
                            "file_ids must have the same length as files");
466
            return NULL;
×
467
        }
468
        file_ids.reserve(n);
×
469
        for (Py_ssize_t i = 0; i < n; ++i) {
×
470
            long v = PyLong_AsLong(PySequence_Fast_GET_ITEM(seq, i));
×
471
            if (v == -1 && PyErr_Occurred()) {
×
472
                Py_DECREF(seq);
×
473
                return NULL;
×
474
            }
475
            file_ids.push_back(static_cast<int>(v));
×
476
        }
477
        Py_DECREF(seq);
×
478
    }
479

480
    // Optional bloom dimensions override.
481
    std::vector<std::string> bloom_dims;
×
482
    if (bloom_dims_obj && bloom_dims_obj != Py_None) {
×
483
        PyObject *seq = PySequence_Fast(bloom_dims_obj,
×
484
                                        "bloom_dimensions must be a sequence");
485
        if (!seq) return NULL;
×
486
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
487
        bloom_dims.reserve(n);
×
488
        for (Py_ssize_t i = 0; i < n; ++i) {
×
489
            const char *s = PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(seq, i));
×
490
            if (!s) {
×
491
                Py_DECREF(seq);
×
492
                return NULL;
×
493
            }
494
            bloom_dims.emplace_back(s);
×
495
        }
496
        Py_DECREF(seq);
×
497
    }
498

499
    // Resolve Runtime (matching CheckpointIndexer pattern).
500
    Runtime *rt = nullptr;
×
501
    if (runtime_arg && runtime_arg != Py_None) {
×
502
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
503
            rt = ((RuntimeObject *)runtime_arg)->runtime.get();
×
504
        } else {
505
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
506
            if (!native || !PyObject_TypeCheck(native, &RuntimeType)) {
×
507
                Py_XDECREF(native);
×
508
                PyErr_SetString(PyExc_TypeError,
×
509
                                "runtime must be a Runtime instance or None");
510
                return NULL;
×
511
            }
512
            rt = ((RuntimeObject *)native)->runtime.get();
×
513
            Py_DECREF(native);
×
514
        }
515
    } else {
×
516
        rt = get_default_runtime();
×
517
    }
518

519
    // Build config + sink factory shared state.
520
    struct SharedArtifacts {
521
        std::mutex mu;
522
        std::vector<IndexDatabaseSstWriterContext::Artifacts> list;
523
    };
524
    auto artifacts = std::make_shared<SharedArtifacts>();
×
525
    auto staging = std::string(staging_dir);
×
526
    auto batch = std::string(batch_id);
×
527

528
    // Optional aggregation config, extracted from the Python dataclass.
529
    std::shared_ptr<dftracer::utils::utilities::composites::dft::aggregators::
530
                        AggregationConfig>
531
        agg_config_ptr;
×
532
    if (aggregation_config_obj && aggregation_config_obj != Py_None) {
×
533
        using dftracer::utils::utilities::composites::dft::aggregators::
534
            AggregationConfig;
535
        auto cfg = std::make_shared<AggregationConfig>();
×
536
        auto pull_double = [&](const char *name, double fallback) -> double {
×
537
            PyObject *v = PyObject_GetAttrString(aggregation_config_obj, name);
×
538
            if (!v || v == Py_None) {
×
539
                Py_XDECREF(v);
×
540
                PyErr_Clear();
×
541
                return fallback;
×
542
            }
543
            double out = PyFloat_AsDouble(v);
×
544
            Py_DECREF(v);
545
            if (out == -1.0 && PyErr_Occurred()) return fallback;
×
546
            return out;
×
547
        };
×
548
        auto pull_bool = [&](const char *name, bool fallback) -> bool {
×
549
            PyObject *v = PyObject_GetAttrString(aggregation_config_obj, name);
×
550
            if (!v || v == Py_None) {
×
551
                Py_XDECREF(v);
×
552
                PyErr_Clear();
×
553
                return fallback;
×
554
            }
555
            int out = PyObject_IsTrue(v);
×
556
            Py_DECREF(v);
557
            return out > 0 ? true : fallback;
×
558
        };
×
559
        auto pull_string_list =
560
            [&](const char *name) -> std::vector<std::string> {
×
561
            std::vector<std::string> out;
×
562
            PyObject *v = PyObject_GetAttrString(aggregation_config_obj, name);
×
563
            if (!v || v == Py_None) {
×
564
                Py_XDECREF(v);
×
565
                PyErr_Clear();
×
566
                return out;
×
567
            }
568
            PyObject *seq = PySequence_Fast(v, "expected list of str");
×
569
            Py_DECREF(v);
×
570
            if (!seq) {
×
571
                PyErr_Clear();
×
572
                return out;
×
573
            }
574
            Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
575
            out.reserve(n);
×
576
            for (Py_ssize_t i = 0; i < n; ++i) {
×
577
                const char *s =
578
                    PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(seq, i));
×
579
                if (s) out.emplace_back(s);
×
580
            }
581
            Py_DECREF(seq);
×
582
            return out;
×
583
        };
×
584
        double time_interval_ms = pull_double("time_interval_ms", 5000.0);
×
585
        cfg->time_interval_us =
×
586
            static_cast<std::uint64_t>(time_interval_ms * 1000.0);
×
587
        cfg->compute_percentiles = pull_bool("compute_percentiles", false);
×
588
        cfg->extra_group_keys = pull_string_list("group_keys");
×
589
        cfg->custom_metric_fields = pull_string_list("custom_metric_fields");
×
590
        agg_config_ptr = std::move(cfg);
×
591
    }
×
592

593
    // owned_member_maps must outlive rt->submit: FileSlice::members is raw.
594
    std::vector<std::vector<GzipMember>> owned_member_maps;
×
595
    std::vector<IndexBuildBatchConfig::FileSlice> parsed_slices;
×
596
    if (file_slices_obj && file_slices_obj != Py_None) {
×
597
        PyObject *seq =
598
            PySequence_Fast(file_slices_obj, "file_slices must be a sequence");
×
599
        if (!seq) return NULL;
×
600
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
601
        if (static_cast<std::size_t>(n) != files.size()) {
×
602
            Py_DECREF(seq);
×
603
            PyErr_SetString(PyExc_ValueError,
×
604
                            "file_slices must match files length");
605
            return NULL;
×
606
        }
607
        owned_member_maps.resize(n);
×
608
        parsed_slices.resize(n);
×
609
        for (Py_ssize_t i = 0; i < n; ++i) {
×
610
            PyObject *entry = PySequence_Fast_GET_ITEM(seq, i);
×
611
            if (entry == Py_None) {
×
612
                continue;  // leave slice default-constructed (members=null)
×
613
            }
614
            Py_ssize_t mb = 0, me = 0, ckpt_base = 0;
×
615
            int skip_scoped = 0;
×
616
            PyObject *members_obj = nullptr;
×
617
            if (!PyArg_ParseTuple(entry, "nnnpO", &mb, &me, &ckpt_base,
×
618
                                  &skip_scoped, &members_obj)) {
619
                Py_DECREF(seq);
×
620
                return NULL;
×
621
            }
622
            PyObject *mseq = PySequence_Fast(
×
623
                members_obj, "file_slices[i].members must be a sequence");
624
            if (!mseq) {
×
625
                Py_DECREF(seq);
×
626
                return NULL;
×
627
            }
628
            Py_ssize_t mn = PySequence_Fast_GET_SIZE(mseq);
×
629
            auto &mv = owned_member_maps[i];
×
630
            mv.resize(mn);
×
631
            for (Py_ssize_t j = 0; j < mn; ++j) {
×
632
                PyObject *m = PySequence_Fast_GET_ITEM(mseq, j);
×
633
                unsigned long long c_offset = 0, c_size = 0;
×
634
                if (!PyArg_ParseTuple(m, "KK", &c_offset, &c_size)) {
×
635
                    Py_DECREF(mseq);
×
636
                    Py_DECREF(seq);
×
637
                    return NULL;
×
638
                }
639
                mv[j].c_offset = static_cast<std::uint64_t>(c_offset);
×
640
                mv[j].c_size = static_cast<std::uint64_t>(c_size);
×
641
            }
642
            Py_DECREF(mseq);
×
643
            parsed_slices[i].members = &mv;
×
644
            parsed_slices[i].member_begin = static_cast<std::size_t>(mb);
×
645
            parsed_slices[i].member_end = static_cast<std::size_t>(me);
×
646
            parsed_slices[i].checkpoint_idx_base =
×
647
                static_cast<std::uint64_t>(ckpt_base);
×
648
            parsed_slices[i].skip_file_scoped_writes = skip_scoped != 0;
×
649
        }
650
        Py_DECREF(seq);
×
651
    }
652

653
    auto batch_config = std::make_shared<IndexBuildBatchConfig>();
×
654
    batch_config->file_paths = std::move(files);
×
655
    batch_config->preassigned_file_ids = std::move(file_ids);
×
656
    if (!parsed_slices.empty()) {
×
657
        batch_config->file_slices = parsed_slices;
×
658
    }
659
    batch_config->index_dir = index_dir;
×
660
    batch_config->checkpoint_size = static_cast<std::size_t>(checkpoint_size);
×
661
    batch_config->build_manifest = build_manifest != 0;
×
662
    batch_config->force_rebuild = force_rebuild != 0;
×
663
    batch_config->bloom_dimensions = std::move(bloom_dims);
×
664
    batch_config->parallelism =
×
665
        parallelism > 0 ? static_cast<std::size_t>(parallelism)
×
666
                        : (rt ? std::max<std::size_t>(rt->threads(), 1) : 1);
×
667
    batch_config->flush_every_files =
×
668
        static_cast<std::size_t>(flush_every_files);
×
669
    batch_config->rebuild_root_summaries = false;
×
670

671
    if (agg_config_ptr) {
×
672
        auto agg_staging = staging;
×
673
        auto agg_prefix = batch + "_agg";
×
674
        // Counter keeps per-file SST dirs unique across duplicate file_paths
675
        // when one worker owns multiple slices of the same file.
676
        auto visitor_counter = std::make_shared<std::atomic<std::size_t>>(0);
×
677
        batch_config->dft_visitor_factory =
×
678
            [agg_staging, agg_prefix, agg_config_ptr,
×
679
             visitor_counter](const std::string &file_path)
680
            -> std::vector<std::unique_ptr<
681
                dftracer::utils::utilities::composites::dft::DftEventVisitor>> {
682
            using dftracer::utils::utilities::composites::dft::DftEventVisitor;
683
            using dftracer::utils::utilities::composites::dft::aggregators::
684
                AggregationVisitor;
685
            const std::size_t idx =
686
                visitor_counter->fetch_add(1, std::memory_order_relaxed);
×
687
            std::string prefix = agg_prefix + "_" + std::to_string(idx);
×
688
            std::vector<std::unique_ptr<DftEventVisitor>> visitors;
×
689
            visitors.push_back(std::make_unique<AggregationVisitor>(
×
690
                agg_staging, prefix, /*config_hash=*/0, *agg_config_ptr,
×
691
                file_path));
692
            return visitors;
×
693
        };
×
694
    }
×
695

696
    // Atomic: write phase calls sink_factory from N coroutines concurrently.
697
    auto batch_counter = std::make_shared<std::atomic<std::size_t>>(0);
×
698
    batch_config->sink_factory =
×
699
        [staging, batch, batch_counter]() -> std::unique_ptr<IndexBatchSink> {
×
700
        const std::size_t idx =
701
            batch_counter->fetch_add(1, std::memory_order_relaxed);
×
702
        std::string sub_batch = batch + "_" + std::to_string(idx);
×
703
        return std::make_unique<IndexDatabaseSstWriterContext>(staging,
×
704
                                                               sub_batch);
705
    };
×
706
    batch_config->sink_commit = [artifacts](IndexBatchSink &sink) {
×
707
        auto &sst = static_cast<IndexDatabaseSstWriterContext &>(sink);
×
708
        auto batch_artifacts = sst.commit();
×
709
        std::lock_guard<std::mutex> lock(artifacts->mu);
×
710
        if (!batch_artifacts.empty()) {
×
711
            artifacts->list.push_back(std::move(batch_artifacts));
×
712
        }
713
    };
×
714

715
    IndexBuildBatchResult result;
×
716
    if (!run_blocking([&] {
×
717
            rt->submit(dftracer::utils::run_coro_scope(
×
718
                           rt->executor(),
719
                           [](dftracer::utils::CoroScope &scope,
×
720
                              std::shared_ptr<IndexBuildBatchConfig> cfg,
721
                              IndexBuildBatchResult *out)
722
                               -> dftracer::utils::coro::CoroTask<void> {
×
723
                               *out =
×
724
                                   co_await IndexBatchBuilderUtility::process(
×
725
                                       &scope, std::move(cfg));
726
                           },
×
727
                           batch_config, &result),
×
728
                       "build-sst-batch")
×
729
                .get();
×
730
        })) {
×
731
        return NULL;
×
732
    }
733

734
    // If any file failed, surface the first error.
735
    if (result.failed > 0) {
×
736
        for (const auto &r : result.results) {
×
737
            if (!r.success) {
×
738
                PyErr_SetString(PyExc_RuntimeError, r.error_message.c_str());
×
739
                return NULL;
×
740
            }
741
        }
742
    }
743

744
    // One dict per committed sink + per-file aggregation below.
745
    PyObject *out_list = PyList_New(0);
×
746
    if (!out_list) return NULL;
×
747
    {
748
        std::lock_guard<std::mutex> lock(artifacts->mu);
×
749
        for (const auto &a : artifacts->list) {
×
750
            PyObject *main_dict = artifacts_to_dict(a);
×
751
            if (!main_dict || PyList_Append(out_list, main_dict) < 0) {
×
752
                Py_XDECREF(main_dict);
×
753
                Py_DECREF(out_list);
×
754
                return NULL;
×
755
            }
756
            Py_DECREF(main_dict);
×
757
        }
758
    }
×
759
    // Harvest per-file aggregation SSTs from extra visitors. Each visitor
760
    // holds a vector of Artifacts (one per FLUSH_THRESHOLD flush + the
761
    // file-complete flush) because SstFileWriter requires strictly
762
    // ascending keys per SST and cross-flush merge operands would collide.
763
    //
764
    // `extra_visitors` is indexed per input file, but a single
765
    // AggregationVisitor instance is typically shared across every file in
766
    // the batch (one flush at end-of-batch). Without dedup we would emit
767
    // that visitor's artifact dict N_files times, producing a manifest with
768
    // N copies of the same SST path. Dedup by visitor pointer so each
769
    // unique flush-sequence is emitted exactly once.
770
    using dftracer::utils::utilities::composites::dft::aggregators::
771
        AggregationVisitor;
772
    using dftracer::utils::utilities::composites::dft::aggregators::
773
        AssociationTracker;
774
    std::unordered_set<AggregationVisitor *> seen_visitors;
×
775
    for (auto &file_visitors : result.extra_visitors) {
×
776
        for (auto &visitor : file_visitors) {
×
777
            auto *agg = dynamic_cast<AggregationVisitor *>(visitor.get());
×
778
            if (!agg) continue;
×
779
            if (!seen_visitors.insert(agg).second) continue;
×
780
            for (auto &a : agg->aggregation_artifacts()) {
×
781
                if (a.empty()) continue;
×
782
                PyObject *agg_dict = artifacts_to_dict(a);
×
783
                if (!agg_dict || PyList_Append(out_list, agg_dict) < 0) {
×
784
                    Py_XDECREF(agg_dict);
×
785
                    Py_DECREF(out_list);
×
786
                    return NULL;
×
787
                }
788
                Py_DECREF(agg_dict);
×
789
            }
790
        }
791
    }
792

793
    AssociationTracker combined;
×
794
    bool any_tracker = false;
×
795
    for (auto *agg : seen_visitors) {
×
796
        auto out = agg->take_output();
×
797
        if (out.local_tracker) {
×
798
            combined.merge(*out.local_tracker);
×
799
            any_tracker = true;
×
800
        }
801
    }
×
802
    PyObject *tracker_bytes = nullptr;
×
803
    if (any_tracker) {
×
804
        combined.finalize();
×
805
        std::string blob = combined.serialize();
×
806
        tracker_bytes = PyBytes_FromStringAndSize(
×
807
            blob.data(), static_cast<Py_ssize_t>(blob.size()));
×
808
    } else {
×
809
        tracker_bytes = PyBytes_FromStringAndSize(nullptr, 0);
×
810
    }
811
    if (!tracker_bytes) {
×
812
        Py_DECREF(out_list);
×
813
        return NULL;
×
814
    }
815
    PyObject *ret = PyTuple_Pack(2, out_list, tracker_bytes);
×
816
    Py_DECREF(out_list);
×
817
    Py_DECREF(tracker_bytes);
×
818
    return ret;
×
819
}
×
820

821
static PyObject *enable_aggregation_deterministic_ids_fn(PyObject * /*self*/,
×
822
                                                         PyObject * /*args*/) {
823
    dftracer::utils::utilities::composites::dft::aggregators::
824
        aggregation_intern()
825
            .enable_deterministic_ids();
×
826
    Py_RETURN_NONE;
×
827
}
828

829
static PyObject *move_artifacts_fn(PyObject * /*self*/, PyObject *args,
×
830
                                   PyObject *kwds) {
831
    static const char *kwlist[] = {"artifacts", "dest_dir", NULL};
832
    PyObject *dict = NULL;
×
833
    const char *dest_dir = NULL;
×
834
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "Os", (char **)kwlist, &dict,
×
835
                                     &dest_dir)) {
836
        return NULL;
×
837
    }
838
    IndexDatabaseSstWriterContext::Artifacts a;
×
839
    if (!artifacts_from_dict(dict, &a)) return NULL;
×
840
    IndexDatabaseSstWriterContext::Artifacts moved;
×
841
    if (!run_blocking_r([&] { return std::move(a).move_to(dest_dir); }, moved))
×
842
        return NULL;
×
843
    return artifacts_to_dict(moved);
×
844
}
×
845

846
namespace {
847

848
dftracer::utils::coro::CoroTask<void> scan_one_gzip_file(
×
849
    std::string path, std::vector<GzipMember> *out) {
×
850
    out->clear();
851
    int fd = ::open(path.c_str(), O_RDONLY);
×
852
    if (fd < 0) co_return;
×
853
    struct stat st;
854
    if (::fstat(fd, &st) == 0 && st.st_size >= 18) {
×
855
        co_await enumerate_gzip_member_candidates(
×
856
            fd, static_cast<std::uint64_t>(st.st_size), *out);
857
    }
858
    ::close(fd);
×
859
}
×
860

861
}  // namespace
862

863
static PyObject *enumerate_gzip_members_fn(PyObject * /*self*/, PyObject *args,
×
864
                                           PyObject *kwds) {
865
    static const char *kwlist[] = {"files", "runtime", NULL};
866
    PyObject *files_obj = NULL;
×
867
    PyObject *runtime_arg = NULL;
×
868
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|O", (char **)kwlist,
×
869
                                     &files_obj, &runtime_arg)) {
870
        return NULL;
×
871
    }
872

873
    std::vector<std::string> files;
×
874
    {
875
        PyObject *seq = PySequence_Fast(files_obj, "files must be a sequence");
×
876
        if (!seq) return NULL;
×
877
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
878
        files.reserve(n);
×
879
        for (Py_ssize_t i = 0; i < n; ++i) {
×
880
            const char *s = PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(seq, i));
×
881
            if (!s) {
×
882
                Py_DECREF(seq);
×
883
                return NULL;
×
884
            }
885
            files.emplace_back(s);
×
886
        }
887
        Py_DECREF(seq);
×
888
    }
889

890
    Runtime *rt = nullptr;
×
891
    if (runtime_arg && runtime_arg != Py_None) {
×
892
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
893
            rt = ((RuntimeObject *)runtime_arg)->runtime.get();
×
894
        } else {
895
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
896
            if (!native || !PyObject_TypeCheck(native, &RuntimeType)) {
×
897
                Py_XDECREF(native);
×
898
                PyErr_SetString(PyExc_TypeError,
×
899
                                "runtime must be a Runtime instance or None");
900
                return NULL;
×
901
            }
902
            rt = ((RuntimeObject *)native)->runtime.get();
×
903
            Py_DECREF(native);
×
904
        }
905
    } else {
×
906
        rt = get_default_runtime();
×
907
    }
908

909
    std::vector<std::vector<GzipMember>> results(files.size());
×
910
    if (!run_blocking([&] {
×
911
            rt->submit(
×
912
                  dftracer::utils::run_coro_scope(
×
913
                      rt->executor(),
914
                      [](dftracer::utils::CoroScope &scope,
×
915
                         const std::vector<std::string> *paths,
916
                         std::vector<std::vector<GzipMember>> *out)
917
                          -> dftracer::utils::coro::CoroTask<void> {
×
918
                          co_await scope.scope(
×
919
                              [paths, out](dftracer::utils::CoroScope &child)
×
920
                                  -> dftracer::utils::coro::CoroTask<void> {
×
921
                                  for (std::size_t i = 0; i < paths->size();
×
922
                                       ++i) {
923
                                      const std::string &path = (*paths)[i];
924
                                      auto *slot = &(*out)[i];
925
                                      child.spawn(
×
926
                                          [path,
×
927
                                           slot](dftracer::utils::CoroScope &)
928
                                              -> dftracer::utils::coro::CoroTask<
929
                                                  void> {
×
930
                                              co_await scan_one_gzip_file(path,
×
931
                                                                          slot);
932
                                          });
×
933
                                  }
934
                                  co_return;
935
                              });
×
936
                          co_return;
937
                      },
×
938
                      &files, &results),
×
939
                  "enumerate-gzip-members")
×
940
                .get();
×
941
        })) {
×
942
        return NULL;
×
943
    }
944

945
    PyObject *out_list = PyList_New(static_cast<Py_ssize_t>(results.size()));
×
946
    if (!out_list) return NULL;
×
947
    for (std::size_t i = 0; i < results.size(); ++i) {
×
948
        const auto &mv = results[i];
×
949
        PyObject *inner = PyList_New(static_cast<Py_ssize_t>(mv.size()));
×
950
        if (!inner) {
×
951
            Py_DECREF(out_list);
×
952
            return NULL;
×
953
        }
954
        for (std::size_t j = 0; j < mv.size(); ++j) {
×
955
            PyObject *t =
956
                Py_BuildValue("(KK)", (unsigned long long)mv[j].c_offset,
×
957
                              (unsigned long long)mv[j].c_size);
×
958
            if (!t) {
×
959
                Py_DECREF(inner);
×
960
                Py_DECREF(out_list);
×
961
                return NULL;
×
962
            }
963
            PyList_SET_ITEM(inner, j, t);
×
964
        }
965
        PyList_SET_ITEM(out_list, i, inner);
×
966
    }
967
    return out_list;
×
968
}
×
969

970
// Mirrors build_work_units + lpt_assign_units in dftracer_aggregator_mpi.cpp
971
// so the Dask backend produces identical work distribution to MPI.
972
static PyObject *plan_work_units_fn(PyObject * /*self*/, PyObject *args,
×
973
                                    PyObject *kwds) {
974
    static const char *kwlist[] = {"member_map", "num_workers", "target_c_size",
975
                                   NULL};
976
    PyObject *map_obj = NULL;
×
977
    Py_ssize_t num_workers = 0;
×
978
    unsigned long long target_c_size = 0;
×
979
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "On|K", (char **)kwlist,
×
980
                                     &map_obj, &num_workers, &target_c_size)) {
981
        return NULL;
×
982
    }
983
    if (num_workers <= 0) num_workers = 1;
×
984

985
    std::vector<std::vector<GzipMember>> member_map;
×
986
    {
987
        PyObject *seq =
988
            PySequence_Fast(map_obj, "member_map must be a sequence");
×
989
        if (!seq) return NULL;
×
990
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
991
        member_map.resize(n);
×
992
        for (Py_ssize_t i = 0; i < n; ++i) {
×
993
            PyObject *inner = PySequence_Fast_GET_ITEM(seq, i);
×
994
            PyObject *iseq =
995
                PySequence_Fast(inner, "member_map[i] must be a sequence");
×
996
            if (!iseq) {
×
997
                Py_DECREF(seq);
×
998
                return NULL;
×
999
            }
1000
            Py_ssize_t ni = PySequence_Fast_GET_SIZE(iseq);
×
1001
            member_map[i].resize(ni);
×
1002
            for (Py_ssize_t j = 0; j < ni; ++j) {
×
1003
                PyObject *t = PySequence_Fast_GET_ITEM(iseq, j);
×
1004
                unsigned long long c_offset = 0, c_size = 0;
×
1005
                if (!PyArg_ParseTuple(t, "KK", &c_offset, &c_size)) {
×
1006
                    Py_DECREF(iseq);
×
1007
                    Py_DECREF(seq);
×
1008
                    return NULL;
×
1009
                }
1010
                member_map[i][j].c_offset =
×
1011
                    static_cast<std::uint64_t>(c_offset);
1012
                member_map[i][j].c_size = static_cast<std::uint64_t>(c_size);
×
1013
            }
1014
            Py_DECREF(iseq);
×
1015
        }
1016
        Py_DECREF(seq);
×
1017
    }
1018

1019
    // Fallback: treat empty/non-gzip files as a single whole-file member.
1020
    std::uint64_t total_c = 0;
×
1021
    for (auto &mv : member_map) {
×
1022
        if (mv.empty()) mv.push_back({0, 0});
×
1023
        for (const auto &m : mv) total_c += m.c_size;
×
1024
    }
1025

1026
    if (target_c_size == 0) {
×
1027
        target_c_size =
×
1028
            (total_c + static_cast<std::uint64_t>(num_workers) - 1) /
×
1029
            std::max<std::uint64_t>(static_cast<std::uint64_t>(num_workers), 1);
×
1030
    }
1031

1032
    struct Unit {
1033
        std::size_t file_idx;
1034
        std::size_t member_begin;
1035
        std::size_t member_end;
1036
        std::uint64_t c_size;
1037
    };
1038
    std::vector<Unit> units;
×
1039
    for (std::size_t fi = 0; fi < member_map.size(); ++fi) {
×
1040
        const auto &members = member_map[fi];
×
1041
        if (members.empty()) continue;
×
1042
        std::size_t begin = 0;
×
1043
        std::uint64_t accum = 0;
×
1044
        for (std::size_t i = 0; i < members.size(); ++i) {
×
1045
            accum += members[i].c_size;
×
1046
            const bool is_last = (i + 1 == members.size());
×
1047
            if ((target_c_size > 0 && accum >= target_c_size) || is_last) {
×
1048
                units.push_back({fi, begin, i + 1, accum});
×
1049
                begin = i + 1;
×
1050
                accum = 0;
×
1051
            }
1052
        }
1053
    }
1054

1055
    std::vector<std::size_t> order(units.size());
×
1056
    for (std::size_t i = 0; i < order.size(); ++i) order[i] = i;
×
1057
    std::sort(order.begin(), order.end(), [&](std::size_t a, std::size_t b) {
×
1058
        if (units[a].c_size != units[b].c_size)
×
1059
            return units[a].c_size > units[b].c_size;
×
1060
        if (units[a].file_idx != units[b].file_idx)
×
1061
            return units[a].file_idx < units[b].file_idx;
×
1062
        return units[a].member_begin < units[b].member_begin;
×
1063
    });
1064
    const std::size_t nw = static_cast<std::size_t>(num_workers);
×
1065
    std::vector<std::uint64_t> loads(nw, 0);
×
1066
    std::vector<std::vector<std::size_t>> per_worker(nw);
×
1067
    for (std::size_t ord : order) {
×
1068
        std::size_t best = 0;
×
1069
        for (std::size_t r = 1; r < nw; ++r)
×
1070
            if (loads[r] < loads[best]) best = r;
×
1071
        per_worker[best].push_back(ord);
×
1072
        loads[best] += std::max<std::uint64_t>(units[ord].c_size, 1);
×
1073
    }
1074

1075
    PyObject *out = PyList_New(static_cast<Py_ssize_t>(nw));
×
1076
    if (!out) return NULL;
×
1077
    for (std::size_t w = 0; w < nw; ++w) {
×
1078
        // Keep per-worker slices sorted by (file_idx, member_begin) for
1079
        // deterministic, file-group-friendly iteration downstream.
1080
        auto &lst = per_worker[w];
×
1081
        std::sort(lst.begin(), lst.end(), [&](std::size_t a, std::size_t b) {
×
1082
            if (units[a].file_idx != units[b].file_idx)
×
1083
                return units[a].file_idx < units[b].file_idx;
×
1084
            return units[a].member_begin < units[b].member_begin;
×
1085
        });
1086
        PyObject *inner = PyList_New(static_cast<Py_ssize_t>(lst.size()));
×
1087
        if (!inner) {
×
1088
            Py_DECREF(out);
×
1089
            return NULL;
×
1090
        }
1091
        for (std::size_t k = 0; k < lst.size(); ++k) {
×
1092
            const auto &u = units[lst[k]];
×
1093
            PyObject *t = Py_BuildValue(
×
1094
                "(nnnK)", (Py_ssize_t)u.file_idx, (Py_ssize_t)u.member_begin,
×
1095
                (Py_ssize_t)u.member_end, (unsigned long long)u.c_size);
×
1096
            if (!t) {
×
1097
                Py_DECREF(inner);
×
1098
                Py_DECREF(out);
×
1099
                return NULL;
×
1100
            }
1101
            PyList_SET_ITEM(inner, k, t);
×
1102
        }
1103
        PyList_SET_ITEM(out, w, inner);
×
1104
    }
1105
    return out;
×
1106
}
×
1107

1108
// ---------------------------------------------------------------------------
1109
// Module registration
1110
// ---------------------------------------------------------------------------
1111

1112
static PyMethodDef SstDistributionMethods[] = {
1113
    {"build_sst_batch", (PyCFunction)build_sst_batch_fn,
1114
     METH_VARARGS | METH_KEYWORDS,
1115
     "build_sst_batch(files, file_ids, staging_dir, batch_id, ...) "
1116
     "-> (list[dict], bytes)\n"
1117
     "Run the indexer pipeline with an SST sink and return "
1118
     "(artifact_dicts, tracker_blob). The tracker blob is the serialized "
1119
     "merged AssociationTracker from this batch's aggregation visitors "
1120
     "(empty bytes when no aggregation_config was passed)."},
1121
    {"plan_lpt_partition", (PyCFunction)plan_lpt_partition_fn, METH_VARARGS,
1122
     "plan_lpt_partition(entries, num_workers) -> list[list[(path, size)]]\n"
1123
     "Greedy Longest-Processing-Time-first bin-packing of (path, size) "
1124
     "tuples across num_workers buckets. Minimises the maximum per-worker "
1125
     "total size."},
1126
    {"scan_files", (PyCFunction)scan_files_fn, METH_VARARGS | METH_KEYWORDS,
1127
     "scan_files(directory, patterns=None, recursive=False, runtime=None) "
1128
     "-> list[(path, size)]\n"
1129
     "Parallel directory scan returning (path, size) tuples for regular "
1130
     "files matching the patterns."},
1131
    {"enable_aggregation_deterministic_ids",
1132
     (PyCFunction)enable_aggregation_deterministic_ids_fn, METH_NOARGS,
1133
     "enable_aggregation_deterministic_ids() -> None\n"
1134
     "Flip the global aggregation StringIntern into deterministic-id mode "
1135
     "so the same string maps to the same 32-bit id in every worker "
1136
     "process. Call once at worker startup BEFORE any aggregation work."},
1137
    {"move_artifacts", (PyCFunction)move_artifacts_fn,
1138
     METH_VARARGS | METH_KEYWORDS,
1139
     "move_artifacts(artifacts, dest_dir) -> dict\n"
1140
     "Move every populated SST in `artifacts` (as returned by "
1141
     "`build_sst_batch`) into `dest_dir` via the C++ rename/copy helper, "
1142
     "returning a fresh dict with the new paths. Single GIL release, no "
1143
     "per-file Python shutil.move overhead."},
1144
    {"enumerate_gzip_members", (PyCFunction)enumerate_gzip_members_fn,
1145
     METH_VARARGS | METH_KEYWORDS,
1146
     "enumerate_gzip_members(files, runtime=None) -> list[list[(c_offset, "
1147
     "c_size)]]\n"
1148
     "Cooperative async scan of gzip member offsets across `files`. "
1149
     "Returns lists of (c_offset, c_size) parallel to `files`; empty for "
1150
     "non-gzip / unreadable files."},
1151
    {"plan_work_units", (PyCFunction)plan_work_units_fn,
1152
     METH_VARARGS | METH_KEYWORDS,
1153
     "plan_work_units(member_map, num_workers, target_c_size=0) "
1154
     "-> list[list[(file_idx, member_begin, member_end, c_size)]]\n"
1155
     "Deterministic LPT assignment of intra-file gzip-member slices "
1156
     "across workers. Each worker's list contains (file_idx, "
1157
     "member_begin, member_end, c_size) tuples; a file sliced across "
1158
     "multiple workers appears in each owner's list with disjoint "
1159
     "[member_begin, member_end) ranges."},
1160
    {NULL, NULL, 0, NULL}};
1161

1162
int init_sst_distribution(PyObject *m) {
2✔
1163
    if (register_type(m, &SstArtifactRegistryType, "SstArtifactRegistry") < 0)
2✔
1164
        return -1;
×
1165
    if (PyModule_AddFunctions(m, SstDistributionMethods) < 0) return -1;
2✔
1166
    return 0;
2✔
1167
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc