• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 27086550389

07 Jun 2026 07:48AM UTC coverage: 52.005% (+0.04%) from 51.969%
27086550389

Pull #75

github

web-flow
Merge 576326a8a into ebe16998d
Pull Request #75: fix(wheel): update Xcode version

36986 of 92663 branches covered (39.91%)

Branch coverage included in aggregate %.

33410 of 42702 relevant lines covered (78.24%)

20421.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.83
/src/dftracer/utils/python/batch_indexer.cpp
1
#include <dftracer/utils/core/common/filesystem.h>
2
#include <dftracer/utils/core/common/string_intern.h>
3
#include <dftracer/utils/core/coro/task.h>
4
#include <dftracer/utils/core/coro/when_all.h>
5
#include <dftracer/utils/core/rocksdb/db_manager.h>
6
#include <dftracer/utils/core/runtime.h>
7
#include <dftracer/utils/core/tasks/coro_scope.h>
8
#include <dftracer/utils/python/batch_indexer.h>
9
#include <dftracer/utils/python/indexer.h>
10
#include <dftracer/utils/python/runtime.h>
11
#include <dftracer/utils/utilities/common/query/query.h>
12
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_config.h>
13
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_serialization.h>
14
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregator_types.h>
15
#include <dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.h>
16
#include <dftracer/utils/utilities/composites/dft/aggregators/system_metrics.h>
17
#include <dftracer/utils/utilities/composites/dft/aggregators/system_metrics_serialization.h>
18
#include <dftracer/utils/utilities/composites/dft/indexing/index_resolver_utility.h>
19
#include <dftracer/utils/utilities/composites/dft/indexing/resolve_and_build.h>
20
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
21
#include <dftracer/utils/utilities/indexer/index_database.h>
22

23
#ifdef DFTRACER_UTILS_ENABLE_ARROW
24
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
25
#endif
26

27
#include <algorithm>
28
#include <chrono>
29
#include <cstdio>
30
#include <limits>
31
#include <optional>
32
#include <string>
33
#include <unordered_map>
34
#include <unordered_set>
35
#include <vector>
36

37
using dftracer::utils::CoroScope;
38
using dftracer::utils::Runtime;
39
using dftracer::utils::coro::CoroTask;
40
using namespace dftracer::utils::utilities::composites::dft::indexing;
41
using namespace dftracer::utils::utilities::composites::dft::aggregators;
42

43
// ---------------------------------------------------------------------------
44
// BatchIndexer - directory-level indexer with resolve/build pattern
45
// ---------------------------------------------------------------------------
46

47
static void Indexer_dealloc(IndexerObject* self) {
102✔
48
    Py_XDECREF(self->runtime_obj);
102✔
49
    Py_XDECREF(self->directory);
102✔
50
    Py_XDECREF(self->files);
102✔
51
    Py_XDECREF(self->index_dir);
102✔
52
    Py_XDECREF(self->group_keys);
102✔
53
    Py_XDECREF(self->custom_metric_fields);
102✔
54
    Py_TYPE(self)->tp_free((PyObject*)self);
102✔
55
}
102✔
56

57
static PyObject* Indexer_new(PyTypeObject* type, PyObject* args,
102✔
58
                             PyObject* kwds) {
59
    IndexerObject* self = (IndexerObject*)type->tp_alloc(type, 0);
102✔
60
    if (self) {
102✔
61
        self->runtime_obj = nullptr;
102✔
62
        self->directory = nullptr;
102✔
63
        self->files = nullptr;
102✔
64
        self->index_dir = nullptr;
102✔
65
        self->require_checkpoint = 1;
102✔
66
        self->require_bloom = 1;
102✔
67
        self->require_manifest = 1;
102✔
68
        self->require_aggregation = 0;
102✔
69
        self->time_interval_ms = 5000.0;
102✔
70
        self->group_keys = nullptr;
102✔
71
        self->custom_metric_fields = nullptr;
102✔
72
        self->compute_percentiles = 0;
102✔
73
        self->checkpoint_size = 32 * 1024 * 1024;
102✔
74
        self->parallelism = 0;
102✔
75
        self->force_rebuild = 0;
102✔
76
    }
51✔
77
    return (PyObject*)self;
102✔
78
}
79

80
static int Indexer_init(IndexerObject* self, PyObject* args, PyObject* kwds) {
102✔
81
    static const char* kwlist[] = {"directory",
82
                                   "files",
83
                                   "index_dir",
84
                                   "require_checkpoint",
85
                                   "require_bloom",
86
                                   "require_manifest",
87
                                   "require_aggregation",
88
                                   "time_interval_ms",
89
                                   "group_keys",
90
                                   "custom_metric_fields",
91
                                   "compute_percentiles",
92
                                   "checkpoint_size",
93
                                   "parallelism",
94
                                   "force_rebuild",
95
                                   "runtime",
96
                                   nullptr};
97

98
    const char* directory = "";
102✔
99
    PyObject* files_obj = Py_None;
102✔
100
    const char* index_dir = "";
102✔
101
    int require_checkpoint = 1;
102✔
102
    int require_bloom = 1;
102✔
103
    int require_manifest = 1;
102✔
104
    int require_aggregation = 0;
102✔
105
    double time_interval_ms = 5000.0;
102✔
106
    PyObject* group_keys_obj = Py_None;
102✔
107
    PyObject* custom_metrics_obj = Py_None;
102✔
108
    int compute_percentiles = 0;
102✔
109
    Py_ssize_t checkpoint_size = 32 * 1024 * 1024;  // 32MB default
102✔
110
    Py_ssize_t parallelism = 0;
102✔
111
    int force_rebuild = 0;
102✔
112
    PyObject* runtime_arg = nullptr;
102✔
113

114
    if (!PyArg_ParseTupleAndKeywords(
102!
115
            args, kwds, "|sOsppppdOOpnnpO", (char**)kwlist, &directory,
51✔
116
            &files_obj, &index_dir, &require_checkpoint, &require_bloom,
117
            &require_manifest, &require_aggregation, &time_interval_ms,
118
            &group_keys_obj, &custom_metrics_obj, &compute_percentiles,
119
            &checkpoint_size, &parallelism, &force_rebuild, &runtime_arg)) {
120
        return -1;
×
121
    }
122

123
    // Validate: at least one of directory or files must be provided
124
    bool has_directory = directory && directory[0] != '\0';
102✔
125
    bool has_files = files_obj && files_obj != Py_None &&
158✔
126
                     PyList_Check(files_obj) && PyList_Size(files_obj) > 0;
158!
127

128
    if (!has_directory && !has_files) {
102✔
129
        PyErr_SetString(PyExc_ValueError,
2!
130
                        "At least one of 'directory' or 'files' must be "
131
                        "provided");
132
        return -1;
2✔
133
    }
134

135
    // Store runtime
136
    if (runtime_arg && runtime_arg != Py_None) {
100!
137
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
138
            Py_INCREF(runtime_arg);
×
139
            self->runtime_obj = runtime_arg;
×
140
        } else {
141
            PyObject* native = PyObject_GetAttrString(runtime_arg, "_native");
×
142
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
×
143
                self->runtime_obj = native;
×
144
            } else {
145
                Py_XDECREF(native);
×
146
                PyErr_SetString(PyExc_TypeError,
×
147
                                "runtime must be a Runtime instance or None");
148
                return -1;
×
149
            }
150
        }
151
    }
152

153
    self->directory = PyUnicode_FromString(directory);
100!
154
    self->index_dir = PyUnicode_FromString(index_dir);
100!
155
    self->require_checkpoint = require_checkpoint;
100✔
156
    self->require_bloom = require_bloom;
100✔
157
    self->require_manifest = require_manifest;
100✔
158
    self->require_aggregation = require_aggregation;
100✔
159
    self->time_interval_ms = time_interval_ms;
100✔
160
    self->compute_percentiles = compute_percentiles;
100✔
161
    self->checkpoint_size = static_cast<std::size_t>(checkpoint_size);
100✔
162
    self->parallelism = static_cast<std::size_t>(parallelism);
100✔
163
    self->force_rebuild = force_rebuild;
100✔
164

165
    // Store files list
166
    if (has_files) {
100✔
167
        Py_INCREF(files_obj);
56!
168
        self->files = files_obj;
56✔
169
    } else {
28✔
170
        self->files = nullptr;
44✔
171
    }
172

173
    // Store group_keys
174
    if (group_keys_obj && group_keys_obj != Py_None) {
100!
175
        Py_INCREF(group_keys_obj);
×
176
        self->group_keys = group_keys_obj;
×
177
    } else {
178
        self->group_keys = nullptr;
100✔
179
    }
180

181
    // Store custom_metric_fields
182
    if (custom_metrics_obj && custom_metrics_obj != Py_None) {
100!
183
        Py_INCREF(custom_metrics_obj);
×
184
        self->custom_metric_fields = custom_metrics_obj;
×
185
    } else {
186
        self->custom_metric_fields = nullptr;
100✔
187
    }
188

189
    return 0;
100✔
190
}
51✔
191

192
static Runtime* get_batch_indexer_runtime(IndexerObject* self) {
296✔
193
    if (self->runtime_obj) {
296!
194
        return ((RuntimeObject*)self->runtime_obj)->runtime.get();
×
195
    }
196
    return get_default_runtime();
296✔
197
}
148✔
198

199
static std::optional<AggregationConfig> build_aggregation_config(
272✔
200
    IndexerObject* self) {
201
    if (!self->require_aggregation) {
272✔
202
        return std::nullopt;
152✔
203
    }
204

205
    AggregationConfig config;
120!
206
    config.time_interval_us =
120✔
207
        static_cast<std::uint64_t>(self->time_interval_ms * 1000.0);
120✔
208

209
    if (self->group_keys && PyList_Check(self->group_keys)) {
120!
210
        Py_ssize_t n = PyList_Size(self->group_keys);
×
211
        for (Py_ssize_t i = 0; i < n; i++) {
×
212
            const char* s =
213
                PyUnicode_AsUTF8(PyList_GetItem(self->group_keys, i));
×
214
            if (s) config.extra_group_keys.emplace_back(s);
×
215
        }
216
    }
217
    if (self->custom_metric_fields &&
120!
218
        PyList_Check(self->custom_metric_fields)) {
×
219
        Py_ssize_t n = PyList_Size(self->custom_metric_fields);
×
220
        for (Py_ssize_t i = 0; i < n; i++) {
×
221
            const char* s =
222
                PyUnicode_AsUTF8(PyList_GetItem(self->custom_metric_fields, i));
×
223
            if (s) config.custom_metric_fields.emplace_back(s);
×
224
        }
225
    }
226

227
    config.compute_percentiles = self->compute_percentiles != 0;
120✔
228
    return config;
120!
229
}
196✔
230

231
// ---------------------------------------------------------------------------
232
// resolve() - check what exists vs needs building
233
// ---------------------------------------------------------------------------
234

235
static PyObject* Indexer_resolve(IndexerObject* self,
206✔
236
                                 PyObject* Py_UNUSED(ignored)) {
237
    const char* directory = PyUnicode_AsUTF8(self->directory);
206!
238
    const char* index_dir = PyUnicode_AsUTF8(self->index_dir);
206!
239

240
    ResolverInput input;
206✔
241
    input.directory = directory ? directory : "";
206!
242
    input.index_dir = index_dir ? index_dir : "";
206!
243
    input.require_checkpoints = self->require_checkpoint;
206✔
244
    input.require_bloom = self->require_bloom;
206✔
245
    input.require_manifest = self->require_manifest;
206✔
246
    input.require_aggregation = self->require_aggregation;
206✔
247
    input.aggregation_config = build_aggregation_config(self);
206!
248

249
    // Add files if provided
250
    if (self->files && PyList_Check(self->files)) {
206!
251
        Py_ssize_t n = PyList_Size(self->files);
128!
252
        for (Py_ssize_t i = 0; i < n; i++) {
278✔
253
            const char* s = PyUnicode_AsUTF8(PyList_GetItem(self->files, i));
150!
254
            if (s) input.files.emplace_back(s);
150!
255
        }
75✔
256
    }
64✔
257

258
    ResolverResult result;
206✔
259
    std::string error_msg;
206✔
260

261
    Py_BEGIN_ALLOW_THREADS try {
206!
262
        Runtime* rt = get_batch_indexer_runtime(self);
206!
263
        rt->submit(run_coro_scope(
618!
264
                       rt->executor(),
103!
265
                       [](CoroScope& scope, ResolverInput in,
824!
266
                          ResolverResult* out) -> CoroTask<void> {
103!
267
                           IndexResolverUtility resolver;
309!
268
                           // Use scope.spawn(utility, input) which auto-binds
269
                           // context for utilities with NeedsContext tag
270
                           *out = co_await scope.spawn(resolver, std::move(in));
515!
271
                       },
515!
272
                       std::move(input), &result),
206✔
273
                   "batch-indexer-resolve")
103!
274
            .get();
206!
275
    } catch (const std::exception& e) {
103!
276
        error_msg = e.what();
×
277
    }
×
278
    Py_END_ALLOW_THREADS
206!
279

280
        if (!error_msg.empty()) {
206!
281
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
282
        return nullptr;
×
283
    }
284

285
    // Build result dict
286
    PyObject* dict = PyDict_New();
206!
287
    if (!dict) return nullptr;
206✔
288

289
    PyDict_SetItemString(dict, "total_files",
206!
290
                         PyLong_FromSize_t(result.all_files.size()));
103!
291
    PyDict_SetItemString(dict, "index_path",
206!
292
                         PyUnicode_FromString(result.index_path.c_str()));
103!
293
    PyDict_SetItemString(
206!
294
        dict, "aggregation_interval_us",
103✔
295
        PyLong_FromUnsignedLongLong(result.stored_time_interval_us));
206!
296
    PyDict_SetItemString(dict, "needs_rebuild",
206!
297
                         PyBool_FromLong(result.needs_augmentation));
206!
298

299
    // Ready files
300
    PyObject* ready_list = PyList_New(result.cached.size());
206!
301
    for (std::size_t i = 0; i < result.cached.size(); ++i) {
380✔
302
        PyList_SetItem(
174!
303
            ready_list, i,
87✔
304
            PyUnicode_FromString(result.cached[i].file_path.c_str()));
174!
305
    }
87✔
306
    PyDict_SetItemString(dict, "ready", ready_list);
206!
307

308
    // Needs work files (union of all needs_* lists)
309
    std::vector<std::string> needs_work;
206✔
310
    for (const auto& item : result.needs_checkpoint) {
286✔
311
        needs_work.push_back(item.file_path);
80!
312
    }
313
    for (const auto& item : result.needs_bloom) {
206!
314
        bool found = false;
×
315
        for (const auto& existing : needs_work) {
×
316
            if (existing == item.file_path) {
×
317
                found = true;
×
318
                break;
×
319
            }
320
        }
321
        if (!found) needs_work.push_back(item.file_path);
×
322
    }
323
    for (const auto& item : result.needs_manifest) {
206!
324
        bool found = false;
×
325
        for (const auto& existing : needs_work) {
×
326
            if (existing == item.file_path) {
×
327
                found = true;
×
328
                break;
×
329
            }
330
        }
331
        if (!found) needs_work.push_back(item.file_path);
×
332
    }
333
    for (const auto& item : result.needs_aggregation) {
206✔
334
        bool found = false;
×
335
        for (const auto& existing : needs_work) {
×
336
            if (existing == item.file_path) {
×
337
                found = true;
×
338
                break;
×
339
            }
340
        }
341
        if (!found) needs_work.push_back(item.file_path);
×
342
    }
343

344
    PyObject* needs_list = PyList_New(needs_work.size());
206!
345
    for (std::size_t i = 0; i < needs_work.size(); ++i) {
286✔
346
        PyList_SetItem(needs_list, i,
80!
347
                       PyUnicode_FromString(needs_work[i].c_str()));
80!
348
    }
40✔
349
    PyDict_SetItemString(dict, "needs_work", needs_list);
206!
350

351
    return dict;
206✔
352
}
206✔
353

354
// ---------------------------------------------------------------------------
355
// build() - build missing index tiers
356
// ---------------------------------------------------------------------------
357

358
static PyObject* Indexer_build(IndexerObject* self,
66✔
359
                               PyObject* Py_UNUSED(ignored)) {
360
    const char* directory = PyUnicode_AsUTF8(self->directory);
66!
361
    const char* index_dir = PyUnicode_AsUTF8(self->index_dir);
66!
362

363
    ResolveAndBuildInput input;
66✔
364
    input.directory = directory ? directory : "";
66!
365
    input.index_dir = index_dir ? index_dir : "";
66!
366
    input.require_checkpoints = self->require_checkpoint;
66✔
367
    input.require_bloom = self->require_bloom;
66✔
368
    input.require_manifest = self->require_manifest;
66✔
369
    input.require_aggregation = self->require_aggregation;
66✔
370
    input.aggregation_config = build_aggregation_config(self);
66!
371
    input.checkpoint_size = self->checkpoint_size;
66✔
372
    input.parallelism = self->parallelism;
66✔
373
    input.force_rebuild = self->force_rebuild;
66✔
374

375
    // Add files if provided
376
    if (self->files && PyList_Check(self->files)) {
66!
377
        Py_ssize_t n = PyList_Size(self->files);
52!
378
        for (Py_ssize_t i = 0; i < n; i++) {
114✔
379
            const char* s = PyUnicode_AsUTF8(PyList_GetItem(self->files, i));
62!
380
            if (s) input.files.emplace_back(s);
62!
381
        }
31✔
382
    }
26✔
383

384
    std::string error_msg;
66✔
385

386
    Py_BEGIN_ALLOW_THREADS try {
66!
387
        Runtime* rt = get_batch_indexer_runtime(self);
66!
388
        rt->submit(run_coro_scope(
231!
389
                       rt->executor(),
33✔
390
                       [](CoroScope& scope,
264!
391
                          ResolveAndBuildInput in) -> CoroTask<void> {
33!
392
                           co_await resolve_and_build_index(&scope,
264!
393
                                                            std::move(in));
99✔
394
                       },
132!
395
                       std::move(input)),
66✔
396
                   "batch-indexer-build")
33!
397
            .get();
66!
398
    } catch (const std::exception& e) {
33!
399
        error_msg = e.what();
×
400
    }
×
401
    Py_END_ALLOW_THREADS
66!
402

403
        if (!error_msg.empty()) {
66!
404
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
405
        return nullptr;
×
406
    }
407

408
    Py_RETURN_NONE;
66✔
409
}
66✔
410

411
// ---------------------------------------------------------------------------
412
// ensure_indexed() - resolve + build if needed
413
// ---------------------------------------------------------------------------
414

415
static PyObject* Indexer_ensure_indexed(IndexerObject* self,
84✔
416
                                        PyObject* Py_UNUSED(ignored)) {
417
    // First resolve
418
    PyObject* status = Indexer_resolve(self, nullptr);
84✔
419
    if (!status) return nullptr;
84✔
420

421
    // Build if files need work, or the aggregation tier must be rebuilt
422
    // (stored time interval differs from the requested one).
423
    PyObject* needs_work = PyDict_GetItemString(status, "needs_work");
84✔
424
    PyObject* needs_rebuild = PyDict_GetItemString(status, "needs_rebuild");
84✔
425
    bool work_pending = needs_work && PyList_Size(needs_work) > 0;
84✔
426
    bool rebuild_pending = needs_rebuild && PyObject_IsTrue(needs_rebuild);
84✔
427
    if (work_pending || rebuild_pending) {
84✔
428
        Py_DECREF(status);
32✔
429

430
        // Build
431
        PyObject* result = Indexer_build(self, nullptr);
64✔
432
        if (!result) return nullptr;
64✔
433
        Py_DECREF(result);
32✔
434

435
        // Re-resolve
436
        status = Indexer_resolve(self, nullptr);
64✔
437
    }
32✔
438

439
    return status;
84✔
440
}
42✔
441

442
// ---------------------------------------------------------------------------
443
// get_checkpoint_indexer() - get a single-file checkpoint indexer
444
// ---------------------------------------------------------------------------
445

446
static PyObject* Indexer_get_checkpoint_indexer(IndexerObject* self,
12✔
447
                                                PyObject* args) {
448
    const char* file_path = nullptr;
12✔
449
    if (!PyArg_ParseTuple(args, "s", &file_path)) {
12!
450
        return nullptr;
×
451
    }
452

453
    // Determine index path using BatchIndexer's index_dir setting
454
    const char* index_dir = PyUnicode_AsUTF8(self->index_dir);
12!
455
    std::string index_path = dftracer::utils::utilities::composites::dft::
6!
456
        internal::determine_index_path(file_path, index_dir ? index_dir : "");
18!
457

458
    // Create IndexerObject
459
    CheckpointIndexerObject* indexer =
6✔
460
        (CheckpointIndexerObject*)CheckpointIndexerType.tp_alloc(
12!
461
            &CheckpointIndexerType, 0);
462
    if (!indexer) {
12✔
463
        return nullptr;
×
464
    }
465

466
    indexer->handle = nullptr;
12✔
467
    indexer->gz_path = PyUnicode_FromString(file_path);
12!
468
    indexer->index_path = PyUnicode_FromString(index_path.c_str());
12!
469
    indexer->checkpoint_size = self->checkpoint_size;
12✔
470
    indexer->build_bloom = 0;
12✔
471
    indexer->build_manifest = 0;
12✔
472

473
    // Share runtime reference
474
    if (self->runtime_obj) {
12!
475
        Py_INCREF(self->runtime_obj);
×
476
        indexer->runtime_obj = self->runtime_obj;
×
477
    } else {
478
        indexer->runtime_obj = nullptr;
12✔
479
    }
480

481
    // Create the native handle
482
    indexer->handle = dft_indexer_create(file_path, index_path.c_str(),
18!
483
                                         self->checkpoint_size, 0);
6✔
484
    if (!indexer->handle) {
12!
485
        Py_DECREF((PyObject*)indexer);
×
486
        PyErr_SetString(PyExc_RuntimeError,
×
487
                        "Failed to create checkpoint indexer");
488
        return nullptr;
×
489
    }
490

491
    return (PyObject*)indexer;
12✔
492
}
12✔
493

494
static std::optional<std::string> resolve_index_path(IndexerObject* self) {
44✔
495
    PyObject* status = Indexer_resolve(self, nullptr);
44!
496
    if (!status) return std::nullopt;
44✔
497
    PyObject* obj = PyDict_GetItemString(status, "index_path");
44!
498
    const char* path = obj ? PyUnicode_AsUTF8(obj) : nullptr;
44!
499
    if (!path || path[0] == '\0') {
44!
500
        Py_DECREF(status);
501
        PyErr_SetString(PyExc_RuntimeError, "No index path available");
×
502
        return std::nullopt;
×
503
    }
504
    std::string result(path);
66!
505
    Py_DECREF(status);
22!
506
    return result;
44!
507
}
44✔
508

509
static PyObject* Indexer_get_hash_table(IndexerObject* self, PyObject* args) {
12✔
510
    const char* type_str = nullptr;
12✔
511
    if (!PyArg_ParseTuple(args, "s", &type_str)) {
12!
512
        return nullptr;
×
513
    }
514

515
    using dftracer::utils::utilities::indexer::IndexDatabase;
516
    using HashType = IndexDatabase::HashType;
517

518
    HashType type;
519
    if (std::strcmp(type_str, "file") == 0) {
12✔
520
        type = HashType::FILE;
4✔
521
    } else if (std::strcmp(type_str, "host") == 0) {
10✔
522
        type = HashType::HOST;
4✔
523
    } else if (std::strcmp(type_str, "string") == 0) {
6✔
524
        type = HashType::STRING;
2✔
525
    } else if (std::strcmp(type_str, "proc") == 0) {
3✔
526
        type = HashType::PROC;
×
527
    } else {
528
        PyErr_SetString(PyExc_ValueError,
2!
529
                        "type must be 'file', 'host', 'string', or 'proc'");
530
        return nullptr;
2✔
531
    }
532

533
    auto idx_opt = resolve_index_path(self);
10!
534
    if (!idx_opt) return nullptr;
10✔
535
    std::string index_path = std::move(*idx_opt);
10✔
536

537
    std::unordered_map<std::string, std::string> hash_map;
10✔
538
    std::string error_msg;
10✔
539

540
    Py_BEGIN_ALLOW_THREADS try {
10!
541
        IndexDatabase db(
5!
542
            index_path,
543
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
5!
544
        hash_map = db.query_hash_table(type);
10!
545
    } catch (const std::exception& e) {
10!
546
        error_msg = e.what();
×
547
    }
×
548
    Py_END_ALLOW_THREADS
10!
549

550
        if (!error_msg.empty()) {
10!
551
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
552
        return nullptr;
×
553
    }
554

555
    PyObject* dict = PyDict_New();
10!
556
    if (!dict) return nullptr;
10✔
557

558
    for (const auto& [hash, name] : hash_map) {
10!
559
        PyObject* key = PyUnicode_FromStringAndSize(hash.data(), hash.size());
×
560
        PyObject* val = PyUnicode_FromStringAndSize(name.data(), name.size());
×
561
        PyDict_SetItem(dict, key, val);
×
562
        Py_DECREF(key);
×
563
        Py_DECREF(val);
×
564
    }
565

566
    return dict;
10✔
567
}
11✔
568

569
static PyObject* Indexer_query_file_pids(IndexerObject* self, PyObject* args) {
4✔
570
    int file_id;
571
    if (!PyArg_ParseTuple(args, "i", &file_id)) {
4!
572
        return nullptr;
×
573
    }
574

575
    using dftracer::utils::utilities::indexer::IndexDatabase;
576

577
    auto idx_opt = resolve_index_path(self);
4!
578
    if (!idx_opt) return nullptr;
4✔
579
    std::string index_path = std::move(*idx_opt);
4✔
580

581
    std::unordered_set<std::uint64_t> pids;
4✔
582
    std::string error_msg;
4✔
583

584
    Py_BEGIN_ALLOW_THREADS try {
4!
585
        IndexDatabase db(
2!
586
            index_path,
587
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
2!
588
        pids = db.query_file_pids(file_id);
4!
589
    } catch (const std::exception& e) {
4!
590
        error_msg = e.what();
×
591
    }
×
592
    Py_END_ALLOW_THREADS
4!
593

594
        if (!error_msg.empty()) {
4!
595
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
596
        return nullptr;
×
597
    }
598

599
    PyObject* set = PySet_New(nullptr);
4!
600
    if (!set) return nullptr;
4✔
601

602
    for (auto pid : pids) {
6!
603
        PyObject* val = PyLong_FromUnsignedLongLong(pid);
2!
604
        PySet_Add(set, val);
2!
605
        Py_DECREF(val);
1!
606
    }
607

608
    return set;
4✔
609
}
4✔
610

611
static PyObject* Indexer_query_all_file_pids(IndexerObject* self,
6✔
612
                                             PyObject* Py_UNUSED(ignored)) {
613
    using dftracer::utils::utilities::indexer::IndexDatabase;
614

615
    auto idx_opt = resolve_index_path(self);
6!
616
    if (!idx_opt) return nullptr;
6!
617
    std::string index_path = std::move(*idx_opt);
6✔
618

619
    std::unordered_map<int, std::unordered_set<std::uint64_t>> all_pids;
6✔
620
    std::string error_msg;
6✔
621

622
    Py_BEGIN_ALLOW_THREADS try {
6!
623
        IndexDatabase db(
3!
624
            index_path,
625
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
3!
626
        all_pids = db.query_all_file_pids();
6!
627
    } catch (const std::exception& e) {
6!
628
        error_msg = e.what();
×
629
    }
×
630
    Py_END_ALLOW_THREADS
6!
631

632
        if (!error_msg.empty()) {
6!
633
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
634
        return nullptr;
×
635
    }
636

637
    PyObject* dict = PyDict_New();
6!
638
    if (!dict) return nullptr;
6✔
639

640
    for (const auto& [file_id, pids] : all_pids) {
15!
641
        PyObject* key = PyLong_FromLong(file_id);
6!
642
        PyObject* set = PySet_New(nullptr);
6!
643
        for (auto pid : pids) {
12✔
644
            PyObject* val = PyLong_FromUnsignedLongLong(pid);
6!
645
            PySet_Add(set, val);
6!
646
            Py_DECREF(val);
3!
647
        }
648
        PyDict_SetItem(dict, key, set);
6!
649
        Py_DECREF(key);
3!
650
        Py_DECREF(set);
3!
651
    }
652

653
    return dict;
6✔
654
}
6✔
655

656
static PyObject* Indexer_query_file_info(IndexerObject* self,
×
657
                                         PyObject* Py_UNUSED(ignored)) {
658
    using dftracer::utils::utilities::indexer::IndexDatabase;
659

660
    auto idx_opt = resolve_index_path(self);
×
661
    if (!idx_opt) return nullptr;
×
662
    std::string index_path = std::move(*idx_opt);
×
663

664
    std::unordered_map<std::string, int> file_ids;
×
665
    std::unordered_map<int, std::unordered_set<std::uint64_t>> all_pids;
×
666
    std::string error_msg;
×
667

668
    Py_BEGIN_ALLOW_THREADS try {
×
669
        IndexDatabase db(
×
670
            index_path,
671
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
×
672
        file_ids = db.query_all_file_info_ids();
×
673
        all_pids = db.query_all_file_pids();
×
674
    } catch (const std::exception& e) {
×
675
        error_msg = e.what();
×
676
    }
×
677
    Py_END_ALLOW_THREADS
×
678

679
        if (!error_msg.empty()) {
×
680
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
681
        return nullptr;
×
682
    }
683

684
    auto data_dir = fs::weakly_canonical(fs::path(index_path)).parent_path();
×
685

686
    PyObject* id_to_path = PyDict_New();
×
687
    if (!id_to_path) return nullptr;
×
688
    for (const auto& [logical_name, fid] : file_ids) {
×
689
        auto resolved = (data_dir / logical_name).string();
×
690
        PyObject* key = PyLong_FromLong(fid);
×
691
        PyObject* val = PyUnicode_FromStringAndSize(
×
692
            resolved.data(), static_cast<Py_ssize_t>(resolved.size()));
×
693
        PyDict_SetItem(id_to_path, key, val);
×
694
        Py_DECREF(key);
×
695
        Py_DECREF(val);
×
696
    }
×
697

698
    PyObject* pid_dict = PyDict_New();
×
699
    if (!pid_dict) {
×
700
        Py_DECREF(id_to_path);
×
701
        return nullptr;
×
702
    }
703
    for (const auto& [file_id, pids] : all_pids) {
×
704
        PyObject* key = PyLong_FromLong(file_id);
×
705
        PyObject* set = PySet_New(nullptr);
×
706
        for (auto pid : pids) {
×
707
            PyObject* val = PyLong_FromUnsignedLongLong(pid);
×
708
            PySet_Add(set, val);
×
709
            Py_DECREF(val);
×
710
        }
711
        PyDict_SetItem(pid_dict, key, set);
×
712
        Py_DECREF(key);
×
713
        Py_DECREF(set);
×
714
    }
715

716
    PyObject* result = PyTuple_Pack(2, id_to_path, pid_dict);
×
717
    Py_DECREF(id_to_path);
×
718
    Py_DECREF(pid_dict);
×
719
    return result;
×
720
}
×
721

722
#ifdef DFTRACER_UTILS_ENABLE_ARROW
723
#include <dftracer/utils/python/trace_reader_iterator.h>
724
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
725

726
static PyObject* create_arrow_batch_capsule(
77✔
727
    dftracer::utils::utilities::common::arrow::ArrowExportResult&& result) {
728
    auto* obj = (ArrowBatchCapsuleObject*)ArrowBatchCapsuleType.tp_alloc(
77✔
729
        &ArrowBatchCapsuleType, 0);
730
    if (!obj) return nullptr;
77✔
731
    obj->result =
77✔
732
        new dftracer::utils::utilities::common::arrow::ArrowExportResult(
66!
733
            std::move(result));
77!
734
    return (PyObject*)obj;
77✔
735
}
33✔
736

737
namespace {
738

739
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
740
using dftracer::utils::utilities::common::arrow::ColumnSpec;
741
using dftracer::utils::utilities::common::arrow::ColumnType;
742
using dftracer::utils::utilities::common::arrow::RecordBatchBuilder;
743

744
static bool parse_agg_type_str(const char* type_str, AggMapType& out) {
4✔
745
    if (strcmp(type_str, "events") == 0) {
4✔
746
        out = AggMapType::EVENT;
4✔
747
        return true;
4✔
748
    }
749
    if (strcmp(type_str, "profiles") == 0) {
×
750
        out = AggMapType::PROFILE;
×
751
        return true;
×
752
    }
753
    if (strcmp(type_str, "system") == 0) {
×
754
        out = AggMapType::SYSTEM;
×
755
        return true;
×
756
    }
757
    PyErr_SetString(PyExc_ValueError,
×
758
                    "type must be 'events', 'profiles', or 'system'");
759
    return false;
×
760
}
2✔
761

762
struct AggDbHandle {
763
    std::shared_ptr<dftracer::utils::rocksdb::RocksDatabase> db;
764
    std::unique_ptr<EventAggregator> agg;
765
};
766

767
static std::unique_ptr<AggDbHandle> open_agg_db(const std::string& index_path,
24✔
768
                                                std::string& error_msg) {
769
    std::shared_ptr<dftracer::utils::rocksdb::RocksDatabase> db;
24✔
770
    try {
771
        db = EventAggregator::open_with_merge_operator(index_path);
24!
772
    } catch (...) {
12✔
773
        auto& mgr = dftracer::utils::rocksdb::RocksDBManager::instance();
×
774
        mgr.reset(index_path);
×
775
        db = mgr.get_or_open(
×
776
            index_path,
777
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
778
        if (db && db->is_open()) {
×
779
            load_intern_dictionary(*db);
×
780
        }
781
    }
×
782
    if (!db || !db->is_open()) {
24!
783
        error_msg = "Failed to open aggregation database";
×
784
        return nullptr;
×
785
    }
786
    std::string config_val;
24✔
787
    auto key = std::string_view(AGG_GLOBAL_CONFIG_KEY,
24✔
788
                                sizeof(AGG_GLOBAL_CONFIG_KEY) - 1);
789
    if (!db->get(key, &config_val, dftracer::utils::rocksdb::cf::AGGREGATION)
48!
790
             .ok()) {
24!
791
        error_msg = "No aggregation config found - was aggregation enabled?";
×
792
        return nullptr;
×
793
    }
794
    auto cfg = deserialize_agg_global_config(config_val);
24!
795
    auto handle = std::make_unique<AggDbHandle>();
24!
796
    handle->db = db;
24✔
797
    handle->agg = std::make_unique<EventAggregator>(db, cfg.config_hash);
24!
798
    return handle;
24✔
799
}
36!
800

801
static std::optional<dftracer::utils::utilities::common::query::Query>
802
parse_query_arg(const char* query_str) {
26✔
803
    if (!query_str || query_str[0] == '\0') return std::nullopt;
26!
804
    auto result = dftracer::utils::utilities::common::query::Query::from_string(
9✔
805
        query_str);
18!
806
    if (!result) {
18✔
807
        PyErr_SetString(PyExc_ValueError, result.error().message.c_str());
2!
808
        return std::nullopt;
2✔
809
    }
810
    return std::move(*result);
16!
811
}
22✔
812

813
constexpr std::uint16_t DFT_NUM_SHARDS = 4096;
814

815
template <typename Output, typename ScanFn>
816
void parallel_shard_scan_range(Runtime* rt, std::uint16_t outer_begin,
24✔
817
                               std::uint16_t outer_end, ScanFn&& scan_fn,
818
                               std::vector<Output>& outputs) {
819
    if (outer_end <= outer_begin) return;
24!
820
    const std::size_t span = static_cast<std::size_t>(outer_end - outer_begin);
24✔
821
    const std::size_t num_tasks = std::min<std::size_t>(rt->threads(), span);
24!
822
    const std::size_t shards_per_task = (span + num_tasks - 1) / num_tasks;
24✔
823
    rt->submit(run_coro_scope(
48!
824
                   rt->executor(),
12✔
825
                   [&](CoroScope& scope) -> CoroTask<void> {
184!
826
                       std::vector<dftracer::utils::coro::SpawnFuture<Output>>
12✔
827
                           futures;
12✔
828
                       futures.reserve(num_tasks);
12!
829
                       for (std::size_t t = 0; t < num_tasks; ++t) {
48!
830
                           auto shard_begin = static_cast<std::uint16_t>(
72✔
831
                               outer_begin + t * shards_per_task);
36✔
832
                           auto shard_end =
36✔
833
                               static_cast<std::uint16_t>(std::min<std::size_t>(
36!
834
                                   outer_begin + (t + 1) * shards_per_task,
36✔
835
                                   outer_end));
36✔
836
                           futures.push_back(
36!
837
                               scope.spawn([&scan_fn, shard_begin, shard_end](
301!
838
                                               CoroScope&) -> CoroTask<Output> {
36!
839
                                   co_return scan_fn(shard_begin, shard_end);
36!
840
                               }));
841
                       }
36✔
842
                       outputs.reserve(num_tasks);
12!
843
                       for (auto& f : futures) {
136!
844
                           outputs.push_back(co_await f);
102!
845
                       }
36!
846
                   }),
100!
847
               "parallel-shard-scan-range")
12!
848
        .get();
24!
849
}
12✔
850

851
template <typename Output, typename ScanFn>
852
void parallel_shard_scan(Runtime* rt, ScanFn&& scan_fn,
24✔
853
                         std::vector<Output>& outputs) {
854
    parallel_shard_scan_range<Output>(rt, 0, DFT_NUM_SHARDS,
36✔
855
                                      std::forward<ScanFn>(scan_fn), outputs);
12✔
856
}
24✔
857

858
static void append_results_to_list(PyObject* list,
64✔
859
                                   std::vector<ArrowExportResult>& results) {
860
    for (auto& r : results) {
141✔
861
        PyObject* capsule = create_arrow_batch_capsule(std::move(r));
77!
862
        if (capsule) {
77✔
863
            PyList_Append(list, capsule);
77!
864
            Py_DECREF(capsule);
33✔
865
        }
33✔
866
    }
867
}
64✔
868

869
struct AggScanInput {
870
    const EventAggregator* agg;
871
    AggMapType target_type;
872
    AggregationBatchType batch_type;
873
    Py_ssize_t batch_size;
874
    std::uint16_t shard_begin;
875
    std::uint16_t shard_end;
876
};
877

878
struct AggScanOutput {
879
    std::vector<ArrowExportResult> results;
880
};
881

882
AggScanOutput scan_aggregation_shard_range(AggScanInput input) {
×
883
    AggScanOutput output;
×
884

885
    static const std::vector<ColumnSpec> schema = {
×
886
        {"batch_type", ColumnType::INT64},  {"cat", ColumnType::DICT_STRING},
×
887
        {"name", ColumnType::DICT_STRING},  {"pid", ColumnType::UINT64},
×
888
        {"tid", ColumnType::UINT64},        {"hhash", ColumnType::DICT_STRING},
×
889
        {"fhash", ColumnType::DICT_STRING}, {"time_bucket", ColumnType::UINT64},
×
890
        {"count", ColumnType::UINT64},      {"dur_total", ColumnType::UINT64},
×
891
        {"dur_min", ColumnType::UINT64},    {"dur_max", ColumnType::UINT64},
×
892
        {"dur_mean", ColumnType::DOUBLE},   {"dur_std", ColumnType::DOUBLE},
×
893
        {"size_total", ColumnType::UINT64}, {"size_min", ColumnType::UINT64},
×
894
        {"size_max", ColumnType::UINT64},   {"size_mean", ColumnType::DOUBLE},
×
895
        {"size_std", ColumnType::DOUBLE},   {"ts", ColumnType::UINT64},
×
896
        {"te", ColumnType::UINT64},
×
897
    };
×
898

899
    RecordBatchBuilder builder;
×
900
    builder.declare_schema(schema);
×
901
    builder.reserve(static_cast<std::size_t>(input.batch_size));
×
902

903
    std::size_t row_count = 0;
×
904

905
    input.agg->scan_shard_range_raw(
×
906
        input.shard_begin, input.shard_end,
×
907
        [&](std::string_view key_bytes, std::string_view val_bytes) -> bool {
×
908
            AggKeyView kv;
×
909
            if (!parse_agg_key_view(key_bytes, kv)) return true;
×
910
            if (kv.map_type != input.target_type) return true;
×
911

912
            AggMetricsFullView mv;
913
            if (!parse_agg_value_full_view(val_bytes, mv)) return true;
×
914

915
            std::size_t ci = 0;
×
916
            builder.append_int64(ci++,
×
917
                                 static_cast<std::int64_t>(input.batch_type));
×
918
            builder.append_dict_string(ci++, kv.cat);
×
919
            builder.append_dict_string(ci++, kv.name);
×
920
            builder.append_uint64(ci++, kv.pid);
×
921
            builder.append_uint64(ci++, kv.tid);
×
922
            builder.append_dict_string(ci++, kv.hhash);
×
923
            builder.append_dict_string(ci++, kv.fhash);
×
924
            builder.append_uint64(ci++, kv.time_bucket);
×
925
            builder.append_uint64(ci++, mv.count);
×
926
            builder.append_uint64(ci++, mv.dur_total);
×
927
            builder.append_uint64(ci++, mv.count > 0 ? mv.dur_min : 0);
×
928
            builder.append_uint64(ci++, mv.dur_max);
×
929
            builder.append_double(ci++, mv.dur_mean);
×
930
            builder.append_double(ci++, mv.dur_stddev());
×
931
            builder.append_uint64(ci++, mv.size_total);
×
932
            builder.append_uint64(ci++, mv.count > 0 ? mv.size_min : 0);
×
933
            builder.append_uint64(ci++, mv.size_max);
×
934
            builder.append_double(ci++, mv.size_mean);
×
935
            builder.append_double(ci++, mv.size_stddev());
×
936
            builder.append_uint64(ci++, mv.ts);
×
937
            builder.append_uint64(ci++, mv.te);
×
938
            builder.end_row();
×
939

940
            row_count++;
×
941
            if (static_cast<Py_ssize_t>(row_count) >= input.batch_size) {
×
942
                auto arrow = builder.finish();
×
943
                if (arrow.valid()) {
×
944
                    output.results.push_back(std::move(arrow));
×
945
                }
946
                builder.reset(true);
×
947
                builder.reserve(static_cast<std::size_t>(input.batch_size));
×
948
                row_count = 0;
×
949
            }
×
950
            return true;
×
951
        });
952

953
    if (row_count > 0) {
×
954
        auto arrow = builder.finish();
×
955
        if (arrow.valid()) {
×
956
            output.results.push_back(std::move(arrow));
×
957
        }
958
    }
×
959

960
    return output;
×
961
}
×
962

963
enum class IOCategory : std::int8_t {
964
    READ = 1,
965
    WRITE = 2,
966
    METADATA = 3,
967
    PCTL = 4,
968
    IPC = 5,
969
    OTHER = 6,
970
    SYNC = 7,
971
};
972

973
inline IOCategory get_io_category(std::string_view func_name) {
439✔
974
    if (func_name == "read" || func_name == "pread" || func_name == "readv" ||
987!
975
        func_name == "preadv" || func_name == "fread") {
788✔
976
        return IOCategory::READ;
160✔
977
    }
978
    if (func_name == "write" || func_name == "pwrite" ||
569✔
979
        func_name == "writev" || func_name == "pwritev" ||
530!
980
        func_name == "fwrite") {
322✔
981
        return IOCategory::WRITE;
170✔
982
    }
983
    if (func_name == "fsync" || func_name == "fdatasync" ||
289!
984
        func_name == "msync" || func_name == "sync") {
237!
985
        return IOCategory::SYNC;
×
986
    }
987
    if (func_name == "open" || func_name == "open64" || func_name == "close" ||
140!
988
        func_name == "fopen" || func_name == "fopen64" ||
×
989
        func_name == "fclose" || func_name == "stat" || func_name == "fstat" ||
×
990
        func_name == "lstat" || func_name == "fstatat" ||
×
991
        func_name == "__xstat" || func_name == "__xstat64" ||
×
992
        func_name == "__lxstat" || func_name == "__lxstat64" ||
×
993
        func_name == "__fxstat" || func_name == "__fxstat64" ||
×
994
        func_name == "access" || func_name == "lseek" ||
×
995
        func_name == "lseek64" || func_name == "fseek" ||
×
996
        func_name == "ftell" || func_name == "seek" || func_name == "fcntl" ||
×
997
        func_name == "ftruncate" || func_name == "mkdir" ||
×
998
        func_name == "rmdir" || func_name == "unlink" ||
×
999
        func_name == "remove" || func_name == "rename" || func_name == "link" ||
×
1000
        func_name == "readlink" || func_name == "opendir" ||
×
1001
        func_name == "closedir" || func_name == "readdir") {
88!
1002
        return IOCategory::METADATA;
113✔
1003
    }
1004
    return IOCategory::OTHER;
×
1005
}
204✔
1006

1007
inline char* fast_itoa(std::uint64_t val, char* buf) {
1008
    char* p = buf;
1009
    do {
1010
        *p++ = '0' + (val % 10);
1011
        val /= 10;
1012
    } while (val);
1013
    std::reverse(buf, p);
1014
    return p;
1015
}
1016

1017
class HashResolver {
1018
   public:
1019
    HashResolver(
153✔
1020
        const std::unordered_map<std::string, std::string>* file_hashes,
1021
        const std::unordered_map<std::string, std::string>* host_hashes)
1022
        : file_hashes_(file_hashes), host_hashes_(host_hashes) {
117✔
1023
        if (file_hashes_) {
83✔
1024
            for (const auto& [hash, name] : *file_hashes_) {
215!
1025
                auto hash_sv = intern_.intern(hash);
189!
1026
                auto name_sv = intern_.intern(name);
189!
1027
                file_map_[hash_sv] = name_sv;
133!
1028
            }
1029
        }
36✔
1030
        if (host_hashes_) {
84!
1031
            for (const auto& [hash, name] : *host_hashes_) {
217!
1032
                auto hash_sv = intern_.intern(hash);
190!
1033
                auto name_sv = intern_.intern(name);
190!
1034
                host_map_[hash_sv] = name_sv;
133!
1035
            }
1036
        }
36✔
1037
    }
120✔
1038

1039
    // Unresolved hashes resolve to empty (not the hash itself): the
1040
    // dfanalyzer side treats empty file_name/host_name as missing (NA).
1041
    std::string_view resolve_file(std::string_view hash) {
749✔
1042
        if (hash.empty()) return hash;
749!
1043
        auto it = file_map_.find(intern_.intern(hash));
750!
1044
        return it != file_map_.end() ? it->second : std::string_view{};
1,126!
1045
    }
378✔
1046

1047
    std::string_view resolve_host(std::string_view hash) {
747✔
1048
        if (hash.empty()) return hash;
747!
1049
        auto it = host_map_.find(intern_.intern(hash));
749!
1050
        return it != host_map_.end() ? it->second : std::string_view{};
1,134!
1051
    }
378✔
1052

1053
    std::string_view intern(std::string_view sv) { return intern_.intern(sv); }
754✔
1054

1055
   private:
1056
    const std::unordered_map<std::string, std::string>* file_hashes_;
1057
    const std::unordered_map<std::string, std::string>* host_hashes_;
1058
    dftracer::utils::StringIntern intern_;
1059
    std::unordered_map<std::string_view, std::string_view> file_map_;
1060
    std::unordered_map<std::string_view, std::string_view> host_map_;
1061
};
1062

1063
struct ProcKey {
1064
    std::string_view hhash;
1065
    std::uint64_t pid;
1066
    std::uint64_t tid;
1067
    bool operator==(const ProcKey& o) const {
442✔
1068
        return hhash == o.hhash && pid == o.pid && tid == o.tid;
442!
1069
    }
1070
};
1071

1072
struct ProcKeyHash {
1073
    std::size_t operator()(const ProcKey& k) const {
1,061✔
1074
        return std::hash<std::string_view>{}(k.hhash) ^
2,095✔
1075
               (std::hash<std::uint64_t>{}(k.pid) << 1) ^
1,583✔
1076
               (std::hash<std::uint64_t>{}(k.tid) << 2);
1,063✔
1077
    }
1078
};
1079

1080
static const std::vector<ColumnSpec> DFANALYZER_SCHEMA = {
1!
1081
    {"cat", ColumnType::DICT_STRING},
1!
1082
    {"func_name", ColumnType::DICT_STRING},
1!
1083
    {"pid", ColumnType::INT64},
1!
1084
    {"tid", ColumnType::INT64},
1!
1085
    {"file_hash", ColumnType::DICT_STRING},
1!
1086
    {"host_hash", ColumnType::DICT_STRING},
1!
1087
    {"file_name", ColumnType::DICT_STRING},
1!
1088
    {"host_name", ColumnType::DICT_STRING},
1!
1089
    {"proc_name", ColumnType::DICT_STRING},
1!
1090
    {"io_cat", ColumnType::INT64},
1!
1091
    {"acc_pat", ColumnType::INT64},
1!
1092
    {"count", ColumnType::INT64},
1!
1093
    {"time", ColumnType::DOUBLE},
1!
1094
    {"size", ColumnType::INT64},
1!
1095
    {"time_min", ColumnType::DOUBLE},
1!
1096
    {"time_max", ColumnType::DOUBLE},
1!
1097
    {"size_min", ColumnType::INT64},
1!
1098
    {"size_max", ColumnType::INT64},
1!
1099
    {"offset_min", ColumnType::INT64},
1!
1100
    {"offset_max", ColumnType::INT64},
1!
1101
    {"time_range", ColumnType::INT64},
1!
1102
    {"time_start", ColumnType::INT64},
1!
1103
    {"time_end", ColumnType::INT64},
1!
1104
};
1105

1106
enum GroupByField : std::uint32_t {
1107
    GB_CAT = 1u << 0,
1108
    GB_FUNC_NAME = 1u << 1,
1109
    GB_PID = 1u << 2,
1110
    GB_TID = 1u << 3,
1111
    GB_FILE_HASH = 1u << 4,
1112
    GB_HOST_HASH = 1u << 5,
1113
    GB_FILE_NAME = 1u << 6,
1114
    GB_HOST_NAME = 1u << 7,
1115
    GB_PROC_NAME = 1u << 8,
1116
    GB_IO_CAT = 1u << 9,
1117
    GB_ACC_PAT = 1u << 10,
1118
    GB_TIME_RANGE = 1u << 11,
1119
};
1120

1121
struct GroupByConfig {
10✔
1122
    std::uint32_t mask = 0;
10✔
1123
    std::vector<GroupByField> order;
1124
    std::vector<std::string> names;  // matches `order`, used for schema
1125
};
1126

1127
inline std::optional<GroupByField> parse_group_by_name(std::string_view name) {
×
1128
    if (name == "cat") return GB_CAT;
×
1129
    if (name == "func_name") return GB_FUNC_NAME;
×
1130
    if (name == "pid") return GB_PID;
×
1131
    if (name == "tid") return GB_TID;
×
1132
    if (name == "file_hash") return GB_FILE_HASH;
×
1133
    if (name == "host_hash") return GB_HOST_HASH;
×
1134
    if (name == "file_name") return GB_FILE_NAME;
×
1135
    if (name == "host_name") return GB_HOST_NAME;
×
1136
    if (name == "proc_name") return GB_PROC_NAME;
×
1137
    if (name == "io_cat") return GB_IO_CAT;
×
1138
    if (name == "acc_pat") return GB_ACC_PAT;
×
1139
    if (name == "time_range") return GB_TIME_RANGE;
×
1140
    return std::nullopt;
×
1141
}
1142

1143
struct CoarseKey {
1144
    std::string_view cat;
1145
    std::string_view func_name;
1146
    std::uint64_t pid = 0;
1147
    std::uint64_t tid = 0;
1148
    std::string_view file_hash;
1149
    std::string_view host_hash;
1150
    std::string_view file_name;
1151
    std::string_view host_name;
1152
    std::string_view proc_name;
1153
    std::int64_t io_cat = 0;
1154
    std::int64_t acc_pat = 0;
1155
    std::int64_t time_range = 0;
1156

1157
    bool operator==(const CoarseKey& o) const {
×
1158
        return cat == o.cat && func_name == o.func_name && pid == o.pid &&
×
1159
               tid == o.tid && file_hash == o.file_hash &&
×
1160
               host_hash == o.host_hash && file_name == o.file_name &&
×
1161
               host_name == o.host_name && proc_name == o.proc_name &&
×
1162
               io_cat == o.io_cat && acc_pat == o.acc_pat &&
×
1163
               time_range == o.time_range;
×
1164
    }
1165
};
1166

1167
struct CoarseKeyHash {
1168
    std::size_t operator()(const CoarseKey& k) const {
×
1169
        auto combine = [](std::size_t h, std::size_t v) {
×
1170
            return h ^ (v + 0x9e3779b97f4a7c15ULL + (h << 6) + (h >> 2));
×
1171
        };
1172
        std::size_t h = std::hash<std::string_view>{}(k.cat);
×
1173
        h = combine(h, std::hash<std::string_view>{}(k.func_name));
×
1174
        h = combine(h, std::hash<std::uint64_t>{}(k.pid));
×
1175
        h = combine(h, std::hash<std::uint64_t>{}(k.tid));
×
1176
        h = combine(h, std::hash<std::string_view>{}(k.file_hash));
×
1177
        h = combine(h, std::hash<std::string_view>{}(k.host_hash));
×
1178
        h = combine(h, std::hash<std::string_view>{}(k.file_name));
×
1179
        h = combine(h, std::hash<std::string_view>{}(k.host_name));
×
1180
        h = combine(h, std::hash<std::string_view>{}(k.proc_name));
×
1181
        h = combine(h, std::hash<std::int64_t>{}(k.io_cat));
×
1182
        h = combine(h, std::hash<std::int64_t>{}(k.acc_pat));
×
1183
        h = combine(h, std::hash<std::int64_t>{}(k.time_range));
×
1184
        return h;
×
1185
    }
1186
};
1187

1188
struct CoarseMetrics {
1189
    std::uint64_t count = 0;
1190
    double time_sum = 0.0;
1191
    double time_sq_sum = 0.0;
1192
    double time_min_val = std::numeric_limits<double>::infinity();
1193
    double time_max_val = -std::numeric_limits<double>::infinity();
1194
    double time_call_min_val = std::numeric_limits<double>::infinity();
1195
    double time_call_max_val = -std::numeric_limits<double>::infinity();
1196
    std::uint64_t size_sum = 0;
1197
    double size_sq_sum = 0.0;
1198
    std::uint64_t size_min_val = std::numeric_limits<std::uint64_t>::max();
1199
    std::uint64_t size_max_val = 0;
1200
    std::uint64_t size_call_min_val = std::numeric_limits<std::uint64_t>::max();
1201
    std::uint64_t size_call_max_val = 0;
1202
    bool has_size = false;
1203
    std::uint64_t time_start_val = std::numeric_limits<std::uint64_t>::max();
1204
    std::uint64_t time_end_val = 0;
1205
    bool has_time_bounds = false;
1206
};
1207

1208
inline std::vector<ColumnSpec> make_coarse_schema(const GroupByConfig& cfg) {
×
1209
    std::vector<ColumnSpec> specs;
×
1210
    specs.reserve(cfg.order.size() + 16);
×
1211
    for (std::size_t i = 0; i < cfg.order.size(); ++i) {
×
1212
        GroupByField f = cfg.order[i];
×
1213
        const std::string& name = cfg.names[i];
×
1214
        switch (f) {
×
1215
            case GB_CAT:
1216
            case GB_FUNC_NAME:
1217
            case GB_FILE_HASH:
1218
            case GB_HOST_HASH:
1219
            case GB_FILE_NAME:
1220
            case GB_HOST_NAME:
1221
            case GB_PROC_NAME:
1222
                specs.push_back({name, ColumnType::DICT_STRING});
×
1223
                break;
×
1224
            case GB_PID:
1225
            case GB_TID:
1226
            case GB_IO_CAT:
1227
            case GB_ACC_PAT:
1228
            case GB_TIME_RANGE:
1229
                specs.push_back({name, ColumnType::INT64});
×
1230
                break;
×
1231
        }
1232
    }
1233
    specs.push_back({"count", ColumnType::INT64});
×
1234
    specs.push_back({"time", ColumnType::DOUBLE});
×
1235
    specs.push_back({"size", ColumnType::INT64});
×
1236
    specs.push_back({"time_sq", ColumnType::DOUBLE});
×
1237
    specs.push_back({"size_sq", ColumnType::DOUBLE});
×
1238
    specs.push_back({"time_min", ColumnType::DOUBLE});
×
1239
    specs.push_back({"time_max", ColumnType::DOUBLE});
×
1240
    specs.push_back({"size_min", ColumnType::INT64});
×
1241
    specs.push_back({"size_max", ColumnType::INT64});
×
1242
    specs.push_back({"time_call_min", ColumnType::DOUBLE});
×
1243
    specs.push_back({"time_call_max", ColumnType::DOUBLE});
×
1244
    specs.push_back({"size_call_min", ColumnType::INT64});
×
1245
    specs.push_back({"size_call_max", ColumnType::INT64});
×
1246
    specs.push_back({"time_start", ColumnType::INT64});
×
1247
    specs.push_back({"time_end", ColumnType::INT64});
×
1248
    return specs;
×
1249
}
×
1250

1251
struct DfanalyzerScanInput {
36✔
1252
    const EventAggregator* agg;
1253
    const DfanalyzerContext* ctx;
1254
    std::optional<AggMapType> type_filter;
1255
    Py_ssize_t batch_size;
1256
    std::uint16_t shard_begin;
1257
    std::uint16_t shard_end;
1258
    const GroupByConfig* group_by = nullptr;  // null = full granularity
36✔
1259
};
1260

1261
struct DfanalyzerScanOutput {
1262
    std::vector<ArrowExportResult> events;
1263
    std::vector<ArrowExportResult> profiles;
1264
    std::vector<ArrowExportResult> system;
1265
};
1266

1267
DfanalyzerScanOutput scan_dfanalyzer_shards(DfanalyzerScanInput input) {
74✔
1268
    DfanalyzerScanOutput output;
74✔
1269

1270
    const bool coarse = input.group_by != nullptr;
82✔
1271
    const std::vector<ColumnSpec> coarse_schema =
1272
        coarse ? make_coarse_schema(*input.group_by)
82!
1273
               : std::vector<ColumnSpec>{};
82!
1274

1275
    auto make_builder = [&]() {
244✔
1276
        RecordBatchBuilder b;
208✔
1277
        if (coarse) {
217!
1278
            b.declare_schema(coarse_schema);
×
1279
        } else {
1280
            b.declare_schema(DFANALYZER_SCHEMA);
217✔
1281
        }
1282
        b.reserve(static_cast<std::size_t>(input.batch_size));
211✔
1283
        return b;
223✔
1284
    };
97!
1285

1286
    RecordBatchBuilder event_builder, profile_builder, system_builder;
76!
1287
    bool use_events =
36✔
1288
        !input.type_filter || *input.type_filter == AggMapType::EVENT;
81!
1289
    bool use_profiles =
36✔
1290
        !input.type_filter || *input.type_filter == AggMapType::PROFILE;
78!
1291
    bool use_system =
36✔
1292
        !input.type_filter || *input.type_filter == AggMapType::SYSTEM;
81!
1293

1294
    if (use_events) event_builder = make_builder();
79✔
1295
    if (use_profiles) profile_builder = make_builder();
83✔
1296
    if (use_system) system_builder = make_builder();
85!
1297

1298
    auto bucket_width_us = static_cast<std::uint64_t>(
85✔
1299
        input.ctx->time_granularity * input.ctx->time_resolution);
85✔
1300
    std::size_t event_count = 0, profile_count = 0, system_count = 0;
85✔
1301

1302
    HashResolver resolver(input.ctx->file_hashes, input.ctx->host_hashes);
85✔
1303
    std::unordered_map<ProcKey, std::string, ProcKeyHash> proc_name_cache;
84✔
1304
    std::unordered_map<std::string_view, IOCategory> io_cat_cache;
83✔
1305

1306
    std::unordered_map<CoarseKey, CoarseMetrics, CoarseKeyHash> event_coarse,
84✔
1307
        profile_coarse, system_coarse;
84✔
1308

1309
    auto flush_builder = [&](RecordBatchBuilder& builder, std::size_t& count,
260✔
1310
                             std::vector<ArrowExportResult>& results) {
1311
        if (count > 0) {
224✔
1312
            auto arrow = builder.finish();
77!
1313
            if (arrow.valid()) {
77!
1314
                results.push_back(std::move(arrow));
77!
1315
            }
33✔
1316
            builder.reset(true);
76!
1317
            builder.reserve(static_cast<std::size_t>(input.batch_size));
77!
1318
            count = 0;
77✔
1319
        }
77✔
1320
    };
224✔
1321

1322
    auto append_row = [&](RecordBatchBuilder& builder, std::size_t& count,
787✔
1323
                          std::vector<ArrowExportResult>& results,
1324
                          const AggKeyView& kv, const AggMetricsView& mv,
1325
                          std::string_view file_name,
1326
                          std::string_view host_name,
1327
                          std::string_view proc_name, IOCategory io_cat) {
1328
        std::size_t ci = 0;
751✔
1329
        builder.append_dict_string(ci++, kv.cat);
751✔
1330
        builder.append_dict_string(ci++, kv.name);
753✔
1331
        builder.append_int64(ci++, static_cast<std::int64_t>(kv.pid));
756✔
1332
        builder.append_int64(ci++, static_cast<std::int64_t>(kv.tid));
751✔
1333
        builder.append_dict_string(ci++, kv.fhash);
753✔
1334
        builder.append_dict_string(ci++, kv.hhash);
756✔
1335
        builder.append_dict_string(ci++, file_name);
756✔
1336
        builder.append_dict_string(ci++, host_name);
756✔
1337
        builder.append_dict_string(ci++, proc_name);
756✔
1338
        builder.append_int64(ci++, static_cast<std::int64_t>(io_cat));
756✔
1339
        builder.append_int64(ci++, 0);
754✔
1340

1341
        builder.append_int64(ci++, static_cast<std::int64_t>(mv.count));
755✔
1342
        builder.append_double(ci++, static_cast<double>(mv.dur_total) /
1,133✔
1343
                                        input.ctx->time_resolution);
755✔
1344

1345
        if (mv.size_total > 0) {
756✔
1346
            builder.append_int64(ci++,
846✔
1347
                                 static_cast<std::int64_t>(mv.size_total));
564✔
1348
        } else {
282✔
1349
            builder.append_null(ci++);
192✔
1350
        }
1351

1352
        builder.append_double(ci++, mv.count > 0
1,132!
1353
                                        ? static_cast<double>(mv.dur_min) /
1,133✔
1354
                                              input.ctx->time_resolution
755✔
1355
                                        : 0.0);
1356
        builder.append_double(ci++, mv.count > 0
1,134!
1357
                                        ? static_cast<double>(mv.dur_max) /
1,134✔
1358
                                              input.ctx->time_resolution
756✔
1359
                                        : 0.0);
1360

1361
        if (mv.size_total > 0 && mv.count > 0) {
756✔
1362
            builder.append_int64(ci++, static_cast<std::int64_t>(mv.size_min));
564✔
1363
            builder.append_int64(ci++, static_cast<std::int64_t>(mv.size_max));
564✔
1364
        } else {
282✔
1365
            builder.append_null(ci++);
192✔
1366
            builder.append_null(ci++);
192✔
1367
        }
1368

1369
        // offset_min > offset_max only when no offset was ever recorded
1370
        // (MetricStats default min=UINT64_MAX, max=0); 0 is a valid offset.
1371
        if (mv.offset_min <= mv.offset_max) {
756!
1372
            builder.append_int64(ci++,
1,134✔
1373
                                 static_cast<std::int64_t>(mv.offset_min));
756✔
1374
            builder.append_int64(ci++,
1,134✔
1375
                                 static_cast<std::int64_t>(mv.offset_max));
756✔
1376
        } else {
378✔
1377
            builder.append_null(ci++);
×
1378
            builder.append_null(ci++);
×
1379
        }
1380

1381
        auto time_range = bucket_width_us > 0
1,512!
1382
                              ? static_cast<std::int64_t>(
378!
1383
                                    (kv.time_bucket - input.ctx->time_origin) /
1,134✔
1384
                                    bucket_width_us)
756✔
1385
                              : 0;
1386
        builder.append_int64(ci++, time_range);
756✔
1387
        // Counter (profile) rows align to the bucket grid: time_start is the
1388
        // bucket start, time_end one bucket later. Plain events keep the
1389
        // precise min/max event timestamps.
1390
        if (kv.map_type == AggMapType::PROFILE) {
755!
1391
            auto bucket_start = static_cast<std::int64_t>(
×
1392
                kv.time_bucket - input.ctx->time_origin);
×
1393
            builder.append_int64(ci++, bucket_start);
×
1394
            builder.append_int64(ci++, bucket_start + static_cast<std::int64_t>(
×
1395
                                                          bucket_width_us));
×
1396
        } else {
1397
            builder.append_int64(ci++, static_cast<std::int64_t>(
1,133✔
1398
                                           mv.ts - input.ctx->time_origin));
755✔
1399
            builder.append_int64(ci++, static_cast<std::int64_t>(
1,133✔
1400
                                           mv.te - input.ctx->time_origin));
755✔
1401
        }
1402
        builder.end_row();
755✔
1403

1404
        count++;
755✔
1405
        if (static_cast<Py_ssize_t>(count) >= input.batch_size) {
755✔
1406
            flush_builder(builder, count, results);
×
1407
        }
1408
    };
803✔
1409

1410
    auto accumulate_coarse =
1411
        [&](std::unordered_map<CoarseKey, CoarseMetrics, CoarseKeyHash>& map,
36✔
1412
            const AggKeyView& kv, const AggMetricsView& mv,
1413
            std::string_view file_name, std::string_view host_name,
1414
            std::string_view proc_name, IOCategory io_cat) {
1415
            const auto& cfg = *input.group_by;
×
1416
            // Probe with non-interned views; hash/equality compare by content,
1417
            // so string_view lifetime doesn't matter for lookup. We only copy
1418
            // (intern) on first insert.
1419
            CoarseKey probe;
×
1420
            if (cfg.mask & GB_CAT) probe.cat = kv.cat;
×
1421
            if (cfg.mask & GB_FUNC_NAME) probe.func_name = kv.name;
×
1422
            if (cfg.mask & GB_PID) probe.pid = kv.pid;
×
1423
            if (cfg.mask & GB_TID) probe.tid = kv.tid;
×
1424
            if (cfg.mask & GB_FILE_HASH) probe.file_hash = kv.fhash;
×
1425
            if (cfg.mask & GB_HOST_HASH) probe.host_hash = kv.hhash;
×
1426
            if (cfg.mask & GB_FILE_NAME) probe.file_name = file_name;
×
1427
            if (cfg.mask & GB_HOST_NAME) probe.host_name = host_name;
×
1428
            if (cfg.mask & GB_PROC_NAME) probe.proc_name = proc_name;
×
1429
            if (cfg.mask & GB_IO_CAT)
×
1430
                probe.io_cat = static_cast<std::int64_t>(io_cat);
×
1431
            if (cfg.mask & GB_TIME_RANGE) {
×
1432
                probe.time_range =
×
1433
                    bucket_width_us > 0
×
1434
                        ? static_cast<std::int64_t>(
×
1435
                              (kv.time_bucket - input.ctx->time_origin) /
×
1436
                              bucket_width_us)
×
1437
                        : 0;
1438
            }
1439
            // acc_pat is always 0 today; included for completeness.
1440

1441
            auto it = map.find(probe);
×
1442
            if (it == map.end()) {
×
1443
                // First sighting: promote views referencing unstable DB buffers
1444
                // to interned copies. file_name/host_name come from the
1445
                // resolver's intern pool, and proc_name from proc_name_cache;
1446
                // both already stable across iterations, no copy needed.
1447
                CoarseKey stable = probe;
×
1448
                if (cfg.mask & GB_CAT) stable.cat = resolver.intern(kv.cat);
×
1449
                if (cfg.mask & GB_FUNC_NAME)
×
1450
                    stable.func_name = resolver.intern(kv.name);
×
1451
                if (cfg.mask & GB_FILE_HASH)
×
1452
                    stable.file_hash = resolver.intern(kv.fhash);
×
1453
                if (cfg.mask & GB_HOST_HASH)
×
1454
                    stable.host_hash = resolver.intern(kv.hhash);
×
1455
                auto [nit, _] = map.emplace(std::move(stable), CoarseMetrics{});
×
1456
                it = nit;
×
1457
            }
1458
            CoarseMetrics& m = it->second;
×
1459
            m.count += mv.count;
×
1460
            double time_val =
×
1461
                static_cast<double>(mv.dur_total) / input.ctx->time_resolution;
×
1462
            m.time_sum += time_val;
×
1463
            m.time_sq_sum += time_val * time_val;
×
1464
            if (time_val < m.time_call_min_val) m.time_call_min_val = time_val;
×
1465
            if (time_val > m.time_call_max_val) m.time_call_max_val = time_val;
×
1466
            if (mv.count > 0) {
×
1467
                double dur_min_v = static_cast<double>(mv.dur_min) /
×
1468
                                   input.ctx->time_resolution;
×
1469
                double dur_max_v = static_cast<double>(mv.dur_max) /
×
1470
                                   input.ctx->time_resolution;
×
1471
                if (dur_min_v < m.time_min_val) m.time_min_val = dur_min_v;
×
1472
                if (dur_max_v > m.time_max_val) m.time_max_val = dur_max_v;
×
1473
            }
1474
            if (mv.size_total > 0) {
×
1475
                m.has_size = true;
×
1476
                m.size_sum += mv.size_total;
×
1477
                double sz = static_cast<double>(mv.size_total);
×
1478
                m.size_sq_sum += sz * sz;
×
1479
                if (mv.size_total < m.size_call_min_val)
×
1480
                    m.size_call_min_val = mv.size_total;
×
1481
                if (mv.size_total > m.size_call_max_val)
×
1482
                    m.size_call_max_val = mv.size_total;
×
1483
                if (mv.count > 0) {
×
1484
                    if (mv.size_min < m.size_min_val)
×
1485
                        m.size_min_val = mv.size_min;
×
1486
                    if (mv.size_max > m.size_max_val)
×
1487
                        m.size_max_val = mv.size_max;
×
1488
                }
1489
            }
1490
            if (mv.ts >= input.ctx->time_origin) {
×
1491
                m.has_time_bounds = true;
×
1492
                auto ts_off = mv.ts - input.ctx->time_origin;
×
1493
                auto te_off = mv.te - input.ctx->time_origin;
×
1494
                if (ts_off < m.time_start_val) m.time_start_val = ts_off;
×
1495
                if (te_off > m.time_end_val) m.time_end_val = te_off;
×
1496
            }
1497
        };
×
1498

1499
    input.agg->scan_shard_range_raw(
72!
1500
        input.shard_begin, input.shard_end,
84!
1501
        [&](std::string_view key_bytes, std::string_view val_bytes) -> bool {
946✔
1502
            AggKeyView kv;
910✔
1503
            if (!parse_agg_key_view(key_bytes, kv)) return true;
921!
1504

1505
            if (input.type_filter && kv.map_type != *input.type_filter)
920!
1506
                return true;
×
1507

1508
            if (input.ctx->query_filter) {
915✔
1509
                auto& q = *input.ctx->query_filter;
531✔
1510
                dftracer::utils::utilities::common::query::ValueMap fields;
531!
1511
                if (q.references("cat")) fields["cat"] = std::string(kv.cat);
532!
1512
                if (q.references("name")) fields["name"] = std::string(kv.name);
526!
1513
                if (q.references("pid")) fields["pid"] = kv.pid;
527!
1514
                if (q.references("tid")) fields["tid"] = kv.tid;
529!
1515
                if (q.references("hhash"))
528!
1516
                    fields["hhash"] = std::string(kv.hhash);
×
1517
                if (q.references("fhash"))
528!
1518
                    fields["fhash"] = std::string(kv.fhash);
×
1519
                if (q.references("time_bucket"))
528!
1520
                    fields["time_bucket"] = kv.time_bucket;
×
1521
                if (!q.evaluate(fields)) return true;
527✔
1522
            }
520✔
1523

1524
            AggMetricsView mv;
1525
            if (!parse_agg_value_view(val_bytes, mv)) return true;
753✔
1526

1527
            auto file_name = resolver.resolve_file(kv.fhash);
756!
1528
            auto host_name = resolver.resolve_host(kv.hhash);
755!
1529

1530
            ProcKey pk{kv.hhash, kv.pid, kv.tid};
757✔
1531
            auto proc_it = proc_name_cache.find(pk);
757!
1532
            std::string_view proc_name;
757✔
1533
            if (proc_it != proc_name_cache.end()) {
757✔
1534
                proc_name = proc_it->second;
444✔
1535
            } else {
241✔
1536
                std::string pn = "app#";
314!
1537
                if (!host_name.empty()) {
314!
1538
                    pn.append(host_name);
314!
1539
                } else if (!kv.hhash.empty()) {
139!
1540
                    pn.append(kv.hhash);
×
1541
                } else {
1542
                    pn.append("unknown");
×
1543
                }
1544
                pn.push_back('#');
314!
1545
                pn.append(std::to_string(kv.pid));
314!
1546
                pn.push_back('#');
313✔
1547
                pn.append(std::to_string(kv.tid));
313!
1548
                ProcKey stable_pk{resolver.intern(kv.hhash), kv.pid, kv.tid};
314!
1549
                auto [it, _] =
309✔
1550
                    proc_name_cache.emplace(stable_pk, std::move(pn));
314!
1551
                proc_name = it->second;
313✔
1552
            }
312✔
1553

1554
            auto io_it = io_cat_cache.find(kv.name);
756!
1555
            IOCategory io_cat;
1556
            if (io_it != io_cat_cache.end()) {
754✔
1557
                io_cat = io_it->second;
312✔
1558
            } else {
174✔
1559
                io_cat = get_io_category(kv.name);
444✔
1560
                io_cat_cache[resolver.intern(kv.name)] = io_cat;
443!
1561
            }
1562

1563
            if (coarse) {
753!
1564
                switch (kv.map_type) {
×
1565
                    case AggMapType::EVENT:
1566
                        if (use_events)
×
1567
                            accumulate_coarse(event_coarse, kv, mv, file_name,
×
1568
                                              host_name, proc_name, io_cat);
1569
                        break;
×
1570
                    case AggMapType::PROFILE:
1571
                        if (use_profiles)
×
1572
                            accumulate_coarse(profile_coarse, kv, mv, file_name,
×
1573
                                              host_name, proc_name, io_cat);
1574
                        break;
×
1575
                    case AggMapType::SYSTEM:
1576
                        if (use_system)
×
1577
                            accumulate_coarse(system_coarse, kv, mv, file_name,
×
1578
                                              host_name, proc_name, io_cat);
1579
                        break;
×
1580
                }
1581
            } else {
1582
                switch (kv.map_type) {
753!
1583
                    case AggMapType::EVENT:
375✔
1584
                        append_row(event_builder, event_count, output.events,
1,131!
1585
                                   kv, mv, file_name, host_name, proc_name,
378✔
1586
                                   io_cat);
378✔
1587
                        break;
755✔
1588
                    case AggMapType::PROFILE:
1589
                        append_row(profile_builder, profile_count,
×
1590
                                   output.profiles, kv, mv, file_name,
×
1591
                                   host_name, proc_name, io_cat);
1592
                        break;
×
1593
                    case AggMapType::SYSTEM:
1594
                        append_row(system_builder, system_count, output.system,
×
1595
                                   kv, mv, file_name, host_name, proc_name,
1596
                                   io_cat);
1597
                        break;
×
1598
                }
1599
            }
1600
            return true;
755✔
1601
        });
473✔
1602

1603
    if (coarse) {
84!
1604
        const auto& cfg = *input.group_by;
×
1605
        auto flush_coarse = [&](std::unordered_map<CoarseKey, CoarseMetrics,
×
1606
                                                   CoarseKeyHash>& map,
1607
                                RecordBatchBuilder& builder, std::size_t& count,
1608
                                std::vector<ArrowExportResult>& results) {
1609
            for (auto& [key, m] : map) {
×
1610
                std::size_t ci = 0;
×
1611
                for (std::size_t i = 0; i < cfg.order.size(); ++i) {
×
1612
                    switch (cfg.order[i]) {
×
1613
                        case GB_CAT:
1614
                            builder.append_dict_string(ci++, key.cat);
×
1615
                            break;
×
1616
                        case GB_FUNC_NAME:
1617
                            builder.append_dict_string(ci++, key.func_name);
×
1618
                            break;
×
1619
                        case GB_PID:
1620
                            builder.append_int64(
×
1621
                                ci++, static_cast<std::int64_t>(key.pid));
×
1622
                            break;
×
1623
                        case GB_TID:
1624
                            builder.append_int64(
×
1625
                                ci++, static_cast<std::int64_t>(key.tid));
×
1626
                            break;
×
1627
                        case GB_FILE_HASH:
1628
                            builder.append_dict_string(ci++, key.file_hash);
×
1629
                            break;
×
1630
                        case GB_HOST_HASH:
1631
                            builder.append_dict_string(ci++, key.host_hash);
×
1632
                            break;
×
1633
                        case GB_FILE_NAME:
1634
                            builder.append_dict_string(ci++, key.file_name);
×
1635
                            break;
×
1636
                        case GB_HOST_NAME:
1637
                            builder.append_dict_string(ci++, key.host_name);
×
1638
                            break;
×
1639
                        case GB_PROC_NAME:
1640
                            builder.append_dict_string(ci++, key.proc_name);
×
1641
                            break;
×
1642
                        case GB_IO_CAT:
1643
                            builder.append_int64(ci++, key.io_cat);
×
1644
                            break;
×
1645
                        case GB_ACC_PAT:
1646
                            builder.append_int64(ci++, key.acc_pat);
×
1647
                            break;
×
1648
                        case GB_TIME_RANGE:
1649
                            builder.append_int64(ci++, key.time_range);
×
1650
                            break;
×
1651
                    }
1652
                }
1653
                builder.append_int64(ci++, static_cast<std::int64_t>(m.count));
×
1654
                builder.append_double(ci++, m.time_sum);
×
1655
                if (m.has_size) {
×
1656
                    builder.append_int64(ci++,
×
1657
                                         static_cast<std::int64_t>(m.size_sum));
×
1658
                } else {
1659
                    builder.append_null(ci++);
×
1660
                }
1661
                builder.append_double(ci++, m.time_sq_sum);
×
1662
                if (m.has_size) {
×
1663
                    builder.append_double(ci++, m.size_sq_sum);
×
1664
                } else {
1665
                    builder.append_null(ci++);
×
1666
                }
1667
                builder.append_double(ci++, m.count > 0 ? m.time_min_val : 0.0);
×
1668
                builder.append_double(ci++, m.count > 0 ? m.time_max_val : 0.0);
×
1669
                if (m.has_size) {
×
1670
                    builder.append_int64(
×
1671
                        ci++, static_cast<std::int64_t>(m.size_min_val));
×
1672
                    builder.append_int64(
×
1673
                        ci++, static_cast<std::int64_t>(m.size_max_val));
×
1674
                } else {
1675
                    builder.append_null(ci++);
×
1676
                    builder.append_null(ci++);
×
1677
                }
1678
                builder.append_double(ci++,
×
1679
                                      m.count > 0 ? m.time_call_min_val : 0.0);
×
1680
                builder.append_double(ci++,
×
1681
                                      m.count > 0 ? m.time_call_max_val : 0.0);
×
1682
                if (m.has_size) {
×
1683
                    builder.append_int64(
×
1684
                        ci++, static_cast<std::int64_t>(m.size_call_min_val));
×
1685
                    builder.append_int64(
×
1686
                        ci++, static_cast<std::int64_t>(m.size_call_max_val));
×
1687
                } else {
1688
                    builder.append_null(ci++);
×
1689
                    builder.append_null(ci++);
×
1690
                }
1691
                builder.append_int64(
×
1692
                    ci++, m.has_time_bounds
×
1693
                              ? static_cast<std::int64_t>(m.time_start_val)
×
1694
                              : 0);
1695
                builder.append_int64(
×
1696
                    ci++, m.has_time_bounds
×
1697
                              ? static_cast<std::int64_t>(m.time_end_val)
×
1698
                              : 0);
1699
                builder.end_row();
×
1700
                ++count;
×
1701
                if (static_cast<Py_ssize_t>(count) >= input.batch_size) {
×
1702
                    flush_builder(builder, count, results);
×
1703
                }
1704
            }
1705
            flush_builder(builder, count, results);
×
1706
        };
×
1707
        if (use_events)
×
1708
            flush_coarse(event_coarse, event_builder, event_count,
×
1709
                         output.events);
×
1710
        if (use_profiles)
×
1711
            flush_coarse(profile_coarse, profile_builder, profile_count,
×
1712
                         output.profiles);
×
1713
        if (use_system)
×
1714
            flush_coarse(system_coarse, system_builder, system_count,
×
1715
                         output.system);
×
1716
    } else {
1717
        if (use_events)
84!
1718
            flush_builder(event_builder, event_count, output.events);
84!
1719
        if (use_profiles)
83✔
1720
            flush_builder(profile_builder, profile_count, output.profiles);
70!
1721
        if (use_system)
83✔
1722
            flush_builder(system_builder, system_count, output.system);
70!
1723
    }
1724

1725
    return output;
130✔
1726
}
88!
1727

1728
// Two-pass scan over SYSTEM_METRICS CF: pass 1 discovers metric column names
1729
// (dynamic per workload), pass 2 emits rows. Needed because RecordBatchBuilder
1730
// requires the schema up front.
1731
std::vector<ArrowExportResult> scan_system_metrics_buffer(
20✔
1732
    const EventAggregator* agg, const DfanalyzerContext* ctx,
1733
    Py_ssize_t batch_size) {
1734
    std::vector<ArrowExportResult> results;
20✔
1735
    if (!agg) return results;
20✔
1736

1737
    std::vector<std::string> metric_names_ordered;
20✔
1738
    std::unordered_set<std::string> metric_name_seen;
20✔
1739
    agg->scan_system_metrics_raw(
30!
1740
        [&](std::string_view, std::string_view val_bytes) -> bool {
10✔
1741
            auto m = deserialize_system_value(val_bytes);
×
1742
            if (m.metrics) {
×
1743
                for (const auto& [name, _] : *m.metrics) {
×
1744
                    if (metric_name_seen.insert(name).second) {
×
1745
                        metric_names_ordered.push_back(name);
×
1746
                    }
1747
                }
1748
            }
1749
            return true;
1750
        });
×
1751

1752
    if (metric_names_ordered.empty()) return results;
20✔
1753

1754
    // SystemAggregationMetrics::metrics is an unordered_map; sort the
1755
    // discovered column names so the emitted Arrow schema is deterministic
1756
    // across runs and builds.
1757
    std::sort(metric_names_ordered.begin(), metric_names_ordered.end());
×
1758

1759
    std::vector<ColumnSpec> schema;
×
1760
    schema.reserve(6 + metric_names_ordered.size());
×
1761
    schema.push_back({"host_hash", ColumnType::DICT_STRING});
×
1762
    schema.push_back({"name", ColumnType::DICT_STRING});
×
1763
    schema.push_back({"time_bucket", ColumnType::INT64});
×
1764
    schema.push_back({"ts", ColumnType::INT64});
×
1765
    schema.push_back({"te", ColumnType::INT64});
×
1766
    schema.push_back({"count", ColumnType::INT64});
×
1767
    for (const auto& mn : metric_names_ordered) {
×
1768
        schema.push_back({mn, ColumnType::DOUBLE});
×
1769
    }
1770

1771
    RecordBatchBuilder builder;
×
1772
    builder.declare_schema(schema);
×
1773
    builder.reserve(static_cast<std::size_t>(batch_size));
×
1774

1775
    auto flush = [&](std::size_t& row_count) {
×
1776
        if (row_count == 0) return;
×
1777
        auto arrow = builder.finish();
×
1778
        if (arrow.valid()) results.push_back(std::move(arrow));
×
1779
        builder.reset(true);
×
1780
        builder.reserve(static_cast<std::size_t>(batch_size));
×
1781
        row_count = 0;
×
1782
    };
×
1783

1784
    std::size_t row_count = 0;
×
1785
    const std::size_t n_metric_cols = metric_names_ordered.size();
×
1786

1787
    agg->scan_system_metrics_raw(
×
1788
        [&](std::string_view key_bytes, std::string_view val_bytes) -> bool {
×
1789
            auto k = deserialize_system_key(key_bytes);
×
1790
            auto m = deserialize_system_value(val_bytes);
×
1791

1792
            std::size_t ci = 0;
×
1793
            builder.append_dict_string(ci++, k.key.hhash);
×
1794
            builder.append_dict_string(ci++, k.key.name);
×
1795
            builder.append_int64(ci++,
×
1796
                                 static_cast<std::int64_t>(k.key.time_bucket));
×
1797
            builder.append_int64(ci++, static_cast<std::int64_t>(m.ts));
×
1798
            builder.append_int64(ci++, static_cast<std::int64_t>(m.te));
×
1799
            builder.append_int64(ci++, static_cast<std::int64_t>(m.count));
×
1800

1801
            for (std::size_t i = 0; i < n_metric_cols; ++i) {
×
1802
                const auto& mn = metric_names_ordered[i];
×
1803
                bool present = false;
×
1804
                if (m.metrics) {
×
1805
                    auto it = m.metrics->find(mn);
×
1806
                    if (it != m.metrics->end()) {
×
1807
                        builder.append_double(ci++, it->second.mean);
×
1808
                        present = true;
×
1809
                    }
1810
                }
1811
                if (!present) builder.append_null(ci++);
×
1812
            }
1813
            builder.end_row();
×
1814
            row_count++;
×
1815
            if (static_cast<Py_ssize_t>(row_count) >= batch_size) {
×
1816
                flush(row_count);
×
1817
            }
1818
            return true;
1819
        });
×
1820
    flush(row_count);
×
1821

1822
    (void)ctx;
1823
    return results;
×
1824
}
20!
1825

1826
}  // namespace
1827

1828
static PyObject* Indexer_iter_aggregation(IndexerObject* self, PyObject* args,
×
1829
                                          PyObject* kwds) {
1830
    static const char* kwlist[] = {"type", "batch_size", nullptr};
1831
    const char* type_str = "events";
×
1832
    Py_ssize_t batch_size = 10000;
×
1833

1834
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|sn", (char**)kwlist,
×
1835
                                     &type_str, &batch_size)) {
1836
        return nullptr;
×
1837
    }
1838

1839
    AggMapType target_type;
1840
    if (!parse_agg_type_str(type_str, target_type)) return nullptr;
×
1841

1842
    AggregationBatchType batch_type;
1843
    if (target_type == AggMapType::EVENT)
×
1844
        batch_type = AggregationBatchType::EVENT;
×
1845
    else if (target_type == AggMapType::PROFILE)
×
1846
        batch_type = AggregationBatchType::PROFILE;
×
1847
    else
1848
        batch_type = AggregationBatchType::SYSTEM;
×
1849

1850
    auto idx_opt = resolve_index_path(self);
×
1851
    if (!idx_opt) return nullptr;
×
1852
    std::string index_path = std::move(*idx_opt);
×
1853

1854
    PyObject* batch_list = PyList_New(0);
×
1855
    if (!batch_list) return nullptr;
×
1856

1857
    std::string error_msg;
×
1858
    std::vector<dftracer::utils::utilities::common::arrow::ArrowExportResult>
1859
        results;
×
1860

1861
    Py_BEGIN_ALLOW_THREADS try {
×
1862
        auto handle = open_agg_db(index_path, error_msg);
×
1863
        if (handle) {
×
1864
            Runtime* rt = get_batch_indexer_runtime(self);
×
1865
            std::vector<AggScanOutput> outputs;
×
1866
            parallel_shard_scan<AggScanOutput>(
×
1867
                rt,
1868
                [&](std::uint16_t shard_begin, std::uint16_t shard_end) {
×
1869
                    AggScanInput input;
1870
                    input.agg = handle->agg.get();
×
1871
                    input.target_type = target_type;
×
1872
                    input.batch_type = batch_type;
×
1873
                    input.batch_size = batch_size;
×
1874
                    input.shard_begin = shard_begin;
×
1875
                    input.shard_end = shard_end;
×
1876
                    return scan_aggregation_shard_range(input);
×
1877
                },
1878
                outputs);
1879

1880
            for (auto& out : outputs) {
×
1881
                for (auto& r : out.results) {
×
1882
                    results.push_back(std::move(r));
×
1883
                }
1884
            }
1885
        }
×
1886
    } catch (const std::exception& e) {
×
1887
        error_msg = e.what();
×
1888
    }
×
1889
    Py_END_ALLOW_THREADS
×
1890

1891
        if (!error_msg.empty()) {
×
1892
        Py_DECREF(batch_list);
×
1893
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
1894
        return nullptr;
×
1895
    }
1896

1897
    append_results_to_list(batch_list, results);
×
1898

1899
    PyObject* iter = PyObject_GetIter(batch_list);
×
1900
    Py_DECREF(batch_list);
×
1901
    return iter;
×
1902
}
×
1903

1904
static PyObject* Indexer_iter_arrow_dfanalyzer(IndexerObject* self,
4✔
1905
                                               PyObject* args, PyObject* kwds) {
1906
    static const char* kwlist[] = {
1907
        "type",  "batch_size", "time_granularity", "time_resolution",
1908
        "query", nullptr};
1909
    const char* type_str = "events";
4✔
1910
    Py_ssize_t batch_size = 10000;
4✔
1911
    double time_granularity = 1.0;
4✔
1912
    double time_resolution = 1000000.0;
4✔
1913
    const char* query_str = nullptr;
4✔
1914

1915
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|snddz", (char**)kwlist,
4!
1916
                                     &type_str, &batch_size, &time_granularity,
1917
                                     &time_resolution, &query_str)) {
1918
        return nullptr;
×
1919
    }
1920

1921
    AggMapType target_type;
1922
    if (!parse_agg_type_str(type_str, target_type)) return nullptr;
4!
1923

1924
    auto query_opt = parse_query_arg(query_str);
4!
1925
    if (!query_opt && PyErr_Occurred()) return nullptr;
4!
1926

1927
    auto idx_opt = resolve_index_path(self);
4!
1928
    if (!idx_opt) return nullptr;
4✔
1929
    std::string index_path = std::move(*idx_opt);
4✔
1930

1931
    PyObject* batch_list = PyList_New(0);
4!
1932
    if (!batch_list) return nullptr;
4✔
1933

1934
    std::string error_msg;
4✔
1935
    std::vector<ArrowExportResult> results;
4✔
1936

1937
    Py_BEGIN_ALLOW_THREADS try {
4!
1938
        auto handle = open_agg_db(index_path, error_msg);
4!
1939
        if (handle) {
4✔
1940
            dftracer::utils::utilities::indexer::IndexDatabase idx_db(
2!
1941
                index_path,
1942
                dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
2!
1943
            auto file_hashes =
1944
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
2!
1945
                                            IndexDatabase::HashType::FILE);
2!
1946
            auto host_hashes =
1947
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
2!
1948
                                            IndexDatabase::HashType::HOST);
2!
1949

1950
            auto time_bounds = handle->agg->query_time_bounds();
4!
1951
            std::uint64_t time_origin =
4✔
1952
                time_bounds.valid ? time_bounds.min_time_bucket : 0;
4!
1953

1954
            DfanalyzerContext ctx;
4✔
1955
            ctx.file_hashes = &file_hashes;
4✔
1956
            ctx.host_hashes = &host_hashes;
4✔
1957
            ctx.query_filter = query_opt ? &*query_opt : nullptr;
4!
1958
            ctx.time_origin = time_origin;
4✔
1959
            ctx.time_resolution = time_resolution;
4✔
1960
            ctx.time_granularity = time_granularity;
4✔
1961

1962
            Runtime* rt = get_batch_indexer_runtime(self);
4!
1963
            std::vector<DfanalyzerScanOutput> outputs;
4✔
1964
            parallel_shard_scan<DfanalyzerScanOutput>(
4!
1965
                rt,
2✔
1966
                [&](std::uint16_t shard_begin, std::uint16_t shard_end) {
14✔
1967
                    DfanalyzerScanInput input;
12✔
1968
                    input.agg = handle->agg.get();
14✔
1969
                    input.ctx = &ctx;
12✔
1970
                    input.type_filter = target_type;
12✔
1971
                    input.batch_size = batch_size;
12✔
1972
                    input.shard_begin = shard_begin;
12✔
1973
                    input.shard_end = shard_end;
12✔
1974
                    return scan_dfanalyzer_shards(input);
20!
1975
                },
1976
                outputs);
1977

1978
            for (auto& out : outputs) {
18✔
1979
                for (auto& r : out.events) results.push_back(std::move(r));
28✔
1980
                for (auto& r : out.profiles) results.push_back(std::move(r));
14!
1981
                for (auto& r : out.system) results.push_back(std::move(r));
14!
1982
            }
1983
        }
4✔
1984
    } catch (const std::exception& e) {
4!
1985
        error_msg = e.what();
×
1986
    }
×
1987
    Py_END_ALLOW_THREADS
4!
1988

1989
        if (!error_msg.empty()) {
4!
1990
        Py_DECREF(batch_list);
×
1991
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
1992
        return nullptr;
×
1993
    }
1994

1995
    append_results_to_list(batch_list, results);
4!
1996

1997
    PyObject* iter = PyObject_GetIter(batch_list);
4!
1998
    Py_DECREF(batch_list);
2!
1999
    return iter;
4✔
2000
}
4✔
2001

2002
static bool parse_group_by_arg(PyObject* obj, GroupByConfig& out) {
20✔
2003
    if (!obj || obj == Py_None) return true;
20!
2004
    if (!PySequence_Check(obj)) {
×
2005
        PyErr_SetString(PyExc_TypeError,
×
2006
                        "group_by must be a sequence of strings or None");
2007
        return false;
×
2008
    }
2009
    Py_ssize_t n = PySequence_Length(obj);
×
2010
    for (Py_ssize_t i = 0; i < n; ++i) {
×
2011
        PyObject* item = PySequence_GetItem(obj, i);
×
2012
        if (!item) return false;
×
2013
        if (!PyUnicode_Check(item)) {
×
2014
            Py_DECREF(item);
2015
            PyErr_SetString(PyExc_TypeError,
×
2016
                            "group_by entries must be strings");
2017
            return false;
×
2018
        }
2019
        Py_ssize_t sz = 0;
×
2020
        const char* s = PyUnicode_AsUTF8AndSize(item, &sz);
×
2021
        if (!s) {
×
2022
            Py_DECREF(item);
2023
            return false;
×
2024
        }
2025
        std::string_view sv(s, static_cast<std::size_t>(sz));
×
2026
        auto field = parse_group_by_name(sv);
×
2027
        if (!field) {
×
2028
            std::string msg = "unsupported group_by field: ";
×
2029
            msg.append(sv);
×
2030
            Py_DECREF(item);
×
2031
            PyErr_SetString(PyExc_ValueError, msg.c_str());
×
2032
            return false;
×
2033
        }
×
2034
        if (!(out.mask & *field)) {
×
2035
            out.mask |= *field;
×
2036
            out.order.push_back(*field);
×
2037
            out.names.emplace_back(sv);
×
2038
        }
2039
        Py_DECREF(item);
2040
    }
2041
    return true;
×
2042
}
10✔
2043

2044
static PyObject* Indexer_iter_arrow_dfanalyzer_all(IndexerObject* self,
22✔
2045
                                                   PyObject* args,
2046
                                                   PyObject* kwds) {
2047
    static const char* kwlist[] = {"batch_size",      "time_granularity",
2048
                                   "time_resolution", "query",
2049
                                   "group_by",        nullptr};
2050
    Py_ssize_t batch_size = 10000;
22✔
2051
    double time_granularity = 1.0;
22✔
2052
    double time_resolution = 1000000.0;
22✔
2053
    const char* query_str = nullptr;
22✔
2054
    PyObject* group_by_obj = nullptr;
22✔
2055

2056
    if (!PyArg_ParseTupleAndKeywords(
22!
2057
            args, kwds, "|nddzO", (char**)kwlist, &batch_size,
11✔
2058
            &time_granularity, &time_resolution, &query_str, &group_by_obj)) {
2059
        return nullptr;
×
2060
    }
2061

2062
    auto query_opt = parse_query_arg(query_str);
22!
2063
    if (!query_opt && PyErr_Occurred()) return nullptr;
22!
2064

2065
    GroupByConfig group_by_cfg;
20✔
2066
    if (!parse_group_by_arg(group_by_obj, group_by_cfg)) return nullptr;
20!
2067
    const GroupByConfig* group_by_ptr =
20✔
2068
        group_by_cfg.mask != 0 ? &group_by_cfg : nullptr;
20!
2069

2070
    auto idx_opt = resolve_index_path(self);
20!
2071
    if (!idx_opt) return nullptr;
20✔
2072
    std::string index_path = std::move(*idx_opt);
20✔
2073

2074
    PyObject* result_dict = PyDict_New();
20!
2075
    if (!result_dict) return nullptr;
20✔
2076

2077
    PyObject* events_list = PyList_New(0);
20!
2078
    PyObject* profiles_list = PyList_New(0);
20!
2079
    PyObject* system_list = PyList_New(0);
20!
2080
    if (!events_list || !profiles_list || !system_list) {
20!
2081
        Py_XDECREF(events_list);
×
2082
        Py_XDECREF(profiles_list);
×
2083
        Py_XDECREF(system_list);
×
2084
        Py_DECREF(result_dict);
×
2085
        return nullptr;
×
2086
    }
2087

2088
    std::string error_msg;
20✔
2089
    std::vector<ArrowExportResult> events_results, profiles_results,
20✔
2090
        system_results;
20✔
2091

2092
    Py_BEGIN_ALLOW_THREADS try {
20!
2093
        auto handle = open_agg_db(index_path, error_msg);
20!
2094
        if (handle) {
20✔
2095
            dftracer::utils::utilities::indexer::IndexDatabase idx_db(
10!
2096
                index_path,
2097
                dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
10!
2098
            auto file_hashes =
2099
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
10!
2100
                                            IndexDatabase::HashType::FILE);
10!
2101
            auto host_hashes =
2102
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
10!
2103
                                            IndexDatabase::HashType::HOST);
10!
2104

2105
            auto time_bounds = handle->agg->query_time_bounds();
20!
2106
            std::uint64_t time_origin =
20✔
2107
                time_bounds.valid ? time_bounds.min_time_bucket : 0;
20!
2108

2109
            DfanalyzerContext ctx;
20✔
2110
            ctx.file_hashes = &file_hashes;
20✔
2111
            ctx.host_hashes = &host_hashes;
20✔
2112
            ctx.query_filter = query_opt ? &*query_opt : nullptr;
20✔
2113
            ctx.time_origin = time_origin;
20✔
2114
            ctx.time_resolution = time_resolution;
20✔
2115
            ctx.time_granularity = time_granularity;
20✔
2116

2117
            Runtime* rt = get_batch_indexer_runtime(self);
20!
2118
            std::vector<DfanalyzerScanOutput> outputs;
20✔
2119
            parallel_shard_scan<DfanalyzerScanOutput>(
20!
2120
                rt,
10✔
2121
                [&](std::uint16_t shard_begin, std::uint16_t shard_end) {
72✔
2122
                    DfanalyzerScanInput input;
62✔
2123
                    input.agg = handle->agg.get();
66✔
2124
                    input.ctx = &ctx;
66✔
2125
                    input.type_filter = std::nullopt;
66✔
2126
                    input.batch_size = batch_size;
65✔
2127
                    input.shard_begin = shard_begin;
65✔
2128
                    input.shard_end = shard_end;
65✔
2129
                    input.group_by = group_by_ptr;
65✔
2130
                    return scan_dfanalyzer_shards(input);
105!
2131
                },
2132
                outputs);
2133

2134
            for (auto& out : outputs) {
90✔
2135
                for (auto& r : out.events)
133✔
2136
                    events_results.push_back(std::move(r));
63!
2137
                for (auto& r : out.profiles)
70✔
2138
                    profiles_results.push_back(std::move(r));
×
2139
                for (auto& r : out.system)
70✔
2140
                    system_results.push_back(std::move(r));
×
2141
            }
2142

2143
            auto sys_buf =
2144
                scan_system_metrics_buffer(handle->agg.get(), &ctx, batch_size);
20!
2145
            for (auto& r : sys_buf) system_results.push_back(std::move(r));
20!
2146
        }
20✔
2147
    } catch (const std::exception& e) {
20!
2148
        error_msg = e.what();
×
2149
    }
×
2150
    Py_END_ALLOW_THREADS
20!
2151

2152
        if (!error_msg.empty()) {
20!
2153
        Py_DECREF(events_list);
×
2154
        Py_DECREF(profiles_list);
×
2155
        Py_DECREF(system_list);
×
2156
        Py_DECREF(result_dict);
×
2157
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
2158
        return nullptr;
×
2159
    }
2160

2161
    append_results_to_list(events_list, events_results);
20!
2162
    append_results_to_list(profiles_list, profiles_results);
20!
2163
    append_results_to_list(system_list, system_results);
20!
2164

2165
    PyDict_SetItemString(result_dict, "events", events_list);
20!
2166
    PyDict_SetItemString(result_dict, "profiles", profiles_list);
20!
2167
    PyDict_SetItemString(result_dict, "system", system_list);
20!
2168
    Py_DECREF(events_list);
10!
2169
    Py_DECREF(profiles_list);
10!
2170
    Py_DECREF(system_list);
10!
2171

2172
    return result_dict;
20✔
2173
}
22✔
2174

2175
// ---------------------------------------------------------------------------
2176
// scan_aggregation_manifest — module-level entry point for analyze_trace.
2177
//
2178
// Each Dask worker calls this with its slice of the agg manifest
2179
// (agg_ssts + sys_ssts) and optionally a [shard_begin, shard_end) range.
2180
// The function opens a scratch IndexDatabase at `scratch_dir`, ingests the
2181
// SSTs into its AGGREGATION/SYSTEM_METRICS CFs (nearly free when SSTs live
2182
// on the same filesystem as `scratch_dir` — RocksDB hard-links them), then
2183
// runs the same parallel shard scan that `iter_arrow_dfanalyzer_all` uses.
2184
//
2185
// AGG_GLOBAL_CONFIG_KEY is not written by worker SSTs, so we construct the
2186
// EventAggregator with config_hash=0 directly instead of going through
2187
// `open_agg_db` (which requires the config key). The config hash is used
2188
// by the aggregator only for write-time validation, not for reads.
2189
//
2190
// The scratch DB is NOT cleaned up here — the Python caller owns
2191
// `scratch_dir` lifetime and should remove it after gathering results.
2192
// ---------------------------------------------------------------------------
2193

2194
static bool collect_string_list(PyObject* obj, const char* name,
×
2195
                                std::vector<std::string>& out) {
2196
    if (!obj || obj == Py_None) return true;
×
2197
    PyObject* seq = PySequence_Fast(obj, name);
×
2198
    if (!seq) return false;
×
2199
    Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
2200
    out.reserve(static_cast<std::size_t>(n));
×
2201
    for (Py_ssize_t i = 0; i < n; ++i) {
×
2202
        PyObject* item = PySequence_Fast_GET_ITEM(seq, i);
×
2203
        if (!PyUnicode_Check(item)) {
×
2204
            Py_DECREF(seq);
2205
            PyErr_Format(PyExc_TypeError, "%s items must be str", name);
×
2206
            return false;
×
2207
        }
2208
        const char* s = PyUnicode_AsUTF8(item);
×
2209
        if (!s) {
×
2210
            Py_DECREF(seq);
2211
            return false;
×
2212
        }
2213
        out.emplace_back(s);
×
2214
    }
2215
    Py_DECREF(seq);
2216
    return true;
×
2217
}
2218

2219
static bool collect_string_string_dict(
×
2220
    PyObject* obj, const char* name,
2221
    std::unordered_map<std::string, std::string>& out) {
2222
    if (!obj || obj == Py_None) return true;
×
2223
    if (!PyDict_Check(obj)) {
×
2224
        PyErr_Format(PyExc_TypeError, "%s must be a dict[str, str] or None",
×
2225
                     name);
2226
        return false;
×
2227
    }
2228
    PyObject *k, *v;
2229
    Py_ssize_t pos = 0;
×
2230
    while (PyDict_Next(obj, &pos, &k, &v)) {
×
2231
        if (!PyUnicode_Check(k) || !PyUnicode_Check(v)) {
×
2232
            PyErr_Format(PyExc_TypeError, "%s must map str -> str", name);
×
2233
            return false;
×
2234
        }
2235
        const char* ks = PyUnicode_AsUTF8(k);
×
2236
        const char* vs = PyUnicode_AsUTF8(v);
×
2237
        if (!ks || !vs) return false;
×
2238
        out.emplace(ks, vs);
×
2239
    }
2240
    return true;
×
2241
}
2242

2243
static PyObject* scan_aggregation_manifest_fn(PyObject* /*self*/,
×
2244
                                              PyObject* args, PyObject* kwds) {
2245
    static const char* kwlist[] = {
2246
        "agg_ssts",        "sys_ssts",    "scratch_dir",
2247
        "meta_index_path", "batch_size",  "time_granularity",
2248
        "time_resolution", "query",       "group_by",
2249
        "shard_begin",     "shard_end",   "runtime",
2250
        "file_hashes",     "host_hashes", nullptr};
2251

2252
    PyObject* agg_ssts_obj = nullptr;
×
2253
    PyObject* sys_ssts_obj = nullptr;
×
2254
    const char* scratch_dir = nullptr;
×
2255
    const char* meta_index_path = nullptr;
×
2256
    Py_ssize_t batch_size = 10000;
×
2257
    double time_granularity = 1.0;
×
2258
    double time_resolution = 1000000.0;
×
2259
    const char* query_str = nullptr;
×
2260
    PyObject* group_by_obj = nullptr;
×
2261
    int shard_begin_i = 0;
×
2262
    int shard_end_i = DFT_NUM_SHARDS;
×
2263
    PyObject* runtime_obj = nullptr;
×
2264
    PyObject* file_hashes_obj = nullptr;
×
2265
    PyObject* host_hashes_obj = nullptr;
×
2266

2267
    if (!PyArg_ParseTupleAndKeywords(
×
2268
            args, kwds, "OOss|nddzOiiOOO", (char**)kwlist, &agg_ssts_obj,
2269
            &sys_ssts_obj, &scratch_dir, &meta_index_path, &batch_size,
2270
            &time_granularity, &time_resolution, &query_str, &group_by_obj,
2271
            &shard_begin_i, &shard_end_i, &runtime_obj, &file_hashes_obj,
2272
            &host_hashes_obj)) {
2273
        return nullptr;
×
2274
    }
2275

2276
    if (shard_begin_i < 0 || shard_end_i > DFT_NUM_SHARDS ||
×
2277
        shard_begin_i >= shard_end_i) {
×
2278
        PyErr_Format(PyExc_ValueError,
×
2279
                     "shard range [%d, %d) invalid (must be within [0, %d))",
2280
                     shard_begin_i, shard_end_i, (int)DFT_NUM_SHARDS);
2281
        return nullptr;
×
2282
    }
2283

2284
    std::vector<std::string> agg_ssts;
×
2285
    std::vector<std::string> sys_ssts;
×
2286
    if (!collect_string_list(agg_ssts_obj, "agg_ssts", agg_ssts))
×
2287
        return nullptr;
×
2288
    if (!collect_string_list(sys_ssts_obj, "sys_ssts", sys_ssts))
×
2289
        return nullptr;
×
2290

2291
    std::unordered_map<std::string, std::string> preloaded_file_hashes;
×
2292
    std::unordered_map<std::string, std::string> preloaded_host_hashes;
×
2293
    const bool hashes_preloaded =
×
2294
        (file_hashes_obj && file_hashes_obj != Py_None) ||
×
2295
        (host_hashes_obj && host_hashes_obj != Py_None);
×
2296
    if (!collect_string_string_dict(file_hashes_obj, "file_hashes",
×
2297
                                    preloaded_file_hashes))
2298
        return nullptr;
×
2299
    if (!collect_string_string_dict(host_hashes_obj, "host_hashes",
×
2300
                                    preloaded_host_hashes))
2301
        return nullptr;
×
2302

2303
    auto query_opt = parse_query_arg(query_str);
×
2304
    if (!query_opt && PyErr_Occurred()) return nullptr;
×
2305

2306
    GroupByConfig group_by_cfg;
×
2307
    if (!parse_group_by_arg(group_by_obj, group_by_cfg)) return nullptr;
×
2308
    const GroupByConfig* group_by_ptr =
×
2309
        group_by_cfg.mask != 0 ? &group_by_cfg : nullptr;
×
2310

2311
    Runtime* rt = nullptr;
×
2312
    if (runtime_obj && runtime_obj != Py_None) {
×
2313
        if (!PyObject_TypeCheck(runtime_obj, &RuntimeType)) {
×
2314
            PyErr_SetString(PyExc_TypeError,
×
2315
                            "runtime must be a Runtime instance or None");
2316
            return nullptr;
×
2317
        }
2318
        rt = ((RuntimeObject*)runtime_obj)->runtime.get();
×
2319
    } else {
2320
        rt = get_default_runtime();
×
2321
    }
2322

2323
    PyObject* result_dict = PyDict_New();
×
2324
    if (!result_dict) return nullptr;
×
2325
    PyObject* events_list = PyList_New(0);
×
2326
    PyObject* profiles_list = PyList_New(0);
×
2327
    PyObject* system_list = PyList_New(0);
×
2328
    if (!events_list || !profiles_list || !system_list) {
×
2329
        Py_XDECREF(events_list);
×
2330
        Py_XDECREF(profiles_list);
×
2331
        Py_XDECREF(system_list);
×
2332
        Py_DECREF(result_dict);
×
2333
        return nullptr;
×
2334
    }
2335

2336
    std::string error_msg;
×
2337
    std::vector<ArrowExportResult> events_results, profiles_results,
×
2338
        system_results;
×
2339
    std::string scratch_index_path = std::string(scratch_dir) + "/.dftindex";
×
2340
    std::string meta_index_path_str(meta_index_path);
×
2341

2342
    Py_BEGIN_ALLOW_THREADS try {
×
2343
        namespace rcf = dftracer::utils::rocksdb::cf;
2344
        using clock = std::chrono::steady_clock;
2345
        auto ms = [](clock::time_point a, clock::time_point b) -> long long {
×
2346
            return std::chrono::duration_cast<std::chrono::milliseconds>(b - a)
×
2347
                .count();
×
2348
        };
2349

2350
        auto t_start = clock::now();
×
2351
        dftracer::utils::utilities::indexer::IndexDatabase scratch_db(
×
2352
            scratch_index_path);
×
2353
        auto t_scratch_open = clock::now();
×
2354

2355
        auto raw_db = scratch_db.db();
×
2356
        for (const auto& p : agg_ssts) {
×
2357
            auto st = raw_db->ingest_external_files(rcf::AGGREGATION, {p},
×
2358
                                                    /*ingest_behind=*/false);
×
2359
            if (!st.ok()) {
×
2360
                error_msg =
2361
                    "ingest AGGREGATION sst '" + p + "': " + st.ToString();
×
2362
                break;
×
2363
            }
2364
        }
×
2365
        if (error_msg.empty()) {
×
2366
            for (const auto& p : sys_ssts) {
×
2367
                auto st = raw_db->ingest_external_files(
×
2368
                    rcf::SYSTEM_METRICS, {p}, /*ingest_behind=*/false);
×
2369
                if (!st.ok()) {
×
2370
                    error_msg = "ingest SYSTEM_METRICS sst '" + p +
×
2371
                                "': " + st.ToString();
×
2372
                    break;
×
2373
                }
2374
            }
×
2375
        }
2376
        auto t_ingest = clock::now();
×
2377

2378
        if (error_msg.empty()) {
×
2379
            auto agg =
2380
                std::make_unique<EventAggregator>(raw_db, /*cfg_hash=*/0);
×
2381

2382
            // If the caller passed pre-loaded hash tables, skip opening
2383
            // the meta DB on lustre. When many dask workers run
2384
            // scan_aggregation_manifest in parallel, loading the hash
2385
            // tables N times from the same file is significant lustre
2386
            // metadata pressure; loading once on the coordinator and
2387
            // passing them in eliminates the redundant reads.
2388
            std::unordered_map<std::string, std::string> loaded_file_hashes;
×
2389
            std::unordered_map<std::string, std::string> loaded_host_hashes;
×
2390
            std::unique_ptr<dftracer::utils::utilities::indexer::IndexDatabase>
2391
                meta_db;
×
2392
            if (!hashes_preloaded) {
×
2393
                meta_db = std::make_unique<
×
2394
                    dftracer::utils::utilities::indexer::IndexDatabase>(
2395
                    meta_index_path_str, dftracer::utils::rocksdb::
2396
                                             RocksDatabase::OpenMode::ReadOnly);
×
2397
                loaded_file_hashes = meta_db->query_hash_table(
×
2398
                    dftracer::utils::utilities::indexer::IndexDatabase::
2399
                        HashType::FILE);
2400
                loaded_host_hashes = meta_db->query_hash_table(
×
2401
                    dftracer::utils::utilities::indexer::IndexDatabase::
2402
                        HashType::HOST);
2403
            }
2404
            const auto& file_hashes =
×
2405
                hashes_preloaded ? preloaded_file_hashes : loaded_file_hashes;
×
2406
            const auto& host_hashes =
×
2407
                hashes_preloaded ? preloaded_host_hashes : loaded_host_hashes;
×
2408
            auto t_hash_tables = clock::now();
×
2409

2410
            auto time_bounds = agg->query_time_bounds();
×
2411
            std::uint64_t time_origin =
×
2412
                time_bounds.valid ? time_bounds.min_time_bucket : 0;
×
2413

2414
            DfanalyzerContext ctx;
×
2415
            ctx.file_hashes = &file_hashes;
×
2416
            ctx.host_hashes = &host_hashes;
×
2417
            ctx.query_filter = query_opt ? &*query_opt : nullptr;
×
2418
            ctx.time_origin = time_origin;
×
2419
            ctx.time_resolution = time_resolution;
×
2420
            ctx.time_granularity = time_granularity;
×
2421

2422
            std::vector<DfanalyzerScanOutput> outputs;
×
2423
            parallel_shard_scan_range<DfanalyzerScanOutput>(
×
2424
                rt, static_cast<std::uint16_t>(shard_begin_i),
2425
                static_cast<std::uint16_t>(shard_end_i),
2426
                [&](std::uint16_t sb, std::uint16_t se) {
×
2427
                    DfanalyzerScanInput input;
×
2428
                    input.agg = agg.get();
×
2429
                    input.ctx = &ctx;
×
2430
                    input.type_filter = std::nullopt;
×
2431
                    input.batch_size = batch_size;
×
2432
                    input.shard_begin = sb;
×
2433
                    input.shard_end = se;
×
2434
                    input.group_by = group_by_ptr;
×
2435
                    return scan_dfanalyzer_shards(input);
×
2436
                },
2437
                outputs);
2438
            auto t_scan = clock::now();
×
2439

2440
            for (auto& out : outputs) {
×
2441
                for (auto& r : out.events)
×
2442
                    events_results.push_back(std::move(r));
×
2443
                for (auto& r : out.profiles)
×
2444
                    profiles_results.push_back(std::move(r));
×
2445
                for (auto& r : out.system)
×
2446
                    system_results.push_back(std::move(r));
×
2447
            }
2448

2449
            std::fprintf(
×
2450
                stderr,
2451
                "[scan_aggregation_manifest] n_agg=%zu n_sys=%zu "
2452
                "scratch_open=%lldms ingest=%lldms hash_tables=%lldms "
2453
                "scan=%lldms\n",
2454
                agg_ssts.size(), sys_ssts.size(), ms(t_start, t_scratch_open),
×
2455
                ms(t_scratch_open, t_ingest), ms(t_ingest, t_hash_tables),
×
2456
                ms(t_hash_tables, t_scan));
×
2457
            std::fflush(stderr);
×
2458
        }
×
2459
    } catch (const std::exception& e) {
×
2460
        error_msg = e.what();
×
2461
    }
×
2462
    Py_END_ALLOW_THREADS
×
2463

2464
        if (!error_msg.empty()) {
×
2465
        Py_DECREF(events_list);
×
2466
        Py_DECREF(profiles_list);
×
2467
        Py_DECREF(system_list);
×
2468
        Py_DECREF(result_dict);
×
2469
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
2470
        return nullptr;
×
2471
    }
2472

2473
    append_results_to_list(events_list, events_results);
×
2474
    append_results_to_list(profiles_list, profiles_results);
×
2475
    append_results_to_list(system_list, system_results);
×
2476

2477
    PyDict_SetItemString(result_dict, "events", events_list);
×
2478
    PyDict_SetItemString(result_dict, "profiles", profiles_list);
×
2479
    PyDict_SetItemString(result_dict, "system", system_list);
×
2480
    Py_DECREF(events_list);
×
2481
    Py_DECREF(profiles_list);
×
2482
    Py_DECREF(system_list);
×
2483

2484
    return result_dict;
×
2485
}
×
2486

2487
static PyMethodDef BatchIndexerModuleMethods[] = {
2488
    {"scan_aggregation_manifest", (PyCFunction)scan_aggregation_manifest_fn,
2489
     METH_VARARGS | METH_KEYWORDS,
2490
     "scan_aggregation_manifest(agg_ssts, sys_ssts, scratch_dir, "
2491
     "meta_index_path, batch_size=10000, time_granularity=1.0, "
2492
     "time_resolution=1e6, query=None, group_by=None, shard_begin=0, "
2493
     "shard_end=4096, runtime=None) -> dict\n"
2494
     "--\n\n"
2495
     "Scan a worker's slice of the distributed aggregation manifest.\n\n"
2496
     "Ingests agg_ssts + sys_ssts into a scratch IndexDatabase at "
2497
     "scratch_dir (caller owns the directory lifecycle) and runs the "
2498
     "dfanalyzer aggregation scan over [shard_begin, shard_end). "
2499
     "meta_index_path is the unified .dftindex used to resolve file / "
2500
     "host hashes. Returns the same dict shape as "
2501
     "Indexer.iter_arrow_dfanalyzer_all."},
2502
    {nullptr, nullptr, 0, nullptr}};
2503
#endif
2504

2505
static PyMethodDef Indexer_methods[] = {
2506
    {"get_checkpoint_indexer", (PyCFunction)Indexer_get_checkpoint_indexer,
2507
     METH_VARARGS,
2508
     "get_checkpoint_indexer(file_path)\n"
2509
     "--\n\n"
2510
     "Get a checkpoint indexer for a specific file.\n\n"
2511
     "Args:\n"
2512
     "    file_path: Path to the trace file (.pfw/.pfw.gz)\n\n"
2513
     "Returns:\n"
2514
     "    Indexer instance for checkpoint-level operations.\n"},
2515
    {"resolve", (PyCFunction)Indexer_resolve, METH_NOARGS,
2516
     "resolve()\n"
2517
     "--\n\n"
2518
     "Check what files exist vs need indexing.\n\n"
2519
     "Returns:\n"
2520
     "    dict with 'total_files', 'ready', 'needs_work', 'index_path'\n"},
2521
    {"build", (PyCFunction)Indexer_build, METH_NOARGS,
2522
     "build()\n"
2523
     "--\n\n"
2524
     "Build all missing index tiers based on require_* flags.\n"},
2525
    {"ensure_indexed", (PyCFunction)Indexer_ensure_indexed, METH_NOARGS,
2526
     "ensure_indexed()\n"
2527
     "--\n\n"
2528
     "Resolve and build if needed.\n\n"
2529
     "Returns:\n"
2530
     "    dict with index status after building.\n"},
2531
    {"get_hash_table", (PyCFunction)Indexer_get_hash_table, METH_VARARGS,
2532
     "get_hash_table(type)\n"
2533
     "--\n\n"
2534
     "Query hash table mappings.\n\n"
2535
     "Args:\n"
2536
     "    type: 'file', 'host', 'string', or 'proc'\n\n"
2537
     "Returns:\n"
2538
     "    dict mapping hash values to resolved names.\n"},
2539
    {"query_file_pids", (PyCFunction)Indexer_query_file_pids, METH_VARARGS,
2540
     "query_file_pids(file_id)\n"
2541
     "--\n\n"
2542
     "Query PIDs observed in a specific file.\n\n"
2543
     "Args:\n"
2544
     "    file_id: Integer file ID from index.\n\n"
2545
     "Returns:\n"
2546
     "    set of PIDs.\n"},
2547
    {"query_all_file_pids", (PyCFunction)Indexer_query_all_file_pids,
2548
     METH_NOARGS,
2549
     "query_all_file_pids()\n"
2550
     "--\n\n"
2551
     "Query PIDs for all indexed files.\n\n"
2552
     "Returns:\n"
2553
     "    dict mapping file_id to set of PIDs.\n"},
2554
    {"query_file_info", (PyCFunction)Indexer_query_file_info, METH_NOARGS,
2555
     "query_file_info()\n"
2556
     "--\n\n"
2557
     "Query file ID to path mapping and per-file PIDs in one call.\n\n"
2558
     "Returns:\n"
2559
     "    tuple of (dict[int, str], dict[int, set[int]]).\n"},
2560
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2561
    {"iter_aggregation", (PyCFunction)Indexer_iter_aggregation,
2562
     METH_VARARGS | METH_KEYWORDS,
2563
     "iter_aggregation(type='events', batch_size=10000)\n"
2564
     "--\n\n"
2565
     "Iterate over aggregation data as Arrow batches.\n\n"
2566
     "Args:\n"
2567
     "    type: 'events', 'profiles', or 'system'\n"
2568
     "    batch_size: Number of entries per batch (default 10000)\n\n"
2569
     "Returns:\n"
2570
     "    Iterator over Arrow batch capsules.\n"},
2571
    {"iter_arrow_dfanalyzer", (PyCFunction)Indexer_iter_arrow_dfanalyzer,
2572
     METH_VARARGS | METH_KEYWORDS,
2573
     "iter_arrow_dfanalyzer(type='events', batch_size=10000, "
2574
     "time_granularity=1.0, time_resolution=1e6, query=None)\n"
2575
     "--\n\n"
2576
     "Iterate over aggregation data as dfanalyzer-compatible Arrow batches.\n\n"
2577
     "Output schema matches dfanalyzer expectations with resolved hashes,\n"
2578
     "normalized time_range, and computed columns (proc_name, io_cat).\n\n"
2579
     "Args:\n"
2580
     "    type: 'events', 'profiles', or 'system'\n"
2581
     "    batch_size: Number of entries per batch (default 10000)\n"
2582
     "    time_granularity: Bucket width in seconds (default 1.0)\n"
2583
     "    time_resolution: Microseconds per output time unit (default 1e6)\n"
2584
     "    query: Optional query filter string (e.g., \"pid == 1234\")\n\n"
2585
     "Returns:\n"
2586
     "    Iterator over Arrow batch capsules.\n"},
2587
    {"iter_arrow_dfanalyzer_all",
2588
     (PyCFunction)Indexer_iter_arrow_dfanalyzer_all,
2589
     METH_VARARGS | METH_KEYWORDS,
2590
     "iter_arrow_dfanalyzer_all(batch_size=10000, time_granularity=1.0, "
2591
     "time_resolution=1e6, query=None, group_by=None)\n"
2592
     "--\n\n"
2593
     "Iterate over all aggregation types in a single scan.\n\n"
2594
     "Returns a dict with 'events', 'profiles', 'system' keys, each "
2595
     "containing\n"
2596
     "a list of Arrow batch capsules. This is ~3x faster than calling\n"
2597
     "iter_arrow_dfanalyzer separately for each type.\n\n"
2598
     "When group_by is provided, the scan collapses dimensions during "
2599
     "aggregation\n"
2600
     "and emits a reduced schema containing only the requested columns plus\n"
2601
     "aggregated metrics (count, time, size, time_sq, size_sq, time_min,\n"
2602
     "time_max, size_min, size_max, time_call_min, time_call_max, "
2603
     "size_call_min,\n"
2604
     "size_call_max, time_start, time_end). Supported group_by columns: "
2605
     "cat,\n"
2606
     "func_name, pid, tid, file_hash, host_hash, file_name, host_name, "
2607
     "proc_name,\n"
2608
     "io_cat, acc_pat, time_range.\n\n"
2609
     "Args:\n"
2610
     "    batch_size: Number of entries per batch (default 10000)\n"
2611
     "    time_granularity: Bucket width in seconds (default 1.0)\n"
2612
     "    time_resolution: Microseconds per output time unit (default 1e6)\n"
2613
     "    query: Optional query filter string\n"
2614
     "    group_by: Optional list of columns to group by; enables coarse\n"
2615
     "        in-scan aggregation (default None = full granularity)\n\n"
2616
     "Returns:\n"
2617
     "    dict with 'events', 'profiles', 'system' lists of Arrow capsules.\n"},
2618
#endif
2619
    {nullptr}};
2620

2621
static PyGetSetDef Indexer_getsetters[] = {{nullptr}};
2622

2623
PyTypeObject IndexerType = {
2624
    PyVarObject_HEAD_INIT(nullptr, 0) "dftracer_utils_ext.Indexer",
2625
    sizeof(IndexerObject),
2626
    0,
2627
    (destructor)Indexer_dealloc,
2628
    0,
2629
    0,
2630
    0,
2631
    0,
2632
    0,
2633
    0,
2634
    0,
2635
    0,
2636
    0,
2637
    0,
2638
    0,
2639
    0,
2640
    0,
2641
    0,
2642
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
2643
    "BatchIndexer(directory='', files=None, index_dir='',\n"
2644
    "             require_checkpoint=True, require_bloom=True,\n"
2645
    "             require_manifest=True, require_aggregation=False,\n"
2646
    "             time_interval_ms=5000.0, group_keys=None,\n"
2647
    "             custom_metric_fields=None, compute_percentiles=False,\n"
2648
    "             parallelism=0, force_rebuild=False, runtime=None)\n"
2649
    "--\n\n"
2650
    "Indexer with tiered index building.\n\n"
2651
    "At least one of 'directory' or 'files' must be provided.\n"
2652
    "- directory: scan for .pfw/.pfw.gz files\n"
2653
    "- files: list of specific file paths\n\n"
2654
    "Supports:\n"
2655
    "- Tier 1: Checkpoints (require_checkpoint)\n"
2656
    "- Tier 2: Bloom filters (require_bloom), Manifests (require_manifest)\n"
2657
    "- Tier 3: Aggregation (require_aggregation + config params)\n",
2658
    0,
2659
    0,
2660
    0,
2661
    0,
2662
    0,
2663
    0,
2664
    Indexer_methods,
2665
    0,
2666
    Indexer_getsetters,
2667
    0,
2668
    0,
2669
    0,
2670
    0,
2671
    0,
2672
    (initproc)Indexer_init,
2673
    0,
2674
    Indexer_new,
2675
};
2676

2677
int init_indexer(PyObject* m) {
2✔
2678
    if (PyType_Ready(&IndexerType) < 0) return -1;
2✔
2679

2680
    Py_INCREF(&IndexerType);
1✔
2681
    if (PyModule_AddObject(m, "Indexer", (PyObject*)&IndexerType) < 0) {
2✔
2682
        Py_DECREF(&IndexerType);
2683
        return -1;
×
2684
    }
2685

2686
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2687
    if (PyModule_AddFunctions(m, BatchIndexerModuleMethods) < 0) return -1;
2✔
2688
#endif
2689

2690
    return 0;
2✔
2691
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc