• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26195612357

20 May 2026 11:19PM UTC coverage: 49.859% (-2.3%) from 52.2%
26195612357

push

github

hariharan-devarajan
feat(aggregator): improve system metrics scanning and persistence error handling

16041 of 43831 branches covered (36.6%)

Branch coverage included in aggregate %.

6 of 17 new or added lines in 2 files covered. (35.29%)

1072 existing lines in 104 files now uncovered.

21423 of 31309 relevant lines covered (68.42%)

13054.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.57
/src/dftracer/utils/python/sst_distribution.cpp
1
#include <dftracer/utils/core/runtime.h>
2
#include <dftracer/utils/core/tasks/coro_scope.h>
3
#include <dftracer/utils/python/runtime.h>
4
#include <dftracer/utils/python/sst_distribution.h>
5
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_config.h>
6
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_key.h>
7
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_visitor.h>
8
#include <dftracer/utils/utilities/composites/dft/aggregators/association_tracker.h>
9
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
10
#include <dftracer/utils/utilities/indexer/file_partition.h>
11
#include <dftracer/utils/utilities/indexer/index_batch_sink.h>
12
#include <dftracer/utils/utilities/indexer/index_builder_utility.h>
13
#include <dftracer/utils/utilities/indexer/index_database_sst_writer_context.h>
14
#include <dftracer/utils/utilities/indexer/internal/common/gzip_member_scanner.h>
15
#include <fcntl.h>
16
#include <sys/stat.h>
17
#include <unistd.h>
18

19
#include <algorithm>
20
#include <atomic>
21
#include <cstdint>
22
#include <memory>
23
#include <mutex>
24
#include <new>
25
#include <optional>
26
#include <string>
27
#include <unordered_set>
28
#include <vector>
29

30
using dftracer::utils::Runtime;
31
using dftracer::utils::utilities::filesystem::FileEntry;
32
using dftracer::utils::utilities::filesystem::PatternDirectoryScannerUtility;
33
using dftracer::utils::utilities::filesystem::
34
    PatternDirectoryScannerUtilityInput;
35
using dftracer::utils::utilities::indexer::IndexBatchBuilderUtility;
36
using dftracer::utils::utilities::indexer::IndexBatchSink;
37
using dftracer::utils::utilities::indexer::IndexBuildBatchConfig;
38
using dftracer::utils::utilities::indexer::IndexBuildBatchResult;
39
using dftracer::utils::utilities::indexer::IndexDatabaseSstWriterContext;
40
using dftracer::utils::utilities::indexer::plan_lpt_partition;
41
using dftracer::utils::utilities::indexer::SstArtifactRegistry;
42
using dftracer::utils::utilities::indexer::internal::
43
    enumerate_gzip_member_candidates;
44
using dftracer::utils::utilities::indexer::internal::GzipMember;
45

46
// ---------------------------------------------------------------------------
47
// SstArtifactRegistry type
48
// ---------------------------------------------------------------------------
49

50
typedef struct {
51
    PyObject_HEAD std::shared_ptr<SstArtifactRegistry> registry;
52
} SstArtifactRegistryObject;
53

54
static void SstArtifactRegistry_dealloc(SstArtifactRegistryObject *self) {
×
55
    self->registry.~shared_ptr<SstArtifactRegistry>();
×
56
    Py_TYPE(self)->tp_free((PyObject *)self);
×
57
}
×
58

59
static PyObject *SstArtifactRegistry_new(PyTypeObject *type,
×
60
                                         PyObject * /*args*/,
61
                                         PyObject * /*kwds*/) {
62
    auto *self = (SstArtifactRegistryObject *)type->tp_alloc(type, 0);
×
63
    if (!self) return NULL;
×
64
    new (&self->registry) std::shared_ptr<SstArtifactRegistry>(
×
65
        std::make_shared<SstArtifactRegistry>());
×
66
    return (PyObject *)self;
×
67
}
68

69
namespace {
70

71
// Field names in the Artifacts dict returned by build_sst_batch and
72
// consumed by SstArtifactRegistry.append. Must match the field names on
73
// IndexDatabaseSstWriterContext::Artifacts.
74
constexpr const char *ARTIFACT_FIELDS[] = {
75
    "metadata_sst",        "checkpoints_sst",         "manifest_sst",
76
    "chunk_bloom_sst",     "file_bloom_sst",          "chunk_stats_sst",
77
    "chunk_dim_stats_sst", "dimensions_sst",          "file_scalar_stats_sst",
78
    "file_cat_counts_sst", "file_pid_tid_counts_sst", "file_name_counts_sst",
79
    "name_dictionary_sst", "name_file_postings_sst",  "name_chunk_postings_sst",
80
    "hash_tables_sst",     "aggregation_sst",         "system_metrics_sst",
81
};
82

83
/// Map a slot name to the matching Artifacts member. Kept in one place so
84
/// that adding a new CF requires updating only `ARTIFACT_FIELDS` plus
85
/// `dispatch_*` below.
86
std::optional<std::string> *artifacts_slot(
×
87
    IndexDatabaseSstWriterContext::Artifacts &a, std::string_view name) {
88
    if (name == "metadata_sst") return &a.metadata_sst;
×
89
    if (name == "checkpoints_sst") return &a.checkpoints_sst;
×
90
    if (name == "manifest_sst") return &a.manifest_sst;
×
91
    if (name == "chunk_bloom_sst") return &a.chunk_bloom_sst;
×
92
    if (name == "file_bloom_sst") return &a.file_bloom_sst;
×
93
    if (name == "chunk_stats_sst") return &a.chunk_stats_sst;
×
94
    if (name == "chunk_dim_stats_sst") return &a.chunk_dim_stats_sst;
×
95
    if (name == "dimensions_sst") return &a.dimensions_sst;
×
96
    if (name == "file_scalar_stats_sst") return &a.file_scalar_stats_sst;
×
97
    if (name == "file_cat_counts_sst") return &a.file_cat_counts_sst;
×
98
    if (name == "file_pid_tid_counts_sst") return &a.file_pid_tid_counts_sst;
×
99
    if (name == "file_name_counts_sst") return &a.file_name_counts_sst;
×
100
    if (name == "name_dictionary_sst") return &a.name_dictionary_sst;
×
101
    if (name == "name_file_postings_sst") return &a.name_file_postings_sst;
×
102
    if (name == "name_chunk_postings_sst") return &a.name_chunk_postings_sst;
×
103
    if (name == "hash_tables_sst") return &a.hash_tables_sst;
×
104
    if (name == "aggregation_sst") return &a.aggregation_sst;
×
105
    if (name == "system_metrics_sst") return &a.system_metrics_sst;
×
106
    return nullptr;
×
107
}
108

109
/// Convert a Python artifacts dict to the C++ Artifacts struct. Missing,
110
/// None, or empty-string entries become nullopt. Returns false on type
111
/// errors (exception set).
112
bool artifacts_from_dict(PyObject *dict,
×
113
                         IndexDatabaseSstWriterContext::Artifacts *out) {
114
    if (!PyDict_Check(dict)) {
×
115
        PyErr_SetString(PyExc_TypeError, "artifacts must be a dict");
×
116
        return false;
×
117
    }
118
    for (const char *field : ARTIFACT_FIELDS) {
×
119
        PyObject *val = PyDict_GetItemString(dict, field);  // borrowed
×
120
        if (!val || val == Py_None) continue;
×
121
        if (!PyUnicode_Check(val)) {
×
122
            PyErr_Format(PyExc_TypeError, "artifacts['%s'] must be str or None",
×
123
                         field);
124
            return false;
×
125
        }
126
        const char *s = PyUnicode_AsUTF8(val);
×
127
        if (!s) return false;
×
128
        if (s[0] == '\0') continue;
×
129
        auto *slot = artifacts_slot(*out, field);
×
130
        if (slot) *slot = std::string(s);
×
131
    }
132
    return true;
×
133
}
134

135
PyObject *artifacts_to_dict(const IndexDatabaseSstWriterContext::Artifacts &a) {
×
136
    PyObject *dict = PyDict_New();
×
137
    if (!dict) return NULL;
×
138
    auto set_field = [&](const char *name,
×
139
                         const std::optional<std::string> &slot) -> bool {
140
        PyObject *v = slot.has_value() ? PyUnicode_FromString(slot->c_str())
×
141
                                       : (Py_INCREF(Py_None), Py_None);
×
142
        if (!v) return false;
×
143
        int rc = PyDict_SetItemString(dict, name, v);
×
144
        Py_DECREF(v);
145
        return rc == 0;
×
146
    };
×
147
    if (!set_field("metadata_sst", a.metadata_sst) ||
×
148
        !set_field("checkpoints_sst", a.checkpoints_sst) ||
×
149
        !set_field("manifest_sst", a.manifest_sst) ||
×
150
        !set_field("chunk_bloom_sst", a.chunk_bloom_sst) ||
×
151
        !set_field("file_bloom_sst", a.file_bloom_sst) ||
×
152
        !set_field("chunk_stats_sst", a.chunk_stats_sst) ||
×
153
        !set_field("chunk_dim_stats_sst", a.chunk_dim_stats_sst) ||
×
154
        !set_field("dimensions_sst", a.dimensions_sst) ||
×
155
        !set_field("file_scalar_stats_sst", a.file_scalar_stats_sst) ||
×
156
        !set_field("file_cat_counts_sst", a.file_cat_counts_sst) ||
×
157
        !set_field("file_pid_tid_counts_sst", a.file_pid_tid_counts_sst) ||
×
158
        !set_field("file_name_counts_sst", a.file_name_counts_sst) ||
×
159
        !set_field("name_dictionary_sst", a.name_dictionary_sst) ||
×
160
        !set_field("name_file_postings_sst", a.name_file_postings_sst) ||
×
161
        !set_field("name_chunk_postings_sst", a.name_chunk_postings_sst) ||
×
162
        !set_field("hash_tables_sst", a.hash_tables_sst) ||
×
163
        !set_field("aggregation_sst", a.aggregation_sst) ||
×
164
        !set_field("system_metrics_sst", a.system_metrics_sst)) {
×
165
        Py_DECREF(dict);
×
166
        return NULL;
×
167
    }
168
    return dict;
×
169
}
170

171
}  // namespace
172

173
static PyObject *SstArtifactRegistry_append(SstArtifactRegistryObject *self,
×
174
                                            PyObject *args) {
175
    PyObject *dict;
176
    if (!PyArg_ParseTuple(args, "O", &dict)) return NULL;
×
177
    IndexDatabaseSstWriterContext::Artifacts a;
×
178
    if (!artifacts_from_dict(dict, &a)) return NULL;
×
179
    self->registry->append(std::move(a));
×
180
    Py_RETURN_NONE;
×
181
}
×
182

183
static PyMethodDef SstArtifactRegistry_methods[] = {
184
    {"append", (PyCFunction)SstArtifactRegistry_append, METH_VARARGS,
185
     "append(artifacts_dict) -> None\n"
186
     "Add a per-batch Artifacts dict (as returned by build_sst_batch or "
187
     "IndexDatabaseSstWriterContext.commit) to the registry."},
188
    {NULL}};
189

190
static PyTypeObject SstArtifactRegistryType = {
191
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.SstArtifactRegistry",
192
    sizeof(SstArtifactRegistryObject),
193
    0,
194
    (destructor)SstArtifactRegistry_dealloc,
195
    0,
196
    0,
197
    0,
198
    0,
199
    0,
200
    0,
201
    0,
202
    0,
203
    0,
204
    0,
205
    0,
206
    0,
207
    0,
208
    0,
209
    Py_TPFLAGS_DEFAULT,
210
    "Thread-safe collector for SST artifact paths.",
211
    0,
212
    0,
213
    0,
214
    0,
215
    0,
216
    0,
217
    SstArtifactRegistry_methods,
218
    0,
219
    0,
220
    0,
221
    0,
222
    0,
223
    0,
224
    0,
225
    0,
226
    0,
227
    SstArtifactRegistry_new,
228
};
229

230
SstArtifactRegistry *sst_artifact_registry_get(PyObject *obj) {
×
231
    if (!PyObject_TypeCheck(obj, &SstArtifactRegistryType)) return nullptr;
×
232
    return ((SstArtifactRegistryObject *)obj)->registry.get();
×
233
}
234

235
// ---------------------------------------------------------------------------
236
// scan_files: parallel directory scan with size info
237
// ---------------------------------------------------------------------------
238

239
static PyObject *scan_files_fn(PyObject * /*self*/, PyObject *args,
×
240
                               PyObject *kwds) {
241
    static const char *kwlist[] = {"directory", "patterns", "recursive",
242
                                   "runtime", NULL};
243
    const char *directory;
244
    PyObject *patterns_obj = NULL;
×
245
    int recursive = 0;
×
246
    PyObject *runtime_arg = NULL;
×
247
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|OpO", (char **)kwlist,
×
248
                                     &directory, &patterns_obj, &recursive,
249
                                     &runtime_arg)) {
250
        return NULL;
×
251
    }
252

253
    std::vector<std::string> patterns;
×
254
    if (patterns_obj && patterns_obj != Py_None) {
×
255
        PyObject *seq =
256
            PySequence_Fast(patterns_obj, "patterns must be a sequence");
×
257
        if (!seq) return NULL;
×
258
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
259
        patterns.reserve(n);
×
260
        for (Py_ssize_t i = 0; i < n; ++i) {
×
261
            const char *s = PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(seq, i));
×
262
            if (!s) {
×
263
                Py_DECREF(seq);
264
                return NULL;
×
265
            }
266
            patterns.emplace_back(s);
×
267
        }
268
        Py_DECREF(seq);
269
    }
270

271
    Runtime *rt = nullptr;
×
272
    if (runtime_arg && runtime_arg != Py_None) {
×
273
        if (!PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
274
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
275
            if (!native || !PyObject_TypeCheck(native, &RuntimeType)) {
×
276
                Py_XDECREF(native);
×
277
                PyErr_SetString(PyExc_TypeError,
×
278
                                "runtime must be a Runtime instance or None");
279
                return NULL;
×
280
            }
281
            rt = ((RuntimeObject *)native)->runtime.get();
×
282
            Py_DECREF(native);
283
        } else {
284
            rt = ((RuntimeObject *)runtime_arg)->runtime.get();
×
285
        }
286
    } else {
×
287
        rt = get_default_runtime();
×
288
    }
289

290
    PatternDirectoryScannerUtilityInput input(directory, patterns,
291
                                              recursive != 0, true);
×
292
    std::vector<FileEntry> entries;
×
293
    try {
294
        Py_BEGIN_ALLOW_THREADS rt
×
295
            ->submit(dftracer::utils::run_coro_scope(
×
296
                         rt->executor(),
297
                         [](dftracer::utils::CoroScope &scope,
×
298
                            PatternDirectoryScannerUtilityInput in,
299
                            std::vector<FileEntry> *out)
300
                             -> dftracer::utils::coro::CoroTask<void> {
301
                             PatternDirectoryScannerUtility scanner;
302
                             *out = co_await scope.spawn(scanner, in);
303
                         },
×
304
                         std::move(input), &entries),
×
305
                     "scan-files")
306
            .get();
×
307
        Py_END_ALLOW_THREADS
×
308
    } catch (const std::exception &e) {
×
309
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
310
        return NULL;
×
311
    }
×
312

313
    PyObject *out = PyList_New(static_cast<Py_ssize_t>(entries.size()));
×
314
    if (!out) return NULL;
×
315
    for (std::size_t i = 0; i < entries.size(); ++i) {
×
316
        PyObject *t = Py_BuildValue("(sn)", entries[i].path.c_str(),
×
317
                                    (Py_ssize_t)entries[i].size);
×
318
        if (!t) {
×
319
            Py_DECREF(out);
320
            return NULL;
×
321
        }
322
        PyList_SET_ITEM(out, i, t);
×
323
    }
324
    return out;
×
325
}
×
326

327
// ---------------------------------------------------------------------------
328
// plan_lpt_partition: LPT bin-packing of (path, size) pairs
329
// ---------------------------------------------------------------------------
330

331
static PyObject *plan_lpt_partition_fn(PyObject * /*self*/, PyObject *args) {
×
332
    PyObject *entries_obj;
333
    Py_ssize_t num_workers;
334
    if (!PyArg_ParseTuple(args, "On", &entries_obj, &num_workers)) return NULL;
×
335
    if (num_workers <= 0) num_workers = 1;
×
336

337
    std::vector<FileEntry> entries;
×
338
    PyObject *seq = PySequence_Fast(entries_obj,
×
339
                                    "entries must be a sequence of "
340
                                    "(path, size) tuples");
341
    if (!seq) return NULL;
×
342
    Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
343
    entries.reserve(n);
×
344
    for (Py_ssize_t i = 0; i < n; ++i) {
×
345
        PyObject *item = PySequence_Fast_GET_ITEM(seq, i);
×
346
        const char *path = nullptr;
×
347
        Py_ssize_t size = 0;
×
348
        if (!PyArg_ParseTuple(item, "sn", &path, &size)) {
×
349
            Py_DECREF(seq);
350
            return NULL;
×
351
        }
352
        FileEntry fe;
×
353
        fe.path = path;
×
354
        fe.size = static_cast<std::size_t>(size);
×
355
        fe.is_regular_file = true;
×
356
        entries.push_back(std::move(fe));
×
357
    }
×
358
    Py_DECREF(seq);
359

360
    auto buckets = plan_lpt_partition(std::move(entries),
×
361
                                      static_cast<std::size_t>(num_workers));
×
362

363
    PyObject *out = PyList_New(static_cast<Py_ssize_t>(buckets.size()));
×
364
    if (!out) return NULL;
×
365
    for (std::size_t i = 0; i < buckets.size(); ++i) {
×
366
        PyObject *lst = PyList_New(static_cast<Py_ssize_t>(buckets[i].size()));
×
367
        if (!lst) {
×
368
            Py_DECREF(out);
369
            return NULL;
×
370
        }
371
        for (std::size_t j = 0; j < buckets[i].size(); ++j) {
×
372
            PyObject *t = Py_BuildValue("(sn)", buckets[i][j].path.c_str(),
×
373
                                        (Py_ssize_t)buckets[i][j].size);
×
374
            if (!t) {
×
375
                Py_DECREF(lst);
376
                Py_DECREF(out);
377
                return NULL;
×
378
            }
379
            PyList_SET_ITEM(lst, j, t);
×
380
        }
381
        PyList_SET_ITEM(out, i, lst);
×
382
    }
383
    return out;
×
384
}
×
385

386
// ---------------------------------------------------------------------------
387
// build_sst_batch: run the indexer pipeline with an SST sink and return
388
// the merged Artifacts dict.
389
// ---------------------------------------------------------------------------
390

391
static PyObject *build_sst_batch_fn(PyObject * /*self*/, PyObject *args,
×
392
                                    PyObject *kwds) {
393
    static const char *kwlist[] = {"files",
394
                                   "file_ids",
395
                                   "staging_dir",
396
                                   "batch_id",
397
                                   "index_dir",
398
                                   "checkpoint_size",
399
                                   "build_manifest",
400
                                   "force_rebuild",
401
                                   "bloom_dimensions",
402
                                   "parallelism",
403
                                   "flush_every_files",
404
                                   "runtime",
405
                                   "aggregation_config",
406
                                   "file_slices",
407
                                   NULL};
408
    PyObject *files_obj;
409
    PyObject *file_ids_obj;
410
    const char *staging_dir;
411
    const char *batch_id;
412
    const char *index_dir = "";
×
413
    Py_ssize_t checkpoint_size = 32 * 1024 * 1024;
×
414
    int build_manifest = 0;
×
415
    int force_rebuild = 0;
×
416
    PyObject *bloom_dims_obj = NULL;
×
417
    Py_ssize_t parallelism = 0;
×
418
    Py_ssize_t flush_every_files = 0;
×
419
    PyObject *runtime_arg = NULL;
×
420
    PyObject *aggregation_config_obj = NULL;
×
421
    PyObject *file_slices_obj = NULL;
×
422

423
    if (!PyArg_ParseTupleAndKeywords(
×
424
            args, kwds, "OOss|snppOnnOOO", (char **)kwlist, &files_obj,
425
            &file_ids_obj, &staging_dir, &batch_id, &index_dir,
426
            &checkpoint_size, &build_manifest, &force_rebuild, &bloom_dims_obj,
427
            &parallelism, &flush_every_files, &runtime_arg,
428
            &aggregation_config_obj, &file_slices_obj)) {
429
        return NULL;
×
430
    }
431

432
    // Unpack files.
433
    std::vector<std::string> files;
×
434
    {
435
        PyObject *seq = PySequence_Fast(files_obj, "files must be a sequence");
×
436
        if (!seq) return NULL;
×
437
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
438
        files.reserve(n);
×
439
        for (Py_ssize_t i = 0; i < n; ++i) {
×
440
            const char *s = PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(seq, i));
×
441
            if (!s) {
×
442
                Py_DECREF(seq);
443
                return NULL;
×
444
            }
445
            files.emplace_back(s);
×
446
        }
447
        Py_DECREF(seq);
448
    }
449
    if (files.empty()) {
×
450
        return PyDict_New();
×
451
    }
452

453
    // Unpack file_ids, parallel to files.
454
    std::vector<int> file_ids;
×
455
    {
456
        PyObject *seq =
457
            PySequence_Fast(file_ids_obj, "file_ids must be a sequence");
×
458
        if (!seq) return NULL;
×
459
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
460
        if (static_cast<std::size_t>(n) != files.size()) {
×
461
            Py_DECREF(seq);
462
            PyErr_SetString(PyExc_ValueError,
×
463
                            "file_ids must have the same length as files");
464
            return NULL;
×
465
        }
466
        file_ids.reserve(n);
×
467
        for (Py_ssize_t i = 0; i < n; ++i) {
×
468
            long v = PyLong_AsLong(PySequence_Fast_GET_ITEM(seq, i));
×
469
            if (v == -1 && PyErr_Occurred()) {
×
470
                Py_DECREF(seq);
471
                return NULL;
×
472
            }
473
            file_ids.push_back(static_cast<int>(v));
×
474
        }
475
        Py_DECREF(seq);
476
    }
477

478
    // Optional bloom dimensions override.
479
    std::vector<std::string> bloom_dims;
×
480
    if (bloom_dims_obj && bloom_dims_obj != Py_None) {
×
481
        PyObject *seq = PySequence_Fast(bloom_dims_obj,
×
482
                                        "bloom_dimensions must be a sequence");
483
        if (!seq) return NULL;
×
484
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
485
        bloom_dims.reserve(n);
×
486
        for (Py_ssize_t i = 0; i < n; ++i) {
×
487
            const char *s = PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(seq, i));
×
488
            if (!s) {
×
489
                Py_DECREF(seq);
490
                return NULL;
×
491
            }
492
            bloom_dims.emplace_back(s);
×
493
        }
494
        Py_DECREF(seq);
495
    }
496

497
    // Resolve Runtime (matching CheckpointIndexer pattern).
498
    Runtime *rt = nullptr;
×
499
    if (runtime_arg && runtime_arg != Py_None) {
×
500
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
501
            rt = ((RuntimeObject *)runtime_arg)->runtime.get();
×
502
        } else {
503
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
504
            if (!native || !PyObject_TypeCheck(native, &RuntimeType)) {
×
505
                Py_XDECREF(native);
×
506
                PyErr_SetString(PyExc_TypeError,
×
507
                                "runtime must be a Runtime instance or None");
508
                return NULL;
×
509
            }
510
            rt = ((RuntimeObject *)native)->runtime.get();
×
511
            Py_DECREF(native);
512
        }
513
    } else {
×
514
        rt = get_default_runtime();
×
515
    }
516

517
    // Build config + sink factory shared state.
518
    struct SharedArtifacts {
519
        std::mutex mu;
520
        std::vector<IndexDatabaseSstWriterContext::Artifacts> list;
521
    };
522
    auto artifacts = std::make_shared<SharedArtifacts>();
×
523
    auto staging = std::string(staging_dir);
×
524
    auto batch = std::string(batch_id);
×
525

526
    // Optional aggregation config, extracted from the Python dataclass.
527
    std::shared_ptr<dftracer::utils::utilities::composites::dft::aggregators::
528
                        AggregationConfig>
529
        agg_config_ptr;
×
530
    if (aggregation_config_obj && aggregation_config_obj != Py_None) {
×
531
        using dftracer::utils::utilities::composites::dft::aggregators::
532
            AggregationConfig;
533
        auto cfg = std::make_shared<AggregationConfig>();
×
534
        auto pull_double = [&](const char *name, double fallback) -> double {
×
535
            PyObject *v = PyObject_GetAttrString(aggregation_config_obj, name);
×
536
            if (!v || v == Py_None) {
×
537
                Py_XDECREF(v);
×
538
                PyErr_Clear();
×
539
                return fallback;
×
540
            }
541
            double out = PyFloat_AsDouble(v);
×
542
            Py_DECREF(v);
543
            if (out == -1.0 && PyErr_Occurred()) return fallback;
×
544
            return out;
×
545
        };
×
546
        auto pull_bool = [&](const char *name, bool fallback) -> bool {
×
547
            PyObject *v = PyObject_GetAttrString(aggregation_config_obj, name);
×
548
            if (!v || v == Py_None) {
×
549
                Py_XDECREF(v);
×
550
                PyErr_Clear();
×
551
                return fallback;
×
552
            }
553
            int out = PyObject_IsTrue(v);
×
554
            Py_DECREF(v);
555
            return out > 0 ? true : fallback;
×
556
        };
×
557
        auto pull_string_list =
558
            [&](const char *name) -> std::vector<std::string> {
×
559
            std::vector<std::string> out;
×
560
            PyObject *v = PyObject_GetAttrString(aggregation_config_obj, name);
×
561
            if (!v || v == Py_None) {
×
562
                Py_XDECREF(v);
×
563
                PyErr_Clear();
×
564
                return out;
×
565
            }
566
            PyObject *seq = PySequence_Fast(v, "expected list of str");
×
567
            Py_DECREF(v);
568
            if (!seq) {
×
569
                PyErr_Clear();
×
570
                return out;
×
571
            }
572
            Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
573
            out.reserve(n);
×
574
            for (Py_ssize_t i = 0; i < n; ++i) {
×
575
                const char *s =
576
                    PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(seq, i));
×
577
                if (s) out.emplace_back(s);
×
578
            }
579
            Py_DECREF(seq);
580
            return out;
×
581
        };
×
582
        double time_interval_ms = pull_double("time_interval_ms", 5000.0);
×
583
        cfg->time_interval_us =
×
584
            static_cast<std::uint64_t>(time_interval_ms * 1000.0);
×
585
        cfg->compute_percentiles = pull_bool("compute_percentiles", false);
×
586
        cfg->extra_group_keys = pull_string_list("group_keys");
×
587
        cfg->custom_metric_fields = pull_string_list("custom_metric_fields");
×
588
        agg_config_ptr = std::move(cfg);
×
589
    }
×
590

591
    // owned_member_maps must outlive rt->submit: FileSlice::members is raw.
592
    std::vector<std::vector<GzipMember>> owned_member_maps;
×
593
    std::vector<IndexBuildBatchConfig::FileSlice> parsed_slices;
×
594
    if (file_slices_obj && file_slices_obj != Py_None) {
×
595
        PyObject *seq =
596
            PySequence_Fast(file_slices_obj, "file_slices must be a sequence");
×
597
        if (!seq) return NULL;
×
598
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
599
        if (static_cast<std::size_t>(n) != files.size()) {
×
600
            Py_DECREF(seq);
601
            PyErr_SetString(PyExc_ValueError,
×
602
                            "file_slices must match files length");
603
            return NULL;
×
604
        }
605
        owned_member_maps.resize(n);
×
606
        parsed_slices.resize(n);
×
607
        for (Py_ssize_t i = 0; i < n; ++i) {
×
608
            PyObject *entry = PySequence_Fast_GET_ITEM(seq, i);
×
609
            if (entry == Py_None) {
×
610
                continue;  // leave slice default-constructed (members=null)
×
611
            }
612
            Py_ssize_t mb = 0, me = 0, ckpt_base = 0;
×
613
            int skip_scoped = 0;
×
614
            PyObject *members_obj = nullptr;
×
615
            if (!PyArg_ParseTuple(entry, "nnnpO", &mb, &me, &ckpt_base,
×
616
                                  &skip_scoped, &members_obj)) {
617
                Py_DECREF(seq);
618
                return NULL;
×
619
            }
620
            PyObject *mseq = PySequence_Fast(
×
621
                members_obj, "file_slices[i].members must be a sequence");
622
            if (!mseq) {
×
623
                Py_DECREF(seq);
624
                return NULL;
×
625
            }
626
            Py_ssize_t mn = PySequence_Fast_GET_SIZE(mseq);
×
627
            auto &mv = owned_member_maps[i];
×
628
            mv.resize(mn);
×
629
            for (Py_ssize_t j = 0; j < mn; ++j) {
×
630
                PyObject *m = PySequence_Fast_GET_ITEM(mseq, j);
×
631
                unsigned long long c_offset = 0, c_size = 0;
×
632
                if (!PyArg_ParseTuple(m, "KK", &c_offset, &c_size)) {
×
633
                    Py_DECREF(mseq);
634
                    Py_DECREF(seq);
635
                    return NULL;
×
636
                }
637
                mv[j].c_offset = static_cast<std::uint64_t>(c_offset);
×
638
                mv[j].c_size = static_cast<std::uint64_t>(c_size);
×
639
            }
640
            Py_DECREF(mseq);
641
            parsed_slices[i].members = &mv;
×
642
            parsed_slices[i].member_begin = static_cast<std::size_t>(mb);
×
643
            parsed_slices[i].member_end = static_cast<std::size_t>(me);
×
644
            parsed_slices[i].checkpoint_idx_base =
×
645
                static_cast<std::uint64_t>(ckpt_base);
×
646
            parsed_slices[i].skip_file_scoped_writes = skip_scoped != 0;
×
647
        }
648
        Py_DECREF(seq);
649
    }
650

651
    auto batch_config = std::make_shared<IndexBuildBatchConfig>();
×
652
    batch_config->file_paths = std::move(files);
×
653
    batch_config->preassigned_file_ids = std::move(file_ids);
×
654
    if (!parsed_slices.empty()) {
×
655
        batch_config->file_slices = parsed_slices;
×
656
    }
657
    batch_config->index_dir = index_dir;
×
658
    batch_config->checkpoint_size = static_cast<std::size_t>(checkpoint_size);
×
659
    batch_config->build_manifest = build_manifest != 0;
×
660
    batch_config->force_rebuild = force_rebuild != 0;
×
661
    batch_config->bloom_dimensions = std::move(bloom_dims);
×
662
    batch_config->parallelism =
×
663
        parallelism > 0 ? static_cast<std::size_t>(parallelism)
×
664
                        : (rt ? std::max<std::size_t>(rt->threads(), 1) : 1);
×
665
    batch_config->flush_every_files =
×
666
        static_cast<std::size_t>(flush_every_files);
×
667
    batch_config->rebuild_root_summaries = false;
×
668

669
    if (agg_config_ptr) {
×
670
        auto agg_staging = staging;
×
671
        auto agg_prefix = batch + "_agg";
×
672
        // Counter keeps per-file SST dirs unique across duplicate file_paths
673
        // when one worker owns multiple slices of the same file.
674
        auto visitor_counter = std::make_shared<std::atomic<std::size_t>>(0);
×
675
        batch_config->dft_visitor_factory =
×
676
            [agg_staging, agg_prefix, agg_config_ptr,
×
677
             visitor_counter](const std::string &file_path)
678
            -> std::vector<std::unique_ptr<
679
                dftracer::utils::utilities::composites::dft::DftEventVisitor>> {
680
            using dftracer::utils::utilities::composites::dft::DftEventVisitor;
681
            using dftracer::utils::utilities::composites::dft::aggregators::
682
                AggregationVisitor;
683
            const std::size_t idx =
684
                visitor_counter->fetch_add(1, std::memory_order_relaxed);
×
685
            std::string prefix = agg_prefix + "_" + std::to_string(idx);
×
686
            std::vector<std::unique_ptr<DftEventVisitor>> visitors;
×
687
            visitors.push_back(std::make_unique<AggregationVisitor>(
×
688
                agg_staging, prefix, /*config_hash=*/0, *agg_config_ptr,
×
689
                file_path));
690
            return visitors;
×
691
        };
×
692
    }
×
693

694
    // Atomic: write phase calls sink_factory from N coroutines concurrently.
695
    auto batch_counter = std::make_shared<std::atomic<std::size_t>>(0);
×
696
    batch_config->sink_factory =
×
697
        [staging, batch, batch_counter]() -> std::unique_ptr<IndexBatchSink> {
×
698
        const std::size_t idx =
699
            batch_counter->fetch_add(1, std::memory_order_relaxed);
×
700
        std::string sub_batch = batch + "_" + std::to_string(idx);
×
701
        return std::make_unique<IndexDatabaseSstWriterContext>(staging,
×
UNCOV
702
                                                               sub_batch);
×
703
    };
×
704
    batch_config->sink_commit = [artifacts](IndexBatchSink &sink) {
×
705
        auto &sst = static_cast<IndexDatabaseSstWriterContext &>(sink);
×
706
        auto batch_artifacts = sst.commit();
×
707
        std::lock_guard<std::mutex> lock(artifacts->mu);
×
708
        if (!batch_artifacts.empty()) {
×
709
            artifacts->list.push_back(std::move(batch_artifacts));
×
710
        }
711
    };
×
712

713
    IndexBuildBatchResult result;
×
714
    std::string submit_error;
×
715
    Py_BEGIN_ALLOW_THREADS try {
×
716
        rt->submit(dftracer::utils::run_coro_scope(
×
717
                       rt->executor(),
718
                       [](dftracer::utils::CoroScope &scope,
×
719
                          std::shared_ptr<IndexBuildBatchConfig> cfg,
720
                          IndexBuildBatchResult *out)
721
                           -> dftracer::utils::coro::CoroTask<void> {
722
                           *out = co_await IndexBatchBuilderUtility::process(
723
                               &scope, std::move(cfg));
724
                       },
×
725
                       batch_config, &result),
726
                   "build-sst-batch")
727
            .get();
×
728
    } catch (const std::exception &e) {
×
729
        submit_error = e.what();
×
730
    }
×
731
    Py_END_ALLOW_THREADS if (!submit_error.empty()) {
×
732
        PyErr_SetString(PyExc_RuntimeError, submit_error.c_str());
×
733
        return NULL;
×
734
    }
735

736
    // If any file failed, surface the first error.
737
    if (result.failed > 0) {
×
738
        for (const auto &r : result.results) {
×
739
            if (!r.success) {
×
740
                PyErr_SetString(PyExc_RuntimeError, r.error_message.c_str());
×
741
                return NULL;
×
742
            }
743
        }
744
    }
745

746
    // One dict per committed sink + per-file aggregation below.
747
    PyObject *out_list = PyList_New(0);
×
748
    if (!out_list) return NULL;
×
749
    {
750
        std::lock_guard<std::mutex> lock(artifacts->mu);
×
751
        for (const auto &a : artifacts->list) {
×
752
            PyObject *main_dict = artifacts_to_dict(a);
×
753
            if (!main_dict || PyList_Append(out_list, main_dict) < 0) {
×
754
                Py_XDECREF(main_dict);
×
755
                Py_DECREF(out_list);
756
                return NULL;
×
757
            }
758
            Py_DECREF(main_dict);
759
        }
760
    }
×
761
    // Harvest per-file aggregation SSTs from extra visitors. Each visitor
762
    // holds a vector of Artifacts (one per FLUSH_THRESHOLD flush + the
763
    // file-complete flush) because SstFileWriter requires strictly
764
    // ascending keys per SST and cross-flush merge operands would collide.
765
    //
766
    // `extra_visitors` is indexed per input file, but a single
767
    // AggregationVisitor instance is typically shared across every file in
768
    // the batch (one flush at end-of-batch). Without dedup we would emit
769
    // that visitor's artifact dict N_files times, producing a manifest with
770
    // N copies of the same SST path. Dedup by visitor pointer so each
771
    // unique flush-sequence is emitted exactly once.
772
    using dftracer::utils::utilities::composites::dft::aggregators::
773
        AggregationVisitor;
774
    using dftracer::utils::utilities::composites::dft::aggregators::
775
        AssociationTracker;
776
    std::unordered_set<AggregationVisitor *> seen_visitors;
×
777
    for (auto &file_visitors : result.extra_visitors) {
×
778
        for (auto &visitor : file_visitors) {
×
779
            auto *agg = dynamic_cast<AggregationVisitor *>(visitor.get());
×
780
            if (!agg) continue;
×
781
            if (!seen_visitors.insert(agg).second) continue;
×
782
            for (auto &a : agg->aggregation_artifacts()) {
×
783
                if (a.empty()) continue;
×
784
                PyObject *agg_dict = artifacts_to_dict(a);
×
785
                if (!agg_dict || PyList_Append(out_list, agg_dict) < 0) {
×
786
                    Py_XDECREF(agg_dict);
×
787
                    Py_DECREF(out_list);
788
                    return NULL;
×
789
                }
790
                Py_DECREF(agg_dict);
791
            }
792
        }
793
    }
794

795
    AssociationTracker combined;
×
796
    bool any_tracker = false;
×
797
    for (auto *agg : seen_visitors) {
×
798
        auto out = agg->take_output();
×
799
        if (out.local_tracker) {
×
800
            combined.merge(*out.local_tracker);
×
801
            any_tracker = true;
×
802
        }
803
    }
×
804
    PyObject *tracker_bytes = nullptr;
×
805
    if (any_tracker) {
×
806
        combined.finalize();
×
807
        std::string blob = combined.serialize();
×
808
        tracker_bytes = PyBytes_FromStringAndSize(
×
809
            blob.data(), static_cast<Py_ssize_t>(blob.size()));
×
810
    } else {
×
811
        tracker_bytes = PyBytes_FromStringAndSize(nullptr, 0);
×
812
    }
813
    if (!tracker_bytes) {
×
814
        Py_DECREF(out_list);
815
        return NULL;
×
816
    }
817
    PyObject *ret = PyTuple_Pack(2, out_list, tracker_bytes);
×
818
    Py_DECREF(out_list);
819
    Py_DECREF(tracker_bytes);
820
    return ret;
×
821
}
×
822

823
static PyObject *enable_aggregation_deterministic_ids_fn(PyObject * /*self*/,
×
824
                                                         PyObject * /*args*/) {
825
    dftracer::utils::utilities::composites::dft::aggregators::
UNCOV
826
        aggregation_intern()
×
827
            .enable_deterministic_ids();
×
828
    Py_RETURN_NONE;
×
829
}
830

831
static PyObject *move_artifacts_fn(PyObject * /*self*/, PyObject *args,
×
832
                                   PyObject *kwds) {
833
    static const char *kwlist[] = {"artifacts", "dest_dir", NULL};
834
    PyObject *dict = NULL;
×
835
    const char *dest_dir = NULL;
×
836
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "Os", (char **)kwlist, &dict,
×
837
                                     &dest_dir)) {
838
        return NULL;
×
839
    }
840
    IndexDatabaseSstWriterContext::Artifacts a;
×
841
    if (!artifacts_from_dict(dict, &a)) return NULL;
×
842
    IndexDatabaseSstWriterContext::Artifacts moved;
×
843
    try {
844
        Py_BEGIN_ALLOW_THREADS moved = std::move(a).move_to(dest_dir);
×
845
        Py_END_ALLOW_THREADS
×
846
    } catch (const std::exception &e) {
×
847
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
848
        return NULL;
×
849
    }
×
850
    return artifacts_to_dict(moved);
×
851
}
×
852

853
namespace {
854

855
dftracer::utils::coro::CoroTask<void> scan_one_gzip_file(
×
856
    std::string path, std::vector<GzipMember> *out) {
857
    out->clear();
858
    int fd = ::open(path.c_str(), O_RDONLY);
859
    if (fd < 0) co_return;
860
    struct stat st;
861
    if (::fstat(fd, &st) == 0 && st.st_size >= 18) {
862
        co_await enumerate_gzip_member_candidates(
863
            fd, static_cast<std::uint64_t>(st.st_size), *out);
864
    }
865
    ::close(fd);
866
}
×
867

868
}  // namespace
869

870
static PyObject *enumerate_gzip_members_fn(PyObject * /*self*/, PyObject *args,
×
871
                                           PyObject *kwds) {
872
    static const char *kwlist[] = {"files", "runtime", NULL};
873
    PyObject *files_obj = NULL;
×
874
    PyObject *runtime_arg = NULL;
×
875
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|O", (char **)kwlist,
×
876
                                     &files_obj, &runtime_arg)) {
877
        return NULL;
×
878
    }
879

880
    std::vector<std::string> files;
×
881
    {
882
        PyObject *seq = PySequence_Fast(files_obj, "files must be a sequence");
×
883
        if (!seq) return NULL;
×
884
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
885
        files.reserve(n);
×
886
        for (Py_ssize_t i = 0; i < n; ++i) {
×
887
            const char *s = PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(seq, i));
×
888
            if (!s) {
×
889
                Py_DECREF(seq);
890
                return NULL;
×
891
            }
892
            files.emplace_back(s);
×
893
        }
894
        Py_DECREF(seq);
895
    }
896

897
    Runtime *rt = nullptr;
×
898
    if (runtime_arg && runtime_arg != Py_None) {
×
899
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
900
            rt = ((RuntimeObject *)runtime_arg)->runtime.get();
×
901
        } else {
902
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
903
            if (!native || !PyObject_TypeCheck(native, &RuntimeType)) {
×
904
                Py_XDECREF(native);
×
905
                PyErr_SetString(PyExc_TypeError,
×
906
                                "runtime must be a Runtime instance or None");
907
                return NULL;
×
908
            }
909
            rt = ((RuntimeObject *)native)->runtime.get();
×
910
            Py_DECREF(native);
911
        }
912
    } else {
×
913
        rt = get_default_runtime();
×
914
    }
915

916
    std::vector<std::vector<GzipMember>> results(files.size());
×
917
    std::string submit_error;
×
918
    Py_BEGIN_ALLOW_THREADS try {
×
919
        rt->submit(
×
920
              dftracer::utils::run_coro_scope(
×
921
                  rt->executor(),
922
                  [](dftracer::utils::CoroScope &scope,
×
923
                     const std::vector<std::string> *paths,
924
                     std::vector<std::vector<GzipMember>> *out)
925
                      -> dftracer::utils::coro::CoroTask<void> {
926
                      co_await scope.scope(
927
                          [paths, out](dftracer::utils::CoroScope &child)
×
928
                              -> dftracer::utils::coro::CoroTask<void> {
929
                              for (std::size_t i = 0; i < paths->size(); ++i) {
930
                                  const std::string &path = (*paths)[i];
931
                                  auto *slot = &(*out)[i];
932
                                  child.spawn(
933
                                      [path, slot](dftracer::utils::CoroScope &)
×
934
                                          -> dftracer::utils::coro::CoroTask<
935
                                              void> {
936
                                          co_await scan_one_gzip_file(path,
937
                                                                      slot);
938
                                      });
×
939
                              }
940
                              co_return;
941
                          });
×
942
                      co_return;
943
                  },
×
944
                  &files, &results),
945
              "enumerate-gzip-members")
946
            .get();
×
947
    } catch (const std::exception &e) {
×
948
        submit_error = e.what();
×
949
    }
×
950
    Py_END_ALLOW_THREADS if (!submit_error.empty()) {
×
951
        PyErr_SetString(PyExc_RuntimeError, submit_error.c_str());
×
952
        return NULL;
×
953
    }
954

955
    PyObject *out_list = PyList_New(static_cast<Py_ssize_t>(results.size()));
×
956
    if (!out_list) return NULL;
×
957
    for (std::size_t i = 0; i < results.size(); ++i) {
×
958
        const auto &mv = results[i];
×
959
        PyObject *inner = PyList_New(static_cast<Py_ssize_t>(mv.size()));
×
960
        if (!inner) {
×
961
            Py_DECREF(out_list);
962
            return NULL;
×
963
        }
964
        for (std::size_t j = 0; j < mv.size(); ++j) {
×
965
            PyObject *t =
966
                Py_BuildValue("(KK)", (unsigned long long)mv[j].c_offset,
×
967
                              (unsigned long long)mv[j].c_size);
×
968
            if (!t) {
×
969
                Py_DECREF(inner);
970
                Py_DECREF(out_list);
971
                return NULL;
×
972
            }
973
            PyList_SET_ITEM(inner, j, t);
×
974
        }
975
        PyList_SET_ITEM(out_list, i, inner);
×
976
    }
977
    return out_list;
×
978
}
×
979

980
// Mirrors build_work_units + lpt_assign_units in dftracer_aggregator_mpi.cpp
981
// so the Dask backend produces identical work distribution to MPI.
982
static PyObject *plan_work_units_fn(PyObject * /*self*/, PyObject *args,
×
983
                                    PyObject *kwds) {
984
    static const char *kwlist[] = {"member_map", "num_workers", "target_c_size",
985
                                   NULL};
986
    PyObject *map_obj = NULL;
×
987
    Py_ssize_t num_workers = 0;
×
988
    unsigned long long target_c_size = 0;
×
989
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "On|K", (char **)kwlist,
×
990
                                     &map_obj, &num_workers, &target_c_size)) {
991
        return NULL;
×
992
    }
993
    if (num_workers <= 0) num_workers = 1;
×
994

995
    std::vector<std::vector<GzipMember>> member_map;
×
996
    {
997
        PyObject *seq =
998
            PySequence_Fast(map_obj, "member_map must be a sequence");
×
999
        if (!seq) return NULL;
×
1000
        Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
1001
        member_map.resize(n);
×
1002
        for (Py_ssize_t i = 0; i < n; ++i) {
×
1003
            PyObject *inner = PySequence_Fast_GET_ITEM(seq, i);
×
1004
            PyObject *iseq =
1005
                PySequence_Fast(inner, "member_map[i] must be a sequence");
×
1006
            if (!iseq) {
×
1007
                Py_DECREF(seq);
1008
                return NULL;
×
1009
            }
1010
            Py_ssize_t ni = PySequence_Fast_GET_SIZE(iseq);
×
1011
            member_map[i].resize(ni);
×
1012
            for (Py_ssize_t j = 0; j < ni; ++j) {
×
1013
                PyObject *t = PySequence_Fast_GET_ITEM(iseq, j);
×
1014
                unsigned long long c_offset = 0, c_size = 0;
×
1015
                if (!PyArg_ParseTuple(t, "KK", &c_offset, &c_size)) {
×
1016
                    Py_DECREF(iseq);
1017
                    Py_DECREF(seq);
1018
                    return NULL;
×
1019
                }
1020
                member_map[i][j].c_offset =
×
1021
                    static_cast<std::uint64_t>(c_offset);
1022
                member_map[i][j].c_size = static_cast<std::uint64_t>(c_size);
×
1023
            }
1024
            Py_DECREF(iseq);
1025
        }
1026
        Py_DECREF(seq);
1027
    }
1028

1029
    // Fallback: treat empty/non-gzip files as a single whole-file member.
1030
    std::uint64_t total_c = 0;
×
1031
    for (auto &mv : member_map) {
×
1032
        if (mv.empty()) mv.push_back({0, 0});
×
1033
        for (const auto &m : mv) total_c += m.c_size;
×
1034
    }
1035

1036
    if (target_c_size == 0) {
×
1037
        target_c_size =
×
1038
            (total_c + static_cast<std::uint64_t>(num_workers) - 1) /
×
1039
            std::max<std::uint64_t>(static_cast<std::uint64_t>(num_workers), 1);
×
1040
    }
1041

1042
    struct Unit {
1043
        std::size_t file_idx;
1044
        std::size_t member_begin;
1045
        std::size_t member_end;
1046
        std::uint64_t c_size;
1047
    };
1048
    std::vector<Unit> units;
×
1049
    for (std::size_t fi = 0; fi < member_map.size(); ++fi) {
×
1050
        const auto &members = member_map[fi];
×
1051
        if (members.empty()) continue;
×
1052
        std::size_t begin = 0;
×
1053
        std::uint64_t accum = 0;
×
1054
        for (std::size_t i = 0; i < members.size(); ++i) {
×
1055
            accum += members[i].c_size;
×
1056
            const bool is_last = (i + 1 == members.size());
×
1057
            if ((target_c_size > 0 && accum >= target_c_size) || is_last) {
×
1058
                units.push_back({fi, begin, i + 1, accum});
×
1059
                begin = i + 1;
×
1060
                accum = 0;
×
1061
            }
1062
        }
1063
    }
1064

1065
    std::vector<std::size_t> order(units.size());
×
1066
    for (std::size_t i = 0; i < order.size(); ++i) order[i] = i;
×
1067
    std::sort(order.begin(), order.end(), [&](std::size_t a, std::size_t b) {
×
1068
        if (units[a].c_size != units[b].c_size)
×
1069
            return units[a].c_size > units[b].c_size;
×
1070
        if (units[a].file_idx != units[b].file_idx)
×
1071
            return units[a].file_idx < units[b].file_idx;
×
1072
        return units[a].member_begin < units[b].member_begin;
×
1073
    });
1074
    const std::size_t nw = static_cast<std::size_t>(num_workers);
×
1075
    std::vector<std::uint64_t> loads(nw, 0);
×
1076
    std::vector<std::vector<std::size_t>> per_worker(nw);
×
1077
    for (std::size_t ord : order) {
×
1078
        std::size_t best = 0;
×
1079
        for (std::size_t r = 1; r < nw; ++r)
×
1080
            if (loads[r] < loads[best]) best = r;
×
1081
        per_worker[best].push_back(ord);
×
1082
        loads[best] += std::max<std::uint64_t>(units[ord].c_size, 1);
×
1083
    }
1084

1085
    PyObject *out = PyList_New(static_cast<Py_ssize_t>(nw));
×
1086
    if (!out) return NULL;
×
1087
    for (std::size_t w = 0; w < nw; ++w) {
×
1088
        // Keep per-worker slices sorted by (file_idx, member_begin) for
1089
        // deterministic, file-group-friendly iteration downstream.
1090
        auto &lst = per_worker[w];
×
1091
        std::sort(lst.begin(), lst.end(), [&](std::size_t a, std::size_t b) {
×
1092
            if (units[a].file_idx != units[b].file_idx)
×
1093
                return units[a].file_idx < units[b].file_idx;
×
1094
            return units[a].member_begin < units[b].member_begin;
×
1095
        });
1096
        PyObject *inner = PyList_New(static_cast<Py_ssize_t>(lst.size()));
×
1097
        if (!inner) {
×
1098
            Py_DECREF(out);
1099
            return NULL;
×
1100
        }
1101
        for (std::size_t k = 0; k < lst.size(); ++k) {
×
1102
            const auto &u = units[lst[k]];
×
1103
            PyObject *t = Py_BuildValue(
×
1104
                "(nnnK)", (Py_ssize_t)u.file_idx, (Py_ssize_t)u.member_begin,
×
1105
                (Py_ssize_t)u.member_end, (unsigned long long)u.c_size);
×
1106
            if (!t) {
×
1107
                Py_DECREF(inner);
1108
                Py_DECREF(out);
1109
                return NULL;
×
1110
            }
1111
            PyList_SET_ITEM(inner, k, t);
×
1112
        }
1113
        PyList_SET_ITEM(out, w, inner);
×
1114
    }
1115
    return out;
×
1116
}
×
1117

1118
// ---------------------------------------------------------------------------
1119
// Module registration
1120
// ---------------------------------------------------------------------------
1121

1122
static PyMethodDef SstDistributionMethods[] = {
1123
    {"build_sst_batch", (PyCFunction)build_sst_batch_fn,
1124
     METH_VARARGS | METH_KEYWORDS,
1125
     "build_sst_batch(files, file_ids, staging_dir, batch_id, ...) "
1126
     "-> (list[dict], bytes)\n"
1127
     "Run the indexer pipeline with an SST sink and return "
1128
     "(artifact_dicts, tracker_blob). The tracker blob is the serialized "
1129
     "merged AssociationTracker from this batch's aggregation visitors "
1130
     "(empty bytes when no aggregation_config was passed)."},
1131
    {"plan_lpt_partition", (PyCFunction)plan_lpt_partition_fn, METH_VARARGS,
1132
     "plan_lpt_partition(entries, num_workers) -> list[list[(path, size)]]\n"
1133
     "Greedy Longest-Processing-Time-first bin-packing of (path, size) "
1134
     "tuples across num_workers buckets. Minimises the maximum per-worker "
1135
     "total size."},
1136
    {"scan_files", (PyCFunction)scan_files_fn, METH_VARARGS | METH_KEYWORDS,
1137
     "scan_files(directory, patterns=None, recursive=False, runtime=None) "
1138
     "-> list[(path, size)]\n"
1139
     "Parallel directory scan returning (path, size) tuples for regular "
1140
     "files matching the patterns."},
1141
    {"enable_aggregation_deterministic_ids",
1142
     (PyCFunction)enable_aggregation_deterministic_ids_fn, METH_NOARGS,
1143
     "enable_aggregation_deterministic_ids() -> None\n"
1144
     "Flip the global aggregation StringIntern into deterministic-id mode "
1145
     "so the same string maps to the same 32-bit id in every worker "
1146
     "process. Call once at worker startup BEFORE any aggregation work."},
1147
    {"move_artifacts", (PyCFunction)move_artifacts_fn,
1148
     METH_VARARGS | METH_KEYWORDS,
1149
     "move_artifacts(artifacts, dest_dir) -> dict\n"
1150
     "Move every populated SST in `artifacts` (as returned by "
1151
     "`build_sst_batch`) into `dest_dir` via the C++ rename/copy helper, "
1152
     "returning a fresh dict with the new paths. Single GIL release, no "
1153
     "per-file Python shutil.move overhead."},
1154
    {"enumerate_gzip_members", (PyCFunction)enumerate_gzip_members_fn,
1155
     METH_VARARGS | METH_KEYWORDS,
1156
     "enumerate_gzip_members(files, runtime=None) -> list[list[(c_offset, "
1157
     "c_size)]]\n"
1158
     "Cooperative async scan of gzip member offsets across `files`. "
1159
     "Returns lists of (c_offset, c_size) parallel to `files`; empty for "
1160
     "non-gzip / unreadable files."},
1161
    {"plan_work_units", (PyCFunction)plan_work_units_fn,
1162
     METH_VARARGS | METH_KEYWORDS,
1163
     "plan_work_units(member_map, num_workers, target_c_size=0) "
1164
     "-> list[list[(file_idx, member_begin, member_end, c_size)]]\n"
1165
     "Deterministic LPT assignment of intra-file gzip-member slices "
1166
     "across workers. Each worker's list contains (file_idx, "
1167
     "member_begin, member_end, c_size) tuples; a file sliced across "
1168
     "multiple workers appears in each owner's list with disjoint "
1169
     "[member_begin, member_end) ranges."},
1170
    {NULL, NULL, 0, NULL}};
1171

1172
int init_sst_distribution(PyObject *m) {
1✔
1173
    if (PyType_Ready(&SstArtifactRegistryType) < 0) return -1;
1!
1174
    Py_INCREF(&SstArtifactRegistryType);
1175
    if (PyModule_AddObject(m, "SstArtifactRegistry",
1✔
1176
                           (PyObject *)&SstArtifactRegistryType) < 0) {
1!
1177
        Py_DECREF(&SstArtifactRegistryType);
1178
        return -1;
×
1179
    }
1180
    if (PyModule_AddFunctions(m, SstDistributionMethods) < 0) return -1;
1!
1181
    return 0;
1✔
1182
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc