• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28693295402

04 Jul 2026 03:17AM UTC coverage: 52.408% (+0.1%) from 52.278%
28693295402

push

github

hariharan-devarajan
feat: silence noisy warnings on aarch64

37318 of 92666 branches covered (40.27%)

Branch coverage included in aggregate %.

33462 of 42389 relevant lines covered (78.94%)

20557.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.88
/src/dftracer/utils/python/batch_indexer.cpp
1
#include <dftracer/utils/core/common/constants.h>
2
#include <dftracer/utils/core/common/filesystem.h>
3
#include <dftracer/utils/core/common/hash_combine.h>
4
#include <dftracer/utils/core/common/string_intern.h>
5
#include <dftracer/utils/core/coro/task.h>
6
#include <dftracer/utils/core/coro/when_all.h>
7
#include <dftracer/utils/core/rocksdb/db_manager.h>
8
#include <dftracer/utils/core/runtime.h>
9
#include <dftracer/utils/core/tasks/coro_scope.h>
10
#include <dftracer/utils/python/batch_indexer.h>
11
#include <dftracer/utils/python/indexer.h>
12
#include <dftracer/utils/python/py_dict_helpers.h>
13
#include <dftracer/utils/python/py_list_helpers.h>
14
#include <dftracer/utils/python/py_runtime_mixin.h>
15
#include <dftracer/utils/python/py_type_helpers.h>
16
#include <dftracer/utils/python/runtime.h>
17
#include <dftracer/utils/utilities/common/query/query.h>
18
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_config.h>
19
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_serialization.h>
20
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregator_types.h>
21
#include <dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.h>
22
#include <dftracer/utils/utilities/composites/dft/aggregators/system_metrics.h>
23
#include <dftracer/utils/utilities/composites/dft/aggregators/system_metrics_serialization.h>
24
#include <dftracer/utils/utilities/composites/dft/indexing/index_resolver_utility.h>
25
#include <dftracer/utils/utilities/composites/dft/indexing/resolve_and_build.h>
26
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
27
#include <dftracer/utils/utilities/indexer/index_database.h>
28

29
#ifdef DFTRACER_UTILS_ENABLE_ARROW
30
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
31
#endif
32

33
#include <algorithm>
34
#include <chrono>
35
#include <cstdio>
36
#include <optional>
37
#include <string>
38
#include <unordered_map>
39
#include <unordered_set>
40
#include <vector>
41

42
using dftracer::utils::CoroScope;
43
using dftracer::utils::Runtime;
44
using dftracer::utils::coro::CoroTask;
45
using namespace dftracer::utils::utilities::composites::dft::indexing;
46
using namespace dftracer::utils::utilities::composites::dft::aggregators;
47

48
// ---------------------------------------------------------------------------
49
// BatchIndexer - directory-level indexer with resolve/build pattern
50
// ---------------------------------------------------------------------------
51

52
static void Indexer_dealloc(IndexerObject* self) {
102✔
53
    Py_XDECREF(self->runtime_obj);
102✔
54
    Py_XDECREF(self->directory);
102✔
55
    Py_XDECREF(self->files);
102✔
56
    Py_XDECREF(self->index_dir);
102✔
57
    Py_XDECREF(self->group_keys);
102✔
58
    Py_XDECREF(self->custom_metric_fields);
102✔
59
    Py_TYPE(self)->tp_free((PyObject*)self);
102✔
60
}
102✔
61

62
static PyObject* Indexer_new(PyTypeObject* type, PyObject* args,
102✔
63
                             PyObject* kwds) {
64
    IndexerObject* self = (IndexerObject*)type->tp_alloc(type, 0);
102✔
65
    if (self) {
102✔
66
        self->runtime_obj = nullptr;
102✔
67
        self->directory = nullptr;
102✔
68
        self->files = nullptr;
102✔
69
        self->index_dir = nullptr;
102✔
70
        self->require_checkpoint = 1;
102✔
71
        self->require_bloom = 1;
102✔
72
        self->require_manifest = 1;
102✔
73
        self->require_aggregation = 0;
102✔
74
        self->time_interval_ms = 5000.0;
102✔
75
        self->group_keys = nullptr;
102✔
76
        self->custom_metric_fields = nullptr;
102✔
77
        self->compute_percentiles = 0;
102✔
78
        self->checkpoint_size =
102✔
79
            dftracer::utils::constants::indexer::DEFAULT_CHECKPOINT_SIZE;
80
        self->parallelism = 0;
102✔
81
        self->force_rebuild = 0;
102✔
82
    }
51✔
83
    return (PyObject*)self;
102✔
84
}
85

86
static int Indexer_init(IndexerObject* self, PyObject* args, PyObject* kwds) {
102✔
87
    static const char* kwlist[] = {"directory",
88
                                   "files",
89
                                   "index_dir",
90
                                   "require_checkpoint",
91
                                   "require_bloom",
92
                                   "require_manifest",
93
                                   "require_aggregation",
94
                                   "time_interval_ms",
95
                                   "group_keys",
96
                                   "custom_metric_fields",
97
                                   "compute_percentiles",
98
                                   "checkpoint_size",
99
                                   "parallelism",
100
                                   "force_rebuild",
101
                                   "runtime",
102
                                   nullptr};
103

104
    const char* directory = "";
102✔
105
    PyObject* files_obj = Py_None;
102✔
106
    const char* index_dir = "";
102✔
107
    int require_checkpoint = 1;
102✔
108
    int require_bloom = 1;
102✔
109
    int require_manifest = 1;
102✔
110
    int require_aggregation = 0;
102✔
111
    double time_interval_ms = 5000.0;
102✔
112
    PyObject* group_keys_obj = Py_None;
102✔
113
    PyObject* custom_metrics_obj = Py_None;
102✔
114
    int compute_percentiles = 0;
102✔
115
    Py_ssize_t checkpoint_size = static_cast<Py_ssize_t>(
102✔
116
        dftracer::utils::constants::indexer::DEFAULT_CHECKPOINT_SIZE);
117
    Py_ssize_t parallelism = 0;
102✔
118
    int force_rebuild = 0;
102✔
119
    PyObject* runtime_arg = nullptr;
102✔
120

121
    if (!PyArg_ParseTupleAndKeywords(
102!
122
            args, kwds, "|sOsppppdOOpnnpO", (char**)kwlist, &directory,
51✔
123
            &files_obj, &index_dir, &require_checkpoint, &require_bloom,
124
            &require_manifest, &require_aggregation, &time_interval_ms,
125
            &group_keys_obj, &custom_metrics_obj, &compute_percentiles,
126
            &checkpoint_size, &parallelism, &force_rebuild, &runtime_arg)) {
127
        return -1;
×
128
    }
129

130
    // Validate: at least one of directory or files must be provided
131
    bool has_directory = directory && directory[0] != '\0';
102✔
132
    bool has_files = files_obj && files_obj != Py_None &&
158✔
133
                     PyList_Check(files_obj) && PyList_Size(files_obj) > 0;
158!
134

135
    if (!has_directory && !has_files) {
102✔
136
        PyErr_SetString(PyExc_ValueError,
2!
137
                        "At least one of 'directory' or 'files' must be "
138
                        "provided");
139
        return -1;
2✔
140
    }
141

142
    // Store runtime
143
    if (runtime_arg && runtime_arg != Py_None) {
100!
144
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
145
            Py_INCREF(runtime_arg);
×
146
            self->runtime_obj = runtime_arg;
×
147
        } else {
148
            PyObject* native = PyObject_GetAttrString(runtime_arg, "_native");
×
149
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
×
150
                self->runtime_obj = native;
×
151
            } else {
152
                Py_XDECREF(native);
×
153
                PyErr_SetString(PyExc_TypeError,
×
154
                                "runtime must be a Runtime instance or None");
155
                return -1;
×
156
            }
157
        }
158
    }
159

160
    self->directory = PyUnicode_FromString(directory);
100!
161
    self->index_dir = PyUnicode_FromString(index_dir);
100!
162
    self->require_checkpoint = require_checkpoint;
100✔
163
    self->require_bloom = require_bloom;
100✔
164
    self->require_manifest = require_manifest;
100✔
165
    self->require_aggregation = require_aggregation;
100✔
166
    self->time_interval_ms = time_interval_ms;
100✔
167
    self->compute_percentiles = compute_percentiles;
100✔
168
    self->checkpoint_size = static_cast<std::size_t>(checkpoint_size);
100✔
169
    self->parallelism = static_cast<std::size_t>(parallelism);
100✔
170
    self->force_rebuild = force_rebuild;
100✔
171

172
    // Store files list
173
    if (has_files) {
100✔
174
        Py_INCREF(files_obj);
56!
175
        self->files = files_obj;
56✔
176
    } else {
28✔
177
        self->files = nullptr;
44✔
178
    }
179

180
    // Store group_keys
181
    if (group_keys_obj && group_keys_obj != Py_None) {
100!
182
        Py_INCREF(group_keys_obj);
×
183
        self->group_keys = group_keys_obj;
×
184
    } else {
185
        self->group_keys = nullptr;
100✔
186
    }
187

188
    // Store custom_metric_fields
189
    if (custom_metrics_obj && custom_metrics_obj != Py_None) {
100!
190
        Py_INCREF(custom_metrics_obj);
×
191
        self->custom_metric_fields = custom_metrics_obj;
×
192
    } else {
193
        self->custom_metric_fields = nullptr;
100✔
194
    }
195

196
    return 0;
100✔
197
}
51✔
198

199
static Runtime* get_batch_indexer_runtime(IndexerObject* self) {
296✔
200
    if (self->runtime_obj) {
296!
201
        return ((RuntimeObject*)self->runtime_obj)->runtime.get();
×
202
    }
203
    return get_default_runtime();
296✔
204
}
148✔
205

206
static std::optional<AggregationConfig> build_aggregation_config(
272✔
207
    IndexerObject* self) {
208
    if (!self->require_aggregation) {
272✔
209
        return std::nullopt;
152✔
210
    }
211

212
    AggregationConfig config;
120!
213
    config.time_interval_us =
120✔
214
        static_cast<std::uint64_t>(self->time_interval_ms * 1000.0);
120✔
215

216
    if (self->group_keys && PyList_Check(self->group_keys)) {
120!
217
        Py_ssize_t n = PyList_Size(self->group_keys);
×
218
        for (Py_ssize_t i = 0; i < n; i++) {
×
219
            const char* s =
220
                PyUnicode_AsUTF8(PyList_GetItem(self->group_keys, i));
×
221
            if (s) config.extra_group_keys.emplace_back(s);
×
222
        }
223
    }
224
    if (self->custom_metric_fields &&
120!
225
        PyList_Check(self->custom_metric_fields)) {
×
226
        Py_ssize_t n = PyList_Size(self->custom_metric_fields);
×
227
        for (Py_ssize_t i = 0; i < n; i++) {
×
228
            const char* s =
229
                PyUnicode_AsUTF8(PyList_GetItem(self->custom_metric_fields, i));
×
230
            if (s) config.custom_metric_fields.emplace_back(s);
×
231
        }
232
    }
233

234
    config.compute_percentiles = self->compute_percentiles != 0;
120✔
235
    return config;
120!
236
}
196✔
237

238
// ---------------------------------------------------------------------------
239
// resolve() - check what exists vs needs building
240
// ---------------------------------------------------------------------------
241

242
static PyObject* Indexer_resolve(IndexerObject* self,
206✔
243
                                 PyObject* Py_UNUSED(ignored)) {
244
    const char* directory = PyUnicode_AsUTF8(self->directory);
206!
245
    const char* index_dir = PyUnicode_AsUTF8(self->index_dir);
206!
246

247
    ResolverInput input;
206✔
248
    input.directory = directory ? directory : "";
206!
249
    input.index_dir = index_dir ? index_dir : "";
206!
250
    input.require_checkpoints = self->require_checkpoint;
206✔
251
    input.require_bloom = self->require_bloom;
206✔
252
    input.require_manifest = self->require_manifest;
206✔
253
    input.require_aggregation = self->require_aggregation;
206✔
254
    input.aggregation_config = build_aggregation_config(self);
206!
255

256
    // Add files if provided
257
    if (self->files && PyList_Check(self->files)) {
206!
258
        Py_ssize_t n = PyList_Size(self->files);
128!
259
        for (Py_ssize_t i = 0; i < n; i++) {
278✔
260
            const char* s = PyUnicode_AsUTF8(PyList_GetItem(self->files, i));
150!
261
            if (s) input.files.emplace_back(s);
150!
262
        }
75✔
263
    }
64✔
264

265
    ResolverResult result;
206✔
266

267
    if (!run_blocking([&] {
309!
268
            Runtime* rt = get_batch_indexer_runtime(self);
206✔
269
            rt->submit(run_coro_scope(
721!
270
                           rt->executor(),
103✔
271
                           [](CoroScope& scope, ResolverInput in,
824!
272
                              ResolverResult* out) -> CoroTask<void> {
103!
273
                               IndexResolverUtility resolver;
309!
274
                               // scope.spawn(utility, input) auto-binds context
275
                               // for utilities with the NeedsContext tag
276
                               *out = co_await scope.spawn(resolver,
824!
277
                                                           std::move(in));
309✔
278
                           },
515!
279
                           std::move(input), &result),
206✔
280
                       "batch-indexer-resolve")
103!
281
                .get();
206!
282
        })) {
206✔
283
        return nullptr;
×
284
    }
285

286
    // Build result dict
287
    PyObject* dict = PyDict_New();
206!
288
    if (!dict) return nullptr;
206✔
289

290
    dict_set_steal(dict, "total_files",
206!
291
                   PyLong_FromSize_t(result.all_files.size()));
103!
292
    dict_set_steal(dict, "index_path",
206!
293
                   PyUnicode_FromString(result.index_path.c_str()));
103!
294
    dict_set_steal(dict, "aggregation_interval_us",
206!
295
                   PyLong_FromUnsignedLongLong(result.stored_time_interval_us));
206!
296
    dict_set_steal(dict, "needs_rebuild",
206!
297
                   PyBool_FromLong(result.needs_augmentation));
206!
298

299
    // Ready files
300
    PyObject* ready_list = PyList_New(result.cached.size());
206!
301
    for (std::size_t i = 0; i < result.cached.size(); ++i) {
380✔
302
        PyList_SetItem(
174!
303
            ready_list, i,
87✔
304
            PyUnicode_FromString(result.cached[i].file_path.c_str()));
174!
305
    }
87✔
306
    PyDict_SetItemString(dict, "ready", ready_list);
206!
307

308
    // Needs work files (union of all needs_* lists)
309
    std::vector<std::string> needs_work;
206✔
310
    for (const auto& item : result.needs_checkpoint) {
286✔
311
        needs_work.push_back(item.file_path);
80!
312
    }
313
    for (const auto& item : result.needs_bloom) {
206!
314
        bool found = false;
×
315
        for (const auto& existing : needs_work) {
×
316
            if (existing == item.file_path) {
×
317
                found = true;
×
318
                break;
×
319
            }
320
        }
321
        if (!found) needs_work.push_back(item.file_path);
×
322
    }
323
    for (const auto& item : result.needs_manifest) {
206!
324
        bool found = false;
×
325
        for (const auto& existing : needs_work) {
×
326
            if (existing == item.file_path) {
×
327
                found = true;
×
328
                break;
×
329
            }
330
        }
331
        if (!found) needs_work.push_back(item.file_path);
×
332
    }
333
    for (const auto& item : result.needs_aggregation) {
206✔
334
        bool found = false;
×
335
        for (const auto& existing : needs_work) {
×
336
            if (existing == item.file_path) {
×
337
                found = true;
×
338
                break;
×
339
            }
340
        }
341
        if (!found) needs_work.push_back(item.file_path);
×
342
    }
343

344
    PyObject* needs_list = PyList_New(needs_work.size());
206!
345
    for (std::size_t i = 0; i < needs_work.size(); ++i) {
286✔
346
        PyList_SetItem(needs_list, i,
80!
347
                       PyUnicode_FromString(needs_work[i].c_str()));
80!
348
    }
40✔
349
    PyDict_SetItemString(dict, "needs_work", needs_list);
206!
350

351
    return dict;
206✔
352
}
206✔
353

354
// ---------------------------------------------------------------------------
355
// build() - build missing index tiers
356
// ---------------------------------------------------------------------------
357

358
static PyObject* Indexer_build(IndexerObject* self,
66✔
359
                               PyObject* Py_UNUSED(ignored)) {
360
    const char* directory = PyUnicode_AsUTF8(self->directory);
66!
361
    const char* index_dir = PyUnicode_AsUTF8(self->index_dir);
66!
362

363
    ResolveAndBuildInput input;
66✔
364
    input.directory = directory ? directory : "";
66!
365
    input.index_dir = index_dir ? index_dir : "";
66!
366
    input.require_checkpoints = self->require_checkpoint;
66✔
367
    input.require_bloom = self->require_bloom;
66✔
368
    input.require_manifest = self->require_manifest;
66✔
369
    input.require_aggregation = self->require_aggregation;
66✔
370
    input.aggregation_config = build_aggregation_config(self);
66!
371
    input.checkpoint_size = self->checkpoint_size;
66✔
372
    input.parallelism = self->parallelism;
66✔
373
    input.force_rebuild = self->force_rebuild;
66✔
374

375
    // Add files if provided
376
    if (self->files && PyList_Check(self->files)) {
66!
377
        Py_ssize_t n = PyList_Size(self->files);
52!
378
        for (Py_ssize_t i = 0; i < n; i++) {
114✔
379
            const char* s = PyUnicode_AsUTF8(PyList_GetItem(self->files, i));
62!
380
            if (s) input.files.emplace_back(s);
62!
381
        }
31✔
382
    }
26✔
383

384
    if (!run_blocking([&] {
99!
385
            Runtime* rt = get_batch_indexer_runtime(self);
66✔
386
            rt->submit(run_coro_scope(
231!
387
                           rt->executor(),
33✔
388
                           [](CoroScope& scope,
264!
389
                              ResolveAndBuildInput in) -> CoroTask<void> {
33!
390
                               co_await resolve_and_build_index(&scope,
264!
391
                                                                std::move(in));
99✔
392
                           },
132!
393
                           std::move(input)),
66✔
394
                       "batch-indexer-build")
33!
395
                .get();
66!
396
        })) {
66✔
397
        return nullptr;
×
398
    }
399

400
    Py_RETURN_NONE;
66✔
401
}
66✔
402

403
// ---------------------------------------------------------------------------
404
// ensure_indexed() - resolve + build if needed
405
// ---------------------------------------------------------------------------
406

407
static PyObject* Indexer_ensure_indexed(IndexerObject* self,
84✔
408
                                        PyObject* Py_UNUSED(ignored)) {
409
    // First resolve
410
    PyObject* status = Indexer_resolve(self, nullptr);
84✔
411
    if (!status) return nullptr;
84✔
412

413
    // Build if files need work, or the aggregation tier must be rebuilt
414
    // (stored time interval differs from the requested one).
415
    PyObject* needs_work = PyDict_GetItemString(status, "needs_work");
84✔
416
    PyObject* needs_rebuild = PyDict_GetItemString(status, "needs_rebuild");
84✔
417
    bool work_pending = needs_work && PyList_Size(needs_work) > 0;
84✔
418
    bool rebuild_pending = needs_rebuild && PyObject_IsTrue(needs_rebuild);
84✔
419
    if (work_pending || rebuild_pending) {
84✔
420
        Py_DECREF(status);
32✔
421

422
        // Build
423
        PyObject* result = Indexer_build(self, nullptr);
64✔
424
        if (!result) return nullptr;
64✔
425
        Py_DECREF(result);
32✔
426

427
        // Re-resolve
428
        status = Indexer_resolve(self, nullptr);
64✔
429
    }
32✔
430

431
    return status;
84✔
432
}
42✔
433

434
// ---------------------------------------------------------------------------
435
// get_checkpoint_indexer() - get a single-file checkpoint indexer
436
// ---------------------------------------------------------------------------
437

438
static PyObject* Indexer_get_checkpoint_indexer(IndexerObject* self,
12✔
439
                                                PyObject* args) {
440
    const char* file_path = nullptr;
12✔
441
    if (!PyArg_ParseTuple(args, "s", &file_path)) {
12!
442
        return nullptr;
×
443
    }
444

445
    // Determine index path using BatchIndexer's index_dir setting
446
    const char* index_dir = PyUnicode_AsUTF8(self->index_dir);
12!
447
    std::string index_path = dftracer::utils::utilities::composites::dft::
6!
448
        internal::determine_index_path(file_path, index_dir ? index_dir : "");
18!
449

450
    // Create IndexerObject
451
    CheckpointIndexerObject* indexer =
6✔
452
        (CheckpointIndexerObject*)CheckpointIndexerType.tp_alloc(
12!
453
            &CheckpointIndexerType, 0);
454
    if (!indexer) {
12✔
455
        return nullptr;
×
456
    }
457

458
    indexer->handle = nullptr;
12✔
459
    indexer->gz_path = PyUnicode_FromString(file_path);
12!
460
    indexer->index_path = PyUnicode_FromString(index_path.c_str());
12!
461
    indexer->checkpoint_size = self->checkpoint_size;
12✔
462
    indexer->build_bloom = 0;
12✔
463
    indexer->build_manifest = 0;
12✔
464

465
    // Share runtime reference
466
    if (self->runtime_obj) {
12!
467
        Py_INCREF(self->runtime_obj);
×
468
        indexer->runtime_obj = self->runtime_obj;
×
469
    } else {
470
        indexer->runtime_obj = nullptr;
12✔
471
    }
472

473
    // Create the native handle
474
    indexer->handle = dft_indexer_create(file_path, index_path.c_str(),
18!
475
                                         self->checkpoint_size, 0);
6✔
476
    if (!indexer->handle) {
12!
477
        Py_DECREF((PyObject*)indexer);
×
478
        PyErr_SetString(PyExc_RuntimeError,
×
479
                        "Failed to create checkpoint indexer");
480
        return nullptr;
×
481
    }
482

483
    return (PyObject*)indexer;
12✔
484
}
12✔
485

486
static std::optional<std::string> resolve_index_path(IndexerObject* self) {
44✔
487
    PyObject* status = Indexer_resolve(self, nullptr);
44!
488
    if (!status) return std::nullopt;
44✔
489
    PyObject* obj = PyDict_GetItemString(status, "index_path");
44!
490
    const char* path = obj ? PyUnicode_AsUTF8(obj) : nullptr;
44!
491
    if (!path || path[0] == '\0') {
44!
492
        Py_DECREF(status);
493
        PyErr_SetString(PyExc_RuntimeError, "No index path available");
×
494
        return std::nullopt;
×
495
    }
496
    std::string result(path);
66!
497
    Py_DECREF(status);
22!
498
    return result;
44!
499
}
44✔
500

501
static PyObject* Indexer_get_hash_table(IndexerObject* self, PyObject* args) {
12✔
502
    const char* type_str = nullptr;
12✔
503
    if (!PyArg_ParseTuple(args, "s", &type_str)) {
12!
504
        return nullptr;
×
505
    }
506

507
    using dftracer::utils::utilities::indexer::IndexDatabase;
508
    using HashType = IndexDatabase::HashType;
509

510
    HashType type;
511
    if (std::strcmp(type_str, "file") == 0) {
12✔
512
        type = HashType::FILE;
4✔
513
    } else if (std::strcmp(type_str, "host") == 0) {
10✔
514
        type = HashType::HOST;
4✔
515
    } else if (std::strcmp(type_str, "string") == 0) {
6✔
516
        type = HashType::STRING;
2✔
517
    } else if (std::strcmp(type_str, "proc") == 0) {
3✔
518
        type = HashType::PROC;
×
519
    } else {
520
        PyErr_SetString(PyExc_ValueError,
2!
521
                        "type must be 'file', 'host', 'string', or 'proc'");
522
        return nullptr;
2✔
523
    }
524

525
    auto idx_opt = resolve_index_path(self);
10!
526
    if (!idx_opt) return nullptr;
10✔
527
    std::string index_path = std::move(*idx_opt);
10✔
528

529
    std::unordered_map<std::string, std::string> hash_map;
10✔
530
    if (!run_blocking_r(
10!
531
            [&] {
15✔
532
                IndexDatabase db(index_path,
10✔
533
                                 dftracer::utils::rocksdb::RocksDatabase::
534
                                     OpenMode::ReadOnly);
5!
535
                return db.query_hash_table(type);
15!
536
            },
10✔
537
            hash_map)) {
538
        return nullptr;
×
539
    }
540

541
    PyObject* dict = PyDict_New();
10!
542
    if (!dict) return nullptr;
10✔
543

544
    for (const auto& [hash, name] : hash_map) {
10!
545
        PyObject* key = PyUnicode_FromStringAndSize(hash.data(), hash.size());
×
546
        PyObject* val = PyUnicode_FromStringAndSize(name.data(), name.size());
×
547
        PyDict_SetItem(dict, key, val);
×
548
        Py_DECREF(key);
×
549
        Py_DECREF(val);
×
550
    }
551

552
    return dict;
10✔
553
}
11✔
554

555
static PyObject* Indexer_query_file_pids(IndexerObject* self, PyObject* args) {
4✔
556
    int file_id;
557
    if (!PyArg_ParseTuple(args, "i", &file_id)) {
4!
558
        return nullptr;
×
559
    }
560

561
    using dftracer::utils::utilities::indexer::IndexDatabase;
562

563
    auto idx_opt = resolve_index_path(self);
4!
564
    if (!idx_opt) return nullptr;
4✔
565
    std::string index_path = std::move(*idx_opt);
4✔
566

567
    std::unordered_set<std::uint64_t> pids;
4✔
568
    if (!run_blocking_r(
4!
569
            [&] {
6✔
570
                IndexDatabase db(index_path,
4✔
571
                                 dftracer::utils::rocksdb::RocksDatabase::
572
                                     OpenMode::ReadOnly);
2!
573
                return db.query_file_pids(file_id);
6!
574
            },
4✔
575
            pids)) {
576
        return nullptr;
×
577
    }
578

579
    PyObject* set = PySet_New(nullptr);
4!
580
    if (!set) return nullptr;
4✔
581

582
    for (auto pid : pids) {
6!
583
        PyObject* val = PyLong_FromUnsignedLongLong(pid);
2!
584
        PySet_Add(set, val);
2!
585
        Py_DECREF(val);
1!
586
    }
587

588
    return set;
4✔
589
}
4✔
590

591
static PyObject* Indexer_query_all_file_pids(IndexerObject* self,
6✔
592
                                             PyObject* Py_UNUSED(ignored)) {
593
    using dftracer::utils::utilities::indexer::IndexDatabase;
594

595
    auto idx_opt = resolve_index_path(self);
6!
596
    if (!idx_opt) return nullptr;
6!
597
    std::string index_path = std::move(*idx_opt);
6✔
598

599
    std::unordered_map<int, std::unordered_set<std::uint64_t>> all_pids;
6✔
600
    if (!run_blocking_r(
6!
601
            [&] {
9✔
602
                IndexDatabase db(index_path,
6✔
603
                                 dftracer::utils::rocksdb::RocksDatabase::
604
                                     OpenMode::ReadOnly);
3!
605
                return db.query_all_file_pids();
9!
606
            },
6✔
607
            all_pids)) {
608
        return nullptr;
×
609
    }
610

611
    PyObject* dict = PyDict_New();
6!
612
    if (!dict) return nullptr;
6✔
613

614
    for (const auto& [file_id, pids] : all_pids) {
12!
615
        PyObject* key = PyLong_FromLong(file_id);
6!
616
        PyObject* set = PySet_New(nullptr);
6!
617
        for (auto pid : pids) {
12✔
618
            PyObject* val = PyLong_FromUnsignedLongLong(pid);
6!
619
            PySet_Add(set, val);
6!
620
            Py_DECREF(val);
3!
621
        }
622
        PyDict_SetItem(dict, key, set);
6!
623
        Py_DECREF(key);
3!
624
        Py_DECREF(set);
3!
625
    }
626

627
    return dict;
6✔
628
}
6✔
629

630
static PyObject* Indexer_query_file_info(IndexerObject* self,
×
631
                                         PyObject* Py_UNUSED(ignored)) {
632
    using dftracer::utils::utilities::indexer::IndexDatabase;
633

634
    auto idx_opt = resolve_index_path(self);
×
635
    if (!idx_opt) return nullptr;
×
636
    std::string index_path = std::move(*idx_opt);
×
637

638
    std::unordered_map<std::string, int> file_ids;
×
639
    std::unordered_map<int, std::unordered_set<std::uint64_t>> all_pids;
×
640

641
    if (!run_blocking([&] {
×
642
            IndexDatabase db(
643
                index_path,
×
644
                dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
×
645
            file_ids = db.query_all_file_info_ids();
×
646
            all_pids = db.query_all_file_pids();
×
647
        })) {
×
648
        return nullptr;
×
649
    }
650

651
    auto data_dir = fs::weakly_canonical(fs::path(index_path)).parent_path();
×
652

653
    PyObject* id_to_path = PyDict_New();
×
654
    if (!id_to_path) return nullptr;
×
655
    for (const auto& [logical_name, fid] : file_ids) {
×
656
        auto resolved = (data_dir / logical_name).string();
×
657
        PyObject* key = PyLong_FromLong(fid);
×
658
        PyObject* val = PyUnicode_FromStringAndSize(
×
659
            resolved.data(), static_cast<Py_ssize_t>(resolved.size()));
×
660
        PyDict_SetItem(id_to_path, key, val);
×
661
        Py_DECREF(key);
×
662
        Py_DECREF(val);
×
663
    }
×
664

665
    PyObject* pid_dict = PyDict_New();
×
666
    if (!pid_dict) {
×
667
        Py_DECREF(id_to_path);
×
668
        return nullptr;
×
669
    }
670
    for (const auto& [file_id, pids] : all_pids) {
×
671
        PyObject* key = PyLong_FromLong(file_id);
×
672
        PyObject* set = PySet_New(nullptr);
×
673
        for (auto pid : pids) {
×
674
            PyObject* val = PyLong_FromUnsignedLongLong(pid);
×
675
            PySet_Add(set, val);
×
676
            Py_DECREF(val);
×
677
        }
678
        PyDict_SetItem(pid_dict, key, set);
×
679
        Py_DECREF(key);
×
680
        Py_DECREF(set);
×
681
    }
682

683
    PyObject* result = PyTuple_Pack(2, id_to_path, pid_dict);
×
684
    Py_DECREF(id_to_path);
×
685
    Py_DECREF(pid_dict);
×
686
    return result;
×
687
}
×
688

689
#ifdef DFTRACER_UTILS_ENABLE_ARROW
690
#include <dftracer/utils/python/trace_reader_iterator.h>
691
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
692
#include <dftracer/utils/utilities/composites/dft/dfanalyzer/dfanalyzer_scan.h>
693

694
static PyObject* create_arrow_batch_capsule(
77✔
695
    dftracer::utils::utilities::common::arrow::ArrowExportResult&& result) {
696
    auto* obj = (ArrowBatchCapsuleObject*)ArrowBatchCapsuleType.tp_alloc(
77✔
697
        &ArrowBatchCapsuleType, 0);
698
    if (!obj) return nullptr;
77✔
699
    obj->result =
77✔
700
        new dftracer::utils::utilities::common::arrow::ArrowExportResult(
66!
701
            std::move(result));
77!
702
    return (PyObject*)obj;
77✔
703
}
33✔
704

705
namespace {
706

707
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
708

709
namespace dfanalyzer = dftracer::utils::utilities::composites::dft::dfanalyzer;
710
using dfanalyzer::AggScanInput;
711
using dfanalyzer::AggScanOutput;
712
using dfanalyzer::DfanalyzerScanInput;
713
using dfanalyzer::DfanalyzerScanOutput;
714
using dfanalyzer::GroupByConfig;
715
using dfanalyzer::open_agg_db;
716
using dfanalyzer::parse_group_by_name;
717
using dfanalyzer::scan_aggregation_shard_range;
718
using dfanalyzer::scan_dfanalyzer_shards;
719
using dfanalyzer::scan_system_metrics_buffer;
720

721
static bool parse_agg_type_str(const char* type_str, AggMapType& out) {
4✔
722
    if (strcmp(type_str, "events") == 0) {
4✔
723
        out = AggMapType::EVENT;
4✔
724
        return true;
4✔
725
    }
726
    if (strcmp(type_str, "profiles") == 0) {
×
727
        out = AggMapType::PROFILE;
×
728
        return true;
×
729
    }
730
    if (strcmp(type_str, "system") == 0) {
×
731
        out = AggMapType::SYSTEM;
×
732
        return true;
×
733
    }
734
    PyErr_SetString(PyExc_ValueError,
×
735
                    "type must be 'events', 'profiles', or 'system'");
736
    return false;
×
737
}
2✔
738

739
static std::optional<dftracer::utils::utilities::common::query::Query>
740
parse_query_arg(const char* query_str) {
26✔
741
    if (!query_str || query_str[0] == '\0') return std::nullopt;
26!
742
    auto result = dftracer::utils::utilities::common::query::Query::from_string(
9✔
743
        query_str);
18!
744
    if (!result) {
18✔
745
        PyErr_SetString(PyExc_ValueError, result.error().message.c_str());
2!
746
        return std::nullopt;
2✔
747
    }
748
    return std::move(*result);
16!
749
}
22✔
750

751
constexpr std::uint16_t DFT_NUM_SHARDS = 4096;
752

753
template <typename Output, typename ScanFn>
754
void parallel_shard_scan_range(Runtime* rt, std::uint16_t outer_begin,
24✔
755
                               std::uint16_t outer_end, ScanFn&& scan_fn,
756
                               std::vector<Output>& outputs) {
757
    if (outer_end <= outer_begin) return;
24!
758
    const std::size_t span = static_cast<std::size_t>(outer_end - outer_begin);
24✔
759
    const std::size_t num_tasks = std::min<std::size_t>(rt->threads(), span);
24!
760
    const std::size_t shards_per_task = (span + num_tasks - 1) / num_tasks;
24✔
761
    rt->submit(run_coro_scope(
48!
762
                   rt->executor(),
12✔
763
                   [&](CoroScope& scope) -> CoroTask<void> {
196!
764
                       std::vector<dftracer::utils::coro::SpawnFuture<Output>>
12✔
765
                           futures;
12✔
766
                       futures.reserve(num_tasks);
12!
767
                       for (std::size_t t = 0; t < num_tasks; ++t) {
48!
768
                           auto shard_begin = static_cast<std::uint16_t>(
72✔
769
                               outer_begin + t * shards_per_task);
36✔
770
                           auto shard_end =
36✔
771
                               static_cast<std::uint16_t>(std::min<std::size_t>(
36!
772
                                   outer_begin + (t + 1) * shards_per_task,
36✔
773
                                   outer_end));
36✔
774
                           futures.push_back(
36!
775
                               scope.spawn([&scan_fn, shard_begin, shard_end](
311!
776
                                               CoroScope&) -> CoroTask<Output> {
40!
777
                                   co_return scan_fn(shard_begin, shard_end);
36!
778
                               }));
779
                       }
36✔
780
                       outputs.reserve(num_tasks);
12!
781
                       for (auto& f : futures) {
148!
782
                           outputs.push_back(co_await f);
111!
783
                       }
36!
784
                   }),
112!
785
               "parallel-shard-scan-range")
12!
786
        .get();
24!
787
}
12✔
788

789
template <typename Output, typename ScanFn>
790
void parallel_shard_scan(Runtime* rt, ScanFn&& scan_fn,
24✔
791
                         std::vector<Output>& outputs) {
792
    parallel_shard_scan_range<Output>(rt, 0, DFT_NUM_SHARDS,
36✔
793
                                      std::forward<ScanFn>(scan_fn), outputs);
12✔
794
}
24✔
795

796
static void append_results_to_list(PyObject* list,
64✔
797
                                   std::vector<ArrowExportResult>& results) {
798
    for (auto& r : results) {
141✔
799
        PyObject* capsule = create_arrow_batch_capsule(std::move(r));
77!
800
        if (capsule) {
77✔
801
            PyList_Append(list, capsule);
77!
802
            Py_DECREF(capsule);
33✔
803
        }
33✔
804
    }
805
}
64✔
806

807
}  // namespace
808

809
static PyObject* Indexer_iter_aggregation(IndexerObject* self, PyObject* args,
×
810
                                          PyObject* kwds) {
811
    static const char* kwlist[] = {"type", "batch_size", nullptr};
812
    const char* type_str = "events";
×
813
    Py_ssize_t batch_size = 10000;
×
814

815
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|sn", (char**)kwlist,
×
816
                                     &type_str, &batch_size)) {
817
        return nullptr;
×
818
    }
819

820
    AggMapType target_type;
821
    if (!parse_agg_type_str(type_str, target_type)) return nullptr;
×
822

823
    AggregationBatchType batch_type;
824
    if (target_type == AggMapType::EVENT)
×
825
        batch_type = AggregationBatchType::EVENT;
×
826
    else if (target_type == AggMapType::PROFILE)
×
827
        batch_type = AggregationBatchType::PROFILE;
×
828
    else
829
        batch_type = AggregationBatchType::SYSTEM;
×
830

831
    auto idx_opt = resolve_index_path(self);
×
832
    if (!idx_opt) return nullptr;
×
833
    std::string index_path = std::move(*idx_opt);
×
834

835
    PyObject* batch_list = PyList_New(0);
×
836
    if (!batch_list) return nullptr;
×
837

838
    std::string error_msg;
×
839
    std::vector<dftracer::utils::utilities::common::arrow::ArrowExportResult>
840
        results;
×
841

842
    Py_BEGIN_ALLOW_THREADS try {
×
843
        auto handle = open_agg_db(index_path, error_msg);
×
844
        if (handle) {
×
845
            Runtime* rt = get_batch_indexer_runtime(self);
×
846
            std::vector<AggScanOutput> outputs;
×
847
            parallel_shard_scan<AggScanOutput>(
×
848
                rt,
849
                [&](std::uint16_t shard_begin, std::uint16_t shard_end) {
×
850
                    AggScanInput input;
851
                    input.agg = handle->agg.get();
×
852
                    input.target_type = target_type;
×
853
                    input.batch_type = batch_type;
×
854
                    input.batch_size = batch_size;
×
855
                    input.shard_begin = shard_begin;
×
856
                    input.shard_end = shard_end;
×
857
                    return scan_aggregation_shard_range(input);
×
858
                },
859
                outputs);
860

861
            for (auto& out : outputs) {
×
862
                for (auto& r : out.results) {
×
863
                    results.push_back(std::move(r));
×
864
                }
865
            }
866
        }
×
867
    } catch (const std::exception& e) {
×
868
        error_msg = e.what();
×
869
    }
×
870
    Py_END_ALLOW_THREADS
×
871

872
        if (!error_msg.empty()) {
×
873
        Py_DECREF(batch_list);
×
874
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
875
        return nullptr;
×
876
    }
877

878
    append_results_to_list(batch_list, results);
×
879

880
    PyObject* iter = PyObject_GetIter(batch_list);
×
881
    Py_DECREF(batch_list);
×
882
    return iter;
×
883
}
×
884

885
static PyObject* Indexer_iter_arrow_dfanalyzer(IndexerObject* self,
4✔
886
                                               PyObject* args, PyObject* kwds) {
887
    static const char* kwlist[] = {
888
        "type",  "batch_size", "time_granularity", "time_resolution",
889
        "query", nullptr};
890
    const char* type_str = "events";
4✔
891
    Py_ssize_t batch_size = 10000;
4✔
892
    double time_granularity = 1.0;
4✔
893
    double time_resolution = 1000000.0;
4✔
894
    const char* query_str = nullptr;
4✔
895

896
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|snddz", (char**)kwlist,
4!
897
                                     &type_str, &batch_size, &time_granularity,
898
                                     &time_resolution, &query_str)) {
899
        return nullptr;
×
900
    }
901

902
    AggMapType target_type;
903
    if (!parse_agg_type_str(type_str, target_type)) return nullptr;
4!
904

905
    auto query_opt = parse_query_arg(query_str);
4!
906
    if (!query_opt && PyErr_Occurred()) return nullptr;
4!
907

908
    auto idx_opt = resolve_index_path(self);
4!
909
    if (!idx_opt) return nullptr;
4✔
910
    std::string index_path = std::move(*idx_opt);
4✔
911

912
    PyObject* batch_list = PyList_New(0);
4!
913
    if (!batch_list) return nullptr;
4✔
914

915
    std::string error_msg;
4✔
916
    std::vector<ArrowExportResult> results;
4✔
917

918
    Py_BEGIN_ALLOW_THREADS try {
4!
919
        auto handle = open_agg_db(index_path, error_msg);
4!
920
        if (handle) {
4✔
921
            dftracer::utils::utilities::indexer::IndexDatabase idx_db(
2!
922
                index_path,
923
                dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
2!
924
            auto file_hashes =
925
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
2!
926
                                            IndexDatabase::HashType::FILE);
2!
927
            auto host_hashes =
928
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
2!
929
                                            IndexDatabase::HashType::HOST);
2!
930

931
            auto time_bounds = handle->agg->query_time_bounds();
4!
932
            std::uint64_t time_origin =
4✔
933
                time_bounds.valid ? time_bounds.min_time_bucket : 0;
4!
934

935
            DfanalyzerContext ctx;
4✔
936
            ctx.file_hashes = &file_hashes;
4✔
937
            ctx.host_hashes = &host_hashes;
4✔
938
            ctx.query_filter = query_opt ? &*query_opt : nullptr;
4!
939
            ctx.time_origin = time_origin;
4✔
940
            ctx.time_resolution = time_resolution;
4✔
941
            ctx.time_granularity = time_granularity;
4✔
942

943
            Runtime* rt = get_batch_indexer_runtime(self);
4!
944
            std::vector<DfanalyzerScanOutput> outputs;
4✔
945
            parallel_shard_scan<DfanalyzerScanOutput>(
4!
946
                rt,
2✔
947
                [&](std::uint16_t shard_begin, std::uint16_t shard_end) {
15✔
948
                    DfanalyzerScanInput input;
13✔
949
                    input.agg = handle->agg.get();
13✔
950
                    input.ctx = &ctx;
13✔
951
                    input.type_filter = target_type;
13✔
952
                    input.batch_size = batch_size;
12✔
953
                    input.shard_begin = shard_begin;
12✔
954
                    input.shard_end = shard_end;
12✔
955
                    return scan_dfanalyzer_shards(input);
20!
956
                },
957
                outputs);
958

959
            for (auto& out : outputs) {
18✔
960
                for (auto& r : out.events) results.push_back(std::move(r));
28✔
961
                for (auto& r : out.profiles) results.push_back(std::move(r));
14!
962
                for (auto& r : out.system) results.push_back(std::move(r));
14!
963
            }
964
        }
4✔
965
    } catch (const std::exception& e) {
4!
966
        error_msg = e.what();
×
967
    }
×
968
    Py_END_ALLOW_THREADS
4!
969

970
        if (!error_msg.empty()) {
4!
971
        Py_DECREF(batch_list);
×
972
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
973
        return nullptr;
×
974
    }
975

976
    append_results_to_list(batch_list, results);
4!
977

978
    PyObject* iter = PyObject_GetIter(batch_list);
4!
979
    Py_DECREF(batch_list);
2!
980
    return iter;
4✔
981
}
4✔
982

983
static bool parse_group_by_arg(PyObject* obj, GroupByConfig& out) {
20✔
984
    if (!obj || obj == Py_None) return true;
20!
985
    if (!PySequence_Check(obj)) {
×
986
        PyErr_SetString(PyExc_TypeError,
×
987
                        "group_by must be a sequence of strings or None");
988
        return false;
×
989
    }
990
    Py_ssize_t n = PySequence_Length(obj);
×
991
    for (Py_ssize_t i = 0; i < n; ++i) {
×
992
        PyObject* item = PySequence_GetItem(obj, i);
×
993
        if (!item) return false;
×
994
        if (!PyUnicode_Check(item)) {
×
995
            Py_DECREF(item);
996
            PyErr_SetString(PyExc_TypeError,
×
997
                            "group_by entries must be strings");
998
            return false;
×
999
        }
1000
        Py_ssize_t sz = 0;
×
1001
        const char* s = PyUnicode_AsUTF8AndSize(item, &sz);
×
1002
        if (!s) {
×
1003
            Py_DECREF(item);
1004
            return false;
×
1005
        }
1006
        std::string_view sv(s, static_cast<std::size_t>(sz));
×
1007
        auto field = parse_group_by_name(sv);
×
1008
        if (!field) {
×
1009
            std::string msg = "unsupported group_by field: ";
×
1010
            msg.append(sv);
×
1011
            Py_DECREF(item);
×
1012
            PyErr_SetString(PyExc_ValueError, msg.c_str());
×
1013
            return false;
×
1014
        }
×
1015
        if (!(out.mask & *field)) {
×
1016
            out.mask |= *field;
×
1017
            out.order.push_back(*field);
×
1018
            out.names.emplace_back(sv);
×
1019
        }
1020
        Py_DECREF(item);
1021
    }
1022
    return true;
×
1023
}
10✔
1024

1025
static PyObject* Indexer_iter_arrow_dfanalyzer_all(IndexerObject* self,
22✔
1026
                                                   PyObject* args,
1027
                                                   PyObject* kwds) {
1028
    static const char* kwlist[] = {"batch_size",      "time_granularity",
1029
                                   "time_resolution", "query",
1030
                                   "group_by",        nullptr};
1031
    Py_ssize_t batch_size = 10000;
22✔
1032
    double time_granularity = 1.0;
22✔
1033
    double time_resolution = 1000000.0;
22✔
1034
    const char* query_str = nullptr;
22✔
1035
    PyObject* group_by_obj = nullptr;
22✔
1036

1037
    if (!PyArg_ParseTupleAndKeywords(
22!
1038
            args, kwds, "|nddzO", (char**)kwlist, &batch_size,
11✔
1039
            &time_granularity, &time_resolution, &query_str, &group_by_obj)) {
1040
        return nullptr;
×
1041
    }
1042

1043
    auto query_opt = parse_query_arg(query_str);
22!
1044
    if (!query_opt && PyErr_Occurred()) return nullptr;
22!
1045

1046
    GroupByConfig group_by_cfg;
20✔
1047
    if (!parse_group_by_arg(group_by_obj, group_by_cfg)) return nullptr;
20!
1048
    const GroupByConfig* group_by_ptr =
20✔
1049
        group_by_cfg.mask != 0 ? &group_by_cfg : nullptr;
20!
1050

1051
    auto idx_opt = resolve_index_path(self);
20!
1052
    if (!idx_opt) return nullptr;
20✔
1053
    std::string index_path = std::move(*idx_opt);
20✔
1054

1055
    PyObject* result_dict = PyDict_New();
20!
1056
    if (!result_dict) return nullptr;
20✔
1057

1058
    PyObject* events_list = PyList_New(0);
20!
1059
    PyObject* profiles_list = PyList_New(0);
20!
1060
    PyObject* system_list = PyList_New(0);
20!
1061
    if (!events_list || !profiles_list || !system_list) {
20!
1062
        Py_XDECREF(events_list);
×
1063
        Py_XDECREF(profiles_list);
×
1064
        Py_XDECREF(system_list);
×
1065
        Py_DECREF(result_dict);
×
1066
        return nullptr;
×
1067
    }
1068

1069
    std::string error_msg;
20✔
1070
    std::vector<ArrowExportResult> events_results, profiles_results,
20✔
1071
        system_results;
20✔
1072

1073
    Py_BEGIN_ALLOW_THREADS try {
20!
1074
        auto handle = open_agg_db(index_path, error_msg);
20!
1075
        if (handle) {
20✔
1076
            dftracer::utils::utilities::indexer::IndexDatabase idx_db(
10!
1077
                index_path,
1078
                dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
10!
1079
            auto file_hashes =
1080
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
10!
1081
                                            IndexDatabase::HashType::FILE);
10!
1082
            auto host_hashes =
1083
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
10!
1084
                                            IndexDatabase::HashType::HOST);
10!
1085

1086
            auto time_bounds = handle->agg->query_time_bounds();
20!
1087
            std::uint64_t time_origin =
20✔
1088
                time_bounds.valid ? time_bounds.min_time_bucket : 0;
20!
1089

1090
            DfanalyzerContext ctx;
20✔
1091
            ctx.file_hashes = &file_hashes;
20✔
1092
            ctx.host_hashes = &host_hashes;
20✔
1093
            ctx.query_filter = query_opt ? &*query_opt : nullptr;
20✔
1094
            ctx.time_origin = time_origin;
20✔
1095
            ctx.time_resolution = time_resolution;
20✔
1096
            ctx.time_granularity = time_granularity;
20✔
1097

1098
            Runtime* rt = get_batch_indexer_runtime(self);
20!
1099
            std::vector<DfanalyzerScanOutput> outputs;
20✔
1100
            parallel_shard_scan<DfanalyzerScanOutput>(
20!
1101
                rt,
10✔
1102
                [&](std::uint16_t shard_begin, std::uint16_t shard_end) {
75✔
1103
                    DfanalyzerScanInput input;
65✔
1104
                    input.agg = handle->agg.get();
64✔
1105
                    input.ctx = &ctx;
64✔
1106
                    input.type_filter = std::nullopt;
64✔
1107
                    input.batch_size = batch_size;
64✔
1108
                    input.shard_begin = shard_begin;
64✔
1109
                    input.shard_end = shard_end;
64✔
1110
                    input.group_by = group_by_ptr;
64✔
1111
                    return scan_dfanalyzer_shards(input);
104!
1112
                },
1113
                outputs);
1114

1115
            for (auto& out : outputs) {
90✔
1116
                for (auto& r : out.events)
133✔
1117
                    events_results.push_back(std::move(r));
63!
1118
                for (auto& r : out.profiles)
70✔
1119
                    profiles_results.push_back(std::move(r));
×
1120
                for (auto& r : out.system)
70✔
1121
                    system_results.push_back(std::move(r));
×
1122
            }
1123

1124
            auto sys_buf =
1125
                scan_system_metrics_buffer(handle->agg.get(), &ctx, batch_size);
20!
1126
            for (auto& r : sys_buf) system_results.push_back(std::move(r));
20!
1127
        }
20✔
1128
    } catch (const std::exception& e) {
20!
1129
        error_msg = e.what();
×
1130
    }
×
1131
    Py_END_ALLOW_THREADS
20!
1132

1133
        if (!error_msg.empty()) {
20!
1134
        Py_DECREF(events_list);
×
1135
        Py_DECREF(profiles_list);
×
1136
        Py_DECREF(system_list);
×
1137
        Py_DECREF(result_dict);
×
1138
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
1139
        return nullptr;
×
1140
    }
1141

1142
    append_results_to_list(events_list, events_results);
20!
1143
    append_results_to_list(profiles_list, profiles_results);
20!
1144
    append_results_to_list(system_list, system_results);
20!
1145

1146
    PyDict_SetItemString(result_dict, "events", events_list);
20!
1147
    PyDict_SetItemString(result_dict, "profiles", profiles_list);
20!
1148
    PyDict_SetItemString(result_dict, "system", system_list);
20!
1149
    Py_DECREF(events_list);
10!
1150
    Py_DECREF(profiles_list);
10!
1151
    Py_DECREF(system_list);
10!
1152

1153
    return result_dict;
20✔
1154
}
22✔
1155

1156
// ---------------------------------------------------------------------------
1157
// scan_aggregation_manifest — module-level entry point for analyze_trace.
1158
//
1159
// Each Dask worker calls this with its slice of the agg manifest
1160
// (agg_ssts + sys_ssts) and optionally a [shard_begin, shard_end) range.
1161
// The function opens a scratch IndexDatabase at `scratch_dir`, ingests the
1162
// SSTs into its AGGREGATION/SYSTEM_METRICS CFs (nearly free when SSTs live
1163
// on the same filesystem as `scratch_dir` — RocksDB hard-links them), then
1164
// runs the same parallel shard scan that `iter_arrow_dfanalyzer_all` uses.
1165
//
1166
// AGG_GLOBAL_CONFIG_KEY is not written by worker SSTs, so we construct the
1167
// EventAggregator with config_hash=0 directly instead of going through
1168
// `open_agg_db` (which requires the config key). The config hash is used
1169
// by the aggregator only for write-time validation, not for reads.
1170
//
1171
// The scratch DB is NOT cleaned up here — the Python caller owns
1172
// `scratch_dir` lifetime and should remove it after gathering results.
1173
// ---------------------------------------------------------------------------
1174

1175
static bool collect_string_string_dict(
×
1176
    PyObject* obj, const char* name,
1177
    std::unordered_map<std::string, std::string>& out) {
1178
    if (!obj || obj == Py_None) return true;
×
1179
    if (!PyDict_Check(obj)) {
×
1180
        PyErr_Format(PyExc_TypeError, "%s must be a dict[str, str] or None",
×
1181
                     name);
1182
        return false;
×
1183
    }
1184
    PyObject *k, *v;
1185
    Py_ssize_t pos = 0;
×
1186
    while (PyDict_Next(obj, &pos, &k, &v)) {
×
1187
        if (!PyUnicode_Check(k) || !PyUnicode_Check(v)) {
×
1188
            PyErr_Format(PyExc_TypeError, "%s must map str -> str", name);
×
1189
            return false;
×
1190
        }
1191
        const char* ks = PyUnicode_AsUTF8(k);
×
1192
        const char* vs = PyUnicode_AsUTF8(v);
×
1193
        if (!ks || !vs) return false;
×
1194
        out.emplace(ks, vs);
×
1195
    }
1196
    return true;
×
1197
}
1198

1199
static PyObject* scan_aggregation_manifest_fn(PyObject* /*self*/,
×
1200
                                              PyObject* args, PyObject* kwds) {
1201
    static const char* kwlist[] = {
1202
        "agg_ssts",        "sys_ssts",    "scratch_dir",
1203
        "meta_index_path", "batch_size",  "time_granularity",
1204
        "time_resolution", "query",       "group_by",
1205
        "shard_begin",     "shard_end",   "runtime",
1206
        "file_hashes",     "host_hashes", nullptr};
1207

1208
    PyObject* agg_ssts_obj = nullptr;
×
1209
    PyObject* sys_ssts_obj = nullptr;
×
1210
    const char* scratch_dir = nullptr;
×
1211
    const char* meta_index_path = nullptr;
×
1212
    Py_ssize_t batch_size = 10000;
×
1213
    double time_granularity = 1.0;
×
1214
    double time_resolution = 1000000.0;
×
1215
    const char* query_str = nullptr;
×
1216
    PyObject* group_by_obj = nullptr;
×
1217
    int shard_begin_i = 0;
×
1218
    int shard_end_i = DFT_NUM_SHARDS;
×
1219
    PyObject* runtime_obj = nullptr;
×
1220
    PyObject* file_hashes_obj = nullptr;
×
1221
    PyObject* host_hashes_obj = nullptr;
×
1222

1223
    if (!PyArg_ParseTupleAndKeywords(
×
1224
            args, kwds, "OOss|nddzOiiOOO", (char**)kwlist, &agg_ssts_obj,
1225
            &sys_ssts_obj, &scratch_dir, &meta_index_path, &batch_size,
1226
            &time_granularity, &time_resolution, &query_str, &group_by_obj,
1227
            &shard_begin_i, &shard_end_i, &runtime_obj, &file_hashes_obj,
1228
            &host_hashes_obj)) {
1229
        return nullptr;
×
1230
    }
1231

1232
    if (shard_begin_i < 0 || shard_end_i > DFT_NUM_SHARDS ||
×
1233
        shard_begin_i >= shard_end_i) {
×
1234
        PyErr_Format(PyExc_ValueError,
×
1235
                     "shard range [%d, %d) invalid (must be within [0, %d))",
1236
                     shard_begin_i, shard_end_i, (int)DFT_NUM_SHARDS);
1237
        return nullptr;
×
1238
    }
1239

1240
    std::vector<std::string> agg_ssts;
×
1241
    std::vector<std::string> sys_ssts;
×
1242
    if (!parse_str_list(agg_ssts_obj, "agg_ssts", agg_ssts)) return nullptr;
×
1243
    if (!parse_str_list(sys_ssts_obj, "sys_ssts", sys_ssts)) return nullptr;
×
1244

1245
    std::unordered_map<std::string, std::string> preloaded_file_hashes;
×
1246
    std::unordered_map<std::string, std::string> preloaded_host_hashes;
×
1247
    const bool hashes_preloaded =
×
1248
        (file_hashes_obj && file_hashes_obj != Py_None) ||
×
1249
        (host_hashes_obj && host_hashes_obj != Py_None);
×
1250
    if (!collect_string_string_dict(file_hashes_obj, "file_hashes",
×
1251
                                    preloaded_file_hashes))
1252
        return nullptr;
×
1253
    if (!collect_string_string_dict(host_hashes_obj, "host_hashes",
×
1254
                                    preloaded_host_hashes))
1255
        return nullptr;
×
1256

1257
    auto query_opt = parse_query_arg(query_str);
×
1258
    if (!query_opt && PyErr_Occurred()) return nullptr;
×
1259

1260
    GroupByConfig group_by_cfg;
×
1261
    if (!parse_group_by_arg(group_by_obj, group_by_cfg)) return nullptr;
×
1262
    const GroupByConfig* group_by_ptr =
×
1263
        group_by_cfg.mask != 0 ? &group_by_cfg : nullptr;
×
1264

1265
    Runtime* rt = nullptr;
×
1266
    if (runtime_obj && runtime_obj != Py_None) {
×
1267
        if (!PyObject_TypeCheck(runtime_obj, &RuntimeType)) {
×
1268
            PyErr_SetString(PyExc_TypeError,
×
1269
                            "runtime must be a Runtime instance or None");
1270
            return nullptr;
×
1271
        }
1272
        rt = ((RuntimeObject*)runtime_obj)->runtime.get();
×
1273
    } else {
1274
        rt = get_default_runtime();
×
1275
    }
1276

1277
    PyObject* result_dict = PyDict_New();
×
1278
    if (!result_dict) return nullptr;
×
1279
    PyObject* events_list = PyList_New(0);
×
1280
    PyObject* profiles_list = PyList_New(0);
×
1281
    PyObject* system_list = PyList_New(0);
×
1282
    if (!events_list || !profiles_list || !system_list) {
×
1283
        Py_XDECREF(events_list);
×
1284
        Py_XDECREF(profiles_list);
×
1285
        Py_XDECREF(system_list);
×
1286
        Py_DECREF(result_dict);
×
1287
        return nullptr;
×
1288
    }
1289

1290
    std::string error_msg;
×
1291
    std::vector<ArrowExportResult> events_results, profiles_results,
×
1292
        system_results;
×
1293
    std::string scratch_index_path = std::string(scratch_dir) + "/.dftindex";
×
1294
    std::string meta_index_path_str(meta_index_path);
×
1295

1296
    Py_BEGIN_ALLOW_THREADS try {
×
1297
        namespace rcf = dftracer::utils::rocksdb::cf;
1298
        using clock = std::chrono::steady_clock;
1299
        auto ms = [](clock::time_point a, clock::time_point b) -> long long {
×
1300
            return std::chrono::duration_cast<std::chrono::milliseconds>(b - a)
×
1301
                .count();
×
1302
        };
1303

1304
        auto t_start = clock::now();
×
1305
        dftracer::utils::utilities::indexer::IndexDatabase scratch_db(
×
1306
            scratch_index_path);
×
1307
        auto t_scratch_open = clock::now();
×
1308

1309
        auto raw_db = scratch_db.db();
×
1310
        for (const auto& p : agg_ssts) {
×
1311
            auto st = raw_db->ingest_external_files(rcf::AGGREGATION, {p},
×
1312
                                                    /*ingest_behind=*/false);
×
1313
            if (!st.ok()) {
×
1314
                error_msg =
1315
                    "ingest AGGREGATION sst '" + p + "': " + st.ToString();
×
1316
                break;
×
1317
            }
1318
        }
×
1319
        if (error_msg.empty()) {
×
1320
            for (const auto& p : sys_ssts) {
×
1321
                auto st = raw_db->ingest_external_files(
×
1322
                    rcf::SYSTEM_METRICS, {p}, /*ingest_behind=*/false);
×
1323
                if (!st.ok()) {
×
1324
                    error_msg = "ingest SYSTEM_METRICS sst '" + p +
×
1325
                                "': " + st.ToString();
×
1326
                    break;
×
1327
                }
1328
            }
×
1329
        }
1330
        auto t_ingest = clock::now();
×
1331

1332
        if (error_msg.empty()) {
×
1333
            auto agg =
1334
                std::make_unique<EventAggregator>(raw_db, /*cfg_hash=*/0);
×
1335

1336
            // If the caller passed pre-loaded hash tables, skip opening
1337
            // the meta DB on lustre. When many dask workers run
1338
            // scan_aggregation_manifest in parallel, loading the hash
1339
            // tables N times from the same file is significant lustre
1340
            // metadata pressure; loading once on the coordinator and
1341
            // passing them in eliminates the redundant reads.
1342
            std::unordered_map<std::string, std::string> loaded_file_hashes;
×
1343
            std::unordered_map<std::string, std::string> loaded_host_hashes;
×
1344
            std::unique_ptr<dftracer::utils::utilities::indexer::IndexDatabase>
1345
                meta_db;
×
1346
            if (!hashes_preloaded) {
×
1347
                meta_db = std::make_unique<
×
1348
                    dftracer::utils::utilities::indexer::IndexDatabase>(
1349
                    meta_index_path_str, dftracer::utils::rocksdb::
1350
                                             RocksDatabase::OpenMode::ReadOnly);
×
1351
                loaded_file_hashes = meta_db->query_hash_table(
×
1352
                    dftracer::utils::utilities::indexer::IndexDatabase::
1353
                        HashType::FILE);
1354
                loaded_host_hashes = meta_db->query_hash_table(
×
1355
                    dftracer::utils::utilities::indexer::IndexDatabase::
1356
                        HashType::HOST);
1357
            }
1358
            const auto& file_hashes =
×
1359
                hashes_preloaded ? preloaded_file_hashes : loaded_file_hashes;
×
1360
            const auto& host_hashes =
×
1361
                hashes_preloaded ? preloaded_host_hashes : loaded_host_hashes;
×
1362
            auto t_hash_tables = clock::now();
×
1363

1364
            auto time_bounds = agg->query_time_bounds();
×
1365
            std::uint64_t time_origin =
×
1366
                time_bounds.valid ? time_bounds.min_time_bucket : 0;
×
1367

1368
            DfanalyzerContext ctx;
×
1369
            ctx.file_hashes = &file_hashes;
×
1370
            ctx.host_hashes = &host_hashes;
×
1371
            ctx.query_filter = query_opt ? &*query_opt : nullptr;
×
1372
            ctx.time_origin = time_origin;
×
1373
            ctx.time_resolution = time_resolution;
×
1374
            ctx.time_granularity = time_granularity;
×
1375

1376
            std::vector<DfanalyzerScanOutput> outputs;
×
1377
            parallel_shard_scan_range<DfanalyzerScanOutput>(
×
1378
                rt, static_cast<std::uint16_t>(shard_begin_i),
1379
                static_cast<std::uint16_t>(shard_end_i),
1380
                [&](std::uint16_t sb, std::uint16_t se) {
×
1381
                    DfanalyzerScanInput input;
×
1382
                    input.agg = agg.get();
×
1383
                    input.ctx = &ctx;
×
1384
                    input.type_filter = std::nullopt;
×
1385
                    input.batch_size = batch_size;
×
1386
                    input.shard_begin = sb;
×
1387
                    input.shard_end = se;
×
1388
                    input.group_by = group_by_ptr;
×
1389
                    return scan_dfanalyzer_shards(input);
×
1390
                },
1391
                outputs);
1392
            auto t_scan = clock::now();
×
1393

1394
            for (auto& out : outputs) {
×
1395
                for (auto& r : out.events)
×
1396
                    events_results.push_back(std::move(r));
×
1397
                for (auto& r : out.profiles)
×
1398
                    profiles_results.push_back(std::move(r));
×
1399
                for (auto& r : out.system)
×
1400
                    system_results.push_back(std::move(r));
×
1401
            }
1402

1403
            std::fprintf(
×
1404
                stderr,
1405
                "[scan_aggregation_manifest] n_agg=%zu n_sys=%zu "
1406
                "scratch_open=%lldms ingest=%lldms hash_tables=%lldms "
1407
                "scan=%lldms\n",
1408
                agg_ssts.size(), sys_ssts.size(), ms(t_start, t_scratch_open),
×
1409
                ms(t_scratch_open, t_ingest), ms(t_ingest, t_hash_tables),
×
1410
                ms(t_hash_tables, t_scan));
×
1411
            std::fflush(stderr);
×
1412
        }
×
1413
    } catch (const std::exception& e) {
×
1414
        error_msg = e.what();
×
1415
    }
×
1416
    Py_END_ALLOW_THREADS
×
1417

1418
        if (!error_msg.empty()) {
×
1419
        Py_DECREF(events_list);
×
1420
        Py_DECREF(profiles_list);
×
1421
        Py_DECREF(system_list);
×
1422
        Py_DECREF(result_dict);
×
1423
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
1424
        return nullptr;
×
1425
    }
1426

1427
    append_results_to_list(events_list, events_results);
×
1428
    append_results_to_list(profiles_list, profiles_results);
×
1429
    append_results_to_list(system_list, system_results);
×
1430

1431
    PyDict_SetItemString(result_dict, "events", events_list);
×
1432
    PyDict_SetItemString(result_dict, "profiles", profiles_list);
×
1433
    PyDict_SetItemString(result_dict, "system", system_list);
×
1434
    Py_DECREF(events_list);
×
1435
    Py_DECREF(profiles_list);
×
1436
    Py_DECREF(system_list);
×
1437

1438
    return result_dict;
×
1439
}
×
1440

1441
static PyMethodDef BatchIndexerModuleMethods[] = {
1442
    {"scan_aggregation_manifest", (PyCFunction)scan_aggregation_manifest_fn,
1443
     METH_VARARGS | METH_KEYWORDS,
1444
     "scan_aggregation_manifest(agg_ssts, sys_ssts, scratch_dir, "
1445
     "meta_index_path, batch_size=10000, time_granularity=1.0, "
1446
     "time_resolution=1e6, query=None, group_by=None, shard_begin=0, "
1447
     "shard_end=4096, runtime=None) -> dict\n"
1448
     "--\n\n"
1449
     "Scan a worker's slice of the distributed aggregation manifest.\n\n"
1450
     "Ingests agg_ssts + sys_ssts into a scratch IndexDatabase at "
1451
     "scratch_dir (caller owns the directory lifecycle) and runs the "
1452
     "dfanalyzer aggregation scan over [shard_begin, shard_end). "
1453
     "meta_index_path is the unified .dftindex used to resolve file / "
1454
     "host hashes. Returns the same dict shape as "
1455
     "Indexer.iter_arrow_dfanalyzer_all."},
1456
    {nullptr, nullptr, 0, nullptr}};
1457
#endif
1458

1459
static PyMethodDef Indexer_methods[] = {
1460
    {"get_checkpoint_indexer", (PyCFunction)Indexer_get_checkpoint_indexer,
1461
     METH_VARARGS,
1462
     "get_checkpoint_indexer(file_path)\n"
1463
     "--\n\n"
1464
     "Get a checkpoint indexer for a specific file.\n\n"
1465
     "Args:\n"
1466
     "    file_path: Path to the trace file (.pfw/.pfw.gz)\n\n"
1467
     "Returns:\n"
1468
     "    Indexer instance for checkpoint-level operations.\n"},
1469
    {"resolve", (PyCFunction)Indexer_resolve, METH_NOARGS,
1470
     "resolve()\n"
1471
     "--\n\n"
1472
     "Check what files exist vs need indexing.\n\n"
1473
     "Returns:\n"
1474
     "    dict with 'total_files', 'ready', 'needs_work', 'index_path'\n"},
1475
    {"build", (PyCFunction)Indexer_build, METH_NOARGS,
1476
     "build()\n"
1477
     "--\n\n"
1478
     "Build all missing index tiers based on require_* flags.\n"},
1479
    {"ensure_indexed", (PyCFunction)Indexer_ensure_indexed, METH_NOARGS,
1480
     "ensure_indexed()\n"
1481
     "--\n\n"
1482
     "Resolve and build if needed.\n\n"
1483
     "Returns:\n"
1484
     "    dict with index status after building.\n"},
1485
    {"get_hash_table", (PyCFunction)Indexer_get_hash_table, METH_VARARGS,
1486
     "get_hash_table(type)\n"
1487
     "--\n\n"
1488
     "Query hash table mappings.\n\n"
1489
     "Args:\n"
1490
     "    type: 'file', 'host', 'string', or 'proc'\n\n"
1491
     "Returns:\n"
1492
     "    dict mapping hash values to resolved names.\n"},
1493
    {"query_file_pids", (PyCFunction)Indexer_query_file_pids, METH_VARARGS,
1494
     "query_file_pids(file_id)\n"
1495
     "--\n\n"
1496
     "Query PIDs observed in a specific file.\n\n"
1497
     "Args:\n"
1498
     "    file_id: Integer file ID from index.\n\n"
1499
     "Returns:\n"
1500
     "    set of PIDs.\n"},
1501
    {"query_all_file_pids", (PyCFunction)Indexer_query_all_file_pids,
1502
     METH_NOARGS,
1503
     "query_all_file_pids()\n"
1504
     "--\n\n"
1505
     "Query PIDs for all indexed files.\n\n"
1506
     "Returns:\n"
1507
     "    dict mapping file_id to set of PIDs.\n"},
1508
    {"query_file_info", (PyCFunction)Indexer_query_file_info, METH_NOARGS,
1509
     "query_file_info()\n"
1510
     "--\n\n"
1511
     "Query file ID to path mapping and per-file PIDs in one call.\n\n"
1512
     "Returns:\n"
1513
     "    tuple of (dict[int, str], dict[int, set[int]]).\n"},
1514
#ifdef DFTRACER_UTILS_ENABLE_ARROW
1515
    {"iter_aggregation", (PyCFunction)Indexer_iter_aggregation,
1516
     METH_VARARGS | METH_KEYWORDS,
1517
     "iter_aggregation(type='events', batch_size=10000)\n"
1518
     "--\n\n"
1519
     "Iterate over aggregation data as Arrow batches.\n\n"
1520
     "Args:\n"
1521
     "    type: 'events', 'profiles', or 'system'\n"
1522
     "    batch_size: Number of entries per batch (default 10000)\n\n"
1523
     "Returns:\n"
1524
     "    Iterator over Arrow batch capsules.\n"},
1525
    {"iter_arrow_dfanalyzer", (PyCFunction)Indexer_iter_arrow_dfanalyzer,
1526
     METH_VARARGS | METH_KEYWORDS,
1527
     "iter_arrow_dfanalyzer(type='events', batch_size=10000, "
1528
     "time_granularity=1.0, time_resolution=1e6, query=None)\n"
1529
     "--\n\n"
1530
     "Iterate over aggregation data as dfanalyzer-compatible Arrow batches.\n\n"
1531
     "Output schema matches dfanalyzer expectations with resolved hashes,\n"
1532
     "normalized time_range, and computed columns (proc_name, io_cat).\n\n"
1533
     "Args:\n"
1534
     "    type: 'events', 'profiles', or 'system'\n"
1535
     "    batch_size: Number of entries per batch (default 10000)\n"
1536
     "    time_granularity: Bucket width in seconds (default 1.0)\n"
1537
     "    time_resolution: Microseconds per output time unit (default 1e6)\n"
1538
     "    query: Optional query filter string (e.g., \"pid == 1234\")\n\n"
1539
     "Returns:\n"
1540
     "    Iterator over Arrow batch capsules.\n"},
1541
    {"iter_arrow_dfanalyzer_all",
1542
     (PyCFunction)Indexer_iter_arrow_dfanalyzer_all,
1543
     METH_VARARGS | METH_KEYWORDS,
1544
     "iter_arrow_dfanalyzer_all(batch_size=10000, time_granularity=1.0, "
1545
     "time_resolution=1e6, query=None, group_by=None)\n"
1546
     "--\n\n"
1547
     "Iterate over all aggregation types in a single scan.\n\n"
1548
     "Returns a dict with 'events', 'profiles', 'system' keys, each "
1549
     "containing\n"
1550
     "a list of Arrow batch capsules. This is ~3x faster than calling\n"
1551
     "iter_arrow_dfanalyzer separately for each type.\n\n"
1552
     "When group_by is provided, the scan collapses dimensions during "
1553
     "aggregation\n"
1554
     "and emits a reduced schema containing only the requested columns plus\n"
1555
     "aggregated metrics (count, time, size, time_sq, size_sq, time_min,\n"
1556
     "time_max, size_min, size_max, time_call_min, time_call_max, "
1557
     "size_call_min,\n"
1558
     "size_call_max, time_start, time_end). Supported group_by columns: "
1559
     "cat,\n"
1560
     "func_name, pid, tid, file_hash, host_hash, file_name, host_name, "
1561
     "proc_name,\n"
1562
     "io_cat, acc_pat, time_range.\n\n"
1563
     "Args:\n"
1564
     "    batch_size: Number of entries per batch (default 10000)\n"
1565
     "    time_granularity: Bucket width in seconds (default 1.0)\n"
1566
     "    time_resolution: Microseconds per output time unit (default 1e6)\n"
1567
     "    query: Optional query filter string\n"
1568
     "    group_by: Optional list of columns to group by; enables coarse\n"
1569
     "        in-scan aggregation (default None = full granularity)\n\n"
1570
     "Returns:\n"
1571
     "    dict with 'events', 'profiles', 'system' lists of Arrow capsules.\n"},
1572
#endif
1573
    {nullptr}};
1574

1575
static PyGetSetDef Indexer_getsetters[] = {{nullptr}};
1576

1577
PyTypeObject IndexerType = {
1578
    PyVarObject_HEAD_INIT(nullptr, 0) "dftracer_utils_ext.Indexer",
1579
    sizeof(IndexerObject),
1580
    0,
1581
    (destructor)Indexer_dealloc,
1582
    0,
1583
    0,
1584
    0,
1585
    0,
1586
    0,
1587
    0,
1588
    0,
1589
    0,
1590
    0,
1591
    0,
1592
    0,
1593
    0,
1594
    0,
1595
    0,
1596
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
1597
    "BatchIndexer(directory='', files=None, index_dir='',\n"
1598
    "             require_checkpoint=True, require_bloom=True,\n"
1599
    "             require_manifest=True, require_aggregation=False,\n"
1600
    "             time_interval_ms=5000.0, group_keys=None,\n"
1601
    "             custom_metric_fields=None, compute_percentiles=False,\n"
1602
    "             parallelism=0, force_rebuild=False, runtime=None)\n"
1603
    "--\n\n"
1604
    "Indexer with tiered index building.\n\n"
1605
    "At least one of 'directory' or 'files' must be provided.\n"
1606
    "- directory: scan for .pfw/.pfw.gz files\n"
1607
    "- files: list of specific file paths\n\n"
1608
    "Supports:\n"
1609
    "- Tier 1: Checkpoints (require_checkpoint)\n"
1610
    "- Tier 2: Bloom filters (require_bloom), Manifests (require_manifest)\n"
1611
    "- Tier 3: Aggregation (require_aggregation + config params)\n",
1612
    0,
1613
    0,
1614
    0,
1615
    0,
1616
    0,
1617
    0,
1618
    Indexer_methods,
1619
    0,
1620
    Indexer_getsetters,
1621
    0,
1622
    0,
1623
    0,
1624
    0,
1625
    0,
1626
    (initproc)Indexer_init,
1627
    0,
1628
    Indexer_new,
1629
};
1630

1631
int init_indexer(PyObject* m) {
2✔
1632
    if (register_type(m, &IndexerType, "Indexer") < 0) return -1;
2✔
1633

1634
#ifdef DFTRACER_UTILS_ENABLE_ARROW
1635
    if (PyModule_AddFunctions(m, BatchIndexerModuleMethods) < 0) return -1;
2✔
1636
#endif
1637

1638
    return 0;
2✔
1639
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc