• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28521653886

01 Jul 2026 01:36PM UTC coverage: 50.92% (-1.4%) from 52.278%
28521653886

Pull #83

github

web-flow
Merge 9bdedb1e9 into 2efed6649
Pull Request #83: refactor and improve code QoL

31893 of 80049 branches covered (39.84%)

Branch coverage included in aggregate %.

789 of 1613 new or added lines in 87 files covered. (48.92%)

5007 existing lines in 181 files now uncovered.

32812 of 47024 relevant lines covered (69.78%)

9905.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.34
/src/dftracer/utils/python/sst_distribution.cpp
1
#include <dftracer/utils/core/common/constants.h>
2
#include <dftracer/utils/core/runtime.h>
3
#include <dftracer/utils/core/tasks/coro_scope.h>
4
#include <dftracer/utils/python/py_errors.h>
5
#include <dftracer/utils/python/py_type_helpers.h>
6
#include <dftracer/utils/python/runtime.h>
7
#include <dftracer/utils/python/sst_distribution.h>
8
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_config.h>
9
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_key.h>
10
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_visitor.h>
11
#include <dftracer/utils/utilities/composites/dft/aggregators/association_tracker.h>
12
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
13
#include <dftracer/utils/utilities/indexer/file_partition.h>
14
#include <dftracer/utils/utilities/indexer/index_batch_sink.h>
15
#include <dftracer/utils/utilities/indexer/index_builder_utility.h>
16
#include <dftracer/utils/utilities/indexer/index_database_sst_writer_context.h>
17
#include <dftracer/utils/utilities/indexer/internal/common/gzip_member_scanner.h>
18
#include <fcntl.h>
19
#include <sys/stat.h>
20
#include <unistd.h>
21

22
#include <algorithm>
23
#include <atomic>
24
#include <cstdint>
25
#include <memory>
26
#include <mutex>
27
#include <new>
28
#include <optional>
29
#include <string>
30
#include <unordered_set>
31
#include <vector>
32

33
using dftracer::utils::Runtime;
34
using dftracer::utils::utilities::filesystem::FileEntry;
35
using dftracer::utils::utilities::filesystem::PatternDirectoryScannerUtility;
36
using dftracer::utils::utilities::filesystem::
37
    PatternDirectoryScannerUtilityInput;
38
using dftracer::utils::utilities::indexer::IndexBatchBuilderUtility;
39
using dftracer::utils::utilities::indexer::IndexBatchSink;
40
using dftracer::utils::utilities::indexer::IndexBuildBatchConfig;
41
using dftracer::utils::utilities::indexer::IndexBuildBatchResult;
42
using dftracer::utils::utilities::indexer::IndexDatabaseSstWriterContext;
43
using dftracer::utils::utilities::indexer::plan_lpt_partition;
44
using dftracer::utils::utilities::indexer::SstArtifactRegistry;
45
using dftracer::utils::utilities::indexer::internal::
46
    enumerate_gzip_member_candidates;
47
using dftracer::utils::utilities::indexer::internal::GzipMember;
48

49
// ---------------------------------------------------------------------------
50
// SstArtifactRegistry type
51
// ---------------------------------------------------------------------------
52

53
typedef struct {
54
    PyObject_HEAD std::shared_ptr<SstArtifactRegistry> registry;
55
} SstArtifactRegistryObject;
56

57
static void SstArtifactRegistry_dealloc(SstArtifactRegistryObject *self) {
×
58
    self->registry.~shared_ptr<SstArtifactRegistry>();
×
59
    Py_TYPE(self)->tp_free((PyObject *)self);
×
60
}
×
61

62
static PyObject *SstArtifactRegistry_new(PyTypeObject *type,
×
63
                                         PyObject * /*args*/,
64
                                         PyObject * /*kwds*/) {
65
    auto *self = (SstArtifactRegistryObject *)type->tp_alloc(type, 0);
×
66
    if (!self) return NULL;
×
67
    new (&self->registry) std::shared_ptr<SstArtifactRegistry>(
×
68
        std::make_shared<SstArtifactRegistry>());
×
69
    return (PyObject *)self;
×
UNCOV
70
}
×
71

72
namespace {
73

74
// Field names in the Artifacts dict returned by build_sst_batch and
75
// consumed by SstArtifactRegistry.append. Must match the field names on
76
// IndexDatabaseSstWriterContext::Artifacts.
77
constexpr const char *ARTIFACT_FIELDS[] = {
78
    "metadata_sst",        "checkpoints_sst",         "manifest_sst",
79
    "chunk_bloom_sst",     "file_bloom_sst",          "chunk_stats_sst",
80
    "chunk_dim_stats_sst", "dimensions_sst",          "file_scalar_stats_sst",
81
    "file_cat_counts_sst", "file_pid_tid_counts_sst", "file_name_counts_sst",
82
    "name_dictionary_sst", "name_file_postings_sst",  "name_chunk_postings_sst",
83
    "hash_tables_sst",     "aggregation_sst",         "system_metrics_sst",
84
};
85

86
/// Map a slot name to the matching Artifacts member. Kept in one place so
87
/// that adding a new CF requires updating only `ARTIFACT_FIELDS` plus
88
/// `dispatch_*` below.
89
std::optional<std::string> *artifacts_slot(
×
90
    IndexDatabaseSstWriterContext::Artifacts &a, std::string_view name) {
91
    if (name == "metadata_sst") return &a.metadata_sst;
×
92
    if (name == "checkpoints_sst") return &a.checkpoints_sst;
×
93
    if (name == "manifest_sst") return &a.manifest_sst;
×
94
    if (name == "chunk_bloom_sst") return &a.chunk_bloom_sst;
×
95
    if (name == "file_bloom_sst") return &a.file_bloom_sst;
×
96
    if (name == "chunk_stats_sst") return &a.chunk_stats_sst;
×
97
    if (name == "chunk_dim_stats_sst") return &a.chunk_dim_stats_sst;
×
98
    if (name == "dimensions_sst") return &a.dimensions_sst;
×
99
    if (name == "file_scalar_stats_sst") return &a.file_scalar_stats_sst;
×
100
    if (name == "file_cat_counts_sst") return &a.file_cat_counts_sst;
×
101
    if (name == "file_pid_tid_counts_sst") return &a.file_pid_tid_counts_sst;
×
102
    if (name == "file_name_counts_sst") return &a.file_name_counts_sst;
×
103
    if (name == "name_dictionary_sst") return &a.name_dictionary_sst;
×
104
    if (name == "name_file_postings_sst") return &a.name_file_postings_sst;
×
105
    if (name == "name_chunk_postings_sst") return &a.name_chunk_postings_sst;
×
106
    if (name == "hash_tables_sst") return &a.hash_tables_sst;
×
107
    if (name == "aggregation_sst") return &a.aggregation_sst;
×
108
    if (name == "system_metrics_sst") return &a.system_metrics_sst;
×
109
    return nullptr;
×
UNCOV
110
}
×
111

112
/// Convert a Python artifacts dict to the C++ Artifacts struct. Missing,
113
/// None, or empty-string entries become nullopt. Returns false on type
114
/// errors (exception set).
115
bool artifacts_from_dict(PyObject *dict,
×
116
                         IndexDatabaseSstWriterContext::Artifacts *out) {
117
    if (!PyDict_Check(dict)) {
×
118
        PyErr_SetString(PyExc_TypeError, "artifacts must be a dict");
×
119
        return false;
×
120
    }
121
    for (const char *field : ARTIFACT_FIELDS) {
×
122
        PyObject *val = PyDict_GetItemString(dict, field);  // borrowed
×
123
        if (!val || val == Py_None) continue;
×
124
        if (!PyUnicode_Check(val)) {
×
125
            PyErr_Format(PyExc_TypeError, "artifacts['%s'] must be str or None",
×
UNCOV
126
                         field);
×
127
            return false;
×
128
        }
129
        const char *s = PyUnicode_AsUTF8(val);
×
130
        if (!s) return false;
×
131
        if (s[0] == '\0') continue;
×
132
        auto *slot = artifacts_slot(*out, field);
×
133
        if (slot) *slot = std::string(s);
×
134
    }
135
    return true;
×
UNCOV
136
}
×
137

138
PyObject *artifacts_to_dict(const IndexDatabaseSstWriterContext::Artifacts &a) {
×
139
    PyObject *dict = PyDict_New();
×
140
    if (!dict) return NULL;
×
141
    auto set_field = [&](const char *name,
×
142
                         const std::optional<std::string> &slot) -> bool {
143
        PyObject *v = slot.has_value() ? PyUnicode_FromString(slot->c_str())
×
144
                                       : (Py_INCREF(Py_None), Py_None);
×
145
        if (!v) return false;
×
146
        int rc = PyDict_SetItemString(dict, name, v);
×
UNCOV
147
        Py_DECREF(v);
×
148
        return rc == 0;
×
149
    };
×
150
    if (!set_field("metadata_sst", a.metadata_sst) ||
×
151
        !set_field("checkpoints_sst", a.checkpoints_sst) ||
×
152
        !set_field("manifest_sst", a.manifest_sst) ||
×
153
        !set_field("chunk_bloom_sst", a.chunk_bloom_sst) ||
×
154
        !set_field("file_bloom_sst", a.file_bloom_sst) ||
×
155
        !set_field("chunk_stats_sst", a.chunk_stats_sst) ||
×
156
        !set_field("chunk_dim_stats_sst", a.chunk_dim_stats_sst) ||
×
157
        !set_field("dimensions_sst", a.dimensions_sst) ||
×
158
        !set_field("file_scalar_stats_sst", a.file_scalar_stats_sst) ||
×
159
        !set_field("file_cat_counts_sst", a.file_cat_counts_sst) ||
×
160
        !set_field("file_pid_tid_counts_sst", a.file_pid_tid_counts_sst) ||
×
161
        !set_field("file_name_counts_sst", a.file_name_counts_sst) ||
×
162
        !set_field("name_dictionary_sst", a.name_dictionary_sst) ||
×
163
        !set_field("name_file_postings_sst", a.name_file_postings_sst) ||
×
164
        !set_field("name_chunk_postings_sst", a.name_chunk_postings_sst) ||
×
165
        !set_field("hash_tables_sst", a.hash_tables_sst) ||
×
166
        !set_field("aggregation_sst", a.aggregation_sst) ||
×
167
        !set_field("system_metrics_sst", a.system_metrics_sst)) {
×
168
        Py_DECREF(dict);
×
169
        return NULL;
×
170
    }
171
    return dict;
×
UNCOV
172
}
×
173

174
}  // namespace
175

176
static PyObject *SstArtifactRegistry_append(SstArtifactRegistryObject *self,
×
177
                                            PyObject *args) {
178
    PyObject *dict;
179
    if (!PyArg_ParseTuple(args, "O", &dict)) return NULL;
×
180
    IndexDatabaseSstWriterContext::Artifacts a;
×
181
    if (!artifacts_from_dict(dict, &a)) return NULL;
×
182
    self->registry->append(std::move(a));
×
183
    Py_RETURN_NONE;
×
184
}
×
185

186
static PyMethodDef SstArtifactRegistry_methods[] = {
187
    {"append", (PyCFunction)SstArtifactRegistry_append, METH_VARARGS,
188
     "append(artifacts_dict) -> None\n"
189
     "Add a per-batch Artifacts dict (as returned by build_sst_batch or "
190
     "IndexDatabaseSstWriterContext.commit) to the registry."},
191
    {NULL}};
192

193
static PyTypeObject SstArtifactRegistryType = {
194
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.SstArtifactRegistry",
195
    sizeof(SstArtifactRegistryObject),
196
    0,
197
    (destructor)SstArtifactRegistry_dealloc,
198
    0,
199
    0,
200
    0,
201
    0,
202
    0,
203
    0,
204
    0,
205
    0,
206
    0,
207
    0,
208
    0,
209
    0,
210
    0,
211
    0,
212
    Py_TPFLAGS_DEFAULT,
213
    "Thread-safe collector for SST artifact paths.",
214
    0,
215
    0,
216
    0,
217
    0,
218
    0,
219
    0,
220
    SstArtifactRegistry_methods,
221
    0,
222
    0,
223
    0,
224
    0,
225
    0,
226
    0,
227
    0,
228
    0,
229
    0,
230
    SstArtifactRegistry_new,
231
};
232

233
SstArtifactRegistry *sst_artifact_registry_get(PyObject *obj) {
×
234
    if (!PyObject_TypeCheck(obj, &SstArtifactRegistryType)) return nullptr;
×
235
    return ((SstArtifactRegistryObject *)obj)->registry.get();
×
UNCOV
236
}
×
237

238
// ---------------------------------------------------------------------------
239
// scan_files: parallel directory scan with size info
240
// ---------------------------------------------------------------------------
241

242
static PyObject *scan_files_fn(PyObject * /*self*/, PyObject *args,
×
243
                               PyObject *kwds) {
244
    static const char *kwlist[] = {"directory", "patterns", "recursive",
245
                                   "runtime", NULL};
246
    const char *directory;
247
    PyObject *patterns_obj = NULL;
×
248
    int recursive = 0;
×
249
    PyObject *runtime_arg = NULL;
×
250
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|OpO", (char **)kwlist,
×
251
                                     &directory, &patterns_obj, &recursive,
252
                                     &runtime_arg)) {
253
        return NULL;
×
254
    }
255

256
    std::vector<std::string> patterns;
×
257
    if (patterns_obj && patterns_obj != Py_None) {
×
UNCOV
258
        PyObject *seq =
×
259
            PySequence_Fast(patterns_obj, "patterns must be a sequence");
×
260
        if (!seq) return NULL;
×
261
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
262
        patterns.reserve(n);
×
263
        for (Py_ssize_t i = 0; i < n; ++i) {
×
264
            const char *s = PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(seq, i));
×
265
            if (!s) {
×
UNCOV
266
                Py_DECREF(seq);
×
267
                return NULL;
×
268
            }
269
            patterns.emplace_back(s);
×
UNCOV
270
        }
×
UNCOV
271
        Py_DECREF(seq);
×
UNCOV
272
    }
×
273

274
    Runtime *rt = nullptr;
×
275
    if (runtime_arg && runtime_arg != Py_None) {
×
276
        if (!PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
277
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
278
            if (!native || !PyObject_TypeCheck(native, &RuntimeType)) {
×
279
                Py_XDECREF(native);
×
280
                PyErr_SetString(PyExc_TypeError,
×
281
                                "runtime must be a Runtime instance or None");
282
                return NULL;
×
283
            }
284
            rt = ((RuntimeObject *)native)->runtime.get();
×
UNCOV
285
            Py_DECREF(native);
×
UNCOV
286
        } else {
×
287
            rt = ((RuntimeObject *)runtime_arg)->runtime.get();
×
288
        }
289
    } else {
×
290
        rt = get_default_runtime();
×
291
    }
292

UNCOV
293
    PatternDirectoryScannerUtilityInput input(directory, patterns,
×
294
                                              recursive != 0, true);
×
295
    std::vector<FileEntry> entries;
×
296
    try {
297
        Py_BEGIN_ALLOW_THREADS rt
×
298
            ->submit(dftracer::utils::run_coro_scope(
×
UNCOV
299
                         rt->executor(),
×
300
                         [](dftracer::utils::CoroScope &scope,
×
301
                            PatternDirectoryScannerUtilityInput in,
302
                            std::vector<FileEntry> *out)
UNCOV
303
                             -> dftracer::utils::coro::CoroTask<void> {
×
UNCOV
304
                             PatternDirectoryScannerUtility scanner;
×
UNCOV
305
                             *out = co_await scope.spawn(scanner, in);
×
306
                         },
×
307
                         std::move(input), &entries),
×
UNCOV
308
                     "scan-files")
×
309
            .get();
×
310
        Py_END_ALLOW_THREADS
×
311
    } catch (const std::exception &e) {
×
NEW
312
        set_typed_py_error(e);
×
313
        return NULL;
×
314
    }
×
315

316
    PyObject *out = PyList_New(static_cast<Py_ssize_t>(entries.size()));
×
317
    if (!out) return NULL;
×
318
    for (std::size_t i = 0; i < entries.size(); ++i) {
×
319
        PyObject *t = Py_BuildValue("(sn)", entries[i].path.c_str(),
×
320
                                    (Py_ssize_t)entries[i].size);
×
321
        if (!t) {
×
UNCOV
322
            Py_DECREF(out);
×
323
            return NULL;
×
324
        }
325
        PyList_SET_ITEM(out, i, t);
×
UNCOV
326
    }
×
327
    return out;
×
328
}
×
329

330
// ---------------------------------------------------------------------------
331
// plan_lpt_partition: LPT bin-packing of (path, size) pairs
332
// ---------------------------------------------------------------------------
333

334
static PyObject *plan_lpt_partition_fn(PyObject * /*self*/, PyObject *args) {
×
335
    PyObject *entries_obj;
336
    Py_ssize_t num_workers;
337
    if (!PyArg_ParseTuple(args, "On", &entries_obj, &num_workers)) return NULL;
×
338
    if (num_workers <= 0) num_workers = 1;
×
339

340
    std::vector<FileEntry> entries;
×
341
    PyObject *seq = PySequence_Fast(entries_obj,
×
342
                                    "entries must be a sequence of "
343
                                    "(path, size) tuples");
344
    if (!seq) return NULL;
×
345
    Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
346
    entries.reserve(n);
×
347
    for (Py_ssize_t i = 0; i < n; ++i) {
×
348
        PyObject *item = PySequence_Fast_GET_ITEM(seq, i);
×
349
        const char *path = nullptr;
×
350
        Py_ssize_t size = 0;
×
351
        if (!PyArg_ParseTuple(item, "sn", &path, &size)) {
×
UNCOV
352
            Py_DECREF(seq);
×
353
            return NULL;
×
354
        }
355
        FileEntry fe;
×
356
        fe.path = path;
×
357
        fe.size = static_cast<std::size_t>(size);
×
358
        fe.is_regular_file = true;
×
359
        entries.push_back(std::move(fe));
×
360
    }
×
UNCOV
361
    Py_DECREF(seq);
×
362

363
    auto buckets = plan_lpt_partition(std::move(entries),
×
364
                                      static_cast<std::size_t>(num_workers));
×
365

366
    PyObject *out = PyList_New(static_cast<Py_ssize_t>(buckets.size()));
×
367
    if (!out) return NULL;
×
368
    for (std::size_t i = 0; i < buckets.size(); ++i) {
×
369
        PyObject *lst = PyList_New(static_cast<Py_ssize_t>(buckets[i].size()));
×
370
        if (!lst) {
×
UNCOV
371
            Py_DECREF(out);
×
372
            return NULL;
×
373
        }
374
        for (std::size_t j = 0; j < buckets[i].size(); ++j) {
×
375
            PyObject *t = Py_BuildValue("(sn)", buckets[i][j].path.c_str(),
×
376
                                        (Py_ssize_t)buckets[i][j].size);
×
377
            if (!t) {
×
UNCOV
378
                Py_DECREF(lst);
×
UNCOV
379
                Py_DECREF(out);
×
380
                return NULL;
×
381
            }
382
            PyList_SET_ITEM(lst, j, t);
×
UNCOV
383
        }
×
384
        PyList_SET_ITEM(out, i, lst);
×
UNCOV
385
    }
×
386
    return out;
×
387
}
×
388

389
// ---------------------------------------------------------------------------
390
// build_sst_batch: run the indexer pipeline with an SST sink and return
391
// the merged Artifacts dict.
392
// ---------------------------------------------------------------------------
393

394
static PyObject *build_sst_batch_fn(PyObject * /*self*/, PyObject *args,
×
395
                                    PyObject *kwds) {
396
    static const char *kwlist[] = {"files",
397
                                   "file_ids",
398
                                   "staging_dir",
399
                                   "batch_id",
400
                                   "index_dir",
401
                                   "checkpoint_size",
402
                                   "build_manifest",
403
                                   "force_rebuild",
404
                                   "bloom_dimensions",
405
                                   "parallelism",
406
                                   "flush_every_files",
407
                                   "runtime",
408
                                   "aggregation_config",
409
                                   "file_slices",
410
                                   NULL};
411
    PyObject *files_obj;
412
    PyObject *file_ids_obj;
413
    const char *staging_dir;
414
    const char *batch_id;
415
    const char *index_dir = "";
×
NEW
416
    Py_ssize_t checkpoint_size = static_cast<Py_ssize_t>(
×
417
        dftracer::utils::constants::indexer::DEFAULT_CHECKPOINT_SIZE);
418
    int build_manifest = 0;
×
419
    int force_rebuild = 0;
×
420
    PyObject *bloom_dims_obj = NULL;
×
421
    Py_ssize_t parallelism = 0;
×
422
    Py_ssize_t flush_every_files = 0;
×
423
    PyObject *runtime_arg = NULL;
×
424
    PyObject *aggregation_config_obj = NULL;
×
425
    PyObject *file_slices_obj = NULL;
×
426

427
    if (!PyArg_ParseTupleAndKeywords(
×
UNCOV
428
            args, kwds, "OOss|snppOnnOOO", (char **)kwlist, &files_obj,
×
429
            &file_ids_obj, &staging_dir, &batch_id, &index_dir,
430
            &checkpoint_size, &build_manifest, &force_rebuild, &bloom_dims_obj,
431
            &parallelism, &flush_every_files, &runtime_arg,
432
            &aggregation_config_obj, &file_slices_obj)) {
433
        return NULL;
×
434
    }
435

436
    // Unpack files.
437
    std::vector<std::string> files;
×
438
    {
439
        PyObject *seq = PySequence_Fast(files_obj, "files must be a sequence");
×
440
        if (!seq) return NULL;
×
441
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
442
        files.reserve(n);
×
443
        for (Py_ssize_t i = 0; i < n; ++i) {
×
444
            const char *s = PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(seq, i));
×
445
            if (!s) {
×
UNCOV
446
                Py_DECREF(seq);
×
447
                return NULL;
×
448
            }
449
            files.emplace_back(s);
×
UNCOV
450
        }
×
UNCOV
451
        Py_DECREF(seq);
×
452
    }
453
    if (files.empty()) {
×
454
        return PyDict_New();
×
455
    }
456

457
    // Unpack file_ids, parallel to files.
458
    std::vector<int> file_ids;
×
459
    {
UNCOV
460
        PyObject *seq =
×
461
            PySequence_Fast(file_ids_obj, "file_ids must be a sequence");
×
462
        if (!seq) return NULL;
×
463
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
464
        if (static_cast<std::size_t>(n) != files.size()) {
×
UNCOV
465
            Py_DECREF(seq);
×
466
            PyErr_SetString(PyExc_ValueError,
×
467
                            "file_ids must have the same length as files");
468
            return NULL;
×
469
        }
470
        file_ids.reserve(n);
×
471
        for (Py_ssize_t i = 0; i < n; ++i) {
×
472
            long v = PyLong_AsLong(PySequence_Fast_GET_ITEM(seq, i));
×
473
            if (v == -1 && PyErr_Occurred()) {
×
UNCOV
474
                Py_DECREF(seq);
×
475
                return NULL;
×
476
            }
477
            file_ids.push_back(static_cast<int>(v));
×
UNCOV
478
        }
×
UNCOV
479
        Py_DECREF(seq);
×
480
    }
481

482
    // Optional bloom dimensions override.
483
    std::vector<std::string> bloom_dims;
×
484
    if (bloom_dims_obj && bloom_dims_obj != Py_None) {
×
485
        PyObject *seq = PySequence_Fast(bloom_dims_obj,
×
486
                                        "bloom_dimensions must be a sequence");
487
        if (!seq) return NULL;
×
488
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
489
        bloom_dims.reserve(n);
×
490
        for (Py_ssize_t i = 0; i < n; ++i) {
×
491
            const char *s = PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(seq, i));
×
492
            if (!s) {
×
UNCOV
493
                Py_DECREF(seq);
×
494
                return NULL;
×
495
            }
496
            bloom_dims.emplace_back(s);
×
UNCOV
497
        }
×
UNCOV
498
        Py_DECREF(seq);
×
UNCOV
499
    }
×
500

501
    // Resolve Runtime (matching CheckpointIndexer pattern).
502
    Runtime *rt = nullptr;
×
503
    if (runtime_arg && runtime_arg != Py_None) {
×
504
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
505
            rt = ((RuntimeObject *)runtime_arg)->runtime.get();
×
UNCOV
506
        } else {
×
507
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
508
            if (!native || !PyObject_TypeCheck(native, &RuntimeType)) {
×
509
                Py_XDECREF(native);
×
510
                PyErr_SetString(PyExc_TypeError,
×
511
                                "runtime must be a Runtime instance or None");
512
                return NULL;
×
513
            }
514
            rt = ((RuntimeObject *)native)->runtime.get();
×
UNCOV
515
            Py_DECREF(native);
×
516
        }
517
    } else {
×
518
        rt = get_default_runtime();
×
519
    }
520

521
    // Build config + sink factory shared state.
522
    struct SharedArtifacts {
523
        std::mutex mu;
524
        std::vector<IndexDatabaseSstWriterContext::Artifacts> list;
525
    };
526
    auto artifacts = std::make_shared<SharedArtifacts>();
×
527
    auto staging = std::string(staging_dir);
×
528
    auto batch = std::string(batch_id);
×
529

530
    // Optional aggregation config, extracted from the Python dataclass.
531
    std::shared_ptr<dftracer::utils::utilities::composites::dft::aggregators::
532
                        AggregationConfig>
533
        agg_config_ptr;
×
534
    if (aggregation_config_obj && aggregation_config_obj != Py_None) {
×
535
        using dftracer::utils::utilities::composites::dft::aggregators::
536
            AggregationConfig;
537
        auto cfg = std::make_shared<AggregationConfig>();
×
538
        auto pull_double = [&](const char *name, double fallback) -> double {
×
539
            PyObject *v = PyObject_GetAttrString(aggregation_config_obj, name);
×
540
            if (!v || v == Py_None) {
×
541
                Py_XDECREF(v);
×
542
                PyErr_Clear();
×
543
                return fallback;
×
544
            }
545
            double out = PyFloat_AsDouble(v);
×
UNCOV
546
            Py_DECREF(v);
×
547
            if (out == -1.0 && PyErr_Occurred()) return fallback;
×
548
            return out;
×
549
        };
×
550
        auto pull_bool = [&](const char *name, bool fallback) -> bool {
×
551
            PyObject *v = PyObject_GetAttrString(aggregation_config_obj, name);
×
552
            if (!v || v == Py_None) {
×
553
                Py_XDECREF(v);
×
554
                PyErr_Clear();
×
555
                return fallback;
×
556
            }
557
            int out = PyObject_IsTrue(v);
×
UNCOV
558
            Py_DECREF(v);
×
559
            return out > 0 ? true : fallback;
×
560
        };
×
561
        auto pull_string_list =
562
            [&](const char *name) -> std::vector<std::string> {
×
563
            std::vector<std::string> out;
×
564
            PyObject *v = PyObject_GetAttrString(aggregation_config_obj, name);
×
565
            if (!v || v == Py_None) {
×
566
                Py_XDECREF(v);
×
567
                PyErr_Clear();
×
568
                return out;
×
569
            }
570
            PyObject *seq = PySequence_Fast(v, "expected list of str");
×
UNCOV
571
            Py_DECREF(v);
×
572
            if (!seq) {
×
573
                PyErr_Clear();
×
574
                return out;
×
575
            }
576
            Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
577
            out.reserve(n);
×
578
            for (Py_ssize_t i = 0; i < n; ++i) {
×
UNCOV
579
                const char *s =
×
580
                    PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(seq, i));
×
581
                if (s) out.emplace_back(s);
×
UNCOV
582
            }
×
UNCOV
583
            Py_DECREF(seq);
×
584
            return out;
×
585
        };
×
586
        double time_interval_ms = pull_double("time_interval_ms", 5000.0);
×
587
        cfg->time_interval_us =
×
588
            static_cast<std::uint64_t>(time_interval_ms * 1000.0);
×
589
        cfg->compute_percentiles = pull_bool("compute_percentiles", false);
×
590
        cfg->extra_group_keys = pull_string_list("group_keys");
×
591
        cfg->custom_metric_fields = pull_string_list("custom_metric_fields");
×
592
        agg_config_ptr = std::move(cfg);
×
593
    }
×
594

595
    // owned_member_maps must outlive rt->submit: FileSlice::members is raw.
596
    std::vector<std::vector<GzipMember>> owned_member_maps;
×
597
    std::vector<IndexBuildBatchConfig::FileSlice> parsed_slices;
×
598
    if (file_slices_obj && file_slices_obj != Py_None) {
×
UNCOV
599
        PyObject *seq =
×
600
            PySequence_Fast(file_slices_obj, "file_slices must be a sequence");
×
601
        if (!seq) return NULL;
×
602
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
603
        if (static_cast<std::size_t>(n) != files.size()) {
×
UNCOV
604
            Py_DECREF(seq);
×
605
            PyErr_SetString(PyExc_ValueError,
×
606
                            "file_slices must match files length");
607
            return NULL;
×
608
        }
609
        owned_member_maps.resize(n);
×
610
        parsed_slices.resize(n);
×
611
        for (Py_ssize_t i = 0; i < n; ++i) {
×
612
            PyObject *entry = PySequence_Fast_GET_ITEM(seq, i);
×
613
            if (entry == Py_None) {
×
614
                continue;  // leave slice default-constructed (members=null)
×
615
            }
616
            Py_ssize_t mb = 0, me = 0, ckpt_base = 0;
×
617
            int skip_scoped = 0;
×
618
            PyObject *members_obj = nullptr;
×
619
            if (!PyArg_ParseTuple(entry, "nnnpO", &mb, &me, &ckpt_base,
×
620
                                  &skip_scoped, &members_obj)) {
UNCOV
621
                Py_DECREF(seq);
×
622
                return NULL;
×
623
            }
624
            PyObject *mseq = PySequence_Fast(
×
UNCOV
625
                members_obj, "file_slices[i].members must be a sequence");
×
626
            if (!mseq) {
×
UNCOV
627
                Py_DECREF(seq);
×
628
                return NULL;
×
629
            }
630
            Py_ssize_t mn = PySequence_Fast_GET_SIZE(mseq);
×
631
            auto &mv = owned_member_maps[i];
×
632
            mv.resize(mn);
×
633
            for (Py_ssize_t j = 0; j < mn; ++j) {
×
634
                PyObject *m = PySequence_Fast_GET_ITEM(mseq, j);
×
635
                unsigned long long c_offset = 0, c_size = 0;
×
636
                if (!PyArg_ParseTuple(m, "KK", &c_offset, &c_size)) {
×
UNCOV
637
                    Py_DECREF(mseq);
×
UNCOV
638
                    Py_DECREF(seq);
×
639
                    return NULL;
×
640
                }
641
                mv[j].c_offset = static_cast<std::uint64_t>(c_offset);
×
642
                mv[j].c_size = static_cast<std::uint64_t>(c_size);
×
UNCOV
643
            }
×
UNCOV
644
            Py_DECREF(mseq);
×
645
            parsed_slices[i].members = &mv;
×
646
            parsed_slices[i].member_begin = static_cast<std::size_t>(mb);
×
647
            parsed_slices[i].member_end = static_cast<std::size_t>(me);
×
648
            parsed_slices[i].checkpoint_idx_base =
×
649
                static_cast<std::uint64_t>(ckpt_base);
×
650
            parsed_slices[i].skip_file_scoped_writes = skip_scoped != 0;
×
UNCOV
651
        }
×
UNCOV
652
        Py_DECREF(seq);
×
UNCOV
653
    }
×
654

655
    auto batch_config = std::make_shared<IndexBuildBatchConfig>();
×
656
    batch_config->file_paths = std::move(files);
×
657
    batch_config->preassigned_file_ids = std::move(file_ids);
×
658
    if (!parsed_slices.empty()) {
×
659
        batch_config->file_slices = parsed_slices;
×
UNCOV
660
    }
×
661
    batch_config->index_dir = index_dir;
×
662
    batch_config->checkpoint_size = static_cast<std::size_t>(checkpoint_size);
×
663
    batch_config->build_manifest = build_manifest != 0;
×
664
    batch_config->force_rebuild = force_rebuild != 0;
×
665
    batch_config->bloom_dimensions = std::move(bloom_dims);
×
666
    batch_config->parallelism =
×
667
        parallelism > 0 ? static_cast<std::size_t>(parallelism)
×
668
                        : (rt ? std::max<std::size_t>(rt->threads(), 1) : 1);
×
669
    batch_config->flush_every_files =
×
670
        static_cast<std::size_t>(flush_every_files);
×
671
    batch_config->rebuild_root_summaries = false;
×
672

673
    if (agg_config_ptr) {
×
674
        auto agg_staging = staging;
×
675
        auto agg_prefix = batch + "_agg";
×
676
        // Counter keeps per-file SST dirs unique across duplicate file_paths
677
        // when one worker owns multiple slices of the same file.
678
        auto visitor_counter = std::make_shared<std::atomic<std::size_t>>(0);
×
679
        batch_config->dft_visitor_factory =
×
680
            [agg_staging, agg_prefix, agg_config_ptr,
×
UNCOV
681
             visitor_counter](const std::string &file_path)
×
682
            -> std::vector<std::unique_ptr<
683
                dftracer::utils::utilities::composites::dft::DftEventVisitor>> {
684
            using dftracer::utils::utilities::composites::dft::DftEventVisitor;
685
            using dftracer::utils::utilities::composites::dft::aggregators::
686
                AggregationVisitor;
UNCOV
687
            const std::size_t idx =
×
688
                visitor_counter->fetch_add(1, std::memory_order_relaxed);
×
689
            std::string prefix = agg_prefix + "_" + std::to_string(idx);
×
690
            std::vector<std::unique_ptr<DftEventVisitor>> visitors;
×
691
            visitors.push_back(std::make_unique<AggregationVisitor>(
×
692
                agg_staging, prefix, /*config_hash=*/0, *agg_config_ptr,
×
UNCOV
693
                file_path));
×
694
            return visitors;
×
695
        };
×
696
    }
×
697

698
    // Atomic: write phase calls sink_factory from N coroutines concurrently.
699
    auto batch_counter = std::make_shared<std::atomic<std::size_t>>(0);
×
700
    batch_config->sink_factory =
×
701
        [staging, batch, batch_counter]() -> std::unique_ptr<IndexBatchSink> {
×
UNCOV
702
        const std::size_t idx =
×
703
            batch_counter->fetch_add(1, std::memory_order_relaxed);
×
704
        std::string sub_batch = batch + "_" + std::to_string(idx);
×
705
        return std::make_unique<IndexDatabaseSstWriterContext>(staging,
×
706
                                                               sub_batch);
707
    };
×
708
    batch_config->sink_commit = [artifacts](IndexBatchSink &sink) {
×
709
        auto &sst = static_cast<IndexDatabaseSstWriterContext &>(sink);
×
710
        auto batch_artifacts = sst.commit();
×
711
        std::lock_guard<std::mutex> lock(artifacts->mu);
×
712
        if (!batch_artifacts.empty()) {
×
713
            artifacts->list.push_back(std::move(batch_artifacts));
×
UNCOV
714
        }
×
715
    };
×
716

717
    IndexBuildBatchResult result;
×
718
    std::string submit_error;
×
719
    Py_BEGIN_ALLOW_THREADS try {
×
720
        rt->submit(dftracer::utils::run_coro_scope(
×
UNCOV
721
                       rt->executor(),
×
722
                       [](dftracer::utils::CoroScope &scope,
×
723
                          std::shared_ptr<IndexBuildBatchConfig> cfg,
724
                          IndexBuildBatchResult *out)
UNCOV
725
                           -> dftracer::utils::coro::CoroTask<void> {
×
UNCOV
726
                           *out = co_await IndexBatchBuilderUtility::process(
×
UNCOV
727
                               &scope, std::move(cfg));
×
728
                       },
×
UNCOV
729
                       batch_config, &result),
×
UNCOV
730
                   "build-sst-batch")
×
731
            .get();
×
732
    } catch (const std::exception &e) {
×
733
        submit_error = e.what();
×
734
    }
×
735
    Py_END_ALLOW_THREADS if (!submit_error.empty()) {
×
736
        PyErr_SetString(PyExc_RuntimeError, submit_error.c_str());
×
737
        return NULL;
×
738
    }
739

740
    // If any file failed, surface the first error.
741
    if (result.failed > 0) {
×
742
        for (const auto &r : result.results) {
×
743
            if (!r.success) {
×
744
                PyErr_SetString(PyExc_RuntimeError, r.error_message.c_str());
×
745
                return NULL;
×
746
            }
747
        }
UNCOV
748
    }
×
749

750
    // One dict per committed sink + per-file aggregation below.
751
    PyObject *out_list = PyList_New(0);
×
752
    if (!out_list) return NULL;
×
753
    {
754
        std::lock_guard<std::mutex> lock(artifacts->mu);
×
755
        for (const auto &a : artifacts->list) {
×
756
            PyObject *main_dict = artifacts_to_dict(a);
×
757
            if (!main_dict || PyList_Append(out_list, main_dict) < 0) {
×
758
                Py_XDECREF(main_dict);
×
UNCOV
759
                Py_DECREF(out_list);
×
760
                return NULL;
×
761
            }
UNCOV
762
            Py_DECREF(main_dict);
×
763
        }
764
    }
×
765
    // Harvest per-file aggregation SSTs from extra visitors. Each visitor
766
    // holds a vector of Artifacts (one per FLUSH_THRESHOLD flush + the
767
    // file-complete flush) because SstFileWriter requires strictly
768
    // ascending keys per SST and cross-flush merge operands would collide.
769
    //
770
    // `extra_visitors` is indexed per input file, but a single
771
    // AggregationVisitor instance is typically shared across every file in
772
    // the batch (one flush at end-of-batch). Without dedup we would emit
773
    // that visitor's artifact dict N_files times, producing a manifest with
774
    // N copies of the same SST path. Dedup by visitor pointer so each
775
    // unique flush-sequence is emitted exactly once.
776
    using dftracer::utils::utilities::composites::dft::aggregators::
777
        AggregationVisitor;
778
    using dftracer::utils::utilities::composites::dft::aggregators::
779
        AssociationTracker;
780
    std::unordered_set<AggregationVisitor *> seen_visitors;
×
781
    for (auto &file_visitors : result.extra_visitors) {
×
782
        for (auto &visitor : file_visitors) {
×
783
            auto *agg = dynamic_cast<AggregationVisitor *>(visitor.get());
×
784
            if (!agg) continue;
×
785
            if (!seen_visitors.insert(agg).second) continue;
×
786
            for (auto &a : agg->aggregation_artifacts()) {
×
787
                if (a.empty()) continue;
×
788
                PyObject *agg_dict = artifacts_to_dict(a);
×
789
                if (!agg_dict || PyList_Append(out_list, agg_dict) < 0) {
×
790
                    Py_XDECREF(agg_dict);
×
UNCOV
791
                    Py_DECREF(out_list);
×
792
                    return NULL;
×
793
                }
UNCOV
794
                Py_DECREF(agg_dict);
×
795
            }
796
        }
797
    }
798

799
    AssociationTracker combined;
×
800
    bool any_tracker = false;
×
801
    for (auto *agg : seen_visitors) {
×
802
        auto out = agg->take_output();
×
803
        if (out.local_tracker) {
×
804
            combined.merge(*out.local_tracker);
×
805
            any_tracker = true;
×
UNCOV
806
        }
×
807
    }
×
808
    PyObject *tracker_bytes = nullptr;
×
809
    if (any_tracker) {
×
810
        combined.finalize();
×
811
        std::string blob = combined.serialize();
×
812
        tracker_bytes = PyBytes_FromStringAndSize(
×
813
            blob.data(), static_cast<Py_ssize_t>(blob.size()));
×
814
    } else {
×
815
        tracker_bytes = PyBytes_FromStringAndSize(nullptr, 0);
×
816
    }
817
    if (!tracker_bytes) {
×
UNCOV
818
        Py_DECREF(out_list);
×
819
        return NULL;
×
820
    }
821
    PyObject *ret = PyTuple_Pack(2, out_list, tracker_bytes);
×
UNCOV
822
    Py_DECREF(out_list);
×
UNCOV
823
    Py_DECREF(tracker_bytes);
×
824
    return ret;
×
825
}
×
826

827
static PyObject *enable_aggregation_deterministic_ids_fn(PyObject * /*self*/,
×
828
                                                         PyObject * /*args*/) {
UNCOV
829
    dftracer::utils::utilities::composites::dft::aggregators::
×
830
        aggregation_intern()
831
            .enable_deterministic_ids();
×
832
    Py_RETURN_NONE;
×
833
}
834

835
static PyObject *move_artifacts_fn(PyObject * /*self*/, PyObject *args,
×
836
                                   PyObject *kwds) {
837
    static const char *kwlist[] = {"artifacts", "dest_dir", NULL};
838
    PyObject *dict = NULL;
×
839
    const char *dest_dir = NULL;
×
840
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "Os", (char **)kwlist, &dict,
×
841
                                     &dest_dir)) {
842
        return NULL;
×
843
    }
844
    IndexDatabaseSstWriterContext::Artifacts a;
×
845
    if (!artifacts_from_dict(dict, &a)) return NULL;
×
846
    IndexDatabaseSstWriterContext::Artifacts moved;
×
847
    try {
848
        Py_BEGIN_ALLOW_THREADS moved = std::move(a).move_to(dest_dir);
×
849
        Py_END_ALLOW_THREADS
×
850
    } catch (const std::exception &e) {
×
NEW
851
        set_typed_py_error(e);
×
852
        return NULL;
×
853
    }
×
854
    return artifacts_to_dict(moved);
×
855
}
×
856

857
namespace {
858

859
dftracer::utils::coro::CoroTask<void> scan_one_gzip_file(
×
UNCOV
860
    std::string path, std::vector<GzipMember> *out) {
×
UNCOV
861
    out->clear();
×
UNCOV
862
    int fd = ::open(path.c_str(), O_RDONLY);
×
UNCOV
863
    if (fd < 0) co_return;
×
UNCOV
864
    struct stat st;
×
UNCOV
865
    if (::fstat(fd, &st) == 0 && st.st_size >= 18) {
×
UNCOV
866
        co_await enumerate_gzip_member_candidates(
×
UNCOV
867
            fd, static_cast<std::uint64_t>(st.st_size), *out);
×
UNCOV
868
    }
×
UNCOV
869
    ::close(fd);
×
870
}
×
871

872
}  // namespace
873

874
static PyObject *enumerate_gzip_members_fn(PyObject * /*self*/, PyObject *args,
×
875
                                           PyObject *kwds) {
876
    static const char *kwlist[] = {"files", "runtime", NULL};
877
    PyObject *files_obj = NULL;
×
878
    PyObject *runtime_arg = NULL;
×
879
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|O", (char **)kwlist,
×
880
                                     &files_obj, &runtime_arg)) {
881
        return NULL;
×
882
    }
883

884
    std::vector<std::string> files;
×
885
    {
886
        PyObject *seq = PySequence_Fast(files_obj, "files must be a sequence");
×
887
        if (!seq) return NULL;
×
888
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
889
        files.reserve(n);
×
890
        for (Py_ssize_t i = 0; i < n; ++i) {
×
891
            const char *s = PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(seq, i));
×
892
            if (!s) {
×
UNCOV
893
                Py_DECREF(seq);
×
894
                return NULL;
×
895
            }
896
            files.emplace_back(s);
×
UNCOV
897
        }
×
UNCOV
898
        Py_DECREF(seq);
×
899
    }
900

901
    Runtime *rt = nullptr;
×
902
    if (runtime_arg && runtime_arg != Py_None) {
×
903
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
904
            rt = ((RuntimeObject *)runtime_arg)->runtime.get();
×
UNCOV
905
        } else {
×
906
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
907
            if (!native || !PyObject_TypeCheck(native, &RuntimeType)) {
×
908
                Py_XDECREF(native);
×
909
                PyErr_SetString(PyExc_TypeError,
×
910
                                "runtime must be a Runtime instance or None");
911
                return NULL;
×
912
            }
913
            rt = ((RuntimeObject *)native)->runtime.get();
×
UNCOV
914
            Py_DECREF(native);
×
915
        }
916
    } else {
×
917
        rt = get_default_runtime();
×
918
    }
919

920
    std::vector<std::vector<GzipMember>> results(files.size());
×
921
    std::string submit_error;
×
922
    Py_BEGIN_ALLOW_THREADS try {
×
923
        rt->submit(
×
924
              dftracer::utils::run_coro_scope(
×
UNCOV
925
                  rt->executor(),
×
926
                  [](dftracer::utils::CoroScope &scope,
×
927
                     const std::vector<std::string> *paths,
928
                     std::vector<std::vector<GzipMember>> *out)
UNCOV
929
                      -> dftracer::utils::coro::CoroTask<void> {
×
UNCOV
930
                      co_await scope.scope(
×
931
                          [paths, out](dftracer::utils::CoroScope &child)
×
UNCOV
932
                              -> dftracer::utils::coro::CoroTask<void> {
×
UNCOV
933
                              for (std::size_t i = 0; i < paths->size(); ++i) {
×
UNCOV
934
                                  const std::string &path = (*paths)[i];
×
UNCOV
935
                                  auto *slot = &(*out)[i];
×
UNCOV
936
                                  child.spawn(
×
937
                                      [path, slot](dftracer::utils::CoroScope &)
×
938
                                          -> dftracer::utils::coro::CoroTask<
UNCOV
939
                                              void> {
×
UNCOV
940
                                          co_await scan_one_gzip_file(path,
×
UNCOV
941
                                                                      slot);
×
942
                                      });
×
UNCOV
943
                              }
×
UNCOV
944
                              co_return;
×
945
                          });
×
UNCOV
946
                      co_return;
×
947
                  },
×
948
                  &files, &results),
UNCOV
949
              "enumerate-gzip-members")
×
950
            .get();
×
951
    } catch (const std::exception &e) {
×
952
        submit_error = e.what();
×
953
    }
×
954
    Py_END_ALLOW_THREADS if (!submit_error.empty()) {
×
955
        PyErr_SetString(PyExc_RuntimeError, submit_error.c_str());
×
956
        return NULL;
×
957
    }
958

959
    PyObject *out_list = PyList_New(static_cast<Py_ssize_t>(results.size()));
×
960
    if (!out_list) return NULL;
×
961
    for (std::size_t i = 0; i < results.size(); ++i) {
×
962
        const auto &mv = results[i];
×
963
        PyObject *inner = PyList_New(static_cast<Py_ssize_t>(mv.size()));
×
964
        if (!inner) {
×
UNCOV
965
            Py_DECREF(out_list);
×
966
            return NULL;
×
967
        }
968
        for (std::size_t j = 0; j < mv.size(); ++j) {
×
UNCOV
969
            PyObject *t =
×
970
                Py_BuildValue("(KK)", (unsigned long long)mv[j].c_offset,
×
971
                              (unsigned long long)mv[j].c_size);
×
972
            if (!t) {
×
UNCOV
973
                Py_DECREF(inner);
×
UNCOV
974
                Py_DECREF(out_list);
×
975
                return NULL;
×
976
            }
977
            PyList_SET_ITEM(inner, j, t);
×
UNCOV
978
        }
×
979
        PyList_SET_ITEM(out_list, i, inner);
×
UNCOV
980
    }
×
981
    return out_list;
×
982
}
×
983

984
// Mirrors build_work_units + lpt_assign_units in dftracer_aggregator_mpi.cpp
985
// so the Dask backend produces identical work distribution to MPI.
986
static PyObject *plan_work_units_fn(PyObject * /*self*/, PyObject *args,
×
987
                                    PyObject *kwds) {
988
    static const char *kwlist[] = {"member_map", "num_workers", "target_c_size",
989
                                   NULL};
990
    PyObject *map_obj = NULL;
×
991
    Py_ssize_t num_workers = 0;
×
992
    unsigned long long target_c_size = 0;
×
993
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "On|K", (char **)kwlist,
×
994
                                     &map_obj, &num_workers, &target_c_size)) {
995
        return NULL;
×
996
    }
997
    if (num_workers <= 0) num_workers = 1;
×
998

999
    std::vector<std::vector<GzipMember>> member_map;
×
1000
    {
UNCOV
1001
        PyObject *seq =
×
1002
            PySequence_Fast(map_obj, "member_map must be a sequence");
×
1003
        if (!seq) return NULL;
×
1004
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
1005
        member_map.resize(n);
×
1006
        for (Py_ssize_t i = 0; i < n; ++i) {
×
1007
            PyObject *inner = PySequence_Fast_GET_ITEM(seq, i);
×
UNCOV
1008
            PyObject *iseq =
×
1009
                PySequence_Fast(inner, "member_map[i] must be a sequence");
×
1010
            if (!iseq) {
×
UNCOV
1011
                Py_DECREF(seq);
×
1012
                return NULL;
×
1013
            }
1014
            Py_ssize_t ni = PySequence_Fast_GET_SIZE(iseq);
×
1015
            member_map[i].resize(ni);
×
1016
            for (Py_ssize_t j = 0; j < ni; ++j) {
×
1017
                PyObject *t = PySequence_Fast_GET_ITEM(iseq, j);
×
1018
                unsigned long long c_offset = 0, c_size = 0;
×
1019
                if (!PyArg_ParseTuple(t, "KK", &c_offset, &c_size)) {
×
UNCOV
1020
                    Py_DECREF(iseq);
×
UNCOV
1021
                    Py_DECREF(seq);
×
1022
                    return NULL;
×
1023
                }
1024
                member_map[i][j].c_offset =
×
UNCOV
1025
                    static_cast<std::uint64_t>(c_offset);
×
1026
                member_map[i][j].c_size = static_cast<std::uint64_t>(c_size);
×
UNCOV
1027
            }
×
UNCOV
1028
            Py_DECREF(iseq);
×
UNCOV
1029
        }
×
UNCOV
1030
        Py_DECREF(seq);
×
1031
    }
1032

1033
    // Fallback: treat empty/non-gzip files as a single whole-file member.
1034
    std::uint64_t total_c = 0;
×
1035
    for (auto &mv : member_map) {
×
1036
        if (mv.empty()) mv.push_back({0, 0});
×
1037
        for (const auto &m : mv) total_c += m.c_size;
×
1038
    }
1039

1040
    if (target_c_size == 0) {
×
1041
        target_c_size =
×
1042
            (total_c + static_cast<std::uint64_t>(num_workers) - 1) /
×
1043
            std::max<std::uint64_t>(static_cast<std::uint64_t>(num_workers), 1);
×
UNCOV
1044
    }
×
1045

1046
    struct Unit {
1047
        std::size_t file_idx;
1048
        std::size_t member_begin;
1049
        std::size_t member_end;
1050
        std::uint64_t c_size;
1051
    };
1052
    std::vector<Unit> units;
×
1053
    for (std::size_t fi = 0; fi < member_map.size(); ++fi) {
×
1054
        const auto &members = member_map[fi];
×
1055
        if (members.empty()) continue;
×
1056
        std::size_t begin = 0;
×
1057
        std::uint64_t accum = 0;
×
1058
        for (std::size_t i = 0; i < members.size(); ++i) {
×
1059
            accum += members[i].c_size;
×
1060
            const bool is_last = (i + 1 == members.size());
×
1061
            if ((target_c_size > 0 && accum >= target_c_size) || is_last) {
×
1062
                units.push_back({fi, begin, i + 1, accum});
×
1063
                begin = i + 1;
×
1064
                accum = 0;
×
UNCOV
1065
            }
×
UNCOV
1066
        }
×
UNCOV
1067
    }
×
1068

1069
    std::vector<std::size_t> order(units.size());
×
1070
    for (std::size_t i = 0; i < order.size(); ++i) order[i] = i;
×
1071
    std::sort(order.begin(), order.end(), [&](std::size_t a, std::size_t b) {
×
1072
        if (units[a].c_size != units[b].c_size)
×
1073
            return units[a].c_size > units[b].c_size;
×
1074
        if (units[a].file_idx != units[b].file_idx)
×
1075
            return units[a].file_idx < units[b].file_idx;
×
1076
        return units[a].member_begin < units[b].member_begin;
×
UNCOV
1077
    });
×
1078
    const std::size_t nw = static_cast<std::size_t>(num_workers);
×
1079
    std::vector<std::uint64_t> loads(nw, 0);
×
1080
    std::vector<std::vector<std::size_t>> per_worker(nw);
×
1081
    for (std::size_t ord : order) {
×
1082
        std::size_t best = 0;
×
1083
        for (std::size_t r = 1; r < nw; ++r)
×
1084
            if (loads[r] < loads[best]) best = r;
×
1085
        per_worker[best].push_back(ord);
×
1086
        loads[best] += std::max<std::uint64_t>(units[ord].c_size, 1);
×
1087
    }
1088

1089
    PyObject *out = PyList_New(static_cast<Py_ssize_t>(nw));
×
1090
    if (!out) return NULL;
×
1091
    for (std::size_t w = 0; w < nw; ++w) {
×
1092
        // Keep per-worker slices sorted by (file_idx, member_begin) for
1093
        // deterministic, file-group-friendly iteration downstream.
1094
        auto &lst = per_worker[w];
×
1095
        std::sort(lst.begin(), lst.end(), [&](std::size_t a, std::size_t b) {
×
1096
            if (units[a].file_idx != units[b].file_idx)
×
1097
                return units[a].file_idx < units[b].file_idx;
×
1098
            return units[a].member_begin < units[b].member_begin;
×
UNCOV
1099
        });
×
1100
        PyObject *inner = PyList_New(static_cast<Py_ssize_t>(lst.size()));
×
1101
        if (!inner) {
×
UNCOV
1102
            Py_DECREF(out);
×
1103
            return NULL;
×
1104
        }
1105
        for (std::size_t k = 0; k < lst.size(); ++k) {
×
1106
            const auto &u = units[lst[k]];
×
1107
            PyObject *t = Py_BuildValue(
×
1108
                "(nnnK)", (Py_ssize_t)u.file_idx, (Py_ssize_t)u.member_begin,
×
1109
                (Py_ssize_t)u.member_end, (unsigned long long)u.c_size);
×
1110
            if (!t) {
×
UNCOV
1111
                Py_DECREF(inner);
×
UNCOV
1112
                Py_DECREF(out);
×
1113
                return NULL;
×
1114
            }
1115
            PyList_SET_ITEM(inner, k, t);
×
UNCOV
1116
        }
×
1117
        PyList_SET_ITEM(out, w, inner);
×
UNCOV
1118
    }
×
1119
    return out;
×
1120
}
×
1121

1122
// ---------------------------------------------------------------------------
1123
// Module registration
1124
// ---------------------------------------------------------------------------
1125

1126
static PyMethodDef SstDistributionMethods[] = {
1127
    {"build_sst_batch", (PyCFunction)build_sst_batch_fn,
1128
     METH_VARARGS | METH_KEYWORDS,
1129
     "build_sst_batch(files, file_ids, staging_dir, batch_id, ...) "
1130
     "-> (list[dict], bytes)\n"
1131
     "Run the indexer pipeline with an SST sink and return "
1132
     "(artifact_dicts, tracker_blob). The tracker blob is the serialized "
1133
     "merged AssociationTracker from this batch's aggregation visitors "
1134
     "(empty bytes when no aggregation_config was passed)."},
1135
    {"plan_lpt_partition", (PyCFunction)plan_lpt_partition_fn, METH_VARARGS,
1136
     "plan_lpt_partition(entries, num_workers) -> list[list[(path, size)]]\n"
1137
     "Greedy Longest-Processing-Time-first bin-packing of (path, size) "
1138
     "tuples across num_workers buckets. Minimises the maximum per-worker "
1139
     "total size."},
1140
    {"scan_files", (PyCFunction)scan_files_fn, METH_VARARGS | METH_KEYWORDS,
1141
     "scan_files(directory, patterns=None, recursive=False, runtime=None) "
1142
     "-> list[(path, size)]\n"
1143
     "Parallel directory scan returning (path, size) tuples for regular "
1144
     "files matching the patterns."},
1145
    {"enable_aggregation_deterministic_ids",
1146
     (PyCFunction)enable_aggregation_deterministic_ids_fn, METH_NOARGS,
1147
     "enable_aggregation_deterministic_ids() -> None\n"
1148
     "Flip the global aggregation StringIntern into deterministic-id mode "
1149
     "so the same string maps to the same 32-bit id in every worker "
1150
     "process. Call once at worker startup BEFORE any aggregation work."},
1151
    {"move_artifacts", (PyCFunction)move_artifacts_fn,
1152
     METH_VARARGS | METH_KEYWORDS,
1153
     "move_artifacts(artifacts, dest_dir) -> dict\n"
1154
     "Move every populated SST in `artifacts` (as returned by "
1155
     "`build_sst_batch`) into `dest_dir` via the C++ rename/copy helper, "
1156
     "returning a fresh dict with the new paths. Single GIL release, no "
1157
     "per-file Python shutil.move overhead."},
1158
    {"enumerate_gzip_members", (PyCFunction)enumerate_gzip_members_fn,
1159
     METH_VARARGS | METH_KEYWORDS,
1160
     "enumerate_gzip_members(files, runtime=None) -> list[list[(c_offset, "
1161
     "c_size)]]\n"
1162
     "Cooperative async scan of gzip member offsets across `files`. "
1163
     "Returns lists of (c_offset, c_size) parallel to `files`; empty for "
1164
     "non-gzip / unreadable files."},
1165
    {"plan_work_units", (PyCFunction)plan_work_units_fn,
1166
     METH_VARARGS | METH_KEYWORDS,
1167
     "plan_work_units(member_map, num_workers, target_c_size=0) "
1168
     "-> list[list[(file_idx, member_begin, member_end, c_size)]]\n"
1169
     "Deterministic LPT assignment of intra-file gzip-member slices "
1170
     "across workers. Each worker's list contains (file_idx, "
1171
     "member_begin, member_end, c_size) tuples; a file sliced across "
1172
     "multiple workers appears in each owner's list with disjoint "
1173
     "[member_begin, member_end) ranges."},
1174
    {NULL, NULL, 0, NULL}};
1175

1176
int init_sst_distribution(PyObject *m) {
1✔
1177
    if (register_type(m, &SstArtifactRegistryType, "SstArtifactRegistry") < 0)
1!
UNCOV
1178
        return -1;
×
1179
    if (PyModule_AddFunctions(m, SstDistributionMethods) < 0) return -1;
1!
1180
    return 0;
1✔
1181
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc