• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28356348514

29 Jun 2026 07:40AM UTC coverage: 52.174% (-0.1%) from 52.278%
28356348514

Pull #83

github

web-flow
Merge 278203630 into 2efed6649
Pull Request #83: refactor and improve code QoL

37276 of 92891 branches covered (40.13%)

Branch coverage included in aggregate %.

671 of 1173 new or added lines in 58 files covered. (57.2%)

66 existing lines in 30 files now uncovered.

33619 of 42991 relevant lines covered (78.2%)

20387.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.43
/src/dftracer/utils/python/sst_distribution.cpp
1
#include <dftracer/utils/core/common/constants.h>
2
#include <dftracer/utils/core/runtime.h>
3
#include <dftracer/utils/core/tasks/coro_scope.h>
4
#include <dftracer/utils/python/py_type_helpers.h>
5
#include <dftracer/utils/python/runtime.h>
6
#include <dftracer/utils/python/sst_distribution.h>
7
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_config.h>
8
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_key.h>
9
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_visitor.h>
10
#include <dftracer/utils/utilities/composites/dft/aggregators/association_tracker.h>
11
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
12
#include <dftracer/utils/utilities/indexer/file_partition.h>
13
#include <dftracer/utils/utilities/indexer/index_batch_sink.h>
14
#include <dftracer/utils/utilities/indexer/index_builder_utility.h>
15
#include <dftracer/utils/utilities/indexer/index_database_sst_writer_context.h>
16
#include <dftracer/utils/utilities/indexer/internal/common/gzip_member_scanner.h>
17
#include <fcntl.h>
18
#include <sys/stat.h>
19
#include <unistd.h>
20

21
#include <algorithm>
22
#include <atomic>
23
#include <cstdint>
24
#include <memory>
25
#include <mutex>
26
#include <new>
27
#include <optional>
28
#include <string>
29
#include <unordered_set>
30
#include <vector>
31

32
using dftracer::utils::Runtime;
33
using dftracer::utils::utilities::filesystem::FileEntry;
34
using dftracer::utils::utilities::filesystem::PatternDirectoryScannerUtility;
35
using dftracer::utils::utilities::filesystem::
36
    PatternDirectoryScannerUtilityInput;
37
using dftracer::utils::utilities::indexer::IndexBatchBuilderUtility;
38
using dftracer::utils::utilities::indexer::IndexBatchSink;
39
using dftracer::utils::utilities::indexer::IndexBuildBatchConfig;
40
using dftracer::utils::utilities::indexer::IndexBuildBatchResult;
41
using dftracer::utils::utilities::indexer::IndexDatabaseSstWriterContext;
42
using dftracer::utils::utilities::indexer::plan_lpt_partition;
43
using dftracer::utils::utilities::indexer::SstArtifactRegistry;
44
using dftracer::utils::utilities::indexer::internal::
45
    enumerate_gzip_member_candidates;
46
using dftracer::utils::utilities::indexer::internal::GzipMember;
47

48
// ---------------------------------------------------------------------------
49
// SstArtifactRegistry type
50
// ---------------------------------------------------------------------------
51

52
typedef struct {
53
    PyObject_HEAD std::shared_ptr<SstArtifactRegistry> registry;
54
} SstArtifactRegistryObject;
55

56
static void SstArtifactRegistry_dealloc(SstArtifactRegistryObject *self) {
×
57
    self->registry.~shared_ptr<SstArtifactRegistry>();
×
58
    Py_TYPE(self)->tp_free((PyObject *)self);
×
59
}
×
60

61
static PyObject *SstArtifactRegistry_new(PyTypeObject *type,
×
62
                                         PyObject * /*args*/,
63
                                         PyObject * /*kwds*/) {
64
    auto *self = (SstArtifactRegistryObject *)type->tp_alloc(type, 0);
×
65
    if (!self) return NULL;
×
66
    new (&self->registry) std::shared_ptr<SstArtifactRegistry>(
×
67
        std::make_shared<SstArtifactRegistry>());
×
68
    return (PyObject *)self;
×
69
}
70

71
namespace {
72

73
// Field names in the Artifacts dict returned by build_sst_batch and
74
// consumed by SstArtifactRegistry.append. Must match the field names on
75
// IndexDatabaseSstWriterContext::Artifacts.
76
constexpr const char *ARTIFACT_FIELDS[] = {
77
    "metadata_sst",        "checkpoints_sst",         "manifest_sst",
78
    "chunk_bloom_sst",     "file_bloom_sst",          "chunk_stats_sst",
79
    "chunk_dim_stats_sst", "dimensions_sst",          "file_scalar_stats_sst",
80
    "file_cat_counts_sst", "file_pid_tid_counts_sst", "file_name_counts_sst",
81
    "name_dictionary_sst", "name_file_postings_sst",  "name_chunk_postings_sst",
82
    "hash_tables_sst",     "aggregation_sst",         "system_metrics_sst",
83
};
84

85
/// Map a slot name to the matching Artifacts member. Kept in one place so
86
/// that adding a new CF requires updating only `ARTIFACT_FIELDS` plus
87
/// `dispatch_*` below.
88
std::optional<std::string> *artifacts_slot(
×
89
    IndexDatabaseSstWriterContext::Artifacts &a, std::string_view name) {
90
    if (name == "metadata_sst") return &a.metadata_sst;
×
91
    if (name == "checkpoints_sst") return &a.checkpoints_sst;
×
92
    if (name == "manifest_sst") return &a.manifest_sst;
×
93
    if (name == "chunk_bloom_sst") return &a.chunk_bloom_sst;
×
94
    if (name == "file_bloom_sst") return &a.file_bloom_sst;
×
95
    if (name == "chunk_stats_sst") return &a.chunk_stats_sst;
×
96
    if (name == "chunk_dim_stats_sst") return &a.chunk_dim_stats_sst;
×
97
    if (name == "dimensions_sst") return &a.dimensions_sst;
×
98
    if (name == "file_scalar_stats_sst") return &a.file_scalar_stats_sst;
×
99
    if (name == "file_cat_counts_sst") return &a.file_cat_counts_sst;
×
100
    if (name == "file_pid_tid_counts_sst") return &a.file_pid_tid_counts_sst;
×
101
    if (name == "file_name_counts_sst") return &a.file_name_counts_sst;
×
102
    if (name == "name_dictionary_sst") return &a.name_dictionary_sst;
×
103
    if (name == "name_file_postings_sst") return &a.name_file_postings_sst;
×
104
    if (name == "name_chunk_postings_sst") return &a.name_chunk_postings_sst;
×
105
    if (name == "hash_tables_sst") return &a.hash_tables_sst;
×
106
    if (name == "aggregation_sst") return &a.aggregation_sst;
×
107
    if (name == "system_metrics_sst") return &a.system_metrics_sst;
×
108
    return nullptr;
×
109
}
110

111
/// Convert a Python artifacts dict to the C++ Artifacts struct. Missing,
112
/// None, or empty-string entries become nullopt. Returns false on type
113
/// errors (exception set).
114
bool artifacts_from_dict(PyObject *dict,
×
115
                         IndexDatabaseSstWriterContext::Artifacts *out) {
116
    if (!PyDict_Check(dict)) {
×
117
        PyErr_SetString(PyExc_TypeError, "artifacts must be a dict");
×
118
        return false;
×
119
    }
120
    for (const char *field : ARTIFACT_FIELDS) {
×
121
        PyObject *val = PyDict_GetItemString(dict, field);  // borrowed
×
122
        if (!val || val == Py_None) continue;
×
123
        if (!PyUnicode_Check(val)) {
×
124
            PyErr_Format(PyExc_TypeError, "artifacts['%s'] must be str or None",
×
125
                         field);
126
            return false;
×
127
        }
128
        const char *s = PyUnicode_AsUTF8(val);
×
129
        if (!s) return false;
×
130
        if (s[0] == '\0') continue;
×
131
        auto *slot = artifacts_slot(*out, field);
×
132
        if (slot) *slot = std::string(s);
×
133
    }
134
    return true;
×
135
}
136

137
PyObject *artifacts_to_dict(const IndexDatabaseSstWriterContext::Artifacts &a) {
×
138
    PyObject *dict = PyDict_New();
×
139
    if (!dict) return NULL;
×
140
    auto set_field = [&](const char *name,
×
141
                         const std::optional<std::string> &slot) -> bool {
142
        PyObject *v = slot.has_value() ? PyUnicode_FromString(slot->c_str())
×
143
                                       : (Py_INCREF(Py_None), Py_None);
×
144
        if (!v) return false;
×
145
        int rc = PyDict_SetItemString(dict, name, v);
×
146
        Py_DECREF(v);
147
        return rc == 0;
×
148
    };
×
149
    if (!set_field("metadata_sst", a.metadata_sst) ||
×
150
        !set_field("checkpoints_sst", a.checkpoints_sst) ||
×
151
        !set_field("manifest_sst", a.manifest_sst) ||
×
152
        !set_field("chunk_bloom_sst", a.chunk_bloom_sst) ||
×
153
        !set_field("file_bloom_sst", a.file_bloom_sst) ||
×
154
        !set_field("chunk_stats_sst", a.chunk_stats_sst) ||
×
155
        !set_field("chunk_dim_stats_sst", a.chunk_dim_stats_sst) ||
×
156
        !set_field("dimensions_sst", a.dimensions_sst) ||
×
157
        !set_field("file_scalar_stats_sst", a.file_scalar_stats_sst) ||
×
158
        !set_field("file_cat_counts_sst", a.file_cat_counts_sst) ||
×
159
        !set_field("file_pid_tid_counts_sst", a.file_pid_tid_counts_sst) ||
×
160
        !set_field("file_name_counts_sst", a.file_name_counts_sst) ||
×
161
        !set_field("name_dictionary_sst", a.name_dictionary_sst) ||
×
162
        !set_field("name_file_postings_sst", a.name_file_postings_sst) ||
×
163
        !set_field("name_chunk_postings_sst", a.name_chunk_postings_sst) ||
×
164
        !set_field("hash_tables_sst", a.hash_tables_sst) ||
×
165
        !set_field("aggregation_sst", a.aggregation_sst) ||
×
166
        !set_field("system_metrics_sst", a.system_metrics_sst)) {
×
167
        Py_DECREF(dict);
×
168
        return NULL;
×
169
    }
170
    return dict;
×
171
}
172

173
}  // namespace
174

175
static PyObject *SstArtifactRegistry_append(SstArtifactRegistryObject *self,
×
176
                                            PyObject *args) {
177
    PyObject *dict;
178
    if (!PyArg_ParseTuple(args, "O", &dict)) return NULL;
×
179
    IndexDatabaseSstWriterContext::Artifacts a;
×
180
    if (!artifacts_from_dict(dict, &a)) return NULL;
×
181
    self->registry->append(std::move(a));
×
182
    Py_RETURN_NONE;
×
183
}
×
184

185
static PyMethodDef SstArtifactRegistry_methods[] = {
186
    {"append", (PyCFunction)SstArtifactRegistry_append, METH_VARARGS,
187
     "append(artifacts_dict) -> None\n"
188
     "Add a per-batch Artifacts dict (as returned by build_sst_batch or "
189
     "IndexDatabaseSstWriterContext.commit) to the registry."},
190
    {NULL}};
191

192
static PyTypeObject SstArtifactRegistryType = {
193
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.SstArtifactRegistry",
194
    sizeof(SstArtifactRegistryObject),
195
    0,
196
    (destructor)SstArtifactRegistry_dealloc,
197
    0,
198
    0,
199
    0,
200
    0,
201
    0,
202
    0,
203
    0,
204
    0,
205
    0,
206
    0,
207
    0,
208
    0,
209
    0,
210
    0,
211
    Py_TPFLAGS_DEFAULT,
212
    "Thread-safe collector for SST artifact paths.",
213
    0,
214
    0,
215
    0,
216
    0,
217
    0,
218
    0,
219
    SstArtifactRegistry_methods,
220
    0,
221
    0,
222
    0,
223
    0,
224
    0,
225
    0,
226
    0,
227
    0,
228
    0,
229
    SstArtifactRegistry_new,
230
};
231

232
SstArtifactRegistry *sst_artifact_registry_get(PyObject *obj) {
×
233
    if (!PyObject_TypeCheck(obj, &SstArtifactRegistryType)) return nullptr;
×
234
    return ((SstArtifactRegistryObject *)obj)->registry.get();
×
235
}
236

237
// ---------------------------------------------------------------------------
238
// scan_files: parallel directory scan with size info
239
// ---------------------------------------------------------------------------
240

241
static PyObject *scan_files_fn(PyObject * /*self*/, PyObject *args,
×
242
                               PyObject *kwds) {
243
    static const char *kwlist[] = {"directory", "patterns", "recursive",
244
                                   "runtime", NULL};
245
    const char *directory;
246
    PyObject *patterns_obj = NULL;
×
247
    int recursive = 0;
×
248
    PyObject *runtime_arg = NULL;
×
249
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|OpO", (char **)kwlist,
×
250
                                     &directory, &patterns_obj, &recursive,
251
                                     &runtime_arg)) {
252
        return NULL;
×
253
    }
254

255
    std::vector<std::string> patterns;
×
256
    if (patterns_obj && patterns_obj != Py_None) {
×
257
        PyObject *seq =
258
            PySequence_Fast(patterns_obj, "patterns must be a sequence");
×
259
        if (!seq) return NULL;
×
260
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
261
        patterns.reserve(n);
×
262
        for (Py_ssize_t i = 0; i < n; ++i) {
×
263
            const char *s = PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(seq, i));
×
264
            if (!s) {
×
265
                Py_DECREF(seq);
×
266
                return NULL;
×
267
            }
268
            patterns.emplace_back(s);
×
269
        }
270
        Py_DECREF(seq);
×
271
    }
272

273
    Runtime *rt = nullptr;
×
274
    if (runtime_arg && runtime_arg != Py_None) {
×
275
        if (!PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
276
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
277
            if (!native || !PyObject_TypeCheck(native, &RuntimeType)) {
×
278
                Py_XDECREF(native);
×
279
                PyErr_SetString(PyExc_TypeError,
×
280
                                "runtime must be a Runtime instance or None");
281
                return NULL;
×
282
            }
283
            rt = ((RuntimeObject *)native)->runtime.get();
×
284
            Py_DECREF(native);
×
285
        } else {
286
            rt = ((RuntimeObject *)runtime_arg)->runtime.get();
×
287
        }
288
    } else {
×
289
        rt = get_default_runtime();
×
290
    }
291

292
    PatternDirectoryScannerUtilityInput input(directory, patterns,
×
293
                                              recursive != 0, true);
×
294
    std::vector<FileEntry> entries;
×
295
    try {
296
        Py_BEGIN_ALLOW_THREADS rt
×
297
            ->submit(dftracer::utils::run_coro_scope(
×
298
                         rt->executor(),
299
                         [](dftracer::utils::CoroScope &scope,
×
300
                            PatternDirectoryScannerUtilityInput in,
301
                            std::vector<FileEntry> *out)
302
                             -> dftracer::utils::coro::CoroTask<void> {
×
303
                             PatternDirectoryScannerUtility scanner;
×
304
                             *out = co_await scope.spawn(scanner, in);
×
305
                         },
×
306
                         std::move(input), &entries),
×
307
                     "scan-files")
×
308
            .get();
×
309
        Py_END_ALLOW_THREADS
×
310
    } catch (const std::exception &e) {
×
311
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
312
        return NULL;
×
313
    }
×
314

315
    PyObject *out = PyList_New(static_cast<Py_ssize_t>(entries.size()));
×
316
    if (!out) return NULL;
×
317
    for (std::size_t i = 0; i < entries.size(); ++i) {
×
318
        PyObject *t = Py_BuildValue("(sn)", entries[i].path.c_str(),
×
319
                                    (Py_ssize_t)entries[i].size);
×
320
        if (!t) {
×
321
            Py_DECREF(out);
×
322
            return NULL;
×
323
        }
324
        PyList_SET_ITEM(out, i, t);
×
325
    }
326
    return out;
×
327
}
×
328

329
// ---------------------------------------------------------------------------
330
// plan_lpt_partition: LPT bin-packing of (path, size) pairs
331
// ---------------------------------------------------------------------------
332

333
static PyObject *plan_lpt_partition_fn(PyObject * /*self*/, PyObject *args) {
×
334
    PyObject *entries_obj;
335
    Py_ssize_t num_workers;
336
    if (!PyArg_ParseTuple(args, "On", &entries_obj, &num_workers)) return NULL;
×
337
    if (num_workers <= 0) num_workers = 1;
×
338

339
    std::vector<FileEntry> entries;
×
340
    PyObject *seq = PySequence_Fast(entries_obj,
×
341
                                    "entries must be a sequence of "
342
                                    "(path, size) tuples");
343
    if (!seq) return NULL;
×
344
    Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
345
    entries.reserve(n);
×
346
    for (Py_ssize_t i = 0; i < n; ++i) {
×
347
        PyObject *item = PySequence_Fast_GET_ITEM(seq, i);
×
348
        const char *path = nullptr;
×
349
        Py_ssize_t size = 0;
×
350
        if (!PyArg_ParseTuple(item, "sn", &path, &size)) {
×
351
            Py_DECREF(seq);
×
352
            return NULL;
×
353
        }
354
        FileEntry fe;
×
355
        fe.path = path;
×
356
        fe.size = static_cast<std::size_t>(size);
×
357
        fe.is_regular_file = true;
×
358
        entries.push_back(std::move(fe));
×
359
    }
×
360
    Py_DECREF(seq);
×
361

362
    auto buckets = plan_lpt_partition(std::move(entries),
×
363
                                      static_cast<std::size_t>(num_workers));
×
364

365
    PyObject *out = PyList_New(static_cast<Py_ssize_t>(buckets.size()));
×
366
    if (!out) return NULL;
×
367
    for (std::size_t i = 0; i < buckets.size(); ++i) {
×
368
        PyObject *lst = PyList_New(static_cast<Py_ssize_t>(buckets[i].size()));
×
369
        if (!lst) {
×
370
            Py_DECREF(out);
×
371
            return NULL;
×
372
        }
373
        for (std::size_t j = 0; j < buckets[i].size(); ++j) {
×
374
            PyObject *t = Py_BuildValue("(sn)", buckets[i][j].path.c_str(),
×
375
                                        (Py_ssize_t)buckets[i][j].size);
×
376
            if (!t) {
×
377
                Py_DECREF(lst);
×
378
                Py_DECREF(out);
×
379
                return NULL;
×
380
            }
381
            PyList_SET_ITEM(lst, j, t);
×
382
        }
383
        PyList_SET_ITEM(out, i, lst);
×
384
    }
385
    return out;
×
386
}
×
387

388
// ---------------------------------------------------------------------------
389
// build_sst_batch: run the indexer pipeline with an SST sink and return
390
// the merged Artifacts dict.
391
// ---------------------------------------------------------------------------
392

393
static PyObject *build_sst_batch_fn(PyObject * /*self*/, PyObject *args,
×
394
                                    PyObject *kwds) {
395
    static const char *kwlist[] = {"files",
396
                                   "file_ids",
397
                                   "staging_dir",
398
                                   "batch_id",
399
                                   "index_dir",
400
                                   "checkpoint_size",
401
                                   "build_manifest",
402
                                   "force_rebuild",
403
                                   "bloom_dimensions",
404
                                   "parallelism",
405
                                   "flush_every_files",
406
                                   "runtime",
407
                                   "aggregation_config",
408
                                   "file_slices",
409
                                   NULL};
410
    PyObject *files_obj;
411
    PyObject *file_ids_obj;
412
    const char *staging_dir;
413
    const char *batch_id;
414
    const char *index_dir = "";
×
NEW
415
    Py_ssize_t checkpoint_size = static_cast<Py_ssize_t>(
×
416
        dftracer::utils::constants::indexer::DEFAULT_CHECKPOINT_SIZE);
417
    int build_manifest = 0;
×
418
    int force_rebuild = 0;
×
419
    PyObject *bloom_dims_obj = NULL;
×
420
    Py_ssize_t parallelism = 0;
×
421
    Py_ssize_t flush_every_files = 0;
×
422
    PyObject *runtime_arg = NULL;
×
423
    PyObject *aggregation_config_obj = NULL;
×
424
    PyObject *file_slices_obj = NULL;
×
425

426
    if (!PyArg_ParseTupleAndKeywords(
×
427
            args, kwds, "OOss|snppOnnOOO", (char **)kwlist, &files_obj,
428
            &file_ids_obj, &staging_dir, &batch_id, &index_dir,
429
            &checkpoint_size, &build_manifest, &force_rebuild, &bloom_dims_obj,
430
            &parallelism, &flush_every_files, &runtime_arg,
431
            &aggregation_config_obj, &file_slices_obj)) {
432
        return NULL;
×
433
    }
434

435
    // Unpack files.
436
    std::vector<std::string> files;
×
437
    {
438
        PyObject *seq = PySequence_Fast(files_obj, "files must be a sequence");
×
439
        if (!seq) return NULL;
×
440
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
441
        files.reserve(n);
×
442
        for (Py_ssize_t i = 0; i < n; ++i) {
×
443
            const char *s = PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(seq, i));
×
444
            if (!s) {
×
445
                Py_DECREF(seq);
×
446
                return NULL;
×
447
            }
448
            files.emplace_back(s);
×
449
        }
450
        Py_DECREF(seq);
×
451
    }
452
    if (files.empty()) {
×
453
        return PyDict_New();
×
454
    }
455

456
    // Unpack file_ids, parallel to files.
457
    std::vector<int> file_ids;
×
458
    {
459
        PyObject *seq =
460
            PySequence_Fast(file_ids_obj, "file_ids must be a sequence");
×
461
        if (!seq) return NULL;
×
462
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
463
        if (static_cast<std::size_t>(n) != files.size()) {
×
464
            Py_DECREF(seq);
×
465
            PyErr_SetString(PyExc_ValueError,
×
466
                            "file_ids must have the same length as files");
467
            return NULL;
×
468
        }
469
        file_ids.reserve(n);
×
470
        for (Py_ssize_t i = 0; i < n; ++i) {
×
471
            long v = PyLong_AsLong(PySequence_Fast_GET_ITEM(seq, i));
×
472
            if (v == -1 && PyErr_Occurred()) {
×
473
                Py_DECREF(seq);
×
474
                return NULL;
×
475
            }
476
            file_ids.push_back(static_cast<int>(v));
×
477
        }
478
        Py_DECREF(seq);
×
479
    }
480

481
    // Optional bloom dimensions override.
482
    std::vector<std::string> bloom_dims;
×
483
    if (bloom_dims_obj && bloom_dims_obj != Py_None) {
×
484
        PyObject *seq = PySequence_Fast(bloom_dims_obj,
×
485
                                        "bloom_dimensions must be a sequence");
486
        if (!seq) return NULL;
×
487
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
488
        bloom_dims.reserve(n);
×
489
        for (Py_ssize_t i = 0; i < n; ++i) {
×
490
            const char *s = PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(seq, i));
×
491
            if (!s) {
×
492
                Py_DECREF(seq);
×
493
                return NULL;
×
494
            }
495
            bloom_dims.emplace_back(s);
×
496
        }
497
        Py_DECREF(seq);
×
498
    }
499

500
    // Resolve Runtime (matching CheckpointIndexer pattern).
501
    Runtime *rt = nullptr;
×
502
    if (runtime_arg && runtime_arg != Py_None) {
×
503
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
504
            rt = ((RuntimeObject *)runtime_arg)->runtime.get();
×
505
        } else {
506
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
507
            if (!native || !PyObject_TypeCheck(native, &RuntimeType)) {
×
508
                Py_XDECREF(native);
×
509
                PyErr_SetString(PyExc_TypeError,
×
510
                                "runtime must be a Runtime instance or None");
511
                return NULL;
×
512
            }
513
            rt = ((RuntimeObject *)native)->runtime.get();
×
514
            Py_DECREF(native);
×
515
        }
516
    } else {
×
517
        rt = get_default_runtime();
×
518
    }
519

520
    // Build config + sink factory shared state.
521
    struct SharedArtifacts {
522
        std::mutex mu;
523
        std::vector<IndexDatabaseSstWriterContext::Artifacts> list;
524
    };
525
    auto artifacts = std::make_shared<SharedArtifacts>();
×
526
    auto staging = std::string(staging_dir);
×
527
    auto batch = std::string(batch_id);
×
528

529
    // Optional aggregation config, extracted from the Python dataclass.
530
    std::shared_ptr<dftracer::utils::utilities::composites::dft::aggregators::
531
                        AggregationConfig>
532
        agg_config_ptr;
×
533
    if (aggregation_config_obj && aggregation_config_obj != Py_None) {
×
534
        using dftracer::utils::utilities::composites::dft::aggregators::
535
            AggregationConfig;
536
        auto cfg = std::make_shared<AggregationConfig>();
×
537
        auto pull_double = [&](const char *name, double fallback) -> double {
×
538
            PyObject *v = PyObject_GetAttrString(aggregation_config_obj, name);
×
539
            if (!v || v == Py_None) {
×
540
                Py_XDECREF(v);
×
541
                PyErr_Clear();
×
542
                return fallback;
×
543
            }
544
            double out = PyFloat_AsDouble(v);
×
545
            Py_DECREF(v);
546
            if (out == -1.0 && PyErr_Occurred()) return fallback;
×
547
            return out;
×
548
        };
×
549
        auto pull_bool = [&](const char *name, bool fallback) -> bool {
×
550
            PyObject *v = PyObject_GetAttrString(aggregation_config_obj, name);
×
551
            if (!v || v == Py_None) {
×
552
                Py_XDECREF(v);
×
553
                PyErr_Clear();
×
554
                return fallback;
×
555
            }
556
            int out = PyObject_IsTrue(v);
×
557
            Py_DECREF(v);
558
            return out > 0 ? true : fallback;
×
559
        };
×
560
        auto pull_string_list =
561
            [&](const char *name) -> std::vector<std::string> {
×
562
            std::vector<std::string> out;
×
563
            PyObject *v = PyObject_GetAttrString(aggregation_config_obj, name);
×
564
            if (!v || v == Py_None) {
×
565
                Py_XDECREF(v);
×
566
                PyErr_Clear();
×
567
                return out;
×
568
            }
569
            PyObject *seq = PySequence_Fast(v, "expected list of str");
×
570
            Py_DECREF(v);
×
571
            if (!seq) {
×
572
                PyErr_Clear();
×
573
                return out;
×
574
            }
575
            Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
576
            out.reserve(n);
×
577
            for (Py_ssize_t i = 0; i < n; ++i) {
×
578
                const char *s =
579
                    PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(seq, i));
×
580
                if (s) out.emplace_back(s);
×
581
            }
582
            Py_DECREF(seq);
×
583
            return out;
×
584
        };
×
585
        double time_interval_ms = pull_double("time_interval_ms", 5000.0);
×
586
        cfg->time_interval_us =
×
587
            static_cast<std::uint64_t>(time_interval_ms * 1000.0);
×
588
        cfg->compute_percentiles = pull_bool("compute_percentiles", false);
×
589
        cfg->extra_group_keys = pull_string_list("group_keys");
×
590
        cfg->custom_metric_fields = pull_string_list("custom_metric_fields");
×
591
        agg_config_ptr = std::move(cfg);
×
592
    }
×
593

594
    // owned_member_maps must outlive rt->submit: FileSlice::members is raw.
595
    std::vector<std::vector<GzipMember>> owned_member_maps;
×
596
    std::vector<IndexBuildBatchConfig::FileSlice> parsed_slices;
×
597
    if (file_slices_obj && file_slices_obj != Py_None) {
×
598
        PyObject *seq =
599
            PySequence_Fast(file_slices_obj, "file_slices must be a sequence");
×
600
        if (!seq) return NULL;
×
601
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
602
        if (static_cast<std::size_t>(n) != files.size()) {
×
603
            Py_DECREF(seq);
×
604
            PyErr_SetString(PyExc_ValueError,
×
605
                            "file_slices must match files length");
606
            return NULL;
×
607
        }
608
        owned_member_maps.resize(n);
×
609
        parsed_slices.resize(n);
×
610
        for (Py_ssize_t i = 0; i < n; ++i) {
×
611
            PyObject *entry = PySequence_Fast_GET_ITEM(seq, i);
×
612
            if (entry == Py_None) {
×
613
                continue;  // leave slice default-constructed (members=null)
×
614
            }
615
            Py_ssize_t mb = 0, me = 0, ckpt_base = 0;
×
616
            int skip_scoped = 0;
×
617
            PyObject *members_obj = nullptr;
×
618
            if (!PyArg_ParseTuple(entry, "nnnpO", &mb, &me, &ckpt_base,
×
619
                                  &skip_scoped, &members_obj)) {
620
                Py_DECREF(seq);
×
621
                return NULL;
×
622
            }
623
            PyObject *mseq = PySequence_Fast(
×
624
                members_obj, "file_slices[i].members must be a sequence");
625
            if (!mseq) {
×
626
                Py_DECREF(seq);
×
627
                return NULL;
×
628
            }
629
            Py_ssize_t mn = PySequence_Fast_GET_SIZE(mseq);
×
630
            auto &mv = owned_member_maps[i];
×
631
            mv.resize(mn);
×
632
            for (Py_ssize_t j = 0; j < mn; ++j) {
×
633
                PyObject *m = PySequence_Fast_GET_ITEM(mseq, j);
×
634
                unsigned long long c_offset = 0, c_size = 0;
×
635
                if (!PyArg_ParseTuple(m, "KK", &c_offset, &c_size)) {
×
636
                    Py_DECREF(mseq);
×
637
                    Py_DECREF(seq);
×
638
                    return NULL;
×
639
                }
640
                mv[j].c_offset = static_cast<std::uint64_t>(c_offset);
×
641
                mv[j].c_size = static_cast<std::uint64_t>(c_size);
×
642
            }
643
            Py_DECREF(mseq);
×
644
            parsed_slices[i].members = &mv;
×
645
            parsed_slices[i].member_begin = static_cast<std::size_t>(mb);
×
646
            parsed_slices[i].member_end = static_cast<std::size_t>(me);
×
647
            parsed_slices[i].checkpoint_idx_base =
×
648
                static_cast<std::uint64_t>(ckpt_base);
×
649
            parsed_slices[i].skip_file_scoped_writes = skip_scoped != 0;
×
650
        }
651
        Py_DECREF(seq);
×
652
    }
653

654
    auto batch_config = std::make_shared<IndexBuildBatchConfig>();
×
655
    batch_config->file_paths = std::move(files);
×
656
    batch_config->preassigned_file_ids = std::move(file_ids);
×
657
    if (!parsed_slices.empty()) {
×
658
        batch_config->file_slices = parsed_slices;
×
659
    }
660
    batch_config->index_dir = index_dir;
×
661
    batch_config->checkpoint_size = static_cast<std::size_t>(checkpoint_size);
×
662
    batch_config->build_manifest = build_manifest != 0;
×
663
    batch_config->force_rebuild = force_rebuild != 0;
×
664
    batch_config->bloom_dimensions = std::move(bloom_dims);
×
665
    batch_config->parallelism =
×
666
        parallelism > 0 ? static_cast<std::size_t>(parallelism)
×
667
                        : (rt ? std::max<std::size_t>(rt->threads(), 1) : 1);
×
668
    batch_config->flush_every_files =
×
669
        static_cast<std::size_t>(flush_every_files);
×
670
    batch_config->rebuild_root_summaries = false;
×
671

672
    if (agg_config_ptr) {
×
673
        auto agg_staging = staging;
×
674
        auto agg_prefix = batch + "_agg";
×
675
        // Counter keeps per-file SST dirs unique across duplicate file_paths
676
        // when one worker owns multiple slices of the same file.
677
        auto visitor_counter = std::make_shared<std::atomic<std::size_t>>(0);
×
678
        batch_config->dft_visitor_factory =
×
679
            [agg_staging, agg_prefix, agg_config_ptr,
×
680
             visitor_counter](const std::string &file_path)
681
            -> std::vector<std::unique_ptr<
682
                dftracer::utils::utilities::composites::dft::DftEventVisitor>> {
683
            using dftracer::utils::utilities::composites::dft::DftEventVisitor;
684
            using dftracer::utils::utilities::composites::dft::aggregators::
685
                AggregationVisitor;
686
            const std::size_t idx =
687
                visitor_counter->fetch_add(1, std::memory_order_relaxed);
×
688
            std::string prefix = agg_prefix + "_" + std::to_string(idx);
×
689
            std::vector<std::unique_ptr<DftEventVisitor>> visitors;
×
690
            visitors.push_back(std::make_unique<AggregationVisitor>(
×
691
                agg_staging, prefix, /*config_hash=*/0, *agg_config_ptr,
×
692
                file_path));
693
            return visitors;
×
694
        };
×
695
    }
×
696

697
    // Atomic: write phase calls sink_factory from N coroutines concurrently.
698
    auto batch_counter = std::make_shared<std::atomic<std::size_t>>(0);
×
699
    batch_config->sink_factory =
×
700
        [staging, batch, batch_counter]() -> std::unique_ptr<IndexBatchSink> {
×
701
        const std::size_t idx =
702
            batch_counter->fetch_add(1, std::memory_order_relaxed);
×
703
        std::string sub_batch = batch + "_" + std::to_string(idx);
×
704
        return std::make_unique<IndexDatabaseSstWriterContext>(staging,
×
705
                                                               sub_batch);
706
    };
×
707
    batch_config->sink_commit = [artifacts](IndexBatchSink &sink) {
×
708
        auto &sst = static_cast<IndexDatabaseSstWriterContext &>(sink);
×
709
        auto batch_artifacts = sst.commit();
×
710
        std::lock_guard<std::mutex> lock(artifacts->mu);
×
711
        if (!batch_artifacts.empty()) {
×
712
            artifacts->list.push_back(std::move(batch_artifacts));
×
713
        }
714
    };
×
715

716
    IndexBuildBatchResult result;
×
717
    std::string submit_error;
×
718
    Py_BEGIN_ALLOW_THREADS try {
×
719
        rt->submit(dftracer::utils::run_coro_scope(
×
720
                       rt->executor(),
×
721
                       [](dftracer::utils::CoroScope &scope,
×
722
                          std::shared_ptr<IndexBuildBatchConfig> cfg,
723
                          IndexBuildBatchResult *out)
724
                           -> dftracer::utils::coro::CoroTask<void> {
×
725
                           *out = co_await IndexBatchBuilderUtility::process(
×
726
                               &scope, std::move(cfg));
727
                       },
×
728
                       batch_config, &result),
729
                   "build-sst-batch")
×
730
            .get();
×
731
    } catch (const std::exception &e) {
×
732
        submit_error = e.what();
×
733
    }
×
734
    Py_END_ALLOW_THREADS if (!submit_error.empty()) {
×
735
        PyErr_SetString(PyExc_RuntimeError, submit_error.c_str());
×
736
        return NULL;
×
737
    }
738

739
    // If any file failed, surface the first error.
740
    if (result.failed > 0) {
×
741
        for (const auto &r : result.results) {
×
742
            if (!r.success) {
×
743
                PyErr_SetString(PyExc_RuntimeError, r.error_message.c_str());
×
744
                return NULL;
×
745
            }
746
        }
747
    }
748

749
    // One dict per committed sink + per-file aggregation below.
750
    PyObject *out_list = PyList_New(0);
×
751
    if (!out_list) return NULL;
×
752
    {
753
        std::lock_guard<std::mutex> lock(artifacts->mu);
×
754
        for (const auto &a : artifacts->list) {
×
755
            PyObject *main_dict = artifacts_to_dict(a);
×
756
            if (!main_dict || PyList_Append(out_list, main_dict) < 0) {
×
757
                Py_XDECREF(main_dict);
×
758
                Py_DECREF(out_list);
×
759
                return NULL;
×
760
            }
761
            Py_DECREF(main_dict);
×
762
        }
763
    }
×
764
    // Harvest per-file aggregation SSTs from extra visitors. Each visitor
765
    // holds a vector of Artifacts (one per FLUSH_THRESHOLD flush + the
766
    // file-complete flush) because SstFileWriter requires strictly
767
    // ascending keys per SST and cross-flush merge operands would collide.
768
    //
769
    // `extra_visitors` is indexed per input file, but a single
770
    // AggregationVisitor instance is typically shared across every file in
771
    // the batch (one flush at end-of-batch). Without dedup we would emit
772
    // that visitor's artifact dict N_files times, producing a manifest with
773
    // N copies of the same SST path. Dedup by visitor pointer so each
774
    // unique flush-sequence is emitted exactly once.
775
    using dftracer::utils::utilities::composites::dft::aggregators::
776
        AggregationVisitor;
777
    using dftracer::utils::utilities::composites::dft::aggregators::
778
        AssociationTracker;
779
    std::unordered_set<AggregationVisitor *> seen_visitors;
×
780
    for (auto &file_visitors : result.extra_visitors) {
×
781
        for (auto &visitor : file_visitors) {
×
782
            auto *agg = dynamic_cast<AggregationVisitor *>(visitor.get());
×
783
            if (!agg) continue;
×
784
            if (!seen_visitors.insert(agg).second) continue;
×
785
            for (auto &a : agg->aggregation_artifacts()) {
×
786
                if (a.empty()) continue;
×
787
                PyObject *agg_dict = artifacts_to_dict(a);
×
788
                if (!agg_dict || PyList_Append(out_list, agg_dict) < 0) {
×
789
                    Py_XDECREF(agg_dict);
×
790
                    Py_DECREF(out_list);
×
791
                    return NULL;
×
792
                }
793
                Py_DECREF(agg_dict);
×
794
            }
795
        }
796
    }
797

798
    AssociationTracker combined;
×
799
    bool any_tracker = false;
×
800
    for (auto *agg : seen_visitors) {
×
801
        auto out = agg->take_output();
×
802
        if (out.local_tracker) {
×
803
            combined.merge(*out.local_tracker);
×
804
            any_tracker = true;
×
805
        }
806
    }
×
807
    PyObject *tracker_bytes = nullptr;
×
808
    if (any_tracker) {
×
809
        combined.finalize();
×
810
        std::string blob = combined.serialize();
×
811
        tracker_bytes = PyBytes_FromStringAndSize(
×
812
            blob.data(), static_cast<Py_ssize_t>(blob.size()));
×
813
    } else {
×
814
        tracker_bytes = PyBytes_FromStringAndSize(nullptr, 0);
×
815
    }
816
    if (!tracker_bytes) {
×
817
        Py_DECREF(out_list);
×
818
        return NULL;
×
819
    }
820
    PyObject *ret = PyTuple_Pack(2, out_list, tracker_bytes);
×
821
    Py_DECREF(out_list);
×
822
    Py_DECREF(tracker_bytes);
×
823
    return ret;
×
824
}
×
825

826
static PyObject *enable_aggregation_deterministic_ids_fn(PyObject * /*self*/,
×
827
                                                         PyObject * /*args*/) {
828
    dftracer::utils::utilities::composites::dft::aggregators::
829
        aggregation_intern()
830
            .enable_deterministic_ids();
×
831
    Py_RETURN_NONE;
×
832
}
833

834
static PyObject *move_artifacts_fn(PyObject * /*self*/, PyObject *args,
×
835
                                   PyObject *kwds) {
836
    static const char *kwlist[] = {"artifacts", "dest_dir", NULL};
837
    PyObject *dict = NULL;
×
838
    const char *dest_dir = NULL;
×
839
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "Os", (char **)kwlist, &dict,
×
840
                                     &dest_dir)) {
841
        return NULL;
×
842
    }
843
    IndexDatabaseSstWriterContext::Artifacts a;
×
844
    if (!artifacts_from_dict(dict, &a)) return NULL;
×
845
    IndexDatabaseSstWriterContext::Artifacts moved;
×
846
    try {
847
        Py_BEGIN_ALLOW_THREADS moved = std::move(a).move_to(dest_dir);
×
848
        Py_END_ALLOW_THREADS
×
849
    } catch (const std::exception &e) {
×
850
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
851
        return NULL;
×
852
    }
×
853
    return artifacts_to_dict(moved);
×
854
}
×
855

856
namespace {
857

858
dftracer::utils::coro::CoroTask<void> scan_one_gzip_file(
×
859
    std::string path, std::vector<GzipMember> *out) {
×
860
    out->clear();
861
    int fd = ::open(path.c_str(), O_RDONLY);
×
862
    if (fd < 0) co_return;
×
863
    struct stat st;
864
    if (::fstat(fd, &st) == 0 && st.st_size >= 18) {
×
865
        co_await enumerate_gzip_member_candidates(
×
866
            fd, static_cast<std::uint64_t>(st.st_size), *out);
867
    }
868
    ::close(fd);
×
869
}
×
870

871
}  // namespace
872

873
static PyObject *enumerate_gzip_members_fn(PyObject * /*self*/, PyObject *args,
×
874
                                           PyObject *kwds) {
875
    static const char *kwlist[] = {"files", "runtime", NULL};
876
    PyObject *files_obj = NULL;
×
877
    PyObject *runtime_arg = NULL;
×
878
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|O", (char **)kwlist,
×
879
                                     &files_obj, &runtime_arg)) {
880
        return NULL;
×
881
    }
882

883
    std::vector<std::string> files;
×
884
    {
885
        PyObject *seq = PySequence_Fast(files_obj, "files must be a sequence");
×
886
        if (!seq) return NULL;
×
887
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
888
        files.reserve(n);
×
889
        for (Py_ssize_t i = 0; i < n; ++i) {
×
890
            const char *s = PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(seq, i));
×
891
            if (!s) {
×
892
                Py_DECREF(seq);
×
893
                return NULL;
×
894
            }
895
            files.emplace_back(s);
×
896
        }
897
        Py_DECREF(seq);
×
898
    }
899

900
    Runtime *rt = nullptr;
×
901
    if (runtime_arg && runtime_arg != Py_None) {
×
902
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
903
            rt = ((RuntimeObject *)runtime_arg)->runtime.get();
×
904
        } else {
905
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
906
            if (!native || !PyObject_TypeCheck(native, &RuntimeType)) {
×
907
                Py_XDECREF(native);
×
908
                PyErr_SetString(PyExc_TypeError,
×
909
                                "runtime must be a Runtime instance or None");
910
                return NULL;
×
911
            }
912
            rt = ((RuntimeObject *)native)->runtime.get();
×
913
            Py_DECREF(native);
×
914
        }
915
    } else {
×
916
        rt = get_default_runtime();
×
917
    }
918

919
    std::vector<std::vector<GzipMember>> results(files.size());
×
920
    std::string submit_error;
×
921
    Py_BEGIN_ALLOW_THREADS try {
×
922
        rt->submit(
×
923
              dftracer::utils::run_coro_scope(
×
924
                  rt->executor(),
925
                  [](dftracer::utils::CoroScope &scope,
×
926
                     const std::vector<std::string> *paths,
927
                     std::vector<std::vector<GzipMember>> *out)
928
                      -> dftracer::utils::coro::CoroTask<void> {
×
929
                      co_await scope.scope(
×
930
                          [paths, out](dftracer::utils::CoroScope &child)
×
931
                              -> dftracer::utils::coro::CoroTask<void> {
×
932
                              for (std::size_t i = 0; i < paths->size(); ++i) {
×
933
                                  const std::string &path = (*paths)[i];
934
                                  auto *slot = &(*out)[i];
935
                                  child.spawn(
×
936
                                      [path, slot](dftracer::utils::CoroScope &)
×
937
                                          -> dftracer::utils::coro::CoroTask<
938
                                              void> {
×
939
                                          co_await scan_one_gzip_file(path,
×
940
                                                                      slot);
941
                                      });
×
942
                              }
943
                              co_return;
944
                          });
×
945
                      co_return;
946
                  },
×
947
                  &files, &results),
948
              "enumerate-gzip-members")
×
949
            .get();
×
950
    } catch (const std::exception &e) {
×
951
        submit_error = e.what();
×
952
    }
×
953
    Py_END_ALLOW_THREADS if (!submit_error.empty()) {
×
954
        PyErr_SetString(PyExc_RuntimeError, submit_error.c_str());
×
955
        return NULL;
×
956
    }
957

958
    PyObject *out_list = PyList_New(static_cast<Py_ssize_t>(results.size()));
×
959
    if (!out_list) return NULL;
×
960
    for (std::size_t i = 0; i < results.size(); ++i) {
×
961
        const auto &mv = results[i];
×
962
        PyObject *inner = PyList_New(static_cast<Py_ssize_t>(mv.size()));
×
963
        if (!inner) {
×
964
            Py_DECREF(out_list);
×
965
            return NULL;
×
966
        }
967
        for (std::size_t j = 0; j < mv.size(); ++j) {
×
968
            PyObject *t =
969
                Py_BuildValue("(KK)", (unsigned long long)mv[j].c_offset,
×
970
                              (unsigned long long)mv[j].c_size);
×
971
            if (!t) {
×
972
                Py_DECREF(inner);
×
973
                Py_DECREF(out_list);
×
974
                return NULL;
×
975
            }
976
            PyList_SET_ITEM(inner, j, t);
×
977
        }
978
        PyList_SET_ITEM(out_list, i, inner);
×
979
    }
980
    return out_list;
×
981
}
×
982

983
// Mirrors build_work_units + lpt_assign_units in dftracer_aggregator_mpi.cpp
984
// so the Dask backend produces identical work distribution to MPI.
985
static PyObject *plan_work_units_fn(PyObject * /*self*/, PyObject *args,
×
986
                                    PyObject *kwds) {
987
    static const char *kwlist[] = {"member_map", "num_workers", "target_c_size",
988
                                   NULL};
989
    PyObject *map_obj = NULL;
×
990
    Py_ssize_t num_workers = 0;
×
991
    unsigned long long target_c_size = 0;
×
992
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "On|K", (char **)kwlist,
×
993
                                     &map_obj, &num_workers, &target_c_size)) {
994
        return NULL;
×
995
    }
996
    if (num_workers <= 0) num_workers = 1;
×
997

998
    std::vector<std::vector<GzipMember>> member_map;
×
999
    {
1000
        PyObject *seq =
1001
            PySequence_Fast(map_obj, "member_map must be a sequence");
×
1002
        if (!seq) return NULL;
×
1003
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
1004
        member_map.resize(n);
×
1005
        for (Py_ssize_t i = 0; i < n; ++i) {
×
1006
            PyObject *inner = PySequence_Fast_GET_ITEM(seq, i);
×
1007
            PyObject *iseq =
1008
                PySequence_Fast(inner, "member_map[i] must be a sequence");
×
1009
            if (!iseq) {
×
1010
                Py_DECREF(seq);
×
1011
                return NULL;
×
1012
            }
1013
            Py_ssize_t ni = PySequence_Fast_GET_SIZE(iseq);
×
1014
            member_map[i].resize(ni);
×
1015
            for (Py_ssize_t j = 0; j < ni; ++j) {
×
1016
                PyObject *t = PySequence_Fast_GET_ITEM(iseq, j);
×
1017
                unsigned long long c_offset = 0, c_size = 0;
×
1018
                if (!PyArg_ParseTuple(t, "KK", &c_offset, &c_size)) {
×
1019
                    Py_DECREF(iseq);
×
1020
                    Py_DECREF(seq);
×
1021
                    return NULL;
×
1022
                }
1023
                member_map[i][j].c_offset =
×
1024
                    static_cast<std::uint64_t>(c_offset);
1025
                member_map[i][j].c_size = static_cast<std::uint64_t>(c_size);
×
1026
            }
1027
            Py_DECREF(iseq);
×
1028
        }
1029
        Py_DECREF(seq);
×
1030
    }
1031

1032
    // Fallback: treat empty/non-gzip files as a single whole-file member.
1033
    std::uint64_t total_c = 0;
×
1034
    for (auto &mv : member_map) {
×
1035
        if (mv.empty()) mv.push_back({0, 0});
×
1036
        for (const auto &m : mv) total_c += m.c_size;
×
1037
    }
1038

1039
    if (target_c_size == 0) {
×
1040
        target_c_size =
×
1041
            (total_c + static_cast<std::uint64_t>(num_workers) - 1) /
×
1042
            std::max<std::uint64_t>(static_cast<std::uint64_t>(num_workers), 1);
×
1043
    }
1044

1045
    struct Unit {
1046
        std::size_t file_idx;
1047
        std::size_t member_begin;
1048
        std::size_t member_end;
1049
        std::uint64_t c_size;
1050
    };
1051
    std::vector<Unit> units;
×
1052
    for (std::size_t fi = 0; fi < member_map.size(); ++fi) {
×
1053
        const auto &members = member_map[fi];
×
1054
        if (members.empty()) continue;
×
1055
        std::size_t begin = 0;
×
1056
        std::uint64_t accum = 0;
×
1057
        for (std::size_t i = 0; i < members.size(); ++i) {
×
1058
            accum += members[i].c_size;
×
1059
            const bool is_last = (i + 1 == members.size());
×
1060
            if ((target_c_size > 0 && accum >= target_c_size) || is_last) {
×
1061
                units.push_back({fi, begin, i + 1, accum});
×
1062
                begin = i + 1;
×
1063
                accum = 0;
×
1064
            }
1065
        }
1066
    }
1067

1068
    std::vector<std::size_t> order(units.size());
×
1069
    for (std::size_t i = 0; i < order.size(); ++i) order[i] = i;
×
1070
    std::sort(order.begin(), order.end(), [&](std::size_t a, std::size_t b) {
×
1071
        if (units[a].c_size != units[b].c_size)
×
1072
            return units[a].c_size > units[b].c_size;
×
1073
        if (units[a].file_idx != units[b].file_idx)
×
1074
            return units[a].file_idx < units[b].file_idx;
×
1075
        return units[a].member_begin < units[b].member_begin;
×
1076
    });
1077
    const std::size_t nw = static_cast<std::size_t>(num_workers);
×
1078
    std::vector<std::uint64_t> loads(nw, 0);
×
1079
    std::vector<std::vector<std::size_t>> per_worker(nw);
×
1080
    for (std::size_t ord : order) {
×
1081
        std::size_t best = 0;
×
1082
        for (std::size_t r = 1; r < nw; ++r)
×
1083
            if (loads[r] < loads[best]) best = r;
×
1084
        per_worker[best].push_back(ord);
×
1085
        loads[best] += std::max<std::uint64_t>(units[ord].c_size, 1);
×
1086
    }
1087

1088
    PyObject *out = PyList_New(static_cast<Py_ssize_t>(nw));
×
1089
    if (!out) return NULL;
×
1090
    for (std::size_t w = 0; w < nw; ++w) {
×
1091
        // Keep per-worker slices sorted by (file_idx, member_begin) for
1092
        // deterministic, file-group-friendly iteration downstream.
1093
        auto &lst = per_worker[w];
×
1094
        std::sort(lst.begin(), lst.end(), [&](std::size_t a, std::size_t b) {
×
1095
            if (units[a].file_idx != units[b].file_idx)
×
1096
                return units[a].file_idx < units[b].file_idx;
×
1097
            return units[a].member_begin < units[b].member_begin;
×
1098
        });
1099
        PyObject *inner = PyList_New(static_cast<Py_ssize_t>(lst.size()));
×
1100
        if (!inner) {
×
1101
            Py_DECREF(out);
×
1102
            return NULL;
×
1103
        }
1104
        for (std::size_t k = 0; k < lst.size(); ++k) {
×
1105
            const auto &u = units[lst[k]];
×
1106
            PyObject *t = Py_BuildValue(
×
1107
                "(nnnK)", (Py_ssize_t)u.file_idx, (Py_ssize_t)u.member_begin,
×
1108
                (Py_ssize_t)u.member_end, (unsigned long long)u.c_size);
×
1109
            if (!t) {
×
1110
                Py_DECREF(inner);
×
1111
                Py_DECREF(out);
×
1112
                return NULL;
×
1113
            }
1114
            PyList_SET_ITEM(inner, k, t);
×
1115
        }
1116
        PyList_SET_ITEM(out, w, inner);
×
1117
    }
1118
    return out;
×
1119
}
×
1120

1121
// ---------------------------------------------------------------------------
1122
// Module registration
1123
// ---------------------------------------------------------------------------
1124

1125
static PyMethodDef SstDistributionMethods[] = {
1126
    {"build_sst_batch", (PyCFunction)build_sst_batch_fn,
1127
     METH_VARARGS | METH_KEYWORDS,
1128
     "build_sst_batch(files, file_ids, staging_dir, batch_id, ...) "
1129
     "-> (list[dict], bytes)\n"
1130
     "Run the indexer pipeline with an SST sink and return "
1131
     "(artifact_dicts, tracker_blob). The tracker blob is the serialized "
1132
     "merged AssociationTracker from this batch's aggregation visitors "
1133
     "(empty bytes when no aggregation_config was passed)."},
1134
    {"plan_lpt_partition", (PyCFunction)plan_lpt_partition_fn, METH_VARARGS,
1135
     "plan_lpt_partition(entries, num_workers) -> list[list[(path, size)]]\n"
1136
     "Greedy Longest-Processing-Time-first bin-packing of (path, size) "
1137
     "tuples across num_workers buckets. Minimises the maximum per-worker "
1138
     "total size."},
1139
    {"scan_files", (PyCFunction)scan_files_fn, METH_VARARGS | METH_KEYWORDS,
1140
     "scan_files(directory, patterns=None, recursive=False, runtime=None) "
1141
     "-> list[(path, size)]\n"
1142
     "Parallel directory scan returning (path, size) tuples for regular "
1143
     "files matching the patterns."},
1144
    {"enable_aggregation_deterministic_ids",
1145
     (PyCFunction)enable_aggregation_deterministic_ids_fn, METH_NOARGS,
1146
     "enable_aggregation_deterministic_ids() -> None\n"
1147
     "Flip the global aggregation StringIntern into deterministic-id mode "
1148
     "so the same string maps to the same 32-bit id in every worker "
1149
     "process. Call once at worker startup BEFORE any aggregation work."},
1150
    {"move_artifacts", (PyCFunction)move_artifacts_fn,
1151
     METH_VARARGS | METH_KEYWORDS,
1152
     "move_artifacts(artifacts, dest_dir) -> dict\n"
1153
     "Move every populated SST in `artifacts` (as returned by "
1154
     "`build_sst_batch`) into `dest_dir` via the C++ rename/copy helper, "
1155
     "returning a fresh dict with the new paths. Single GIL release, no "
1156
     "per-file Python shutil.move overhead."},
1157
    {"enumerate_gzip_members", (PyCFunction)enumerate_gzip_members_fn,
1158
     METH_VARARGS | METH_KEYWORDS,
1159
     "enumerate_gzip_members(files, runtime=None) -> list[list[(c_offset, "
1160
     "c_size)]]\n"
1161
     "Cooperative async scan of gzip member offsets across `files`. "
1162
     "Returns lists of (c_offset, c_size) parallel to `files`; empty for "
1163
     "non-gzip / unreadable files."},
1164
    {"plan_work_units", (PyCFunction)plan_work_units_fn,
1165
     METH_VARARGS | METH_KEYWORDS,
1166
     "plan_work_units(member_map, num_workers, target_c_size=0) "
1167
     "-> list[list[(file_idx, member_begin, member_end, c_size)]]\n"
1168
     "Deterministic LPT assignment of intra-file gzip-member slices "
1169
     "across workers. Each worker's list contains (file_idx, "
1170
     "member_begin, member_end, c_size) tuples; a file sliced across "
1171
     "multiple workers appears in each owner's list with disjoint "
1172
     "[member_begin, member_end) ranges."},
1173
    {NULL, NULL, 0, NULL}};
1174

1175
int init_sst_distribution(PyObject *m) {
2✔
1176
    if (register_type(m, &SstArtifactRegistryType, "SstArtifactRegistry") < 0)
2✔
UNCOV
1177
        return -1;
×
1178
    if (PyModule_AddFunctions(m, SstDistributionMethods) < 0) return -1;
2✔
1179
    return 0;
2✔
1180
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc