• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28356348514

29 Jun 2026 07:40AM UTC coverage: 52.174% (-0.1%) from 52.278%
28356348514

Pull #83

github

web-flow
Merge 278203630 into 2efed6649
Pull Request #83: refactor and improve code QoL

37276 of 92891 branches covered (40.13%)

Branch coverage included in aggregate %.

671 of 1173 new or added lines in 58 files covered. (57.2%)

66 existing lines in 30 files now uncovered.

33619 of 42991 relevant lines covered (78.2%)

20387.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.29
/src/dftracer/utils/python/batch_indexer.cpp
1
#include <dftracer/utils/core/common/constants.h>
2
#include <dftracer/utils/core/common/filesystem.h>
3
#include <dftracer/utils/core/common/hash_combine.h>
4
#include <dftracer/utils/core/common/string_intern.h>
5
#include <dftracer/utils/core/coro/task.h>
6
#include <dftracer/utils/core/coro/when_all.h>
7
#include <dftracer/utils/core/rocksdb/db_manager.h>
8
#include <dftracer/utils/core/runtime.h>
9
#include <dftracer/utils/core/tasks/coro_scope.h>
10
#include <dftracer/utils/python/batch_indexer.h>
11
#include <dftracer/utils/python/indexer.h>
12
#include <dftracer/utils/python/py_dict_helpers.h>
13
#include <dftracer/utils/python/py_type_helpers.h>
14
#include <dftracer/utils/python/runtime.h>
15
#include <dftracer/utils/utilities/common/query/query.h>
16
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_config.h>
17
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_serialization.h>
18
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregator_types.h>
19
#include <dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.h>
20
#include <dftracer/utils/utilities/composites/dft/aggregators/system_metrics.h>
21
#include <dftracer/utils/utilities/composites/dft/aggregators/system_metrics_serialization.h>
22
#include <dftracer/utils/utilities/composites/dft/indexing/index_resolver_utility.h>
23
#include <dftracer/utils/utilities/composites/dft/indexing/resolve_and_build.h>
24
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
25
#include <dftracer/utils/utilities/indexer/index_database.h>
26

27
#ifdef DFTRACER_UTILS_ENABLE_ARROW
28
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
29
#endif
30

31
#include <algorithm>
32
#include <chrono>
33
#include <cstdio>
34
#include <limits>
35
#include <optional>
36
#include <string>
37
#include <unordered_map>
38
#include <unordered_set>
39
#include <vector>
40

41
using dftracer::utils::CoroScope;
42
using dftracer::utils::Runtime;
43
using dftracer::utils::coro::CoroTask;
44
using namespace dftracer::utils::utilities::composites::dft::indexing;
45
using namespace dftracer::utils::utilities::composites::dft::aggregators;
46

47
// ---------------------------------------------------------------------------
48
// BatchIndexer - directory-level indexer with resolve/build pattern
49
// ---------------------------------------------------------------------------
50

51
static void Indexer_dealloc(IndexerObject* self) {
102✔
52
    Py_XDECREF(self->runtime_obj);
102✔
53
    Py_XDECREF(self->directory);
102✔
54
    Py_XDECREF(self->files);
102✔
55
    Py_XDECREF(self->index_dir);
102✔
56
    Py_XDECREF(self->group_keys);
102✔
57
    Py_XDECREF(self->custom_metric_fields);
102✔
58
    Py_TYPE(self)->tp_free((PyObject*)self);
102✔
59
}
102✔
60

61
static PyObject* Indexer_new(PyTypeObject* type, PyObject* args,
102✔
62
                             PyObject* kwds) {
63
    IndexerObject* self = (IndexerObject*)type->tp_alloc(type, 0);
102✔
64
    if (self) {
102✔
65
        self->runtime_obj = nullptr;
102✔
66
        self->directory = nullptr;
102✔
67
        self->files = nullptr;
102✔
68
        self->index_dir = nullptr;
102✔
69
        self->require_checkpoint = 1;
102✔
70
        self->require_bloom = 1;
102✔
71
        self->require_manifest = 1;
102✔
72
        self->require_aggregation = 0;
102✔
73
        self->time_interval_ms = 5000.0;
102✔
74
        self->group_keys = nullptr;
102✔
75
        self->custom_metric_fields = nullptr;
102✔
76
        self->compute_percentiles = 0;
102✔
77
        self->checkpoint_size =
102✔
78
            dftracer::utils::constants::indexer::DEFAULT_CHECKPOINT_SIZE;
79
        self->parallelism = 0;
102✔
80
        self->force_rebuild = 0;
102✔
81
    }
51✔
82
    return (PyObject*)self;
102✔
83
}
84

85
static int Indexer_init(IndexerObject* self, PyObject* args, PyObject* kwds) {
102✔
86
    static const char* kwlist[] = {"directory",
87
                                   "files",
88
                                   "index_dir",
89
                                   "require_checkpoint",
90
                                   "require_bloom",
91
                                   "require_manifest",
92
                                   "require_aggregation",
93
                                   "time_interval_ms",
94
                                   "group_keys",
95
                                   "custom_metric_fields",
96
                                   "compute_percentiles",
97
                                   "checkpoint_size",
98
                                   "parallelism",
99
                                   "force_rebuild",
100
                                   "runtime",
101
                                   nullptr};
102

103
    const char* directory = "";
102✔
104
    PyObject* files_obj = Py_None;
102✔
105
    const char* index_dir = "";
102✔
106
    int require_checkpoint = 1;
102✔
107
    int require_bloom = 1;
102✔
108
    int require_manifest = 1;
102✔
109
    int require_aggregation = 0;
102✔
110
    double time_interval_ms = 5000.0;
102✔
111
    PyObject* group_keys_obj = Py_None;
102✔
112
    PyObject* custom_metrics_obj = Py_None;
102✔
113
    int compute_percentiles = 0;
102✔
114
    Py_ssize_t checkpoint_size = static_cast<Py_ssize_t>(
102✔
115
        dftracer::utils::constants::indexer::DEFAULT_CHECKPOINT_SIZE);
116
    Py_ssize_t parallelism = 0;
102✔
117
    int force_rebuild = 0;
102✔
118
    PyObject* runtime_arg = nullptr;
102✔
119

120
    if (!PyArg_ParseTupleAndKeywords(
102!
121
            args, kwds, "|sOsppppdOOpnnpO", (char**)kwlist, &directory,
51✔
122
            &files_obj, &index_dir, &require_checkpoint, &require_bloom,
123
            &require_manifest, &require_aggregation, &time_interval_ms,
124
            &group_keys_obj, &custom_metrics_obj, &compute_percentiles,
125
            &checkpoint_size, &parallelism, &force_rebuild, &runtime_arg)) {
126
        return -1;
×
127
    }
128

129
    // Validate: at least one of directory or files must be provided
130
    bool has_directory = directory && directory[0] != '\0';
102✔
131
    bool has_files = files_obj && files_obj != Py_None &&
158✔
132
                     PyList_Check(files_obj) && PyList_Size(files_obj) > 0;
158!
133

134
    if (!has_directory && !has_files) {
102✔
135
        PyErr_SetString(PyExc_ValueError,
2!
136
                        "At least one of 'directory' or 'files' must be "
137
                        "provided");
138
        return -1;
2✔
139
    }
140

141
    // Store runtime
142
    if (runtime_arg && runtime_arg != Py_None) {
100!
143
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
144
            Py_INCREF(runtime_arg);
×
145
            self->runtime_obj = runtime_arg;
×
146
        } else {
147
            PyObject* native = PyObject_GetAttrString(runtime_arg, "_native");
×
148
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
×
149
                self->runtime_obj = native;
×
150
            } else {
151
                Py_XDECREF(native);
×
152
                PyErr_SetString(PyExc_TypeError,
×
153
                                "runtime must be a Runtime instance or None");
154
                return -1;
×
155
            }
156
        }
157
    }
158

159
    self->directory = PyUnicode_FromString(directory);
100!
160
    self->index_dir = PyUnicode_FromString(index_dir);
100!
161
    self->require_checkpoint = require_checkpoint;
100✔
162
    self->require_bloom = require_bloom;
100✔
163
    self->require_manifest = require_manifest;
100✔
164
    self->require_aggregation = require_aggregation;
100✔
165
    self->time_interval_ms = time_interval_ms;
100✔
166
    self->compute_percentiles = compute_percentiles;
100✔
167
    self->checkpoint_size = static_cast<std::size_t>(checkpoint_size);
100✔
168
    self->parallelism = static_cast<std::size_t>(parallelism);
100✔
169
    self->force_rebuild = force_rebuild;
100✔
170

171
    // Store files list
172
    if (has_files) {
100✔
173
        Py_INCREF(files_obj);
56!
174
        self->files = files_obj;
56✔
175
    } else {
28✔
176
        self->files = nullptr;
44✔
177
    }
178

179
    // Store group_keys
180
    if (group_keys_obj && group_keys_obj != Py_None) {
100!
181
        Py_INCREF(group_keys_obj);
×
182
        self->group_keys = group_keys_obj;
×
183
    } else {
184
        self->group_keys = nullptr;
100✔
185
    }
186

187
    // Store custom_metric_fields
188
    if (custom_metrics_obj && custom_metrics_obj != Py_None) {
100!
189
        Py_INCREF(custom_metrics_obj);
×
190
        self->custom_metric_fields = custom_metrics_obj;
×
191
    } else {
192
        self->custom_metric_fields = nullptr;
100✔
193
    }
194

195
    return 0;
100✔
196
}
51✔
197

198
static Runtime* get_batch_indexer_runtime(IndexerObject* self) {
296✔
199
    if (self->runtime_obj) {
296!
200
        return ((RuntimeObject*)self->runtime_obj)->runtime.get();
×
201
    }
202
    return get_default_runtime();
296✔
203
}
148✔
204

205
static std::optional<AggregationConfig> build_aggregation_config(
272✔
206
    IndexerObject* self) {
207
    if (!self->require_aggregation) {
272✔
208
        return std::nullopt;
152✔
209
    }
210

211
    AggregationConfig config;
120!
212
    config.time_interval_us =
120✔
213
        static_cast<std::uint64_t>(self->time_interval_ms * 1000.0);
120✔
214

215
    if (self->group_keys && PyList_Check(self->group_keys)) {
120!
216
        Py_ssize_t n = PyList_Size(self->group_keys);
×
217
        for (Py_ssize_t i = 0; i < n; i++) {
×
218
            const char* s =
219
                PyUnicode_AsUTF8(PyList_GetItem(self->group_keys, i));
×
220
            if (s) config.extra_group_keys.emplace_back(s);
×
221
        }
222
    }
223
    if (self->custom_metric_fields &&
120!
224
        PyList_Check(self->custom_metric_fields)) {
×
225
        Py_ssize_t n = PyList_Size(self->custom_metric_fields);
×
226
        for (Py_ssize_t i = 0; i < n; i++) {
×
227
            const char* s =
228
                PyUnicode_AsUTF8(PyList_GetItem(self->custom_metric_fields, i));
×
229
            if (s) config.custom_metric_fields.emplace_back(s);
×
230
        }
231
    }
232

233
    config.compute_percentiles = self->compute_percentiles != 0;
120✔
234
    return config;
120!
235
}
196✔
236

237
// ---------------------------------------------------------------------------
238
// resolve() - check what exists vs needs building
239
// ---------------------------------------------------------------------------
240

241
static PyObject* Indexer_resolve(IndexerObject* self,
206✔
242
                                 PyObject* Py_UNUSED(ignored)) {
243
    const char* directory = PyUnicode_AsUTF8(self->directory);
206!
244
    const char* index_dir = PyUnicode_AsUTF8(self->index_dir);
206!
245

246
    ResolverInput input;
206✔
247
    input.directory = directory ? directory : "";
206!
248
    input.index_dir = index_dir ? index_dir : "";
206!
249
    input.require_checkpoints = self->require_checkpoint;
206✔
250
    input.require_bloom = self->require_bloom;
206✔
251
    input.require_manifest = self->require_manifest;
206✔
252
    input.require_aggregation = self->require_aggregation;
206✔
253
    input.aggregation_config = build_aggregation_config(self);
206!
254

255
    // Add files if provided
256
    if (self->files && PyList_Check(self->files)) {
206!
257
        Py_ssize_t n = PyList_Size(self->files);
128!
258
        for (Py_ssize_t i = 0; i < n; i++) {
278✔
259
            const char* s = PyUnicode_AsUTF8(PyList_GetItem(self->files, i));
150!
260
            if (s) input.files.emplace_back(s);
150!
261
        }
75✔
262
    }
64✔
263

264
    ResolverResult result;
206✔
265
    std::string error_msg;
206✔
266

267
    Py_BEGIN_ALLOW_THREADS try {
206!
268
        Runtime* rt = get_batch_indexer_runtime(self);
206!
269
        rt->submit(run_coro_scope(
618!
270
                       rt->executor(),
103!
271
                       [](CoroScope& scope, ResolverInput in,
824!
272
                          ResolverResult* out) -> CoroTask<void> {
103!
273
                           IndexResolverUtility resolver;
309!
274
                           // Use scope.spawn(utility, input) which auto-binds
275
                           // context for utilities with NeedsContext tag
276
                           *out = co_await scope.spawn(resolver, std::move(in));
515!
277
                       },
515!
278
                       std::move(input), &result),
206✔
279
                   "batch-indexer-resolve")
103!
280
            .get();
206!
281
    } catch (const std::exception& e) {
103!
282
        error_msg = e.what();
×
283
    }
×
284
    Py_END_ALLOW_THREADS
206!
285

286
        if (!error_msg.empty()) {
206!
287
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
288
        return nullptr;
×
289
    }
290

291
    // Build result dict
292
    PyObject* dict = PyDict_New();
206!
293
    if (!dict) return nullptr;
206✔
294

295
    dict_set_steal(dict, "total_files",
206!
296
                   PyLong_FromSize_t(result.all_files.size()));
103!
297
    dict_set_steal(dict, "index_path",
206!
298
                   PyUnicode_FromString(result.index_path.c_str()));
103!
299
    dict_set_steal(dict, "aggregation_interval_us",
206!
300
                   PyLong_FromUnsignedLongLong(result.stored_time_interval_us));
206!
301
    dict_set_steal(dict, "needs_rebuild",
206!
302
                   PyBool_FromLong(result.needs_augmentation));
206!
303

304
    // Ready files
305
    PyObject* ready_list = PyList_New(result.cached.size());
206!
306
    for (std::size_t i = 0; i < result.cached.size(); ++i) {
380✔
307
        PyList_SetItem(
174!
308
            ready_list, i,
87✔
309
            PyUnicode_FromString(result.cached[i].file_path.c_str()));
174!
310
    }
87✔
311
    PyDict_SetItemString(dict, "ready", ready_list);
206!
312

313
    // Needs work files (union of all needs_* lists)
314
    std::vector<std::string> needs_work;
206✔
315
    for (const auto& item : result.needs_checkpoint) {
286✔
316
        needs_work.push_back(item.file_path);
80!
317
    }
318
    for (const auto& item : result.needs_bloom) {
206!
319
        bool found = false;
×
320
        for (const auto& existing : needs_work) {
×
321
            if (existing == item.file_path) {
×
322
                found = true;
×
323
                break;
×
324
            }
325
        }
326
        if (!found) needs_work.push_back(item.file_path);
×
327
    }
328
    for (const auto& item : result.needs_manifest) {
206!
329
        bool found = false;
×
330
        for (const auto& existing : needs_work) {
×
331
            if (existing == item.file_path) {
×
332
                found = true;
×
333
                break;
×
334
            }
335
        }
336
        if (!found) needs_work.push_back(item.file_path);
×
337
    }
338
    for (const auto& item : result.needs_aggregation) {
206✔
339
        bool found = false;
×
340
        for (const auto& existing : needs_work) {
×
341
            if (existing == item.file_path) {
×
342
                found = true;
×
343
                break;
×
344
            }
345
        }
346
        if (!found) needs_work.push_back(item.file_path);
×
347
    }
348

349
    PyObject* needs_list = PyList_New(needs_work.size());
206!
350
    for (std::size_t i = 0; i < needs_work.size(); ++i) {
286✔
351
        PyList_SetItem(needs_list, i,
80!
352
                       PyUnicode_FromString(needs_work[i].c_str()));
80!
353
    }
40✔
354
    PyDict_SetItemString(dict, "needs_work", needs_list);
206!
355

356
    return dict;
206✔
357
}
206✔
358

359
// ---------------------------------------------------------------------------
360
// build() - build missing index tiers
361
// ---------------------------------------------------------------------------
362

363
static PyObject* Indexer_build(IndexerObject* self,
66✔
364
                               PyObject* Py_UNUSED(ignored)) {
365
    const char* directory = PyUnicode_AsUTF8(self->directory);
66!
366
    const char* index_dir = PyUnicode_AsUTF8(self->index_dir);
66!
367

368
    ResolveAndBuildInput input;
66✔
369
    input.directory = directory ? directory : "";
66!
370
    input.index_dir = index_dir ? index_dir : "";
66!
371
    input.require_checkpoints = self->require_checkpoint;
66✔
372
    input.require_bloom = self->require_bloom;
66✔
373
    input.require_manifest = self->require_manifest;
66✔
374
    input.require_aggregation = self->require_aggregation;
66✔
375
    input.aggregation_config = build_aggregation_config(self);
66!
376
    input.checkpoint_size = self->checkpoint_size;
66✔
377
    input.parallelism = self->parallelism;
66✔
378
    input.force_rebuild = self->force_rebuild;
66✔
379

380
    // Add files if provided
381
    if (self->files && PyList_Check(self->files)) {
66!
382
        Py_ssize_t n = PyList_Size(self->files);
52!
383
        for (Py_ssize_t i = 0; i < n; i++) {
114✔
384
            const char* s = PyUnicode_AsUTF8(PyList_GetItem(self->files, i));
62!
385
            if (s) input.files.emplace_back(s);
62!
386
        }
31✔
387
    }
26✔
388

389
    std::string error_msg;
66✔
390

391
    Py_BEGIN_ALLOW_THREADS try {
66!
392
        Runtime* rt = get_batch_indexer_runtime(self);
66!
393
        rt->submit(run_coro_scope(
231!
394
                       rt->executor(),
33✔
395
                       [](CoroScope& scope,
264!
396
                          ResolveAndBuildInput in) -> CoroTask<void> {
33!
397
                           co_await resolve_and_build_index(&scope,
264!
398
                                                            std::move(in));
99✔
399
                       },
132!
400
                       std::move(input)),
66✔
401
                   "batch-indexer-build")
33!
402
            .get();
66!
403
    } catch (const std::exception& e) {
33!
404
        error_msg = e.what();
×
405
    }
×
406
    Py_END_ALLOW_THREADS
66!
407

408
        if (!error_msg.empty()) {
66!
409
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
410
        return nullptr;
×
411
    }
412

413
    Py_RETURN_NONE;
66✔
414
}
66✔
415

416
// ---------------------------------------------------------------------------
417
// ensure_indexed() - resolve + build if needed
418
// ---------------------------------------------------------------------------
419

420
static PyObject* Indexer_ensure_indexed(IndexerObject* self,
84✔
421
                                        PyObject* Py_UNUSED(ignored)) {
422
    // First resolve
423
    PyObject* status = Indexer_resolve(self, nullptr);
84✔
424
    if (!status) return nullptr;
84✔
425

426
    // Build if files need work, or the aggregation tier must be rebuilt
427
    // (stored time interval differs from the requested one).
428
    PyObject* needs_work = PyDict_GetItemString(status, "needs_work");
84✔
429
    PyObject* needs_rebuild = PyDict_GetItemString(status, "needs_rebuild");
84✔
430
    bool work_pending = needs_work && PyList_Size(needs_work) > 0;
84✔
431
    bool rebuild_pending = needs_rebuild && PyObject_IsTrue(needs_rebuild);
84✔
432
    if (work_pending || rebuild_pending) {
84✔
433
        Py_DECREF(status);
32✔
434

435
        // Build
436
        PyObject* result = Indexer_build(self, nullptr);
64✔
437
        if (!result) return nullptr;
64✔
438
        Py_DECREF(result);
32✔
439

440
        // Re-resolve
441
        status = Indexer_resolve(self, nullptr);
64✔
442
    }
32✔
443

444
    return status;
84✔
445
}
42✔
446

447
// ---------------------------------------------------------------------------
448
// get_checkpoint_indexer() - get a single-file checkpoint indexer
449
// ---------------------------------------------------------------------------
450

451
static PyObject* Indexer_get_checkpoint_indexer(IndexerObject* self,
12✔
452
                                                PyObject* args) {
453
    const char* file_path = nullptr;
12✔
454
    if (!PyArg_ParseTuple(args, "s", &file_path)) {
12!
455
        return nullptr;
×
456
    }
457

458
    // Determine index path using BatchIndexer's index_dir setting
459
    const char* index_dir = PyUnicode_AsUTF8(self->index_dir);
12!
460
    std::string index_path = dftracer::utils::utilities::composites::dft::
6!
461
        internal::determine_index_path(file_path, index_dir ? index_dir : "");
18!
462

463
    // Create IndexerObject
464
    CheckpointIndexerObject* indexer =
6✔
465
        (CheckpointIndexerObject*)CheckpointIndexerType.tp_alloc(
12!
466
            &CheckpointIndexerType, 0);
467
    if (!indexer) {
12✔
468
        return nullptr;
×
469
    }
470

471
    indexer->handle = nullptr;
12✔
472
    indexer->gz_path = PyUnicode_FromString(file_path);
12!
473
    indexer->index_path = PyUnicode_FromString(index_path.c_str());
12!
474
    indexer->checkpoint_size = self->checkpoint_size;
12✔
475
    indexer->build_bloom = 0;
12✔
476
    indexer->build_manifest = 0;
12✔
477

478
    // Share runtime reference
479
    if (self->runtime_obj) {
12!
480
        Py_INCREF(self->runtime_obj);
×
481
        indexer->runtime_obj = self->runtime_obj;
×
482
    } else {
483
        indexer->runtime_obj = nullptr;
12✔
484
    }
485

486
    // Create the native handle
487
    indexer->handle = dft_indexer_create(file_path, index_path.c_str(),
18!
488
                                         self->checkpoint_size, 0);
6✔
489
    if (!indexer->handle) {
12!
490
        Py_DECREF((PyObject*)indexer);
×
491
        PyErr_SetString(PyExc_RuntimeError,
×
492
                        "Failed to create checkpoint indexer");
493
        return nullptr;
×
494
    }
495

496
    return (PyObject*)indexer;
12✔
497
}
12✔
498

499
static std::optional<std::string> resolve_index_path(IndexerObject* self) {
44✔
500
    PyObject* status = Indexer_resolve(self, nullptr);
44!
501
    if (!status) return std::nullopt;
44✔
502
    PyObject* obj = PyDict_GetItemString(status, "index_path");
44!
503
    const char* path = obj ? PyUnicode_AsUTF8(obj) : nullptr;
44!
504
    if (!path || path[0] == '\0') {
44!
505
        Py_DECREF(status);
506
        PyErr_SetString(PyExc_RuntimeError, "No index path available");
×
507
        return std::nullopt;
×
508
    }
509
    std::string result(path);
66!
510
    Py_DECREF(status);
22!
511
    return result;
44!
512
}
44✔
513

514
static PyObject* Indexer_get_hash_table(IndexerObject* self, PyObject* args) {
12✔
515
    const char* type_str = nullptr;
12✔
516
    if (!PyArg_ParseTuple(args, "s", &type_str)) {
12!
517
        return nullptr;
×
518
    }
519

520
    using dftracer::utils::utilities::indexer::IndexDatabase;
521
    using HashType = IndexDatabase::HashType;
522

523
    HashType type;
524
    if (std::strcmp(type_str, "file") == 0) {
12✔
525
        type = HashType::FILE;
4✔
526
    } else if (std::strcmp(type_str, "host") == 0) {
10✔
527
        type = HashType::HOST;
4✔
528
    } else if (std::strcmp(type_str, "string") == 0) {
6✔
529
        type = HashType::STRING;
2✔
530
    } else if (std::strcmp(type_str, "proc") == 0) {
3✔
531
        type = HashType::PROC;
×
532
    } else {
533
        PyErr_SetString(PyExc_ValueError,
2!
534
                        "type must be 'file', 'host', 'string', or 'proc'");
535
        return nullptr;
2✔
536
    }
537

538
    auto idx_opt = resolve_index_path(self);
10!
539
    if (!idx_opt) return nullptr;
10✔
540
    std::string index_path = std::move(*idx_opt);
10✔
541

542
    std::unordered_map<std::string, std::string> hash_map;
10✔
543
    std::string error_msg;
10✔
544

545
    Py_BEGIN_ALLOW_THREADS try {
10!
546
        IndexDatabase db(
5!
547
            index_path,
548
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
5!
549
        hash_map = db.query_hash_table(type);
10!
550
    } catch (const std::exception& e) {
10!
551
        error_msg = e.what();
×
552
    }
×
553
    Py_END_ALLOW_THREADS
10!
554

555
        if (!error_msg.empty()) {
10!
556
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
557
        return nullptr;
×
558
    }
559

560
    PyObject* dict = PyDict_New();
10!
561
    if (!dict) return nullptr;
10✔
562

563
    for (const auto& [hash, name] : hash_map) {
10!
564
        PyObject* key = PyUnicode_FromStringAndSize(hash.data(), hash.size());
×
565
        PyObject* val = PyUnicode_FromStringAndSize(name.data(), name.size());
×
566
        PyDict_SetItem(dict, key, val);
×
567
        Py_DECREF(key);
×
568
        Py_DECREF(val);
×
569
    }
570

571
    return dict;
10✔
572
}
11✔
573

574
static PyObject* Indexer_query_file_pids(IndexerObject* self, PyObject* args) {
4✔
575
    int file_id;
576
    if (!PyArg_ParseTuple(args, "i", &file_id)) {
4!
577
        return nullptr;
×
578
    }
579

580
    using dftracer::utils::utilities::indexer::IndexDatabase;
581

582
    auto idx_opt = resolve_index_path(self);
4!
583
    if (!idx_opt) return nullptr;
4✔
584
    std::string index_path = std::move(*idx_opt);
4✔
585

586
    std::unordered_set<std::uint64_t> pids;
4✔
587
    std::string error_msg;
4✔
588

589
    Py_BEGIN_ALLOW_THREADS try {
4!
590
        IndexDatabase db(
2!
591
            index_path,
592
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
2!
593
        pids = db.query_file_pids(file_id);
4!
594
    } catch (const std::exception& e) {
4!
595
        error_msg = e.what();
×
596
    }
×
597
    Py_END_ALLOW_THREADS
4!
598

599
        if (!error_msg.empty()) {
4!
600
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
601
        return nullptr;
×
602
    }
603

604
    PyObject* set = PySet_New(nullptr);
4!
605
    if (!set) return nullptr;
4✔
606

607
    for (auto pid : pids) {
6!
608
        PyObject* val = PyLong_FromUnsignedLongLong(pid);
2!
609
        PySet_Add(set, val);
2!
610
        Py_DECREF(val);
1!
611
    }
612

613
    return set;
4✔
614
}
4✔
615

616
static PyObject* Indexer_query_all_file_pids(IndexerObject* self,
6✔
617
                                             PyObject* Py_UNUSED(ignored)) {
618
    using dftracer::utils::utilities::indexer::IndexDatabase;
619

620
    auto idx_opt = resolve_index_path(self);
6!
621
    if (!idx_opt) return nullptr;
6!
622
    std::string index_path = std::move(*idx_opt);
6✔
623

624
    std::unordered_map<int, std::unordered_set<std::uint64_t>> all_pids;
6✔
625
    std::string error_msg;
6✔
626

627
    Py_BEGIN_ALLOW_THREADS try {
6!
628
        IndexDatabase db(
3!
629
            index_path,
630
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
3!
631
        all_pids = db.query_all_file_pids();
6!
632
    } catch (const std::exception& e) {
6!
633
        error_msg = e.what();
×
634
    }
×
635
    Py_END_ALLOW_THREADS
6!
636

637
        if (!error_msg.empty()) {
6!
638
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
639
        return nullptr;
×
640
    }
641

642
    PyObject* dict = PyDict_New();
6!
643
    if (!dict) return nullptr;
6✔
644

645
    for (const auto& [file_id, pids] : all_pids) {
12!
646
        PyObject* key = PyLong_FromLong(file_id);
6!
647
        PyObject* set = PySet_New(nullptr);
6!
648
        for (auto pid : pids) {
12✔
649
            PyObject* val = PyLong_FromUnsignedLongLong(pid);
6!
650
            PySet_Add(set, val);
6!
651
            Py_DECREF(val);
3!
652
        }
653
        PyDict_SetItem(dict, key, set);
6!
654
        Py_DECREF(key);
3!
655
        Py_DECREF(set);
3!
656
    }
657

658
    return dict;
6✔
659
}
6✔
660

661
static PyObject* Indexer_query_file_info(IndexerObject* self,
×
662
                                         PyObject* Py_UNUSED(ignored)) {
663
    using dftracer::utils::utilities::indexer::IndexDatabase;
664

665
    auto idx_opt = resolve_index_path(self);
×
666
    if (!idx_opt) return nullptr;
×
667
    std::string index_path = std::move(*idx_opt);
×
668

669
    std::unordered_map<std::string, int> file_ids;
×
670
    std::unordered_map<int, std::unordered_set<std::uint64_t>> all_pids;
×
671
    std::string error_msg;
×
672

673
    Py_BEGIN_ALLOW_THREADS try {
×
674
        IndexDatabase db(
×
675
            index_path,
676
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
×
677
        file_ids = db.query_all_file_info_ids();
×
678
        all_pids = db.query_all_file_pids();
×
679
    } catch (const std::exception& e) {
×
680
        error_msg = e.what();
×
681
    }
×
682
    Py_END_ALLOW_THREADS
×
683

684
        if (!error_msg.empty()) {
×
685
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
686
        return nullptr;
×
687
    }
688

689
    auto data_dir = fs::weakly_canonical(fs::path(index_path)).parent_path();
×
690

691
    PyObject* id_to_path = PyDict_New();
×
692
    if (!id_to_path) return nullptr;
×
693
    for (const auto& [logical_name, fid] : file_ids) {
×
694
        auto resolved = (data_dir / logical_name).string();
×
695
        PyObject* key = PyLong_FromLong(fid);
×
696
        PyObject* val = PyUnicode_FromStringAndSize(
×
697
            resolved.data(), static_cast<Py_ssize_t>(resolved.size()));
×
698
        PyDict_SetItem(id_to_path, key, val);
×
699
        Py_DECREF(key);
×
700
        Py_DECREF(val);
×
701
    }
×
702

703
    PyObject* pid_dict = PyDict_New();
×
704
    if (!pid_dict) {
×
705
        Py_DECREF(id_to_path);
×
706
        return nullptr;
×
707
    }
708
    for (const auto& [file_id, pids] : all_pids) {
×
709
        PyObject* key = PyLong_FromLong(file_id);
×
710
        PyObject* set = PySet_New(nullptr);
×
711
        for (auto pid : pids) {
×
712
            PyObject* val = PyLong_FromUnsignedLongLong(pid);
×
713
            PySet_Add(set, val);
×
714
            Py_DECREF(val);
×
715
        }
716
        PyDict_SetItem(pid_dict, key, set);
×
717
        Py_DECREF(key);
×
718
        Py_DECREF(set);
×
719
    }
720

721
    PyObject* result = PyTuple_Pack(2, id_to_path, pid_dict);
×
722
    Py_DECREF(id_to_path);
×
723
    Py_DECREF(pid_dict);
×
724
    return result;
×
725
}
×
726

727
#ifdef DFTRACER_UTILS_ENABLE_ARROW
728
#include <dftracer/utils/python/trace_reader_iterator.h>
729
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
730

731
static PyObject* create_arrow_batch_capsule(
77✔
732
    dftracer::utils::utilities::common::arrow::ArrowExportResult&& result) {
733
    auto* obj = (ArrowBatchCapsuleObject*)ArrowBatchCapsuleType.tp_alloc(
77✔
734
        &ArrowBatchCapsuleType, 0);
735
    if (!obj) return nullptr;
77✔
736
    obj->result =
77✔
737
        new dftracer::utils::utilities::common::arrow::ArrowExportResult(
66!
738
            std::move(result));
77!
739
    return (PyObject*)obj;
77✔
740
}
33✔
741

742
namespace {
743

744
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
745
using dftracer::utils::utilities::common::arrow::ColumnSpec;
746
using dftracer::utils::utilities::common::arrow::ColumnType;
747
using dftracer::utils::utilities::common::arrow::RecordBatchBuilder;
748

749
static bool parse_agg_type_str(const char* type_str, AggMapType& out) {
4✔
750
    if (strcmp(type_str, "events") == 0) {
4✔
751
        out = AggMapType::EVENT;
4✔
752
        return true;
4✔
753
    }
754
    if (strcmp(type_str, "profiles") == 0) {
×
755
        out = AggMapType::PROFILE;
×
756
        return true;
×
757
    }
758
    if (strcmp(type_str, "system") == 0) {
×
759
        out = AggMapType::SYSTEM;
×
760
        return true;
×
761
    }
762
    PyErr_SetString(PyExc_ValueError,
×
763
                    "type must be 'events', 'profiles', or 'system'");
764
    return false;
×
765
}
2✔
766

767
struct AggDbHandle {
768
    std::shared_ptr<dftracer::utils::rocksdb::RocksDatabase> db;
769
    std::unique_ptr<EventAggregator> agg;
770
};
771

772
static std::unique_ptr<AggDbHandle> open_agg_db(const std::string& index_path,
24✔
773
                                                std::string& error_msg) {
774
    std::shared_ptr<dftracer::utils::rocksdb::RocksDatabase> db;
24✔
775
    try {
776
        db = EventAggregator::open_with_merge_operator(index_path);
24!
777
    } catch (...) {
12✔
778
        auto& mgr = dftracer::utils::rocksdb::RocksDBManager::instance();
×
779
        mgr.reset(index_path);
×
780
        db = mgr.get_or_open(
×
781
            index_path,
782
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
783
        if (db && db->is_open()) {
×
784
            load_intern_dictionary(*db);
×
785
        }
786
    }
×
787
    if (!db || !db->is_open()) {
24!
788
        error_msg = "Failed to open aggregation database";
×
789
        return nullptr;
×
790
    }
791
    std::string config_val;
24✔
792
    auto key = std::string_view(AGG_GLOBAL_CONFIG_KEY,
24✔
793
                                sizeof(AGG_GLOBAL_CONFIG_KEY) - 1);
794
    if (!db->get(key, &config_val, dftracer::utils::rocksdb::cf::AGGREGATION)
48!
795
             .ok()) {
24!
796
        error_msg = "No aggregation config found - was aggregation enabled?";
×
797
        return nullptr;
×
798
    }
799
    auto cfg = deserialize_agg_global_config(config_val);
24!
800
    auto handle = std::make_unique<AggDbHandle>();
24!
801
    handle->db = db;
24✔
802
    handle->agg = std::make_unique<EventAggregator>(db, cfg.config_hash);
24!
803
    return handle;
24✔
804
}
36!
805

806
static std::optional<dftracer::utils::utilities::common::query::Query>
807
parse_query_arg(const char* query_str) {
26✔
808
    if (!query_str || query_str[0] == '\0') return std::nullopt;
26!
809
    auto result = dftracer::utils::utilities::common::query::Query::from_string(
9✔
810
        query_str);
18!
811
    if (!result) {
18✔
812
        PyErr_SetString(PyExc_ValueError, result.error().message.c_str());
2!
813
        return std::nullopt;
2✔
814
    }
815
    return std::move(*result);
16!
816
}
22✔
817

818
constexpr std::uint16_t DFT_NUM_SHARDS = 4096;
819

820
template <typename Output, typename ScanFn>
821
void parallel_shard_scan_range(Runtime* rt, std::uint16_t outer_begin,
24✔
822
                               std::uint16_t outer_end, ScanFn&& scan_fn,
823
                               std::vector<Output>& outputs) {
824
    if (outer_end <= outer_begin) return;
24!
825
    const std::size_t span = static_cast<std::size_t>(outer_end - outer_begin);
24✔
826
    const std::size_t num_tasks = std::min<std::size_t>(rt->threads(), span);
24!
827
    const std::size_t shards_per_task = (span + num_tasks - 1) / num_tasks;
24✔
828
    rt->submit(run_coro_scope(
48!
829
                   rt->executor(),
12✔
830
                   [&](CoroScope& scope) -> CoroTask<void> {
176!
831
                       std::vector<dftracer::utils::coro::SpawnFuture<Output>>
12✔
832
                           futures;
12✔
833
                       futures.reserve(num_tasks);
12!
834
                       for (std::size_t t = 0; t < num_tasks; ++t) {
48!
835
                           auto shard_begin = static_cast<std::uint16_t>(
72✔
836
                               outer_begin + t * shards_per_task);
36✔
837
                           auto shard_end =
36✔
838
                               static_cast<std::uint16_t>(std::min<std::size_t>(
36!
839
                                   outer_begin + (t + 1) * shards_per_task,
36✔
840
                                   outer_end));
36✔
841
                           futures.push_back(
36!
842
                               scope.spawn([&scan_fn, shard_begin, shard_end](
312!
843
                                               CoroScope&) -> CoroTask<Output> {
37!
844
                                   co_return scan_fn(shard_begin, shard_end);
36!
845
                               }));
846
                       }
36✔
847
                       outputs.reserve(num_tasks);
12!
848
                       for (auto& f : futures) {
128!
849
                           outputs.push_back(co_await f);
96!
850
                       }
36!
851
                   }),
92!
852
               "parallel-shard-scan-range")
12!
853
        .get();
24!
854
}
12✔
855

856
template <typename Output, typename ScanFn>
857
void parallel_shard_scan(Runtime* rt, ScanFn&& scan_fn,
24✔
858
                         std::vector<Output>& outputs) {
859
    parallel_shard_scan_range<Output>(rt, 0, DFT_NUM_SHARDS,
36✔
860
                                      std::forward<ScanFn>(scan_fn), outputs);
12✔
861
}
24✔
862

863
static void append_results_to_list(PyObject* list,
64✔
864
                                   std::vector<ArrowExportResult>& results) {
865
    for (auto& r : results) {
141✔
866
        PyObject* capsule = create_arrow_batch_capsule(std::move(r));
77!
867
        if (capsule) {
77✔
868
            PyList_Append(list, capsule);
77!
869
            Py_DECREF(capsule);
33✔
870
        }
33✔
871
    }
872
}
64✔
873

874
struct AggScanInput {
875
    const EventAggregator* agg;
876
    AggMapType target_type;
877
    AggregationBatchType batch_type;
878
    Py_ssize_t batch_size;
879
    std::uint16_t shard_begin;
880
    std::uint16_t shard_end;
881
};
882

883
struct AggScanOutput {
884
    std::vector<ArrowExportResult> results;
885
};
886

887
AggScanOutput scan_aggregation_shard_range(AggScanInput input) {
×
888
    AggScanOutput output;
×
889

890
    static const std::vector<ColumnSpec> schema = {
×
891
        {"batch_type", ColumnType::INT64},  {"cat", ColumnType::DICT_STRING},
×
892
        {"name", ColumnType::DICT_STRING},  {"pid", ColumnType::UINT64},
×
893
        {"tid", ColumnType::UINT64},        {"hhash", ColumnType::DICT_STRING},
×
894
        {"fhash", ColumnType::DICT_STRING}, {"time_bucket", ColumnType::UINT64},
×
895
        {"count", ColumnType::UINT64},      {"dur_total", ColumnType::UINT64},
×
896
        {"dur_min", ColumnType::UINT64},    {"dur_max", ColumnType::UINT64},
×
897
        {"dur_mean", ColumnType::DOUBLE},   {"dur_std", ColumnType::DOUBLE},
×
898
        {"size_total", ColumnType::UINT64}, {"size_min", ColumnType::UINT64},
×
899
        {"size_max", ColumnType::UINT64},   {"size_mean", ColumnType::DOUBLE},
×
900
        {"size_std", ColumnType::DOUBLE},   {"ts", ColumnType::UINT64},
×
901
        {"te", ColumnType::UINT64},
×
902
    };
×
903

904
    RecordBatchBuilder builder;
×
905
    builder.declare_schema(schema);
×
906
    builder.reserve(static_cast<std::size_t>(input.batch_size));
×
907

908
    std::size_t row_count = 0;
×
909

910
    input.agg->scan_shard_range_raw(
×
911
        input.shard_begin, input.shard_end,
×
912
        [&](std::string_view key_bytes, std::string_view val_bytes) -> bool {
×
913
            AggKeyView kv;
×
914
            if (!parse_agg_key_view(key_bytes, kv)) return true;
×
915
            if (kv.map_type != input.target_type) return true;
×
916

917
            AggMetricsFullView mv;
918
            if (!parse_agg_value_full_view(val_bytes, mv)) return true;
×
919

920
            std::size_t ci = 0;
×
921
            builder.append_int64(ci++,
×
922
                                 static_cast<std::int64_t>(input.batch_type));
×
923
            builder.append_dict_string(ci++, kv.cat);
×
924
            builder.append_dict_string(ci++, kv.name);
×
925
            builder.append_uint64(ci++, kv.pid);
×
926
            builder.append_uint64(ci++, kv.tid);
×
927
            builder.append_dict_string(ci++, kv.hhash);
×
928
            builder.append_dict_string(ci++, kv.fhash);
×
929
            builder.append_uint64(ci++, kv.time_bucket);
×
930
            builder.append_uint64(ci++, mv.count);
×
931
            builder.append_uint64(ci++, mv.dur_total);
×
932
            builder.append_uint64(ci++, mv.count > 0 ? mv.dur_min : 0);
×
933
            builder.append_uint64(ci++, mv.dur_max);
×
934
            builder.append_double(ci++, mv.dur_mean);
×
935
            builder.append_double(ci++, mv.dur_stddev());
×
936
            builder.append_uint64(ci++, mv.size_total);
×
937
            builder.append_uint64(ci++, mv.count > 0 ? mv.size_min : 0);
×
938
            builder.append_uint64(ci++, mv.size_max);
×
939
            builder.append_double(ci++, mv.size_mean);
×
940
            builder.append_double(ci++, mv.size_stddev());
×
941
            builder.append_uint64(ci++, mv.ts);
×
942
            builder.append_uint64(ci++, mv.te);
×
943
            builder.end_row();
×
944

945
            row_count++;
×
946
            if (static_cast<Py_ssize_t>(row_count) >= input.batch_size) {
×
947
                auto arrow = builder.finish();
×
948
                if (arrow.valid()) {
×
949
                    output.results.push_back(std::move(arrow));
×
950
                }
951
                builder.reset(true);
×
952
                builder.reserve(static_cast<std::size_t>(input.batch_size));
×
953
                row_count = 0;
×
954
            }
×
955
            return true;
×
956
        });
957

958
    if (row_count > 0) {
×
959
        auto arrow = builder.finish();
×
960
        if (arrow.valid()) {
×
961
            output.results.push_back(std::move(arrow));
×
962
        }
963
    }
×
964

965
    return output;
×
966
}
×
967

968
enum class IOCategory : std::int8_t {
969
    READ = 1,
970
    WRITE = 2,
971
    METADATA = 3,
972
    PCTL = 4,
973
    IPC = 5,
974
    OTHER = 6,
975
    SYNC = 7,
976
};
977

978
inline IOCategory get_io_category(std::string_view func_name) {
443✔
979
    if (func_name == "read" || func_name == "pread" || func_name == "readv" ||
832!
980
        func_name == "preadv" || func_name == "fread") {
776✔
981
        return IOCategory::READ;
160✔
982
    }
983
    if (func_name == "write" || func_name == "pwrite" ||
483✔
984
        func_name == "writev" || func_name == "pwritev" ||
526!
985
        func_name == "fwrite") {
322✔
986
        return IOCategory::WRITE;
170✔
987
    }
988
    if (func_name == "fsync" || func_name == "fdatasync" ||
238!
989
        func_name == "msync" || func_name == "sync") {
238!
990
        return IOCategory::SYNC;
×
991
    }
992
    if (func_name == "open" || func_name == "open64" || func_name == "close" ||
140!
993
        func_name == "fopen" || func_name == "fopen64" ||
×
994
        func_name == "fclose" || func_name == "stat" || func_name == "fstat" ||
×
995
        func_name == "lstat" || func_name == "fstatat" ||
×
996
        func_name == "__xstat" || func_name == "__xstat64" ||
×
997
        func_name == "__lxstat" || func_name == "__lxstat64" ||
×
998
        func_name == "__fxstat" || func_name == "__fxstat64" ||
×
999
        func_name == "access" || func_name == "lseek" ||
×
1000
        func_name == "lseek64" || func_name == "fseek" ||
×
1001
        func_name == "ftell" || func_name == "seek" || func_name == "fcntl" ||
×
1002
        func_name == "ftruncate" || func_name == "mkdir" ||
×
1003
        func_name == "rmdir" || func_name == "unlink" ||
×
1004
        func_name == "remove" || func_name == "rename" || func_name == "link" ||
×
1005
        func_name == "readlink" || func_name == "opendir" ||
×
1006
        func_name == "closedir" || func_name == "readdir") {
88!
1007
        return IOCategory::METADATA;
114✔
1008
    }
1009
    return IOCategory::OTHER;
×
1010
}
204✔
1011

1012
inline char* fast_itoa(std::uint64_t val, char* buf) {
1013
    char* p = buf;
1014
    do {
1015
        *p++ = '0' + (val % 10);
1016
        val /= 10;
1017
    } while (val);
1018
    std::reverse(buf, p);
1019
    return p;
1020
}
1021

1022
class HashResolver {
1023
   public:
1024
    HashResolver(
153✔
1025
        const std::unordered_map<std::string, std::string>* file_hashes,
1026
        const std::unordered_map<std::string, std::string>* host_hashes)
1027
        : file_hashes_(file_hashes), host_hashes_(host_hashes) {
118✔
1028
        if (file_hashes_) {
83✔
1029
            for (const auto& [hash, name] : *file_hashes_) {
217!
1030
                auto hash_sv = intern_.intern(hash);
132!
1031
                auto name_sv = intern_.intern(name);
132!
1032
                file_map_[hash_sv] = name_sv;
133!
1033
            }
1034
        }
36✔
1035
        if (host_hashes_) {
85✔
1036
            for (const auto& [hash, name] : *host_hashes_) {
216!
1037
                auto hash_sv = intern_.intern(hash);
133!
1038
                auto name_sv = intern_.intern(name);
132!
1039
                host_map_[hash_sv] = name_sv;
133!
1040
            }
1041
        }
36✔
1042
    }
121✔
1043

1044
    // Unresolved hashes resolve to empty (not the hash itself): the
1045
    // dfanalyzer side treats empty file_name/host_name as missing (NA).
1046
    std::string_view resolve_file(std::string_view hash) {
754✔
1047
        if (hash.empty()) return hash;
754!
1048
        auto it = file_map_.find(intern_.intern(hash));
746!
1049
        return it != file_map_.end() ? it->second : std::string_view{};
1,113!
1050
    }
378✔
1051

1052
    std::string_view resolve_host(std::string_view hash) {
746✔
1053
        if (hash.empty()) return hash;
746!
1054
        auto it = host_map_.find(intern_.intern(hash));
743!
1055
        return it != host_map_.end() ? it->second : std::string_view{};
1,120!
1056
    }
378✔
1057

1058
    std::string_view intern(std::string_view sv) { return intern_.intern(sv); }
757✔
1059

1060
   private:
1061
    const std::unordered_map<std::string, std::string>* file_hashes_;
1062
    const std::unordered_map<std::string, std::string>* host_hashes_;
1063
    dftracer::utils::StringIntern intern_;
1064
    std::unordered_map<std::string_view, std::string_view> file_map_;
1065
    std::unordered_map<std::string_view, std::string_view> host_map_;
1066
};
1067

1068
struct ProcKey {
1069
    std::string_view hhash;
1070
    std::uint64_t pid;
1071
    std::uint64_t tid;
1072
    bool operator==(const ProcKey& o) const {
440✔
1073
        return hhash == o.hhash && pid == o.pid && tid == o.tid;
440!
1074
    }
1075
};
1076

1077
struct ProcKeyHash {
1078
    std::size_t operator()(const ProcKey& k) const {
1,021✔
1079
        return std::hash<std::string_view>{}(k.hhash) ^
1,989✔
1080
               (std::hash<std::uint64_t>{}(k.pid) << 1) ^
1,509✔
1081
               (std::hash<std::uint64_t>{}(k.tid) << 2);
1,026✔
1082
    }
1083
};
1084

1085
static const std::vector<ColumnSpec> DFANALYZER_SCHEMA = {
1!
1086
    {"cat", ColumnType::DICT_STRING},
1!
1087
    {"func_name", ColumnType::DICT_STRING},
1!
1088
    {"pid", ColumnType::INT64},
1!
1089
    {"tid", ColumnType::INT64},
1!
1090
    {"file_hash", ColumnType::DICT_STRING},
1!
1091
    {"host_hash", ColumnType::DICT_STRING},
1!
1092
    {"file_name", ColumnType::DICT_STRING},
1!
1093
    {"host_name", ColumnType::DICT_STRING},
1!
1094
    {"proc_name", ColumnType::DICT_STRING},
1!
1095
    {"io_cat", ColumnType::INT64},
1!
1096
    {"acc_pat", ColumnType::INT64},
1!
1097
    {"count", ColumnType::INT64},
1!
1098
    {"time", ColumnType::DOUBLE},
1!
1099
    {"size", ColumnType::INT64},
1!
1100
    {"time_min", ColumnType::DOUBLE},
1!
1101
    {"time_max", ColumnType::DOUBLE},
1!
1102
    {"size_min", ColumnType::INT64},
1!
1103
    {"size_max", ColumnType::INT64},
1!
1104
    {"offset_min", ColumnType::INT64},
1!
1105
    {"offset_max", ColumnType::INT64},
1!
1106
    {"time_range", ColumnType::INT64},
1!
1107
    {"time_start", ColumnType::INT64},
1!
1108
    {"time_end", ColumnType::INT64},
1!
1109
};
1110

1111
enum GroupByField : std::uint32_t {
1112
    GB_CAT = 1u << 0,
1113
    GB_FUNC_NAME = 1u << 1,
1114
    GB_PID = 1u << 2,
1115
    GB_TID = 1u << 3,
1116
    GB_FILE_HASH = 1u << 4,
1117
    GB_HOST_HASH = 1u << 5,
1118
    GB_FILE_NAME = 1u << 6,
1119
    GB_HOST_NAME = 1u << 7,
1120
    GB_PROC_NAME = 1u << 8,
1121
    GB_IO_CAT = 1u << 9,
1122
    GB_ACC_PAT = 1u << 10,
1123
    GB_TIME_RANGE = 1u << 11,
1124
};
1125

1126
struct GroupByConfig {
10✔
1127
    std::uint32_t mask = 0;
10✔
1128
    std::vector<GroupByField> order;
1129
    std::vector<std::string> names;  // matches `order`, used for schema
1130
};
1131

1132
inline std::optional<GroupByField> parse_group_by_name(std::string_view name) {
×
1133
    if (name == "cat") return GB_CAT;
×
1134
    if (name == "func_name") return GB_FUNC_NAME;
×
1135
    if (name == "pid") return GB_PID;
×
1136
    if (name == "tid") return GB_TID;
×
1137
    if (name == "file_hash") return GB_FILE_HASH;
×
1138
    if (name == "host_hash") return GB_HOST_HASH;
×
1139
    if (name == "file_name") return GB_FILE_NAME;
×
1140
    if (name == "host_name") return GB_HOST_NAME;
×
1141
    if (name == "proc_name") return GB_PROC_NAME;
×
1142
    if (name == "io_cat") return GB_IO_CAT;
×
1143
    if (name == "acc_pat") return GB_ACC_PAT;
×
1144
    if (name == "time_range") return GB_TIME_RANGE;
×
1145
    return std::nullopt;
×
1146
}
1147

1148
struct CoarseKey {
1149
    std::string_view cat;
1150
    std::string_view func_name;
1151
    std::uint64_t pid = 0;
1152
    std::uint64_t tid = 0;
1153
    std::string_view file_hash;
1154
    std::string_view host_hash;
1155
    std::string_view file_name;
1156
    std::string_view host_name;
1157
    std::string_view proc_name;
1158
    std::int64_t io_cat = 0;
1159
    std::int64_t acc_pat = 0;
1160
    std::int64_t time_range = 0;
1161

1162
    bool operator==(const CoarseKey& o) const {
×
1163
        return cat == o.cat && func_name == o.func_name && pid == o.pid &&
×
1164
               tid == o.tid && file_hash == o.file_hash &&
×
1165
               host_hash == o.host_hash && file_name == o.file_name &&
×
1166
               host_name == o.host_name && proc_name == o.proc_name &&
×
1167
               io_cat == o.io_cat && acc_pat == o.acc_pat &&
×
1168
               time_range == o.time_range;
×
1169
    }
1170
};
1171

1172
struct CoarseKeyHash {
1173
    std::size_t operator()(const CoarseKey& k) const {
×
1174
        auto combine = [](std::size_t h, std::size_t v) {
×
NEW
1175
            dftracer::utils::hash_combine(h, v);
×
NEW
1176
            return h;
×
1177
        };
1178
        std::size_t h = std::hash<std::string_view>{}(k.cat);
×
1179
        h = combine(h, std::hash<std::string_view>{}(k.func_name));
×
1180
        h = combine(h, std::hash<std::uint64_t>{}(k.pid));
×
1181
        h = combine(h, std::hash<std::uint64_t>{}(k.tid));
×
1182
        h = combine(h, std::hash<std::string_view>{}(k.file_hash));
×
1183
        h = combine(h, std::hash<std::string_view>{}(k.host_hash));
×
1184
        h = combine(h, std::hash<std::string_view>{}(k.file_name));
×
1185
        h = combine(h, std::hash<std::string_view>{}(k.host_name));
×
1186
        h = combine(h, std::hash<std::string_view>{}(k.proc_name));
×
1187
        h = combine(h, std::hash<std::int64_t>{}(k.io_cat));
×
1188
        h = combine(h, std::hash<std::int64_t>{}(k.acc_pat));
×
1189
        h = combine(h, std::hash<std::int64_t>{}(k.time_range));
×
1190
        return h;
×
1191
    }
1192
};
1193

1194
struct CoarseMetrics {
1195
    std::uint64_t count = 0;
1196
    double time_sum = 0.0;
1197
    double time_sq_sum = 0.0;
1198
    double time_min_val = std::numeric_limits<double>::infinity();
1199
    double time_max_val = -std::numeric_limits<double>::infinity();
1200
    double time_call_min_val = std::numeric_limits<double>::infinity();
1201
    double time_call_max_val = -std::numeric_limits<double>::infinity();
1202
    std::uint64_t size_sum = 0;
1203
    double size_sq_sum = 0.0;
1204
    std::uint64_t size_min_val = std::numeric_limits<std::uint64_t>::max();
1205
    std::uint64_t size_max_val = 0;
1206
    std::uint64_t size_call_min_val = std::numeric_limits<std::uint64_t>::max();
1207
    std::uint64_t size_call_max_val = 0;
1208
    bool has_size = false;
1209
    std::uint64_t time_start_val = std::numeric_limits<std::uint64_t>::max();
1210
    std::uint64_t time_end_val = 0;
1211
    bool has_time_bounds = false;
1212
};
1213

1214
inline std::vector<ColumnSpec> make_coarse_schema(const GroupByConfig& cfg) {
×
1215
    std::vector<ColumnSpec> specs;
×
1216
    specs.reserve(cfg.order.size() + 16);
×
1217
    for (std::size_t i = 0; i < cfg.order.size(); ++i) {
×
1218
        GroupByField f = cfg.order[i];
×
1219
        const std::string& name = cfg.names[i];
×
1220
        switch (f) {
×
1221
            case GB_CAT:
1222
            case GB_FUNC_NAME:
1223
            case GB_FILE_HASH:
1224
            case GB_HOST_HASH:
1225
            case GB_FILE_NAME:
1226
            case GB_HOST_NAME:
1227
            case GB_PROC_NAME:
1228
                specs.push_back({name, ColumnType::DICT_STRING});
×
1229
                break;
×
1230
            case GB_PID:
1231
            case GB_TID:
1232
            case GB_IO_CAT:
1233
            case GB_ACC_PAT:
1234
            case GB_TIME_RANGE:
1235
                specs.push_back({name, ColumnType::INT64});
×
1236
                break;
×
1237
        }
1238
    }
1239
    specs.push_back({"count", ColumnType::INT64});
×
1240
    specs.push_back({"time", ColumnType::DOUBLE});
×
1241
    specs.push_back({"size", ColumnType::INT64});
×
1242
    specs.push_back({"time_sq", ColumnType::DOUBLE});
×
1243
    specs.push_back({"size_sq", ColumnType::DOUBLE});
×
1244
    specs.push_back({"time_min", ColumnType::DOUBLE});
×
1245
    specs.push_back({"time_max", ColumnType::DOUBLE});
×
1246
    specs.push_back({"size_min", ColumnType::INT64});
×
1247
    specs.push_back({"size_max", ColumnType::INT64});
×
1248
    specs.push_back({"time_call_min", ColumnType::DOUBLE});
×
1249
    specs.push_back({"time_call_max", ColumnType::DOUBLE});
×
1250
    specs.push_back({"size_call_min", ColumnType::INT64});
×
1251
    specs.push_back({"size_call_max", ColumnType::INT64});
×
1252
    specs.push_back({"time_start", ColumnType::INT64});
×
1253
    specs.push_back({"time_end", ColumnType::INT64});
×
1254
    return specs;
×
1255
}
×
1256

1257
struct DfanalyzerScanInput {
35✔
1258
    const EventAggregator* agg;
1259
    const DfanalyzerContext* ctx;
1260
    std::optional<AggMapType> type_filter;
1261
    Py_ssize_t batch_size;
1262
    std::uint16_t shard_begin;
1263
    std::uint16_t shard_end;
1264
    const GroupByConfig* group_by = nullptr;  // null = full granularity
35✔
1265
};
1266

1267
struct DfanalyzerScanOutput {
1268
    std::vector<ArrowExportResult> events;
1269
    std::vector<ArrowExportResult> profiles;
1270
    std::vector<ArrowExportResult> system;
1271
};
1272

1273
// Immutable per-scan context shared by the row emitters.
1274
struct DfaEmitCtx {
1275
    const DfanalyzerContext* ctx;
1276
    std::uint64_t bucket_width_us;
1277
    Py_ssize_t batch_size;
1278
};
1279

1280
// Finish the current batch into `results` and start a fresh one.
1281
static void flush_dfa_builder(RecordBatchBuilder& builder, std::size_t& count,
220✔
1282
                              std::vector<ArrowExportResult>& results,
1283
                              Py_ssize_t batch_size) {
1284
    if (count > 0) {
220✔
1285
        auto arrow = builder.finish();
76!
1286
        if (arrow.valid()) {
77✔
1287
            results.push_back(std::move(arrow));
76!
1288
        }
33✔
1289
        builder.reset(true);
78!
1290
        builder.reserve(static_cast<std::size_t>(batch_size));
77!
1291
        count = 0;
77✔
1292
    }
77✔
1293
}
220✔
1294

1295
// Emit one full-granularity row from a single aggregated key/value.
1296
static void append_fine_row(RecordBatchBuilder& builder, std::size_t& count,
752✔
1297
                            std::vector<ArrowExportResult>& results,
1298
                            const AggKeyView& kv, const AggMetricsView& mv,
1299
                            std::string_view file_name,
1300
                            std::string_view host_name,
1301
                            std::string_view proc_name, IOCategory io_cat,
1302
                            const DfaEmitCtx& ec) {
1303
    std::size_t ci = 0;
752✔
1304
    builder.append_dict_string(ci++, kv.cat);
752✔
1305
    builder.append_dict_string(ci++, kv.name);
754✔
1306
    builder.append_int64(ci++, static_cast<std::int64_t>(kv.pid));
754✔
1307
    builder.append_int64(ci++, static_cast<std::int64_t>(kv.tid));
750✔
1308
    builder.append_dict_string(ci++, kv.fhash);
752✔
1309
    builder.append_dict_string(ci++, kv.hhash);
756✔
1310
    builder.append_dict_string(ci++, file_name);
755✔
1311
    builder.append_dict_string(ci++, host_name);
756✔
1312
    builder.append_dict_string(ci++, proc_name);
756✔
1313
    builder.append_int64(ci++, static_cast<std::int64_t>(io_cat));
755✔
1314
    builder.append_int64(ci++, 0);
755✔
1315

1316
    builder.append_int64(ci++, static_cast<std::int64_t>(mv.count));
755✔
1317
    builder.append_double(
1,132✔
1318
        ci++, static_cast<double>(mv.dur_total) / ec.ctx->time_resolution);
754✔
1319

1320
    if (mv.size_total > 0) {
754✔
1321
        builder.append_int64(ci++, static_cast<std::int64_t>(mv.size_total));
564✔
1322
    } else {
282✔
1323
        builder.append_null(ci++);
190✔
1324
    }
1325

1326
    builder.append_double(ci++, mv.count > 0 ? static_cast<double>(mv.dur_min) /
1,132!
1327
                                                   ec.ctx->time_resolution
755✔
1328
                                             : 0.0);
1329
    builder.append_double(ci++, mv.count > 0 ? static_cast<double>(mv.dur_max) /
1,130!
1330
                                                   ec.ctx->time_resolution
754✔
1331
                                             : 0.0);
1332

1333
    if (mv.size_total > 0 && mv.count > 0) {
756✔
1334
        builder.append_int64(ci++, static_cast<std::int64_t>(mv.size_min));
564✔
1335
        builder.append_int64(ci++, static_cast<std::int64_t>(mv.size_max));
564✔
1336
    } else {
282✔
1337
        builder.append_null(ci++);
192✔
1338
        builder.append_null(ci++);
192✔
1339
    }
1340

1341
    // offset_min > offset_max only when no offset was ever recorded
1342
    // (MetricStats default min=UINT64_MAX, max=0); 0 is a valid offset.
1343
    if (mv.offset_min <= mv.offset_max) {
755!
1344
        builder.append_int64(ci++, static_cast<std::int64_t>(mv.offset_min));
755✔
1345
        builder.append_int64(ci++, static_cast<std::int64_t>(mv.offset_max));
754✔
1346
    } else {
378✔
NEW
1347
        builder.append_null(ci++);
×
NEW
1348
        builder.append_null(ci++);
×
1349
    }
1350

1351
    auto time_range =
756✔
1352
        ec.bucket_width_us > 0
756!
1353
            ? static_cast<std::int64_t>((kv.time_bucket - ec.ctx->time_origin) /
1,134!
1354
                                        ec.bucket_width_us)
756✔
1355
            : 0;
1356
    builder.append_int64(ci++, time_range);
756✔
1357
    // Counter (profile) rows align to the bucket grid: time_start is the
1358
    // bucket start, time_end one bucket later. Plain events keep the
1359
    // precise min/max event timestamps.
1360
    if (kv.map_type == AggMapType::PROFILE) {
756!
NEW
1361
        auto bucket_start =
×
NEW
1362
            static_cast<std::int64_t>(kv.time_bucket - ec.ctx->time_origin);
×
NEW
1363
        builder.append_int64(ci++, bucket_start);
×
NEW
1364
        builder.append_int64(
×
NEW
1365
            ci++, bucket_start + static_cast<std::int64_t>(ec.bucket_width_us));
×
1366
    } else {
1367
        builder.append_int64(
1,134✔
1368
            ci++, static_cast<std::int64_t>(mv.ts - ec.ctx->time_origin));
756✔
1369
        builder.append_int64(
1,134✔
1370
            ci++, static_cast<std::int64_t>(mv.te - ec.ctx->time_origin));
756✔
1371
    }
1372
    builder.end_row();
755✔
1373

1374
    count++;
754✔
1375
    if (static_cast<Py_ssize_t>(count) >= ec.batch_size) {
754✔
NEW
1376
        flush_dfa_builder(builder, count, results, ec.batch_size);
×
1377
    }
1378
}
754✔
1379

1380
// Emit one coarse (grouped) row from an accumulated key/metrics pair.
NEW
1381
static void append_coarse_row(RecordBatchBuilder& builder, const CoarseKey& key,
×
1382
                              const CoarseMetrics& m,
1383
                              const GroupByConfig& cfg) {
NEW
1384
    std::size_t ci = 0;
×
NEW
1385
    for (std::size_t i = 0; i < cfg.order.size(); ++i) {
×
NEW
1386
        switch (cfg.order[i]) {
×
1387
            case GB_CAT:
NEW
1388
                builder.append_dict_string(ci++, key.cat);
×
NEW
1389
                break;
×
1390
            case GB_FUNC_NAME:
NEW
1391
                builder.append_dict_string(ci++, key.func_name);
×
NEW
1392
                break;
×
1393
            case GB_PID:
NEW
1394
                builder.append_int64(ci++, static_cast<std::int64_t>(key.pid));
×
NEW
1395
                break;
×
1396
            case GB_TID:
NEW
1397
                builder.append_int64(ci++, static_cast<std::int64_t>(key.tid));
×
NEW
1398
                break;
×
1399
            case GB_FILE_HASH:
NEW
1400
                builder.append_dict_string(ci++, key.file_hash);
×
NEW
1401
                break;
×
1402
            case GB_HOST_HASH:
NEW
1403
                builder.append_dict_string(ci++, key.host_hash);
×
NEW
1404
                break;
×
1405
            case GB_FILE_NAME:
NEW
1406
                builder.append_dict_string(ci++, key.file_name);
×
NEW
1407
                break;
×
1408
            case GB_HOST_NAME:
NEW
1409
                builder.append_dict_string(ci++, key.host_name);
×
NEW
1410
                break;
×
1411
            case GB_PROC_NAME:
NEW
1412
                builder.append_dict_string(ci++, key.proc_name);
×
NEW
1413
                break;
×
1414
            case GB_IO_CAT:
NEW
1415
                builder.append_int64(ci++, key.io_cat);
×
NEW
1416
                break;
×
1417
            case GB_ACC_PAT:
NEW
1418
                builder.append_int64(ci++, key.acc_pat);
×
NEW
1419
                break;
×
1420
            case GB_TIME_RANGE:
NEW
1421
                builder.append_int64(ci++, key.time_range);
×
NEW
1422
                break;
×
1423
        }
1424
    }
NEW
1425
    builder.append_int64(ci++, static_cast<std::int64_t>(m.count));
×
NEW
1426
    builder.append_double(ci++, m.time_sum);
×
NEW
1427
    if (m.has_size) {
×
NEW
1428
        builder.append_int64(ci++, static_cast<std::int64_t>(m.size_sum));
×
1429
    } else {
NEW
1430
        builder.append_null(ci++);
×
1431
    }
NEW
1432
    builder.append_double(ci++, m.time_sq_sum);
×
NEW
1433
    if (m.has_size) {
×
NEW
1434
        builder.append_double(ci++, m.size_sq_sum);
×
1435
    } else {
NEW
1436
        builder.append_null(ci++);
×
1437
    }
NEW
1438
    builder.append_double(ci++, m.count > 0 ? m.time_min_val : 0.0);
×
NEW
1439
    builder.append_double(ci++, m.count > 0 ? m.time_max_val : 0.0);
×
NEW
1440
    if (m.has_size) {
×
NEW
1441
        builder.append_int64(ci++, static_cast<std::int64_t>(m.size_min_val));
×
NEW
1442
        builder.append_int64(ci++, static_cast<std::int64_t>(m.size_max_val));
×
1443
    } else {
NEW
1444
        builder.append_null(ci++);
×
NEW
1445
        builder.append_null(ci++);
×
1446
    }
NEW
1447
    builder.append_double(ci++, m.count > 0 ? m.time_call_min_val : 0.0);
×
NEW
1448
    builder.append_double(ci++, m.count > 0 ? m.time_call_max_val : 0.0);
×
NEW
1449
    if (m.has_size) {
×
NEW
1450
        builder.append_int64(ci++,
×
NEW
1451
                             static_cast<std::int64_t>(m.size_call_min_val));
×
NEW
1452
        builder.append_int64(ci++,
×
NEW
1453
                             static_cast<std::int64_t>(m.size_call_max_val));
×
1454
    } else {
NEW
1455
        builder.append_null(ci++);
×
NEW
1456
        builder.append_null(ci++);
×
1457
    }
NEW
1458
    builder.append_int64(ci++, m.has_time_bounds
×
NEW
1459
                                   ? static_cast<std::int64_t>(m.time_start_val)
×
1460
                                   : 0);
NEW
1461
    builder.append_int64(ci++, m.has_time_bounds
×
NEW
1462
                                   ? static_cast<std::int64_t>(m.time_end_val)
×
1463
                                   : 0);
NEW
1464
    builder.end_row();
×
NEW
1465
}
×
1466

1467
DfanalyzerScanOutput scan_dfanalyzer_shards(DfanalyzerScanInput input) {
82✔
1468
    DfanalyzerScanOutput output;
82✔
1469

1470
    const bool coarse = input.group_by != nullptr;
84✔
1471
    const std::vector<ColumnSpec> coarse_schema =
1472
        coarse ? make_coarse_schema(*input.group_by)
84!
1473
               : std::vector<ColumnSpec>{};
84!
1474

1475
    auto make_builder = [&]() {
249✔
1476
        RecordBatchBuilder b;
211✔
1477
        if (coarse) {
217!
1478
            b.declare_schema(coarse_schema);
×
1479
        } else {
1480
            b.declare_schema(DFANALYZER_SCHEMA);
217✔
1481
        }
1482
        b.reserve(static_cast<std::size_t>(input.batch_size));
224✔
1483
        return b;
222✔
1484
    };
98!
1485

1486
    RecordBatchBuilder event_builder, profile_builder, system_builder;
83!
1487
    bool use_events =
38✔
1488
        !input.type_filter || *input.type_filter == AggMapType::EVENT;
86!
1489
    bool use_profiles =
38✔
1490
        !input.type_filter || *input.type_filter == AggMapType::PROFILE;
82!
1491
    bool use_system =
38✔
1492
        !input.type_filter || *input.type_filter == AggMapType::SYSTEM;
83!
1493

1494
    if (use_events) event_builder = make_builder();
80✔
1495
    if (use_profiles) profile_builder = make_builder();
81✔
1496
    if (use_system) system_builder = make_builder();
86!
1497

1498
    auto bucket_width_us = static_cast<std::uint64_t>(
86✔
1499
        input.ctx->time_granularity * input.ctx->time_resolution);
86✔
1500
    const DfaEmitCtx emit_ctx{input.ctx, bucket_width_us, input.batch_size};
86✔
1501
    std::size_t event_count = 0, profile_count = 0, system_count = 0;
86✔
1502

1503
    HashResolver resolver(input.ctx->file_hashes, input.ctx->host_hashes);
86✔
1504
    std::unordered_map<ProcKey, std::string, ProcKeyHash> proc_name_cache;
84✔
1505
    std::unordered_map<std::string_view, IOCategory> io_cat_cache;
82✔
1506

1507
    std::unordered_map<CoarseKey, CoarseMetrics, CoarseKeyHash> event_coarse,
83✔
1508
        profile_coarse, system_coarse;
84✔
1509

1510
    auto flush_builder = [&](RecordBatchBuilder& builder, std::size_t& count,
256✔
1511
                             std::vector<ArrowExportResult>& results) {
1512
        flush_dfa_builder(builder, count, results, input.batch_size);
220✔
1513
    };
266✔
1514

1515
    auto append_row = [&](RecordBatchBuilder& builder, std::size_t& count,
792✔
1516
                          std::vector<ArrowExportResult>& results,
1517
                          const AggKeyView& kv, const AggMetricsView& mv,
1518
                          std::string_view file_name,
1519
                          std::string_view host_name,
1520
                          std::string_view proc_name, IOCategory io_cat) {
1521
        append_fine_row(builder, count, results, kv, mv, file_name, host_name,
1,134✔
1522
                        proc_name, io_cat, emit_ctx);
756✔
1523
    };
803✔
1524

1525
    auto accumulate_coarse =
1526
        [&](std::unordered_map<CoarseKey, CoarseMetrics, CoarseKeyHash>& map,
36✔
1527
            const AggKeyView& kv, const AggMetricsView& mv,
1528
            std::string_view file_name, std::string_view host_name,
1529
            std::string_view proc_name, IOCategory io_cat) {
1530
            const auto& cfg = *input.group_by;
×
1531
            // Probe with non-interned views; hash/equality compare by content,
1532
            // so string_view lifetime doesn't matter for lookup. We only copy
1533
            // (intern) on first insert.
1534
            CoarseKey probe;
×
1535
            if (cfg.mask & GB_CAT) probe.cat = kv.cat;
×
1536
            if (cfg.mask & GB_FUNC_NAME) probe.func_name = kv.name;
×
1537
            if (cfg.mask & GB_PID) probe.pid = kv.pid;
×
1538
            if (cfg.mask & GB_TID) probe.tid = kv.tid;
×
1539
            if (cfg.mask & GB_FILE_HASH) probe.file_hash = kv.fhash;
×
1540
            if (cfg.mask & GB_HOST_HASH) probe.host_hash = kv.hhash;
×
1541
            if (cfg.mask & GB_FILE_NAME) probe.file_name = file_name;
×
1542
            if (cfg.mask & GB_HOST_NAME) probe.host_name = host_name;
×
1543
            if (cfg.mask & GB_PROC_NAME) probe.proc_name = proc_name;
×
1544
            if (cfg.mask & GB_IO_CAT)
×
1545
                probe.io_cat = static_cast<std::int64_t>(io_cat);
×
1546
            if (cfg.mask & GB_TIME_RANGE) {
×
1547
                probe.time_range =
×
1548
                    bucket_width_us > 0
×
1549
                        ? static_cast<std::int64_t>(
×
1550
                              (kv.time_bucket - input.ctx->time_origin) /
×
1551
                              bucket_width_us)
×
1552
                        : 0;
1553
            }
1554
            // acc_pat is always 0 today; included for completeness.
1555

1556
            auto it = map.find(probe);
×
1557
            if (it == map.end()) {
×
1558
                // First sighting: promote views referencing unstable DB buffers
1559
                // to interned copies. file_name/host_name come from the
1560
                // resolver's intern pool, and proc_name from proc_name_cache;
1561
                // both already stable across iterations, no copy needed.
1562
                CoarseKey stable = probe;
×
1563
                if (cfg.mask & GB_CAT) stable.cat = resolver.intern(kv.cat);
×
1564
                if (cfg.mask & GB_FUNC_NAME)
×
1565
                    stable.func_name = resolver.intern(kv.name);
×
1566
                if (cfg.mask & GB_FILE_HASH)
×
1567
                    stable.file_hash = resolver.intern(kv.fhash);
×
1568
                if (cfg.mask & GB_HOST_HASH)
×
1569
                    stable.host_hash = resolver.intern(kv.hhash);
×
1570
                auto [nit, _] = map.emplace(std::move(stable), CoarseMetrics{});
×
1571
                it = nit;
×
1572
            }
1573
            CoarseMetrics& m = it->second;
×
1574
            m.count += mv.count;
×
1575
            double time_val =
×
1576
                static_cast<double>(mv.dur_total) / input.ctx->time_resolution;
×
1577
            m.time_sum += time_val;
×
1578
            m.time_sq_sum += time_val * time_val;
×
1579
            if (time_val < m.time_call_min_val) m.time_call_min_val = time_val;
×
1580
            if (time_val > m.time_call_max_val) m.time_call_max_val = time_val;
×
1581
            if (mv.count > 0) {
×
1582
                double dur_min_v = static_cast<double>(mv.dur_min) /
×
1583
                                   input.ctx->time_resolution;
×
1584
                double dur_max_v = static_cast<double>(mv.dur_max) /
×
1585
                                   input.ctx->time_resolution;
×
1586
                if (dur_min_v < m.time_min_val) m.time_min_val = dur_min_v;
×
1587
                if (dur_max_v > m.time_max_val) m.time_max_val = dur_max_v;
×
1588
            }
1589
            if (mv.size_total > 0) {
×
1590
                m.has_size = true;
×
1591
                m.size_sum += mv.size_total;
×
1592
                double sz = static_cast<double>(mv.size_total);
×
1593
                m.size_sq_sum += sz * sz;
×
1594
                if (mv.size_total < m.size_call_min_val)
×
1595
                    m.size_call_min_val = mv.size_total;
×
1596
                if (mv.size_total > m.size_call_max_val)
×
1597
                    m.size_call_max_val = mv.size_total;
×
1598
                if (mv.count > 0) {
×
1599
                    if (mv.size_min < m.size_min_val)
×
1600
                        m.size_min_val = mv.size_min;
×
1601
                    if (mv.size_max > m.size_max_val)
×
1602
                        m.size_max_val = mv.size_max;
×
1603
                }
1604
            }
1605
            if (mv.ts >= input.ctx->time_origin) {
×
1606
                m.has_time_bounds = true;
×
1607
                auto ts_off = mv.ts - input.ctx->time_origin;
×
1608
                auto te_off = mv.te - input.ctx->time_origin;
×
1609
                if (ts_off < m.time_start_val) m.time_start_val = ts_off;
×
1610
                if (te_off > m.time_end_val) m.time_end_val = te_off;
×
1611
            }
1612
        };
×
1613

1614
    input.agg->scan_shard_range_raw(
72!
1615
        input.shard_begin, input.shard_end,
83!
1616
        [&](std::string_view key_bytes, std::string_view val_bytes) -> bool {
944✔
1617
            AggKeyView kv;
908✔
1618
            if (!parse_agg_key_view(key_bytes, kv)) return true;
882!
1619

1620
            if (input.type_filter && kv.map_type != *input.type_filter)
904!
1621
                return true;
×
1622

1623
            if (input.ctx->query_filter) {
902✔
1624
                auto& q = *input.ctx->query_filter;
525✔
1625
                dftracer::utils::utilities::common::query::ValueMap fields;
525!
1626
                if (q.references("cat")) fields["cat"] = std::string(kv.cat);
522!
1627
                if (q.references("name")) fields["name"] = std::string(kv.name);
522!
1628
                if (q.references("pid")) fields["pid"] = kv.pid;
522!
1629
                if (q.references("tid")) fields["tid"] = kv.tid;
516!
1630
                if (q.references("hhash"))
522!
1631
                    fields["hhash"] = std::string(kv.hhash);
×
1632
                if (q.references("fhash"))
524!
1633
                    fields["fhash"] = std::string(kv.fhash);
×
1634
                if (q.references("time_bucket"))
525!
1635
                    fields["time_bucket"] = kv.time_bucket;
×
1636
                if (!q.evaluate(fields)) return true;
525!
1637
            }
525✔
1638

1639
            AggMetricsView mv;
1640
            if (!parse_agg_value_view(val_bytes, mv)) return true;
746✔
1641

1642
            auto file_name = resolver.resolve_file(kv.fhash);
752!
1643
            auto host_name = resolver.resolve_host(kv.hhash);
752!
1644

1645
            ProcKey pk{kv.hhash, kv.pid, kv.tid};
749✔
1646
            auto proc_it = proc_name_cache.find(pk);
749!
1647
            std::string_view proc_name;
749✔
1648
            if (proc_it != proc_name_cache.end()) {
749✔
1649
                proc_name = proc_it->second;
441✔
1650
            } else {
239✔
1651
                std::string pn = "app#";
308!
1652
                if (!host_name.empty()) {
312!
1653
                    pn.append(host_name);
310!
1654
                } else if (!kv.hhash.empty()) {
139!
1655
                    pn.append(kv.hhash);
×
1656
                } else {
1657
                    pn.append("unknown");
×
1658
                }
1659
                pn.push_back('#');
313!
1660
                pn.append(std::to_string(kv.pid));
309!
1661
                pn.push_back('#');
309!
1662
                pn.append(std::to_string(kv.tid));
312!
1663
                ProcKey stable_pk{resolver.intern(kv.hhash), kv.pid, kv.tid};
313!
1664
                auto [it, _] =
311✔
1665
                    proc_name_cache.emplace(stable_pk, std::move(pn));
313!
1666
                proc_name = it->second;
310!
1667
            }
310✔
1668

1669
            auto io_it = io_cat_cache.find(kv.name);
752!
1670
            IOCategory io_cat;
1671
            if (io_it != io_cat_cache.end()) {
752✔
1672
                io_cat = io_it->second;
312✔
1673
            } else {
174✔
1674
                io_cat = get_io_category(kv.name);
441✔
1675
                io_cat_cache[resolver.intern(kv.name)] = io_cat;
444!
1676
            }
1677

1678
            if (coarse) {
754!
1679
                switch (kv.map_type) {
×
1680
                    case AggMapType::EVENT:
1681
                        if (use_events)
×
1682
                            accumulate_coarse(event_coarse, kv, mv, file_name,
×
1683
                                              host_name, proc_name, io_cat);
1684
                        break;
×
1685
                    case AggMapType::PROFILE:
1686
                        if (use_profiles)
×
1687
                            accumulate_coarse(profile_coarse, kv, mv, file_name,
×
1688
                                              host_name, proc_name, io_cat);
1689
                        break;
×
1690
                    case AggMapType::SYSTEM:
1691
                        if (use_system)
×
1692
                            accumulate_coarse(system_coarse, kv, mv, file_name,
×
1693
                                              host_name, proc_name, io_cat);
1694
                        break;
×
1695
                }
1696
            } else {
1697
                switch (kv.map_type) {
754!
1698
                    case AggMapType::EVENT:
376✔
1699
                        append_row(event_builder, event_count, output.events,
1,132!
1700
                                   kv, mv, file_name, host_name, proc_name,
378✔
1701
                                   io_cat);
378✔
1702
                        break;
756✔
1703
                    case AggMapType::PROFILE:
1704
                        append_row(profile_builder, profile_count,
×
1705
                                   output.profiles, kv, mv, file_name,
×
1706
                                   host_name, proc_name, io_cat);
1707
                        break;
×
1708
                    case AggMapType::SYSTEM:
1709
                        append_row(system_builder, system_count, output.system,
×
1710
                                   kv, mv, file_name, host_name, proc_name,
1711
                                   io_cat);
1712
                        break;
×
1713
                }
1714
            }
1715
            return true;
756✔
1716
        });
456✔
1717

1718
    if (coarse) {
84!
1719
        const auto& cfg = *input.group_by;
×
1720
        auto flush_coarse = [&](std::unordered_map<CoarseKey, CoarseMetrics,
×
1721
                                                   CoarseKeyHash>& map,
1722
                                RecordBatchBuilder& builder, std::size_t& count,
1723
                                std::vector<ArrowExportResult>& results) {
1724
            for (auto& [key, m] : map) {
×
NEW
1725
                append_coarse_row(builder, key, m, cfg);
×
1726
                ++count;
×
1727
                if (static_cast<Py_ssize_t>(count) >= input.batch_size) {
×
1728
                    flush_builder(builder, count, results);
×
1729
                }
1730
            }
1731
            flush_builder(builder, count, results);
×
1732
        };
×
1733
        if (use_events)
×
1734
            flush_coarse(event_coarse, event_builder, event_count,
×
1735
                         output.events);
×
1736
        if (use_profiles)
×
1737
            flush_coarse(profile_coarse, profile_builder, profile_count,
×
1738
                         output.profiles);
×
1739
        if (use_system)
×
1740
            flush_coarse(system_coarse, system_builder, system_count,
×
1741
                         output.system);
×
1742
    } else {
1743
        if (use_events)
84!
1744
            flush_builder(event_builder, event_count, output.events);
84!
1745
        if (use_profiles)
83✔
1746
            flush_builder(profile_builder, profile_count, output.profiles);
69!
1747
        if (use_system)
84✔
1748
            flush_builder(system_builder, system_count, output.system);
70!
1749
    }
1750

1751
    return output;
129✔
1752
}
86!
1753

1754
// Two-pass scan over SYSTEM_METRICS CF: pass 1 discovers metric column names
1755
// (dynamic per workload), pass 2 emits rows. Needed because RecordBatchBuilder
1756
// requires the schema up front.
1757
std::vector<ArrowExportResult> scan_system_metrics_buffer(
20✔
1758
    const EventAggregator* agg, const DfanalyzerContext* ctx,
1759
    Py_ssize_t batch_size) {
1760
    std::vector<ArrowExportResult> results;
20✔
1761
    if (!agg) return results;
20✔
1762

1763
    std::vector<std::string> metric_names_ordered;
20✔
1764
    std::unordered_set<std::string> metric_name_seen;
20✔
1765
    agg->scan_system_metrics_raw(
30!
1766
        [&](std::string_view, std::string_view val_bytes) -> bool {
10✔
1767
            auto m = deserialize_system_value(val_bytes);
×
1768
            if (m.metrics) {
×
1769
                for (const auto& [name, _] : *m.metrics) {
×
1770
                    if (metric_name_seen.insert(name).second) {
×
1771
                        metric_names_ordered.push_back(name);
×
1772
                    }
1773
                }
1774
            }
1775
            return true;
1776
        });
×
1777

1778
    if (metric_names_ordered.empty()) return results;
20✔
1779

1780
    // SystemAggregationMetrics::metrics is an unordered_map; sort the
1781
    // discovered column names so the emitted Arrow schema is deterministic
1782
    // across runs and builds.
1783
    std::sort(metric_names_ordered.begin(), metric_names_ordered.end());
×
1784

1785
    std::vector<ColumnSpec> schema;
×
1786
    schema.reserve(6 + metric_names_ordered.size());
×
1787
    schema.push_back({"host_hash", ColumnType::DICT_STRING});
×
1788
    schema.push_back({"name", ColumnType::DICT_STRING});
×
1789
    schema.push_back({"time_bucket", ColumnType::INT64});
×
1790
    schema.push_back({"ts", ColumnType::INT64});
×
1791
    schema.push_back({"te", ColumnType::INT64});
×
1792
    schema.push_back({"count", ColumnType::INT64});
×
1793
    for (const auto& mn : metric_names_ordered) {
×
1794
        schema.push_back({mn, ColumnType::DOUBLE});
×
1795
    }
1796

1797
    RecordBatchBuilder builder;
×
1798
    builder.declare_schema(schema);
×
1799
    builder.reserve(static_cast<std::size_t>(batch_size));
×
1800

1801
    auto flush = [&](std::size_t& row_count) {
×
1802
        if (row_count == 0) return;
×
1803
        auto arrow = builder.finish();
×
1804
        if (arrow.valid()) results.push_back(std::move(arrow));
×
1805
        builder.reset(true);
×
1806
        builder.reserve(static_cast<std::size_t>(batch_size));
×
1807
        row_count = 0;
×
1808
    };
×
1809

1810
    std::size_t row_count = 0;
×
1811
    const std::size_t n_metric_cols = metric_names_ordered.size();
×
1812

1813
    agg->scan_system_metrics_raw(
×
1814
        [&](std::string_view key_bytes, std::string_view val_bytes) -> bool {
×
1815
            auto k = deserialize_system_key(key_bytes);
×
1816
            auto m = deserialize_system_value(val_bytes);
×
1817

1818
            std::size_t ci = 0;
×
1819
            builder.append_dict_string(ci++, k.key.hhash);
×
1820
            builder.append_dict_string(ci++, k.key.name);
×
1821
            builder.append_int64(ci++,
×
1822
                                 static_cast<std::int64_t>(k.key.time_bucket));
×
1823
            builder.append_int64(ci++, static_cast<std::int64_t>(m.ts));
×
1824
            builder.append_int64(ci++, static_cast<std::int64_t>(m.te));
×
1825
            builder.append_int64(ci++, static_cast<std::int64_t>(m.count));
×
1826

1827
            for (std::size_t i = 0; i < n_metric_cols; ++i) {
×
1828
                const auto& mn = metric_names_ordered[i];
×
1829
                bool present = false;
×
1830
                if (m.metrics) {
×
1831
                    auto it = m.metrics->find(mn);
×
1832
                    if (it != m.metrics->end()) {
×
1833
                        builder.append_double(ci++, it->second.mean);
×
1834
                        present = true;
×
1835
                    }
1836
                }
1837
                if (!present) builder.append_null(ci++);
×
1838
            }
1839
            builder.end_row();
×
1840
            row_count++;
×
1841
            if (static_cast<Py_ssize_t>(row_count) >= batch_size) {
×
1842
                flush(row_count);
×
1843
            }
1844
            return true;
1845
        });
×
1846
    flush(row_count);
×
1847

1848
    (void)ctx;
1849
    return results;
×
1850
}
20!
1851

1852
}  // namespace
1853

1854
static PyObject* Indexer_iter_aggregation(IndexerObject* self, PyObject* args,
×
1855
                                          PyObject* kwds) {
1856
    static const char* kwlist[] = {"type", "batch_size", nullptr};
1857
    const char* type_str = "events";
×
1858
    Py_ssize_t batch_size = 10000;
×
1859

1860
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|sn", (char**)kwlist,
×
1861
                                     &type_str, &batch_size)) {
1862
        return nullptr;
×
1863
    }
1864

1865
    AggMapType target_type;
1866
    if (!parse_agg_type_str(type_str, target_type)) return nullptr;
×
1867

1868
    AggregationBatchType batch_type;
1869
    if (target_type == AggMapType::EVENT)
×
1870
        batch_type = AggregationBatchType::EVENT;
×
1871
    else if (target_type == AggMapType::PROFILE)
×
1872
        batch_type = AggregationBatchType::PROFILE;
×
1873
    else
1874
        batch_type = AggregationBatchType::SYSTEM;
×
1875

1876
    auto idx_opt = resolve_index_path(self);
×
1877
    if (!idx_opt) return nullptr;
×
1878
    std::string index_path = std::move(*idx_opt);
×
1879

1880
    PyObject* batch_list = PyList_New(0);
×
1881
    if (!batch_list) return nullptr;
×
1882

1883
    std::string error_msg;
×
1884
    std::vector<dftracer::utils::utilities::common::arrow::ArrowExportResult>
1885
        results;
×
1886

1887
    Py_BEGIN_ALLOW_THREADS try {
×
1888
        auto handle = open_agg_db(index_path, error_msg);
×
1889
        if (handle) {
×
1890
            Runtime* rt = get_batch_indexer_runtime(self);
×
1891
            std::vector<AggScanOutput> outputs;
×
1892
            parallel_shard_scan<AggScanOutput>(
×
1893
                rt,
1894
                [&](std::uint16_t shard_begin, std::uint16_t shard_end) {
×
1895
                    AggScanInput input;
1896
                    input.agg = handle->agg.get();
×
1897
                    input.target_type = target_type;
×
1898
                    input.batch_type = batch_type;
×
1899
                    input.batch_size = batch_size;
×
1900
                    input.shard_begin = shard_begin;
×
1901
                    input.shard_end = shard_end;
×
1902
                    return scan_aggregation_shard_range(input);
×
1903
                },
1904
                outputs);
1905

1906
            for (auto& out : outputs) {
×
1907
                for (auto& r : out.results) {
×
1908
                    results.push_back(std::move(r));
×
1909
                }
1910
            }
1911
        }
×
1912
    } catch (const std::exception& e) {
×
1913
        error_msg = e.what();
×
1914
    }
×
1915
    Py_END_ALLOW_THREADS
×
1916

1917
        if (!error_msg.empty()) {
×
1918
        Py_DECREF(batch_list);
×
1919
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
1920
        return nullptr;
×
1921
    }
1922

1923
    append_results_to_list(batch_list, results);
×
1924

1925
    PyObject* iter = PyObject_GetIter(batch_list);
×
1926
    Py_DECREF(batch_list);
×
1927
    return iter;
×
1928
}
×
1929

1930
static PyObject* Indexer_iter_arrow_dfanalyzer(IndexerObject* self,
4✔
1931
                                               PyObject* args, PyObject* kwds) {
1932
    static const char* kwlist[] = {
1933
        "type",  "batch_size", "time_granularity", "time_resolution",
1934
        "query", nullptr};
1935
    const char* type_str = "events";
4✔
1936
    Py_ssize_t batch_size = 10000;
4✔
1937
    double time_granularity = 1.0;
4✔
1938
    double time_resolution = 1000000.0;
4✔
1939
    const char* query_str = nullptr;
4✔
1940

1941
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|snddz", (char**)kwlist,
4!
1942
                                     &type_str, &batch_size, &time_granularity,
1943
                                     &time_resolution, &query_str)) {
1944
        return nullptr;
×
1945
    }
1946

1947
    AggMapType target_type;
1948
    if (!parse_agg_type_str(type_str, target_type)) return nullptr;
4!
1949

1950
    auto query_opt = parse_query_arg(query_str);
4!
1951
    if (!query_opt && PyErr_Occurred()) return nullptr;
4!
1952

1953
    auto idx_opt = resolve_index_path(self);
4!
1954
    if (!idx_opt) return nullptr;
4✔
1955
    std::string index_path = std::move(*idx_opt);
4✔
1956

1957
    PyObject* batch_list = PyList_New(0);
4!
1958
    if (!batch_list) return nullptr;
4✔
1959

1960
    std::string error_msg;
4✔
1961
    std::vector<ArrowExportResult> results;
4✔
1962

1963
    Py_BEGIN_ALLOW_THREADS try {
4!
1964
        auto handle = open_agg_db(index_path, error_msg);
4!
1965
        if (handle) {
4✔
1966
            dftracer::utils::utilities::indexer::IndexDatabase idx_db(
2!
1967
                index_path,
1968
                dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
2!
1969
            auto file_hashes =
1970
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
2!
1971
                                            IndexDatabase::HashType::FILE);
2!
1972
            auto host_hashes =
1973
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
2!
1974
                                            IndexDatabase::HashType::HOST);
2!
1975

1976
            auto time_bounds = handle->agg->query_time_bounds();
4!
1977
            std::uint64_t time_origin =
4✔
1978
                time_bounds.valid ? time_bounds.min_time_bucket : 0;
4!
1979

1980
            DfanalyzerContext ctx;
4✔
1981
            ctx.file_hashes = &file_hashes;
4✔
1982
            ctx.host_hashes = &host_hashes;
4✔
1983
            ctx.query_filter = query_opt ? &*query_opt : nullptr;
4!
1984
            ctx.time_origin = time_origin;
4✔
1985
            ctx.time_resolution = time_resolution;
4✔
1986
            ctx.time_granularity = time_granularity;
4✔
1987

1988
            Runtime* rt = get_batch_indexer_runtime(self);
4!
1989
            std::vector<DfanalyzerScanOutput> outputs;
4✔
1990
            parallel_shard_scan<DfanalyzerScanOutput>(
4!
1991
                rt,
2✔
1992
                [&](std::uint16_t shard_begin, std::uint16_t shard_end) {
16✔
1993
                    DfanalyzerScanInput input;
14✔
1994
                    input.agg = handle->agg.get();
14✔
1995
                    input.ctx = &ctx;
13✔
1996
                    input.type_filter = target_type;
13✔
1997
                    input.batch_size = batch_size;
14✔
1998
                    input.shard_begin = shard_begin;
14✔
1999
                    input.shard_end = shard_end;
14✔
2000
                    return scan_dfanalyzer_shards(input);
22!
2001
                },
2002
                outputs);
2003

2004
            for (auto& out : outputs) {
18✔
2005
                for (auto& r : out.events) results.push_back(std::move(r));
28✔
2006
                for (auto& r : out.profiles) results.push_back(std::move(r));
14!
2007
                for (auto& r : out.system) results.push_back(std::move(r));
14!
2008
            }
2009
        }
4✔
2010
    } catch (const std::exception& e) {
4!
2011
        error_msg = e.what();
×
2012
    }
×
2013
    Py_END_ALLOW_THREADS
4!
2014

2015
        if (!error_msg.empty()) {
4!
2016
        Py_DECREF(batch_list);
×
2017
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
2018
        return nullptr;
×
2019
    }
2020

2021
    append_results_to_list(batch_list, results);
4!
2022

2023
    PyObject* iter = PyObject_GetIter(batch_list);
4!
2024
    Py_DECREF(batch_list);
2!
2025
    return iter;
4✔
2026
}
4✔
2027

2028
static bool parse_group_by_arg(PyObject* obj, GroupByConfig& out) {
20✔
2029
    if (!obj || obj == Py_None) return true;
20!
2030
    if (!PySequence_Check(obj)) {
×
2031
        PyErr_SetString(PyExc_TypeError,
×
2032
                        "group_by must be a sequence of strings or None");
2033
        return false;
×
2034
    }
2035
    Py_ssize_t n = PySequence_Length(obj);
×
2036
    for (Py_ssize_t i = 0; i < n; ++i) {
×
2037
        PyObject* item = PySequence_GetItem(obj, i);
×
2038
        if (!item) return false;
×
2039
        if (!PyUnicode_Check(item)) {
×
2040
            Py_DECREF(item);
2041
            PyErr_SetString(PyExc_TypeError,
×
2042
                            "group_by entries must be strings");
2043
            return false;
×
2044
        }
2045
        Py_ssize_t sz = 0;
×
2046
        const char* s = PyUnicode_AsUTF8AndSize(item, &sz);
×
2047
        if (!s) {
×
2048
            Py_DECREF(item);
2049
            return false;
×
2050
        }
2051
        std::string_view sv(s, static_cast<std::size_t>(sz));
×
2052
        auto field = parse_group_by_name(sv);
×
2053
        if (!field) {
×
2054
            std::string msg = "unsupported group_by field: ";
×
2055
            msg.append(sv);
×
2056
            Py_DECREF(item);
×
2057
            PyErr_SetString(PyExc_ValueError, msg.c_str());
×
2058
            return false;
×
2059
        }
×
2060
        if (!(out.mask & *field)) {
×
2061
            out.mask |= *field;
×
2062
            out.order.push_back(*field);
×
2063
            out.names.emplace_back(sv);
×
2064
        }
2065
        Py_DECREF(item);
2066
    }
2067
    return true;
×
2068
}
10✔
2069

2070
static PyObject* Indexer_iter_arrow_dfanalyzer_all(IndexerObject* self,
22✔
2071
                                                   PyObject* args,
2072
                                                   PyObject* kwds) {
2073
    static const char* kwlist[] = {"batch_size",      "time_granularity",
2074
                                   "time_resolution", "query",
2075
                                   "group_by",        nullptr};
2076
    Py_ssize_t batch_size = 10000;
22✔
2077
    double time_granularity = 1.0;
22✔
2078
    double time_resolution = 1000000.0;
22✔
2079
    const char* query_str = nullptr;
22✔
2080
    PyObject* group_by_obj = nullptr;
22✔
2081

2082
    if (!PyArg_ParseTupleAndKeywords(
22!
2083
            args, kwds, "|nddzO", (char**)kwlist, &batch_size,
11✔
2084
            &time_granularity, &time_resolution, &query_str, &group_by_obj)) {
2085
        return nullptr;
×
2086
    }
2087

2088
    auto query_opt = parse_query_arg(query_str);
22!
2089
    if (!query_opt && PyErr_Occurred()) return nullptr;
22!
2090

2091
    GroupByConfig group_by_cfg;
20✔
2092
    if (!parse_group_by_arg(group_by_obj, group_by_cfg)) return nullptr;
20!
2093
    const GroupByConfig* group_by_ptr =
20✔
2094
        group_by_cfg.mask != 0 ? &group_by_cfg : nullptr;
20!
2095

2096
    auto idx_opt = resolve_index_path(self);
20!
2097
    if (!idx_opt) return nullptr;
20✔
2098
    std::string index_path = std::move(*idx_opt);
20✔
2099

2100
    PyObject* result_dict = PyDict_New();
20!
2101
    if (!result_dict) return nullptr;
20✔
2102

2103
    PyObject* events_list = PyList_New(0);
20!
2104
    PyObject* profiles_list = PyList_New(0);
20!
2105
    PyObject* system_list = PyList_New(0);
20!
2106
    if (!events_list || !profiles_list || !system_list) {
20!
2107
        Py_XDECREF(events_list);
×
2108
        Py_XDECREF(profiles_list);
×
2109
        Py_XDECREF(system_list);
×
2110
        Py_DECREF(result_dict);
×
2111
        return nullptr;
×
2112
    }
2113

2114
    std::string error_msg;
20✔
2115
    std::vector<ArrowExportResult> events_results, profiles_results,
20✔
2116
        system_results;
20✔
2117

2118
    Py_BEGIN_ALLOW_THREADS try {
20!
2119
        auto handle = open_agg_db(index_path, error_msg);
20!
2120
        if (handle) {
20✔
2121
            dftracer::utils::utilities::indexer::IndexDatabase idx_db(
10!
2122
                index_path,
2123
                dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
10!
2124
            auto file_hashes =
2125
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
10!
2126
                                            IndexDatabase::HashType::FILE);
10!
2127
            auto host_hashes =
2128
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
10!
2129
                                            IndexDatabase::HashType::HOST);
10!
2130

2131
            auto time_bounds = handle->agg->query_time_bounds();
20!
2132
            std::uint64_t time_origin =
20✔
2133
                time_bounds.valid ? time_bounds.min_time_bucket : 0;
20!
2134

2135
            DfanalyzerContext ctx;
20✔
2136
            ctx.file_hashes = &file_hashes;
20✔
2137
            ctx.host_hashes = &host_hashes;
20✔
2138
            ctx.query_filter = query_opt ? &*query_opt : nullptr;
20✔
2139
            ctx.time_origin = time_origin;
20✔
2140
            ctx.time_resolution = time_resolution;
20✔
2141
            ctx.time_granularity = time_granularity;
20✔
2142

2143
            Runtime* rt = get_batch_indexer_runtime(self);
20!
2144
            std::vector<DfanalyzerScanOutput> outputs;
20✔
2145
            parallel_shard_scan<DfanalyzerScanOutput>(
20!
2146
                rt,
10✔
2147
                [&](std::uint16_t shard_begin, std::uint16_t shard_end) {
77✔
2148
                    DfanalyzerScanInput input;
67✔
2149
                    input.agg = handle->agg.get();
68✔
2150
                    input.ctx = &ctx;
65✔
2151
                    input.type_filter = std::nullopt;
65✔
2152
                    input.batch_size = batch_size;
65✔
2153
                    input.shard_begin = shard_begin;
65✔
2154
                    input.shard_end = shard_end;
65✔
2155
                    input.group_by = group_by_ptr;
65✔
2156
                    return scan_dfanalyzer_shards(input);
105!
2157
                },
2158
                outputs);
2159

2160
            for (auto& out : outputs) {
90✔
2161
                for (auto& r : out.events)
133✔
2162
                    events_results.push_back(std::move(r));
63!
2163
                for (auto& r : out.profiles)
70✔
2164
                    profiles_results.push_back(std::move(r));
×
2165
                for (auto& r : out.system)
70✔
2166
                    system_results.push_back(std::move(r));
×
2167
            }
2168

2169
            auto sys_buf =
2170
                scan_system_metrics_buffer(handle->agg.get(), &ctx, batch_size);
20!
2171
            for (auto& r : sys_buf) system_results.push_back(std::move(r));
20!
2172
        }
20✔
2173
    } catch (const std::exception& e) {
20!
2174
        error_msg = e.what();
×
2175
    }
×
2176
    Py_END_ALLOW_THREADS
20!
2177

2178
        if (!error_msg.empty()) {
20!
2179
        Py_DECREF(events_list);
×
2180
        Py_DECREF(profiles_list);
×
2181
        Py_DECREF(system_list);
×
2182
        Py_DECREF(result_dict);
×
2183
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
2184
        return nullptr;
×
2185
    }
2186

2187
    append_results_to_list(events_list, events_results);
20!
2188
    append_results_to_list(profiles_list, profiles_results);
20!
2189
    append_results_to_list(system_list, system_results);
20!
2190

2191
    PyDict_SetItemString(result_dict, "events", events_list);
20!
2192
    PyDict_SetItemString(result_dict, "profiles", profiles_list);
20!
2193
    PyDict_SetItemString(result_dict, "system", system_list);
20!
2194
    Py_DECREF(events_list);
10!
2195
    Py_DECREF(profiles_list);
10!
2196
    Py_DECREF(system_list);
10!
2197

2198
    return result_dict;
20✔
2199
}
22✔
2200

2201
// ---------------------------------------------------------------------------
2202
// scan_aggregation_manifest — module-level entry point for analyze_trace.
2203
//
2204
// Each Dask worker calls this with its slice of the agg manifest
2205
// (agg_ssts + sys_ssts) and optionally a [shard_begin, shard_end) range.
2206
// The function opens a scratch IndexDatabase at `scratch_dir`, ingests the
2207
// SSTs into its AGGREGATION/SYSTEM_METRICS CFs (nearly free when SSTs live
2208
// on the same filesystem as `scratch_dir` — RocksDB hard-links them), then
2209
// runs the same parallel shard scan that `iter_arrow_dfanalyzer_all` uses.
2210
//
2211
// AGG_GLOBAL_CONFIG_KEY is not written by worker SSTs, so we construct the
2212
// EventAggregator with config_hash=0 directly instead of going through
2213
// `open_agg_db` (which requires the config key). The config hash is used
2214
// by the aggregator only for write-time validation, not for reads.
2215
//
2216
// The scratch DB is NOT cleaned up here — the Python caller owns
2217
// `scratch_dir` lifetime and should remove it after gathering results.
2218
// ---------------------------------------------------------------------------
2219

2220
static bool collect_string_list(PyObject* obj, const char* name,
×
2221
                                std::vector<std::string>& out) {
2222
    if (!obj || obj == Py_None) return true;
×
2223
    PyObject* seq = PySequence_Fast(obj, name);
×
2224
    if (!seq) return false;
×
2225
    Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
2226
    out.reserve(static_cast<std::size_t>(n));
×
2227
    for (Py_ssize_t i = 0; i < n; ++i) {
×
2228
        PyObject* item = PySequence_Fast_GET_ITEM(seq, i);
×
2229
        if (!PyUnicode_Check(item)) {
×
2230
            Py_DECREF(seq);
2231
            PyErr_Format(PyExc_TypeError, "%s items must be str", name);
×
2232
            return false;
×
2233
        }
2234
        const char* s = PyUnicode_AsUTF8(item);
×
2235
        if (!s) {
×
2236
            Py_DECREF(seq);
2237
            return false;
×
2238
        }
2239
        out.emplace_back(s);
×
2240
    }
2241
    Py_DECREF(seq);
2242
    return true;
×
2243
}
2244

2245
static bool collect_string_string_dict(
×
2246
    PyObject* obj, const char* name,
2247
    std::unordered_map<std::string, std::string>& out) {
2248
    if (!obj || obj == Py_None) return true;
×
2249
    if (!PyDict_Check(obj)) {
×
2250
        PyErr_Format(PyExc_TypeError, "%s must be a dict[str, str] or None",
×
2251
                     name);
2252
        return false;
×
2253
    }
2254
    PyObject *k, *v;
2255
    Py_ssize_t pos = 0;
×
2256
    while (PyDict_Next(obj, &pos, &k, &v)) {
×
2257
        if (!PyUnicode_Check(k) || !PyUnicode_Check(v)) {
×
2258
            PyErr_Format(PyExc_TypeError, "%s must map str -> str", name);
×
2259
            return false;
×
2260
        }
2261
        const char* ks = PyUnicode_AsUTF8(k);
×
2262
        const char* vs = PyUnicode_AsUTF8(v);
×
2263
        if (!ks || !vs) return false;
×
2264
        out.emplace(ks, vs);
×
2265
    }
2266
    return true;
×
2267
}
2268

2269
static PyObject* scan_aggregation_manifest_fn(PyObject* /*self*/,
×
2270
                                              PyObject* args, PyObject* kwds) {
2271
    static const char* kwlist[] = {
2272
        "agg_ssts",        "sys_ssts",    "scratch_dir",
2273
        "meta_index_path", "batch_size",  "time_granularity",
2274
        "time_resolution", "query",       "group_by",
2275
        "shard_begin",     "shard_end",   "runtime",
2276
        "file_hashes",     "host_hashes", nullptr};
2277

2278
    PyObject* agg_ssts_obj = nullptr;
×
2279
    PyObject* sys_ssts_obj = nullptr;
×
2280
    const char* scratch_dir = nullptr;
×
2281
    const char* meta_index_path = nullptr;
×
2282
    Py_ssize_t batch_size = 10000;
×
2283
    double time_granularity = 1.0;
×
2284
    double time_resolution = 1000000.0;
×
2285
    const char* query_str = nullptr;
×
2286
    PyObject* group_by_obj = nullptr;
×
2287
    int shard_begin_i = 0;
×
2288
    int shard_end_i = DFT_NUM_SHARDS;
×
2289
    PyObject* runtime_obj = nullptr;
×
2290
    PyObject* file_hashes_obj = nullptr;
×
2291
    PyObject* host_hashes_obj = nullptr;
×
2292

2293
    if (!PyArg_ParseTupleAndKeywords(
×
2294
            args, kwds, "OOss|nddzOiiOOO", (char**)kwlist, &agg_ssts_obj,
2295
            &sys_ssts_obj, &scratch_dir, &meta_index_path, &batch_size,
2296
            &time_granularity, &time_resolution, &query_str, &group_by_obj,
2297
            &shard_begin_i, &shard_end_i, &runtime_obj, &file_hashes_obj,
2298
            &host_hashes_obj)) {
2299
        return nullptr;
×
2300
    }
2301

2302
    if (shard_begin_i < 0 || shard_end_i > DFT_NUM_SHARDS ||
×
2303
        shard_begin_i >= shard_end_i) {
×
2304
        PyErr_Format(PyExc_ValueError,
×
2305
                     "shard range [%d, %d) invalid (must be within [0, %d))",
2306
                     shard_begin_i, shard_end_i, (int)DFT_NUM_SHARDS);
2307
        return nullptr;
×
2308
    }
2309

2310
    std::vector<std::string> agg_ssts;
×
2311
    std::vector<std::string> sys_ssts;
×
2312
    if (!collect_string_list(agg_ssts_obj, "agg_ssts", agg_ssts))
×
2313
        return nullptr;
×
2314
    if (!collect_string_list(sys_ssts_obj, "sys_ssts", sys_ssts))
×
2315
        return nullptr;
×
2316

2317
    std::unordered_map<std::string, std::string> preloaded_file_hashes;
×
2318
    std::unordered_map<std::string, std::string> preloaded_host_hashes;
×
2319
    const bool hashes_preloaded =
×
2320
        (file_hashes_obj && file_hashes_obj != Py_None) ||
×
2321
        (host_hashes_obj && host_hashes_obj != Py_None);
×
2322
    if (!collect_string_string_dict(file_hashes_obj, "file_hashes",
×
2323
                                    preloaded_file_hashes))
2324
        return nullptr;
×
2325
    if (!collect_string_string_dict(host_hashes_obj, "host_hashes",
×
2326
                                    preloaded_host_hashes))
2327
        return nullptr;
×
2328

2329
    auto query_opt = parse_query_arg(query_str);
×
2330
    if (!query_opt && PyErr_Occurred()) return nullptr;
×
2331

2332
    GroupByConfig group_by_cfg;
×
2333
    if (!parse_group_by_arg(group_by_obj, group_by_cfg)) return nullptr;
×
2334
    const GroupByConfig* group_by_ptr =
×
2335
        group_by_cfg.mask != 0 ? &group_by_cfg : nullptr;
×
2336

2337
    Runtime* rt = nullptr;
×
2338
    if (runtime_obj && runtime_obj != Py_None) {
×
2339
        if (!PyObject_TypeCheck(runtime_obj, &RuntimeType)) {
×
2340
            PyErr_SetString(PyExc_TypeError,
×
2341
                            "runtime must be a Runtime instance or None");
2342
            return nullptr;
×
2343
        }
2344
        rt = ((RuntimeObject*)runtime_obj)->runtime.get();
×
2345
    } else {
2346
        rt = get_default_runtime();
×
2347
    }
2348

2349
    PyObject* result_dict = PyDict_New();
×
2350
    if (!result_dict) return nullptr;
×
2351
    PyObject* events_list = PyList_New(0);
×
2352
    PyObject* profiles_list = PyList_New(0);
×
2353
    PyObject* system_list = PyList_New(0);
×
2354
    if (!events_list || !profiles_list || !system_list) {
×
2355
        Py_XDECREF(events_list);
×
2356
        Py_XDECREF(profiles_list);
×
2357
        Py_XDECREF(system_list);
×
2358
        Py_DECREF(result_dict);
×
2359
        return nullptr;
×
2360
    }
2361

2362
    std::string error_msg;
×
2363
    std::vector<ArrowExportResult> events_results, profiles_results,
×
2364
        system_results;
×
2365
    std::string scratch_index_path = std::string(scratch_dir) + "/.dftindex";
×
2366
    std::string meta_index_path_str(meta_index_path);
×
2367

2368
    Py_BEGIN_ALLOW_THREADS try {
×
2369
        namespace rcf = dftracer::utils::rocksdb::cf;
2370
        using clock = std::chrono::steady_clock;
2371
        auto ms = [](clock::time_point a, clock::time_point b) -> long long {
×
2372
            return std::chrono::duration_cast<std::chrono::milliseconds>(b - a)
×
2373
                .count();
×
2374
        };
2375

2376
        auto t_start = clock::now();
×
2377
        dftracer::utils::utilities::indexer::IndexDatabase scratch_db(
×
2378
            scratch_index_path);
×
2379
        auto t_scratch_open = clock::now();
×
2380

2381
        auto raw_db = scratch_db.db();
×
2382
        for (const auto& p : agg_ssts) {
×
2383
            auto st = raw_db->ingest_external_files(rcf::AGGREGATION, {p},
×
2384
                                                    /*ingest_behind=*/false);
×
2385
            if (!st.ok()) {
×
2386
                error_msg =
2387
                    "ingest AGGREGATION sst '" + p + "': " + st.ToString();
×
2388
                break;
×
2389
            }
2390
        }
×
2391
        if (error_msg.empty()) {
×
2392
            for (const auto& p : sys_ssts) {
×
2393
                auto st = raw_db->ingest_external_files(
×
2394
                    rcf::SYSTEM_METRICS, {p}, /*ingest_behind=*/false);
×
2395
                if (!st.ok()) {
×
2396
                    error_msg = "ingest SYSTEM_METRICS sst '" + p +
×
2397
                                "': " + st.ToString();
×
2398
                    break;
×
2399
                }
2400
            }
×
2401
        }
2402
        auto t_ingest = clock::now();
×
2403

2404
        if (error_msg.empty()) {
×
2405
            auto agg =
2406
                std::make_unique<EventAggregator>(raw_db, /*cfg_hash=*/0);
×
2407

2408
            // If the caller passed pre-loaded hash tables, skip opening
2409
            // the meta DB on lustre. When many dask workers run
2410
            // scan_aggregation_manifest in parallel, loading the hash
2411
            // tables N times from the same file is significant lustre
2412
            // metadata pressure; loading once on the coordinator and
2413
            // passing them in eliminates the redundant reads.
2414
            std::unordered_map<std::string, std::string> loaded_file_hashes;
×
2415
            std::unordered_map<std::string, std::string> loaded_host_hashes;
×
2416
            std::unique_ptr<dftracer::utils::utilities::indexer::IndexDatabase>
2417
                meta_db;
×
2418
            if (!hashes_preloaded) {
×
2419
                meta_db = std::make_unique<
×
2420
                    dftracer::utils::utilities::indexer::IndexDatabase>(
2421
                    meta_index_path_str, dftracer::utils::rocksdb::
2422
                                             RocksDatabase::OpenMode::ReadOnly);
×
2423
                loaded_file_hashes = meta_db->query_hash_table(
×
2424
                    dftracer::utils::utilities::indexer::IndexDatabase::
2425
                        HashType::FILE);
2426
                loaded_host_hashes = meta_db->query_hash_table(
×
2427
                    dftracer::utils::utilities::indexer::IndexDatabase::
2428
                        HashType::HOST);
2429
            }
2430
            const auto& file_hashes =
×
2431
                hashes_preloaded ? preloaded_file_hashes : loaded_file_hashes;
×
2432
            const auto& host_hashes =
×
2433
                hashes_preloaded ? preloaded_host_hashes : loaded_host_hashes;
×
2434
            auto t_hash_tables = clock::now();
×
2435

2436
            auto time_bounds = agg->query_time_bounds();
×
2437
            std::uint64_t time_origin =
×
2438
                time_bounds.valid ? time_bounds.min_time_bucket : 0;
×
2439

2440
            DfanalyzerContext ctx;
×
2441
            ctx.file_hashes = &file_hashes;
×
2442
            ctx.host_hashes = &host_hashes;
×
2443
            ctx.query_filter = query_opt ? &*query_opt : nullptr;
×
2444
            ctx.time_origin = time_origin;
×
2445
            ctx.time_resolution = time_resolution;
×
2446
            ctx.time_granularity = time_granularity;
×
2447

2448
            std::vector<DfanalyzerScanOutput> outputs;
×
2449
            parallel_shard_scan_range<DfanalyzerScanOutput>(
×
2450
                rt, static_cast<std::uint16_t>(shard_begin_i),
2451
                static_cast<std::uint16_t>(shard_end_i),
2452
                [&](std::uint16_t sb, std::uint16_t se) {
×
2453
                    DfanalyzerScanInput input;
×
2454
                    input.agg = agg.get();
×
2455
                    input.ctx = &ctx;
×
2456
                    input.type_filter = std::nullopt;
×
2457
                    input.batch_size = batch_size;
×
2458
                    input.shard_begin = sb;
×
2459
                    input.shard_end = se;
×
2460
                    input.group_by = group_by_ptr;
×
2461
                    return scan_dfanalyzer_shards(input);
×
2462
                },
2463
                outputs);
2464
            auto t_scan = clock::now();
×
2465

2466
            for (auto& out : outputs) {
×
2467
                for (auto& r : out.events)
×
2468
                    events_results.push_back(std::move(r));
×
2469
                for (auto& r : out.profiles)
×
2470
                    profiles_results.push_back(std::move(r));
×
2471
                for (auto& r : out.system)
×
2472
                    system_results.push_back(std::move(r));
×
2473
            }
2474

2475
            std::fprintf(
×
2476
                stderr,
2477
                "[scan_aggregation_manifest] n_agg=%zu n_sys=%zu "
2478
                "scratch_open=%lldms ingest=%lldms hash_tables=%lldms "
2479
                "scan=%lldms\n",
2480
                agg_ssts.size(), sys_ssts.size(), ms(t_start, t_scratch_open),
×
2481
                ms(t_scratch_open, t_ingest), ms(t_ingest, t_hash_tables),
×
2482
                ms(t_hash_tables, t_scan));
×
2483
            std::fflush(stderr);
×
2484
        }
×
2485
    } catch (const std::exception& e) {
×
2486
        error_msg = e.what();
×
2487
    }
×
2488
    Py_END_ALLOW_THREADS
×
2489

2490
        if (!error_msg.empty()) {
×
2491
        Py_DECREF(events_list);
×
2492
        Py_DECREF(profiles_list);
×
2493
        Py_DECREF(system_list);
×
2494
        Py_DECREF(result_dict);
×
2495
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
2496
        return nullptr;
×
2497
    }
2498

2499
    append_results_to_list(events_list, events_results);
×
2500
    append_results_to_list(profiles_list, profiles_results);
×
2501
    append_results_to_list(system_list, system_results);
×
2502

2503
    PyDict_SetItemString(result_dict, "events", events_list);
×
2504
    PyDict_SetItemString(result_dict, "profiles", profiles_list);
×
2505
    PyDict_SetItemString(result_dict, "system", system_list);
×
2506
    Py_DECREF(events_list);
×
2507
    Py_DECREF(profiles_list);
×
2508
    Py_DECREF(system_list);
×
2509

2510
    return result_dict;
×
2511
}
×
2512

2513
static PyMethodDef BatchIndexerModuleMethods[] = {
2514
    {"scan_aggregation_manifest", (PyCFunction)scan_aggregation_manifest_fn,
2515
     METH_VARARGS | METH_KEYWORDS,
2516
     "scan_aggregation_manifest(agg_ssts, sys_ssts, scratch_dir, "
2517
     "meta_index_path, batch_size=10000, time_granularity=1.0, "
2518
     "time_resolution=1e6, query=None, group_by=None, shard_begin=0, "
2519
     "shard_end=4096, runtime=None) -> dict\n"
2520
     "--\n\n"
2521
     "Scan a worker's slice of the distributed aggregation manifest.\n\n"
2522
     "Ingests agg_ssts + sys_ssts into a scratch IndexDatabase at "
2523
     "scratch_dir (caller owns the directory lifecycle) and runs the "
2524
     "dfanalyzer aggregation scan over [shard_begin, shard_end). "
2525
     "meta_index_path is the unified .dftindex used to resolve file / "
2526
     "host hashes. Returns the same dict shape as "
2527
     "Indexer.iter_arrow_dfanalyzer_all."},
2528
    {nullptr, nullptr, 0, nullptr}};
2529
#endif
2530

2531
static PyMethodDef Indexer_methods[] = {
2532
    {"get_checkpoint_indexer", (PyCFunction)Indexer_get_checkpoint_indexer,
2533
     METH_VARARGS,
2534
     "get_checkpoint_indexer(file_path)\n"
2535
     "--\n\n"
2536
     "Get a checkpoint indexer for a specific file.\n\n"
2537
     "Args:\n"
2538
     "    file_path: Path to the trace file (.pfw/.pfw.gz)\n\n"
2539
     "Returns:\n"
2540
     "    Indexer instance for checkpoint-level operations.\n"},
2541
    {"resolve", (PyCFunction)Indexer_resolve, METH_NOARGS,
2542
     "resolve()\n"
2543
     "--\n\n"
2544
     "Check what files exist vs need indexing.\n\n"
2545
     "Returns:\n"
2546
     "    dict with 'total_files', 'ready', 'needs_work', 'index_path'\n"},
2547
    {"build", (PyCFunction)Indexer_build, METH_NOARGS,
2548
     "build()\n"
2549
     "--\n\n"
2550
     "Build all missing index tiers based on require_* flags.\n"},
2551
    {"ensure_indexed", (PyCFunction)Indexer_ensure_indexed, METH_NOARGS,
2552
     "ensure_indexed()\n"
2553
     "--\n\n"
2554
     "Resolve and build if needed.\n\n"
2555
     "Returns:\n"
2556
     "    dict with index status after building.\n"},
2557
    {"get_hash_table", (PyCFunction)Indexer_get_hash_table, METH_VARARGS,
2558
     "get_hash_table(type)\n"
2559
     "--\n\n"
2560
     "Query hash table mappings.\n\n"
2561
     "Args:\n"
2562
     "    type: 'file', 'host', 'string', or 'proc'\n\n"
2563
     "Returns:\n"
2564
     "    dict mapping hash values to resolved names.\n"},
2565
    {"query_file_pids", (PyCFunction)Indexer_query_file_pids, METH_VARARGS,
2566
     "query_file_pids(file_id)\n"
2567
     "--\n\n"
2568
     "Query PIDs observed in a specific file.\n\n"
2569
     "Args:\n"
2570
     "    file_id: Integer file ID from index.\n\n"
2571
     "Returns:\n"
2572
     "    set of PIDs.\n"},
2573
    {"query_all_file_pids", (PyCFunction)Indexer_query_all_file_pids,
2574
     METH_NOARGS,
2575
     "query_all_file_pids()\n"
2576
     "--\n\n"
2577
     "Query PIDs for all indexed files.\n\n"
2578
     "Returns:\n"
2579
     "    dict mapping file_id to set of PIDs.\n"},
2580
    {"query_file_info", (PyCFunction)Indexer_query_file_info, METH_NOARGS,
2581
     "query_file_info()\n"
2582
     "--\n\n"
2583
     "Query file ID to path mapping and per-file PIDs in one call.\n\n"
2584
     "Returns:\n"
2585
     "    tuple of (dict[int, str], dict[int, set[int]]).\n"},
2586
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2587
    {"iter_aggregation", (PyCFunction)Indexer_iter_aggregation,
2588
     METH_VARARGS | METH_KEYWORDS,
2589
     "iter_aggregation(type='events', batch_size=10000)\n"
2590
     "--\n\n"
2591
     "Iterate over aggregation data as Arrow batches.\n\n"
2592
     "Args:\n"
2593
     "    type: 'events', 'profiles', or 'system'\n"
2594
     "    batch_size: Number of entries per batch (default 10000)\n\n"
2595
     "Returns:\n"
2596
     "    Iterator over Arrow batch capsules.\n"},
2597
    {"iter_arrow_dfanalyzer", (PyCFunction)Indexer_iter_arrow_dfanalyzer,
2598
     METH_VARARGS | METH_KEYWORDS,
2599
     "iter_arrow_dfanalyzer(type='events', batch_size=10000, "
2600
     "time_granularity=1.0, time_resolution=1e6, query=None)\n"
2601
     "--\n\n"
2602
     "Iterate over aggregation data as dfanalyzer-compatible Arrow batches.\n\n"
2603
     "Output schema matches dfanalyzer expectations with resolved hashes,\n"
2604
     "normalized time_range, and computed columns (proc_name, io_cat).\n\n"
2605
     "Args:\n"
2606
     "    type: 'events', 'profiles', or 'system'\n"
2607
     "    batch_size: Number of entries per batch (default 10000)\n"
2608
     "    time_granularity: Bucket width in seconds (default 1.0)\n"
2609
     "    time_resolution: Microseconds per output time unit (default 1e6)\n"
2610
     "    query: Optional query filter string (e.g., \"pid == 1234\")\n\n"
2611
     "Returns:\n"
2612
     "    Iterator over Arrow batch capsules.\n"},
2613
    {"iter_arrow_dfanalyzer_all",
2614
     (PyCFunction)Indexer_iter_arrow_dfanalyzer_all,
2615
     METH_VARARGS | METH_KEYWORDS,
2616
     "iter_arrow_dfanalyzer_all(batch_size=10000, time_granularity=1.0, "
2617
     "time_resolution=1e6, query=None, group_by=None)\n"
2618
     "--\n\n"
2619
     "Iterate over all aggregation types in a single scan.\n\n"
2620
     "Returns a dict with 'events', 'profiles', 'system' keys, each "
2621
     "containing\n"
2622
     "a list of Arrow batch capsules. This is ~3x faster than calling\n"
2623
     "iter_arrow_dfanalyzer separately for each type.\n\n"
2624
     "When group_by is provided, the scan collapses dimensions during "
2625
     "aggregation\n"
2626
     "and emits a reduced schema containing only the requested columns plus\n"
2627
     "aggregated metrics (count, time, size, time_sq, size_sq, time_min,\n"
2628
     "time_max, size_min, size_max, time_call_min, time_call_max, "
2629
     "size_call_min,\n"
2630
     "size_call_max, time_start, time_end). Supported group_by columns: "
2631
     "cat,\n"
2632
     "func_name, pid, tid, file_hash, host_hash, file_name, host_name, "
2633
     "proc_name,\n"
2634
     "io_cat, acc_pat, time_range.\n\n"
2635
     "Args:\n"
2636
     "    batch_size: Number of entries per batch (default 10000)\n"
2637
     "    time_granularity: Bucket width in seconds (default 1.0)\n"
2638
     "    time_resolution: Microseconds per output time unit (default 1e6)\n"
2639
     "    query: Optional query filter string\n"
2640
     "    group_by: Optional list of columns to group by; enables coarse\n"
2641
     "        in-scan aggregation (default None = full granularity)\n\n"
2642
     "Returns:\n"
2643
     "    dict with 'events', 'profiles', 'system' lists of Arrow capsules.\n"},
2644
#endif
2645
    {nullptr}};
2646

2647
static PyGetSetDef Indexer_getsetters[] = {{nullptr}};
2648

2649
PyTypeObject IndexerType = {
2650
    PyVarObject_HEAD_INIT(nullptr, 0) "dftracer_utils_ext.Indexer",
2651
    sizeof(IndexerObject),
2652
    0,
2653
    (destructor)Indexer_dealloc,
2654
    0,
2655
    0,
2656
    0,
2657
    0,
2658
    0,
2659
    0,
2660
    0,
2661
    0,
2662
    0,
2663
    0,
2664
    0,
2665
    0,
2666
    0,
2667
    0,
2668
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
2669
    "BatchIndexer(directory='', files=None, index_dir='',\n"
2670
    "             require_checkpoint=True, require_bloom=True,\n"
2671
    "             require_manifest=True, require_aggregation=False,\n"
2672
    "             time_interval_ms=5000.0, group_keys=None,\n"
2673
    "             custom_metric_fields=None, compute_percentiles=False,\n"
2674
    "             parallelism=0, force_rebuild=False, runtime=None)\n"
2675
    "--\n\n"
2676
    "Indexer with tiered index building.\n\n"
2677
    "At least one of 'directory' or 'files' must be provided.\n"
2678
    "- directory: scan for .pfw/.pfw.gz files\n"
2679
    "- files: list of specific file paths\n\n"
2680
    "Supports:\n"
2681
    "- Tier 1: Checkpoints (require_checkpoint)\n"
2682
    "- Tier 2: Bloom filters (require_bloom), Manifests (require_manifest)\n"
2683
    "- Tier 3: Aggregation (require_aggregation + config params)\n",
2684
    0,
2685
    0,
2686
    0,
2687
    0,
2688
    0,
2689
    0,
2690
    Indexer_methods,
2691
    0,
2692
    Indexer_getsetters,
2693
    0,
2694
    0,
2695
    0,
2696
    0,
2697
    0,
2698
    (initproc)Indexer_init,
2699
    0,
2700
    Indexer_new,
2701
};
2702

2703
int init_indexer(PyObject* m) {
2✔
2704
    if (register_type(m, &IndexerType, "Indexer") < 0) return -1;
2✔
2705

2706
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2707
    if (PyModule_AddFunctions(m, BatchIndexerModuleMethods) < 0) return -1;
2✔
2708
#endif
2709

2710
    return 0;
2✔
2711
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc