• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26195612357

20 May 2026 11:19PM UTC coverage: 49.859% (-2.3%) from 52.2%
26195612357

push

github

hariharan-devarajan
feat(aggregator): improve system metrics scanning and persistence error handling

16041 of 43831 branches covered (36.6%)

Branch coverage included in aggregate %.

6 of 17 new or added lines in 2 files covered. (35.29%)

1072 existing lines in 104 files now uncovered.

21423 of 31309 relevant lines covered (68.42%)

13054.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.71
/src/dftracer/utils/python/batch_indexer.cpp
1
#include <dftracer/utils/core/common/filesystem.h>
2
#include <dftracer/utils/core/common/string_intern.h>
3
#include <dftracer/utils/core/coro/task.h>
4
#include <dftracer/utils/core/coro/when_all.h>
5
#include <dftracer/utils/core/rocksdb/db_manager.h>
6
#include <dftracer/utils/core/runtime.h>
7
#include <dftracer/utils/core/tasks/coro_scope.h>
8
#include <dftracer/utils/python/batch_indexer.h>
9
#include <dftracer/utils/python/indexer.h>
10
#include <dftracer/utils/python/runtime.h>
11
#include <dftracer/utils/utilities/common/query/query.h>
12
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_config.h>
13
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_serialization.h>
14
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregator_types.h>
15
#include <dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.h>
16
#include <dftracer/utils/utilities/composites/dft/aggregators/system_metrics.h>
17
#include <dftracer/utils/utilities/composites/dft/aggregators/system_metrics_serialization.h>
18
#include <dftracer/utils/utilities/composites/dft/indexing/index_resolver_utility.h>
19
#include <dftracer/utils/utilities/composites/dft/indexing/resolve_and_build.h>
20
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
21
#include <dftracer/utils/utilities/indexer/index_database.h>
22

23
#ifdef DFTRACER_UTILS_ENABLE_ARROW
24
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
25
#endif
26

27
#include <algorithm>
28
#include <chrono>
29
#include <cstdio>
30
#include <limits>
31
#include <optional>
32
#include <string>
33
#include <unordered_map>
34
#include <unordered_set>
35
#include <vector>
36

37
using dftracer::utils::CoroScope;
38
using dftracer::utils::Runtime;
39
using dftracer::utils::coro::CoroTask;
40
using namespace dftracer::utils::utilities::composites::dft::indexing;
41
using namespace dftracer::utils::utilities::composites::dft::aggregators;
42

43
// ---------------------------------------------------------------------------
44
// BatchIndexer - directory-level indexer with resolve/build pattern
45
// ---------------------------------------------------------------------------
46

47
static void Indexer_dealloc(IndexerObject* self) {
47✔
48
    Py_XDECREF(self->runtime_obj);
47✔
49
    Py_XDECREF(self->directory);
47✔
50
    Py_XDECREF(self->files);
47✔
51
    Py_XDECREF(self->index_dir);
47✔
52
    Py_XDECREF(self->group_keys);
47✔
53
    Py_XDECREF(self->custom_metric_fields);
47✔
54
    Py_TYPE(self)->tp_free((PyObject*)self);
47✔
55
}
47✔
56

57
static PyObject* Indexer_new(PyTypeObject* type, PyObject* args,
47✔
58
                             PyObject* kwds) {
59
    IndexerObject* self = (IndexerObject*)type->tp_alloc(type, 0);
47✔
60
    if (self) {
47!
61
        self->runtime_obj = nullptr;
47✔
62
        self->directory = nullptr;
47✔
63
        self->files = nullptr;
47✔
64
        self->index_dir = nullptr;
47✔
65
        self->require_checkpoint = 1;
47✔
66
        self->require_bloom = 1;
47✔
67
        self->require_manifest = 1;
47✔
68
        self->require_aggregation = 0;
47✔
69
        self->time_interval_ms = 5000.0;
47✔
70
        self->group_keys = nullptr;
47✔
71
        self->custom_metric_fields = nullptr;
47✔
72
        self->compute_percentiles = 0;
47✔
73
        self->checkpoint_size = 32 * 1024 * 1024;
47✔
74
        self->parallelism = 0;
47✔
75
        self->force_rebuild = 0;
47✔
76
    }
77
    return (PyObject*)self;
47✔
78
}
79

80
static int Indexer_init(IndexerObject* self, PyObject* args, PyObject* kwds) {
47✔
81
    static const char* kwlist[] = {"directory",
82
                                   "files",
83
                                   "index_dir",
84
                                   "require_checkpoint",
85
                                   "require_bloom",
86
                                   "require_manifest",
87
                                   "require_aggregation",
88
                                   "time_interval_ms",
89
                                   "group_keys",
90
                                   "custom_metric_fields",
91
                                   "compute_percentiles",
92
                                   "checkpoint_size",
93
                                   "parallelism",
94
                                   "force_rebuild",
95
                                   "runtime",
96
                                   nullptr};
97

98
    const char* directory = "";
47✔
99
    PyObject* files_obj = Py_None;
47✔
100
    const char* index_dir = "";
47✔
101
    int require_checkpoint = 1;
47✔
102
    int require_bloom = 1;
47✔
103
    int require_manifest = 1;
47✔
104
    int require_aggregation = 0;
47✔
105
    double time_interval_ms = 5000.0;
47✔
106
    PyObject* group_keys_obj = Py_None;
47✔
107
    PyObject* custom_metrics_obj = Py_None;
47✔
108
    int compute_percentiles = 0;
47✔
109
    Py_ssize_t checkpoint_size = 32 * 1024 * 1024;  // 32MB default
47✔
110
    Py_ssize_t parallelism = 0;
47✔
111
    int force_rebuild = 0;
47✔
112
    PyObject* runtime_arg = nullptr;
47✔
113

114
    if (!PyArg_ParseTupleAndKeywords(
47!
115
            args, kwds, "|sOsppppdOOpnnpO", (char**)kwlist, &directory,
116
            &files_obj, &index_dir, &require_checkpoint, &require_bloom,
117
            &require_manifest, &require_aggregation, &time_interval_ms,
118
            &group_keys_obj, &custom_metrics_obj, &compute_percentiles,
119
            &checkpoint_size, &parallelism, &force_rebuild, &runtime_arg)) {
120
        return -1;
×
121
    }
122

123
    // Validate: at least one of directory or files must be provided
124
    bool has_directory = directory && directory[0] != '\0';
47!
125
    bool has_files = files_obj && files_obj != Py_None &&
73!
126
                     PyList_Check(files_obj) && PyList_Size(files_obj) > 0;
120!
127

128
    if (!has_directory && !has_files) {
47✔
129
        PyErr_SetString(PyExc_ValueError,
1!
130
                        "At least one of 'directory' or 'files' must be "
131
                        "provided");
132
        return -1;
1✔
133
    }
134

135
    // Store runtime
136
    if (runtime_arg && runtime_arg != Py_None) {
46!
137
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
138
            Py_INCREF(runtime_arg);
×
139
            self->runtime_obj = runtime_arg;
×
140
        } else {
141
            PyObject* native = PyObject_GetAttrString(runtime_arg, "_native");
×
142
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
×
143
                self->runtime_obj = native;
×
144
            } else {
145
                Py_XDECREF(native);
×
146
                PyErr_SetString(PyExc_TypeError,
×
147
                                "runtime must be a Runtime instance or None");
148
                return -1;
×
149
            }
150
        }
151
    }
152

153
    self->directory = PyUnicode_FromString(directory);
46!
154
    self->index_dir = PyUnicode_FromString(index_dir);
46!
155
    self->require_checkpoint = require_checkpoint;
46✔
156
    self->require_bloom = require_bloom;
46✔
157
    self->require_manifest = require_manifest;
46✔
158
    self->require_aggregation = require_aggregation;
46✔
159
    self->time_interval_ms = time_interval_ms;
46✔
160
    self->compute_percentiles = compute_percentiles;
46✔
161
    self->checkpoint_size = static_cast<std::size_t>(checkpoint_size);
46✔
162
    self->parallelism = static_cast<std::size_t>(parallelism);
46✔
163
    self->force_rebuild = force_rebuild;
46✔
164

165
    // Store files list
166
    if (has_files) {
46✔
167
        Py_INCREF(files_obj);
26!
168
        self->files = files_obj;
26✔
169
    } else {
170
        self->files = nullptr;
20✔
171
    }
172

173
    // Store group_keys
174
    if (group_keys_obj && group_keys_obj != Py_None) {
46!
175
        Py_INCREF(group_keys_obj);
×
176
        self->group_keys = group_keys_obj;
×
177
    } else {
178
        self->group_keys = nullptr;
46✔
179
    }
180

181
    // Store custom_metric_fields
182
    if (custom_metrics_obj && custom_metrics_obj != Py_None) {
46!
183
        Py_INCREF(custom_metrics_obj);
×
184
        self->custom_metric_fields = custom_metrics_obj;
×
185
    } else {
186
        self->custom_metric_fields = nullptr;
46✔
187
    }
188

189
    return 0;
46✔
190
}
191

192
static Runtime* get_batch_indexer_runtime(IndexerObject* self) {
134✔
193
    if (self->runtime_obj) {
134!
194
        return ((RuntimeObject*)self->runtime_obj)->runtime.get();
×
195
    }
196
    return get_default_runtime();
134✔
197
}
198

199
static std::optional<AggregationConfig> build_aggregation_config(
123✔
200
    IndexerObject* self) {
201
    if (!self->require_aggregation) {
123✔
202
        return std::nullopt;
76✔
203
    }
204

205
    AggregationConfig config;
47!
206
    config.time_interval_us =
47✔
207
        static_cast<std::uint64_t>(self->time_interval_ms * 1000.0);
47✔
208

209
    if (self->group_keys && PyList_Check(self->group_keys)) {
47!
210
        Py_ssize_t n = PyList_Size(self->group_keys);
×
211
        for (Py_ssize_t i = 0; i < n; i++) {
×
212
            const char* s =
213
                PyUnicode_AsUTF8(PyList_GetItem(self->group_keys, i));
×
214
            if (s) config.extra_group_keys.emplace_back(s);
×
215
        }
216
    }
217
    if (self->custom_metric_fields &&
47!
218
        PyList_Check(self->custom_metric_fields)) {
×
219
        Py_ssize_t n = PyList_Size(self->custom_metric_fields);
×
220
        for (Py_ssize_t i = 0; i < n; i++) {
×
221
            const char* s =
222
                PyUnicode_AsUTF8(PyList_GetItem(self->custom_metric_fields, i));
×
223
            if (s) config.custom_metric_fields.emplace_back(s);
×
224
        }
225
    }
226

227
    config.compute_percentiles = self->compute_percentiles != 0;
47✔
228
    return config;
47✔
229
}
47✔
230

231
// ---------------------------------------------------------------------------
232
// resolve() - check what exists vs needs building
233
// ---------------------------------------------------------------------------
234

235
static PyObject* Indexer_resolve(IndexerObject* self,
93✔
236
                                 PyObject* Py_UNUSED(ignored)) {
237
    const char* directory = PyUnicode_AsUTF8(self->directory);
93!
238
    const char* index_dir = PyUnicode_AsUTF8(self->index_dir);
93!
239

240
    ResolverInput input;
93✔
241
    input.directory = directory ? directory : "";
93!
242
    input.index_dir = index_dir ? index_dir : "";
93!
243
    input.require_checkpoints = self->require_checkpoint;
93✔
244
    input.require_bloom = self->require_bloom;
93✔
245
    input.require_manifest = self->require_manifest;
93✔
246
    input.require_aggregation = self->require_aggregation;
93✔
247
    input.aggregation_config = build_aggregation_config(self);
93!
248

249
    // Add files if provided
250
    if (self->files && PyList_Check(self->files)) {
93!
251
        Py_ssize_t n = PyList_Size(self->files);
60!
252
        for (Py_ssize_t i = 0; i < n; i++) {
129✔
253
            const char* s = PyUnicode_AsUTF8(PyList_GetItem(self->files, i));
69!
254
            if (s) input.files.emplace_back(s);
69!
255
        }
256
    }
257

258
    ResolverResult result;
93✔
259
    std::string error_msg;
93✔
260

261
    Py_BEGIN_ALLOW_THREADS try {
93!
262
        Runtime* rt = get_batch_indexer_runtime(self);
93!
263
        rt->submit(run_coro_scope(
279!
264
                       rt->executor(),
265
                       [](CoroScope& scope, ResolverInput in,
93!
266
                          ResolverResult* out) -> CoroTask<void> {
267
                           IndexResolverUtility resolver;
268
                           // Use scope.spawn(utility, input) which auto-binds
269
                           // context for utilities with NeedsContext tag
270
                           *out = co_await scope.spawn(resolver, std::move(in));
271
                       },
186!
272
                       std::move(input), &result),
93✔
273
                   "batch-indexer-resolve")
274
            .get();
93!
UNCOV
275
    } catch (const std::exception& e) {
×
276
        error_msg = e.what();
×
277
    }
×
278
    Py_END_ALLOW_THREADS
93!
279

280
        if (!error_msg.empty()) {
93!
281
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
282
        return nullptr;
×
283
    }
284

285
    // Build result dict
286
    PyObject* dict = PyDict_New();
93!
287
    if (!dict) return nullptr;
93!
288

289
    PyDict_SetItemString(dict, "total_files",
93!
290
                         PyLong_FromSize_t(result.all_files.size()));
291
    PyDict_SetItemString(dict, "index_path",
93!
292
                         PyUnicode_FromString(result.index_path.c_str()));
293

294
    // Ready files
295
    PyObject* ready_list = PyList_New(result.cached.size());
93!
296
    for (std::size_t i = 0; i < result.cached.size(); ++i) {
167✔
297
        PyList_SetItem(
74!
298
            ready_list, i,
299
            PyUnicode_FromString(result.cached[i].file_path.c_str()));
74✔
300
    }
301
    PyDict_SetItemString(dict, "ready", ready_list);
93!
302

303
    // Needs work files (union of all needs_* lists)
304
    std::vector<std::string> needs_work;
93✔
305
    for (const auto& item : result.needs_checkpoint) {
130✔
306
        needs_work.push_back(item.file_path);
37!
307
    }
308
    for (const auto& item : result.needs_bloom) {
93!
309
        bool found = false;
×
310
        for (const auto& existing : needs_work) {
×
311
            if (existing == item.file_path) {
×
312
                found = true;
×
313
                break;
×
314
            }
315
        }
316
        if (!found) needs_work.push_back(item.file_path);
×
317
    }
318
    for (const auto& item : result.needs_manifest) {
93!
319
        bool found = false;
×
320
        for (const auto& existing : needs_work) {
×
321
            if (existing == item.file_path) {
×
322
                found = true;
×
323
                break;
×
324
            }
325
        }
326
        if (!found) needs_work.push_back(item.file_path);
×
327
    }
328
    for (const auto& item : result.needs_aggregation) {
93!
329
        bool found = false;
×
330
        for (const auto& existing : needs_work) {
×
331
            if (existing == item.file_path) {
×
332
                found = true;
×
333
                break;
×
334
            }
335
        }
336
        if (!found) needs_work.push_back(item.file_path);
×
337
    }
338

339
    PyObject* needs_list = PyList_New(needs_work.size());
93!
340
    for (std::size_t i = 0; i < needs_work.size(); ++i) {
130✔
341
        PyList_SetItem(needs_list, i,
37!
342
                       PyUnicode_FromString(needs_work[i].c_str()));
37✔
343
    }
344
    PyDict_SetItemString(dict, "needs_work", needs_list);
93!
345

346
    return dict;
93✔
347
}
93✔
348

349
// ---------------------------------------------------------------------------
350
// build() - build missing index tiers
351
// ---------------------------------------------------------------------------
352

353
static PyObject* Indexer_build(IndexerObject* self,
30✔
354
                               PyObject* Py_UNUSED(ignored)) {
355
    const char* directory = PyUnicode_AsUTF8(self->directory);
30!
356
    const char* index_dir = PyUnicode_AsUTF8(self->index_dir);
30!
357

358
    ResolveAndBuildInput input;
30✔
359
    input.directory = directory ? directory : "";
30!
360
    input.index_dir = index_dir ? index_dir : "";
30!
361
    input.require_checkpoints = self->require_checkpoint;
30✔
362
    input.require_bloom = self->require_bloom;
30✔
363
    input.require_manifest = self->require_manifest;
30✔
364
    input.require_aggregation = self->require_aggregation;
30✔
365
    input.aggregation_config = build_aggregation_config(self);
30!
366
    input.checkpoint_size = self->checkpoint_size;
30✔
367
    input.parallelism = self->parallelism;
30✔
368
    input.force_rebuild = self->force_rebuild;
30✔
369

370
    // Add files if provided
371
    if (self->files && PyList_Check(self->files)) {
30!
372
        Py_ssize_t n = PyList_Size(self->files);
24!
373
        for (Py_ssize_t i = 0; i < n; i++) {
52✔
374
            const char* s = PyUnicode_AsUTF8(PyList_GetItem(self->files, i));
28!
375
            if (s) input.files.emplace_back(s);
28!
376
        }
377
    }
378

379
    std::string error_msg;
30✔
380

381
    Py_BEGIN_ALLOW_THREADS try {
30!
382
        Runtime* rt = get_batch_indexer_runtime(self);
30!
383
        rt->submit(run_coro_scope(
90!
384
                       rt->executor(),
385
                       [](CoroScope& scope,
30!
386
                          ResolveAndBuildInput in) -> CoroTask<void> {
387
                           co_await resolve_and_build_index(&scope,
388
                                                            std::move(in));
389
                       },
60!
390
                       std::move(input)),
30✔
391
                   "batch-indexer-build")
392
            .get();
30!
UNCOV
393
    } catch (const std::exception& e) {
×
394
        error_msg = e.what();
×
395
    }
×
396
    Py_END_ALLOW_THREADS
30!
397

398
        if (!error_msg.empty()) {
30!
399
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
400
        return nullptr;
×
401
    }
402

403
    Py_RETURN_NONE;
30✔
404
}
30✔
405

406
// ---------------------------------------------------------------------------
407
// ensure_indexed() - resolve + build if needed
408
// ---------------------------------------------------------------------------
409

410
static PyObject* Indexer_ensure_indexed(IndexerObject* self,
38✔
411
                                        PyObject* Py_UNUSED(ignored)) {
412
    // First resolve
413
    PyObject* status = Indexer_resolve(self, nullptr);
38✔
414
    if (!status) return nullptr;
38!
415

416
    // Check if needs_work is non-empty
417
    PyObject* needs_work = PyDict_GetItemString(status, "needs_work");
38✔
418
    if (needs_work && PyList_Size(needs_work) > 0) {
38!
419
        Py_DECREF(status);
420

421
        // Build
422
        PyObject* result = Indexer_build(self, nullptr);
29✔
423
        if (!result) return nullptr;
29!
424
        Py_DECREF(result);
425

426
        // Re-resolve
427
        status = Indexer_resolve(self, nullptr);
29✔
428
    }
429

430
    return status;
38✔
431
}
432

433
// ---------------------------------------------------------------------------
434
// get_checkpoint_indexer() - get a single-file checkpoint indexer
435
// ---------------------------------------------------------------------------
436

437
static PyObject* Indexer_get_checkpoint_indexer(IndexerObject* self,
6✔
438
                                                PyObject* args) {
439
    const char* file_path = nullptr;
6✔
440
    if (!PyArg_ParseTuple(args, "s", &file_path)) {
6!
441
        return nullptr;
×
442
    }
443

444
    // Determine index path using BatchIndexer's index_dir setting
445
    const char* index_dir = PyUnicode_AsUTF8(self->index_dir);
6!
446
    std::string index_path = dftracer::utils::utilities::composites::dft::
447
        internal::determine_index_path(file_path, index_dir ? index_dir : "");
12!
448

449
    // Create IndexerObject
450
    CheckpointIndexerObject* indexer =
451
        (CheckpointIndexerObject*)CheckpointIndexerType.tp_alloc(
6!
452
            &CheckpointIndexerType, 0);
453
    if (!indexer) {
6!
454
        return nullptr;
×
455
    }
456

457
    indexer->handle = nullptr;
6✔
458
    indexer->gz_path = PyUnicode_FromString(file_path);
6!
459
    indexer->index_path = PyUnicode_FromString(index_path.c_str());
6!
460
    indexer->checkpoint_size = self->checkpoint_size;
6✔
461
    indexer->build_bloom = 0;
6✔
462
    indexer->build_manifest = 0;
6✔
463

464
    // Share runtime reference
465
    if (self->runtime_obj) {
6!
466
        Py_INCREF(self->runtime_obj);
×
467
        indexer->runtime_obj = self->runtime_obj;
×
468
    } else {
469
        indexer->runtime_obj = nullptr;
6✔
470
    }
471

472
    // Create the native handle
473
    indexer->handle = dft_indexer_create(file_path, index_path.c_str(),
6!
474
                                         self->checkpoint_size, 0);
475
    if (!indexer->handle) {
6!
476
        Py_DECREF((PyObject*)indexer);
477
        PyErr_SetString(PyExc_RuntimeError,
×
478
                        "Failed to create checkpoint indexer");
479
        return nullptr;
×
480
    }
481

482
    return (PyObject*)indexer;
6✔
483
}
6✔
484

485
static std::optional<std::string> resolve_index_path(IndexerObject* self) {
21✔
486
    PyObject* status = Indexer_resolve(self, nullptr);
21!
487
    if (!status) return std::nullopt;
21!
488
    PyObject* obj = PyDict_GetItemString(status, "index_path");
21!
489
    const char* path = obj ? PyUnicode_AsUTF8(obj) : nullptr;
21!
490
    if (!path || path[0] == '\0') {
21!
491
        Py_DECREF(status);
492
        PyErr_SetString(PyExc_RuntimeError, "No index path available");
×
493
        return std::nullopt;
×
494
    }
495
    std::string result(path);
42!
496
    Py_DECREF(status);
497
    return result;
21✔
498
}
21✔
499

500
static PyObject* Indexer_get_hash_table(IndexerObject* self, PyObject* args) {
6✔
501
    const char* type_str = nullptr;
6✔
502
    if (!PyArg_ParseTuple(args, "s", &type_str)) {
6!
503
        return nullptr;
×
504
    }
505

506
    using dftracer::utils::utilities::indexer::IndexDatabase;
507
    using HashType = IndexDatabase::HashType;
508

509
    HashType type;
510
    if (std::strcmp(type_str, "file") == 0) {
6✔
511
        type = HashType::FILE;
2✔
512
    } else if (std::strcmp(type_str, "host") == 0) {
4✔
513
        type = HashType::HOST;
2✔
514
    } else if (std::strcmp(type_str, "string") == 0) {
2✔
515
        type = HashType::STRING;
1✔
516
    } else if (std::strcmp(type_str, "proc") == 0) {
1!
517
        type = HashType::PROC;
×
518
    } else {
519
        PyErr_SetString(PyExc_ValueError,
1!
520
                        "type must be 'file', 'host', 'string', or 'proc'");
521
        return nullptr;
1✔
522
    }
523

524
    auto idx_opt = resolve_index_path(self);
5!
525
    if (!idx_opt) return nullptr;
5!
526
    std::string index_path = std::move(*idx_opt);
5✔
527

528
    std::unordered_map<std::string, std::string> hash_map;
5✔
529
    std::string error_msg;
5✔
530

531
    Py_BEGIN_ALLOW_THREADS try {
5!
532
        IndexDatabase db(
533
            index_path,
534
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
5!
535
        hash_map = db.query_hash_table(type);
5!
536
    } catch (const std::exception& e) {
5!
537
        error_msg = e.what();
×
538
    }
×
539
    Py_END_ALLOW_THREADS
5!
540

541
        if (!error_msg.empty()) {
5!
542
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
543
        return nullptr;
×
544
    }
545

546
    PyObject* dict = PyDict_New();
5!
547
    if (!dict) return nullptr;
5!
548

549
    for (const auto& [hash, name] : hash_map) {
5!
550
        PyObject* key = PyUnicode_FromStringAndSize(hash.data(), hash.size());
×
551
        PyObject* val = PyUnicode_FromStringAndSize(name.data(), name.size());
×
552
        PyDict_SetItem(dict, key, val);
×
553
        Py_DECREF(key);
554
        Py_DECREF(val);
555
    }
556

557
    return dict;
5✔
558
}
5✔
559

560
static PyObject* Indexer_query_file_pids(IndexerObject* self, PyObject* args) {
2✔
561
    int file_id;
562
    if (!PyArg_ParseTuple(args, "i", &file_id)) {
2!
563
        return nullptr;
×
564
    }
565

566
    using dftracer::utils::utilities::indexer::IndexDatabase;
567

568
    auto idx_opt = resolve_index_path(self);
2!
569
    if (!idx_opt) return nullptr;
2!
570
    std::string index_path = std::move(*idx_opt);
2✔
571

572
    std::unordered_set<std::uint64_t> pids;
2✔
573
    std::string error_msg;
2✔
574

575
    Py_BEGIN_ALLOW_THREADS try {
2!
576
        IndexDatabase db(
577
            index_path,
578
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
2!
579
        pids = db.query_file_pids(file_id);
2!
580
    } catch (const std::exception& e) {
2!
581
        error_msg = e.what();
×
582
    }
×
583
    Py_END_ALLOW_THREADS
2!
584

585
        if (!error_msg.empty()) {
2!
586
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
587
        return nullptr;
×
588
    }
589

590
    PyObject* set = PySet_New(nullptr);
2!
591
    if (!set) return nullptr;
2!
592

593
    for (auto pid : pids) {
3✔
594
        PyObject* val = PyLong_FromUnsignedLongLong(pid);
1!
595
        PySet_Add(set, val);
1!
596
        Py_DECREF(val);
597
    }
598

599
    return set;
2✔
600
}
2✔
601

602
static PyObject* Indexer_query_all_file_pids(IndexerObject* self,
3✔
603
                                             PyObject* Py_UNUSED(ignored)) {
604
    using dftracer::utils::utilities::indexer::IndexDatabase;
605

606
    auto idx_opt = resolve_index_path(self);
3!
607
    if (!idx_opt) return nullptr;
3!
608
    std::string index_path = std::move(*idx_opt);
3✔
609

610
    std::unordered_map<int, std::unordered_set<std::uint64_t>> all_pids;
3✔
611
    std::string error_msg;
3✔
612

613
    Py_BEGIN_ALLOW_THREADS try {
3!
614
        IndexDatabase db(
615
            index_path,
616
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
3!
617
        all_pids = db.query_all_file_pids();
3!
618
    } catch (const std::exception& e) {
3!
619
        error_msg = e.what();
×
620
    }
×
621
    Py_END_ALLOW_THREADS
3!
622

623
        if (!error_msg.empty()) {
3!
624
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
625
        return nullptr;
×
626
    }
627

628
    PyObject* dict = PyDict_New();
3!
629
    if (!dict) return nullptr;
3!
630

631
    for (const auto& [file_id, pids] : all_pids) {
6✔
632
        PyObject* key = PyLong_FromLong(file_id);
3!
633
        PyObject* set = PySet_New(nullptr);
3!
634
        for (auto pid : pids) {
6✔
635
            PyObject* val = PyLong_FromUnsignedLongLong(pid);
3!
636
            PySet_Add(set, val);
3!
637
            Py_DECREF(val);
638
        }
639
        PyDict_SetItem(dict, key, set);
3!
640
        Py_DECREF(key);
641
        Py_DECREF(set);
642
    }
643

644
    return dict;
3✔
645
}
3✔
646

647
static PyObject* Indexer_query_file_info(IndexerObject* self,
×
648
                                         PyObject* Py_UNUSED(ignored)) {
649
    using dftracer::utils::utilities::indexer::IndexDatabase;
650

651
    auto idx_opt = resolve_index_path(self);
×
652
    if (!idx_opt) return nullptr;
×
653
    std::string index_path = std::move(*idx_opt);
×
654

655
    std::unordered_map<std::string, int> file_ids;
×
656
    std::unordered_map<int, std::unordered_set<std::uint64_t>> all_pids;
×
657
    std::string error_msg;
×
658

659
    Py_BEGIN_ALLOW_THREADS try {
×
660
        IndexDatabase db(
661
            index_path,
UNCOV
662
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
×
663
        file_ids = db.query_all_file_info_ids();
×
664
        all_pids = db.query_all_file_pids();
×
665
    } catch (const std::exception& e) {
×
666
        error_msg = e.what();
×
667
    }
×
668
    Py_END_ALLOW_THREADS
×
669

670
        if (!error_msg.empty()) {
×
671
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
672
        return nullptr;
×
673
    }
674

675
    auto data_dir = fs::weakly_canonical(fs::path(index_path)).parent_path();
×
676

677
    PyObject* id_to_path = PyDict_New();
×
678
    if (!id_to_path) return nullptr;
×
679
    for (const auto& [logical_name, fid] : file_ids) {
×
680
        auto resolved = (data_dir / logical_name).string();
×
681
        PyObject* key = PyLong_FromLong(fid);
×
682
        PyObject* val = PyUnicode_FromStringAndSize(
×
683
            resolved.data(), static_cast<Py_ssize_t>(resolved.size()));
×
684
        PyDict_SetItem(id_to_path, key, val);
×
685
        Py_DECREF(key);
686
        Py_DECREF(val);
687
    }
×
688

689
    PyObject* pid_dict = PyDict_New();
×
690
    if (!pid_dict) {
×
691
        Py_DECREF(id_to_path);
692
        return nullptr;
×
693
    }
694
    for (const auto& [file_id, pids] : all_pids) {
×
695
        PyObject* key = PyLong_FromLong(file_id);
×
696
        PyObject* set = PySet_New(nullptr);
×
697
        for (auto pid : pids) {
×
698
            PyObject* val = PyLong_FromUnsignedLongLong(pid);
×
699
            PySet_Add(set, val);
×
700
            Py_DECREF(val);
701
        }
702
        PyDict_SetItem(pid_dict, key, set);
×
703
        Py_DECREF(key);
704
        Py_DECREF(set);
705
    }
706

707
    PyObject* result = PyTuple_Pack(2, id_to_path, pid_dict);
×
708
    Py_DECREF(id_to_path);
709
    Py_DECREF(pid_dict);
710
    return result;
×
711
}
×
712

713
#ifdef DFTRACER_UTILS_ENABLE_ARROW
714
#include <dftracer/utils/python/trace_reader_iterator.h>
715
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
716

717
static PyObject* create_arrow_batch_capsule(
40✔
718
    dftracer::utils::utilities::common::arrow::ArrowExportResult&& result) {
719
    auto* obj = (ArrowBatchCapsuleObject*)ArrowBatchCapsuleType.tp_alloc(
40✔
720
        &ArrowBatchCapsuleType, 0);
721
    if (!obj) return nullptr;
40!
722
    obj->result =
40✔
723
        new dftracer::utils::utilities::common::arrow::ArrowExportResult(
724
            std::move(result));
40!
725
    return (PyObject*)obj;
40✔
726
}
727

728
namespace {
729

730
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
731
using dftracer::utils::utilities::common::arrow::ColumnSpec;
732
using dftracer::utils::utilities::common::arrow::ColumnType;
733
using dftracer::utils::utilities::common::arrow::RecordBatchBuilder;
734

735
static bool parse_agg_type_str(const char* type_str, AggMapType& out) {
2✔
736
    if (strcmp(type_str, "events") == 0) {
2!
737
        out = AggMapType::EVENT;
2✔
738
        return true;
2✔
739
    }
740
    if (strcmp(type_str, "profiles") == 0) {
×
741
        out = AggMapType::PROFILE;
×
742
        return true;
×
743
    }
744
    if (strcmp(type_str, "system") == 0) {
×
745
        out = AggMapType::SYSTEM;
×
746
        return true;
×
747
    }
748
    PyErr_SetString(PyExc_ValueError,
×
749
                    "type must be 'events', 'profiles', or 'system'");
750
    return false;
×
751
}
752

753
struct AggDbHandle {
754
    std::shared_ptr<dftracer::utils::rocksdb::RocksDatabase> db;
755
    std::unique_ptr<EventAggregator> agg;
756
};
757

758
static std::unique_ptr<AggDbHandle> open_agg_db(const std::string& index_path,
11✔
759
                                                std::string& error_msg) {
760
    std::shared_ptr<dftracer::utils::rocksdb::RocksDatabase> db;
11✔
761
    try {
762
        db = EventAggregator::open_with_merge_operator(index_path);
11!
UNCOV
763
    } catch (...) {
×
764
        auto& mgr = dftracer::utils::rocksdb::RocksDBManager::instance();
×
765
        mgr.reset(index_path);
×
766
        db = mgr.get_or_open(
×
767
            index_path,
UNCOV
768
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
×
769
        if (db && db->is_open()) {
×
770
            load_intern_dictionary(*db);
×
771
        }
772
    }
×
773
    if (!db || !db->is_open()) {
11!
774
        error_msg = "Failed to open aggregation database";
×
775
        return nullptr;
×
776
    }
777
    std::string config_val;
11✔
778
    auto key = std::string_view(AGG_GLOBAL_CONFIG_KEY,
11✔
779
                                sizeof(AGG_GLOBAL_CONFIG_KEY) - 1);
780
    if (!db->get(key, &config_val, dftracer::utils::rocksdb::cf::AGGREGATION)
22!
781
             .ok()) {
11!
782
        error_msg = "No aggregation config found - was aggregation enabled?";
×
783
        return nullptr;
×
784
    }
785
    auto cfg = deserialize_agg_global_config(config_val);
11✔
786
    auto handle = std::make_unique<AggDbHandle>();
11!
787
    handle->db = db;
11✔
788
    handle->agg = std::make_unique<EventAggregator>(db, cfg.config_hash);
11!
789
    return handle;
11✔
790
}
11✔
791

792
static std::optional<dftracer::utils::utilities::common::query::Query>
793
parse_query_arg(const char* query_str) {
12✔
794
    if (!query_str || query_str[0] == '\0') return std::nullopt;
12!
795
    auto result = dftracer::utils::utilities::common::query::Query::from_string(
796
        query_str);
9!
797
    if (!result) {
9✔
798
        PyErr_SetString(PyExc_ValueError, result.error().message.c_str());
1!
799
        return std::nullopt;
1✔
800
    }
801
    return std::move(*result);
8!
802
}
9✔
803

804
constexpr std::uint16_t DFT_NUM_SHARDS = 4096;
805

806
template <typename Output, typename ScanFn>
807
void parallel_shard_scan_range(Runtime* rt, std::uint16_t outer_begin,
11✔
808
                               std::uint16_t outer_end, ScanFn&& scan_fn,
809
                               std::vector<Output>& outputs) {
810
    if (outer_end <= outer_begin) return;
11!
811
    const std::size_t span = static_cast<std::size_t>(outer_end - outer_begin);
11✔
812
    const std::size_t num_tasks = std::min<std::size_t>(rt->threads(), span);
11!
813
    const std::size_t shards_per_task = (span + num_tasks - 1) / num_tasks;
11✔
814
    rt->submit(run_coro_scope(
11✔
815
                   rt->executor(),
816
                   [&](CoroScope& scope) -> CoroTask<void> {
22!
817
                       std::vector<dftracer::utils::coro::SpawnFuture<Output>>
818
                           futures;
819
                       futures.reserve(num_tasks);
820
                       for (std::size_t t = 0; t < num_tasks; ++t) {
821
                           auto shard_begin = static_cast<std::uint16_t>(
822
                               outer_begin + t * shards_per_task);
823
                           auto shard_end =
824
                               static_cast<std::uint16_t>(std::min<std::size_t>(
825
                                   outer_begin + (t + 1) * shards_per_task,
826
                                   outer_end));
827
                           futures.push_back(
828
                               scope.spawn([&scan_fn, shard_begin, shard_end](
85!
829
                                               CoroScope&) -> CoroTask<Output> {
830
                                   co_return scan_fn(shard_begin, shard_end);
831
                               }));
832
                       }
833
                       outputs.reserve(num_tasks);
834
                       for (auto& f : futures) {
835
                           outputs.push_back(co_await f);
836
                       }
837
                   }),
838
               "parallel-shard-scan-range")
839
        .get();
11!
840
}
841

842
template <typename Output, typename ScanFn>
843
void parallel_shard_scan(Runtime* rt, ScanFn&& scan_fn,
11✔
844
                         std::vector<Output>& outputs) {
845
    parallel_shard_scan_range<Output>(rt, 0, DFT_NUM_SHARDS,
11✔
846
                                      std::forward<ScanFn>(scan_fn), outputs);
847
}
11✔
848

849
static void append_results_to_list(PyObject* list,
29✔
850
                                   std::vector<ArrowExportResult>& results) {
851
    for (auto& r : results) {
69✔
852
        PyObject* capsule = create_arrow_batch_capsule(std::move(r));
40!
853
        if (capsule) {
40!
854
            PyList_Append(list, capsule);
40!
855
            Py_DECREF(capsule);
856
        }
857
    }
858
}
29✔
859

860
struct AggScanInput {
861
    const EventAggregator* agg;
862
    AggMapType target_type;
863
    AggregationBatchType batch_type;
864
    Py_ssize_t batch_size;
865
    std::uint16_t shard_begin;
866
    std::uint16_t shard_end;
867
};
868

869
struct AggScanOutput {
870
    std::vector<ArrowExportResult> results;
871
};
872

873
AggScanOutput scan_aggregation_shard_range(AggScanInput input) {
×
874
    AggScanOutput output;
×
875

876
    static const std::vector<ColumnSpec> schema = {
877
        {"batch_type", ColumnType::INT64},  {"cat", ColumnType::DICT_STRING},
878
        {"name", ColumnType::DICT_STRING},  {"pid", ColumnType::UINT64},
879
        {"tid", ColumnType::UINT64},        {"hhash", ColumnType::DICT_STRING},
880
        {"fhash", ColumnType::DICT_STRING}, {"time_bucket", ColumnType::UINT64},
881
        {"count", ColumnType::UINT64},      {"dur_total", ColumnType::UINT64},
882
        {"dur_min", ColumnType::UINT64},    {"dur_max", ColumnType::UINT64},
883
        {"dur_mean", ColumnType::DOUBLE},   {"dur_std", ColumnType::DOUBLE},
884
        {"size_total", ColumnType::UINT64}, {"size_min", ColumnType::UINT64},
885
        {"size_max", ColumnType::UINT64},   {"size_mean", ColumnType::DOUBLE},
886
        {"size_std", ColumnType::DOUBLE},   {"ts", ColumnType::UINT64},
887
        {"te", ColumnType::UINT64},
UNCOV
888
    };
×
889

890
    RecordBatchBuilder builder;
×
891
    builder.declare_schema(schema);
×
892
    builder.reserve(static_cast<std::size_t>(input.batch_size));
×
893

894
    std::size_t row_count = 0;
×
895

896
    input.agg->scan_shard_range_raw(
×
897
        input.shard_begin, input.shard_end,
×
898
        [&](std::string_view key_bytes, std::string_view val_bytes) -> bool {
×
899
            AggKeyView kv;
×
900
            if (!parse_agg_key_view(key_bytes, kv)) return true;
×
901
            if (kv.map_type != input.target_type) return true;
×
902

903
            AggMetricsFullView mv;
904
            if (!parse_agg_value_full_view(val_bytes, mv)) return true;
×
905

906
            std::size_t ci = 0;
×
907
            builder.append_int64(ci++,
×
908
                                 static_cast<std::int64_t>(input.batch_type));
×
909
            builder.append_dict_string(ci++, kv.cat);
×
910
            builder.append_dict_string(ci++, kv.name);
×
911
            builder.append_uint64(ci++, kv.pid);
×
912
            builder.append_uint64(ci++, kv.tid);
×
913
            builder.append_dict_string(ci++, kv.hhash);
×
914
            builder.append_dict_string(ci++, kv.fhash);
×
915
            builder.append_uint64(ci++, kv.time_bucket);
×
916
            builder.append_uint64(ci++, mv.count);
×
917
            builder.append_uint64(ci++, mv.dur_total);
×
918
            builder.append_uint64(ci++, mv.count > 0 ? mv.dur_min : 0);
×
919
            builder.append_uint64(ci++, mv.dur_max);
×
920
            builder.append_double(ci++, mv.dur_mean);
×
921
            builder.append_double(ci++, mv.dur_stddev());
×
922
            builder.append_uint64(ci++, mv.size_total);
×
923
            builder.append_uint64(ci++, mv.count > 0 ? mv.size_min : 0);
×
924
            builder.append_uint64(ci++, mv.size_max);
×
925
            builder.append_double(ci++, mv.size_mean);
×
926
            builder.append_double(ci++, mv.size_stddev());
×
927
            builder.append_uint64(ci++, mv.ts);
×
928
            builder.append_uint64(ci++, mv.te);
×
929
            builder.end_row();
×
930

931
            row_count++;
×
932
            if (static_cast<Py_ssize_t>(row_count) >= input.batch_size) {
×
933
                auto arrow = builder.finish();
×
934
                if (arrow.valid()) {
×
935
                    output.results.push_back(std::move(arrow));
×
936
                }
937
                builder.reset(true);
×
938
                builder.reserve(static_cast<std::size_t>(input.batch_size));
×
939
                row_count = 0;
×
940
            }
×
941
            return true;
×
942
        });
943

944
    if (row_count > 0) {
×
945
        auto arrow = builder.finish();
×
946
        if (arrow.valid()) {
×
947
            output.results.push_back(std::move(arrow));
×
948
        }
949
    }
×
950

951
    return output;
×
952
}
×
953

954
enum class IOCategory : std::int8_t {
955
    READ = 1,
956
    WRITE = 2,
957
    METADATA = 3,
958
    PCTL = 4,
959
    IPC = 5,
960
    OTHER = 6,
961
    SYNC = 7,
962
};
963

964
inline IOCategory get_io_category(std::string_view func_name) {
212✔
965
    if (func_name == "read" || func_name == "pread" || func_name == "readv" ||
564!
966
        func_name == "preadv" || func_name == "fread") {
567✔
967
        return IOCategory::READ;
80✔
968
    }
969
    if (func_name == "write" || func_name == "pwrite" ||
327!
970
        func_name == "writev" || func_name == "pwritev" ||
409!
971
        func_name == "fwrite") {
221✔
972
        return IOCategory::WRITE;
89✔
973
    }
974
    if (func_name == "fsync" || func_name == "fdatasync" ||
153!
975
        func_name == "msync" || func_name == "sync") {
153!
976
        return IOCategory::SYNC;
×
977
    }
978
    if (func_name == "open" || func_name == "open64" || func_name == "close" ||
76!
979
        func_name == "fopen" || func_name == "fopen64" ||
×
980
        func_name == "fclose" || func_name == "stat" || func_name == "fstat" ||
×
981
        func_name == "lstat" || func_name == "fstatat" ||
×
982
        func_name == "__xstat" || func_name == "__xstat64" ||
×
983
        func_name == "__lxstat" || func_name == "__lxstat64" ||
×
984
        func_name == "__fxstat" || func_name == "__fxstat64" ||
×
985
        func_name == "access" || func_name == "lseek" ||
×
986
        func_name == "lseek64" || func_name == "fseek" ||
×
987
        func_name == "ftell" || func_name == "seek" || func_name == "fcntl" ||
×
988
        func_name == "ftruncate" || func_name == "mkdir" ||
×
989
        func_name == "rmdir" || func_name == "unlink" ||
×
990
        func_name == "remove" || func_name == "rename" || func_name == "link" ||
×
991
        func_name == "readlink" || func_name == "opendir" ||
×
992
        func_name == "closedir" || func_name == "readdir") {
76!
993
        return IOCategory::METADATA;
51✔
994
    }
995
    return IOCategory::OTHER;
×
996
}
997

998
inline char* fast_itoa(std::uint64_t val, char* buf) {
999
    char* p = buf;
1000
    do {
1001
        *p++ = '0' + (val % 10);
1002
        val /= 10;
1003
    } while (val);
1004
    std::reverse(buf, p);
1005
    return p;
1006
}
1007

1008
class HashResolver {
1009
   public:
1010
    HashResolver(
43✔
1011
        const std::unordered_map<std::string, std::string>* file_hashes,
1012
        const std::unordered_map<std::string, std::string>* host_hashes)
1013
        : file_hashes_(file_hashes), host_hashes_(host_hashes) {
43✔
1014
        if (file_hashes_) {
43!
1015
            for (const auto& [hash, name] : *file_hashes_) {
108✔
1016
                auto hash_sv = intern_.intern(hash);
68!
1017
                auto name_sv = intern_.intern(name);
66!
1018
                file_map_[hash_sv] = name_sv;
68!
1019
            }
1020
        }
1021
        if (host_hashes_) {
44!
1022
            for (const auto& [hash, name] : *host_hashes_) {
112✔
1023
                auto hash_sv = intern_.intern(hash);
65!
1024
                auto name_sv = intern_.intern(name);
68!
1025
                host_map_[hash_sv] = name_sv;
66!
1026
            }
1027
        }
1028
    }
44✔
1029

1030
    // Unresolved hashes resolve to empty (not the hash itself): the
1031
    // dfanalyzer side treats empty file_name/host_name as missing (NA).
1032
    std::string_view resolve_file(std::string_view hash) {
325✔
1033
        if (hash.empty()) return hash;
325!
1034
        auto it = file_map_.find(intern_.intern(hash));
323!
1035
        return it != file_map_.end() ? it->second : std::string_view{};
640!
1036
    }
1037

1038
    std::string_view resolve_host(std::string_view hash) {
320✔
1039
        if (hash.empty()) return hash;
320!
1040
        auto it = host_map_.find(intern_.intern(hash));
315!
1041
        return it != host_map_.end() ? it->second : std::string_view{};
652!
1042
    }
1043

1044
    std::string_view intern(std::string_view sv) { return intern_.intern(sv); }
354✔
1045

1046
   private:
1047
    const std::unordered_map<std::string, std::string>* file_hashes_;
1048
    const std::unordered_map<std::string, std::string>* host_hashes_;
1049
    dftracer::utils::StringIntern intern_;
1050
    std::unordered_map<std::string_view, std::string_view> file_map_;
1051
    std::unordered_map<std::string_view, std::string_view> host_map_;
1052
};
1053

1054
struct ProcKey {
1055
    std::string_view hhash;
1056
    std::uint64_t pid;
1057
    std::uint64_t tid;
1058
    bool operator==(const ProcKey& o) const {
191✔
1059
        return hhash == o.hhash && pid == o.pid && tid == o.tid;
191!
1060
    }
1061
};
1062

1063
struct ProcKeyHash {
1064
    std::size_t operator()(const ProcKey& k) const {
454✔
1065
        return std::hash<std::string_view>{}(k.hhash) ^
454✔
1066
               (std::hash<std::uint64_t>{}(k.pid) << 1) ^
450✔
1067
               (std::hash<std::uint64_t>{}(k.tid) << 2);
458✔
1068
    }
1069
};
1070

1071
static const std::vector<ColumnSpec> DFANALYZER_SCHEMA = {
1072
    {"cat", ColumnType::DICT_STRING},
1073
    {"func_name", ColumnType::DICT_STRING},
1074
    {"pid", ColumnType::INT64},
1075
    {"tid", ColumnType::INT64},
1076
    {"file_hash", ColumnType::DICT_STRING},
1077
    {"host_hash", ColumnType::DICT_STRING},
1078
    {"file_name", ColumnType::DICT_STRING},
1079
    {"host_name", ColumnType::DICT_STRING},
1080
    {"proc_name", ColumnType::DICT_STRING},
1081
    {"io_cat", ColumnType::INT64},
1082
    {"acc_pat", ColumnType::INT64},
1083
    {"count", ColumnType::INT64},
1084
    {"time", ColumnType::DOUBLE},
1085
    {"size", ColumnType::INT64},
1086
    {"time_min", ColumnType::DOUBLE},
1087
    {"time_max", ColumnType::DOUBLE},
1088
    {"size_min", ColumnType::INT64},
1089
    {"size_max", ColumnType::INT64},
1090
    {"offset_min", ColumnType::INT64},
1091
    {"offset_max", ColumnType::INT64},
1092
    {"time_range", ColumnType::INT64},
1093
    {"time_start", ColumnType::INT64},
1094
    {"time_end", ColumnType::INT64},
1095
};
1096

1097
enum GroupByField : std::uint32_t {
1098
    GB_CAT = 1u << 0,
1099
    GB_FUNC_NAME = 1u << 1,
1100
    GB_PID = 1u << 2,
1101
    GB_TID = 1u << 3,
1102
    GB_FILE_HASH = 1u << 4,
1103
    GB_HOST_HASH = 1u << 5,
1104
    GB_FILE_NAME = 1u << 6,
1105
    GB_HOST_NAME = 1u << 7,
1106
    GB_PROC_NAME = 1u << 8,
1107
    GB_IO_CAT = 1u << 9,
1108
    GB_ACC_PAT = 1u << 10,
1109
    GB_TIME_RANGE = 1u << 11,
1110
};
1111

1112
struct GroupByConfig {
1113
    std::uint32_t mask = 0;
1114
    std::vector<GroupByField> order;
1115
    std::vector<std::string> names;  // matches `order`, used for schema
1116
};
1117

1118
inline std::optional<GroupByField> parse_group_by_name(std::string_view name) {
×
1119
    if (name == "cat") return GB_CAT;
×
1120
    if (name == "func_name") return GB_FUNC_NAME;
×
1121
    if (name == "pid") return GB_PID;
×
1122
    if (name == "tid") return GB_TID;
×
1123
    if (name == "file_hash") return GB_FILE_HASH;
×
1124
    if (name == "host_hash") return GB_HOST_HASH;
×
1125
    if (name == "file_name") return GB_FILE_NAME;
×
1126
    if (name == "host_name") return GB_HOST_NAME;
×
1127
    if (name == "proc_name") return GB_PROC_NAME;
×
1128
    if (name == "io_cat") return GB_IO_CAT;
×
1129
    if (name == "acc_pat") return GB_ACC_PAT;
×
UNCOV
1130
    if (name == "time_range") return GB_TIME_RANGE;
×
UNCOV
1131
    return std::nullopt;
×
1132
}
1133

1134
struct CoarseKey {
1135
    std::string_view cat;
1136
    std::string_view func_name;
1137
    std::uint64_t pid = 0;
1138
    std::uint64_t tid = 0;
1139
    std::string_view file_hash;
1140
    std::string_view host_hash;
1141
    std::string_view file_name;
1142
    std::string_view host_name;
1143
    std::string_view proc_name;
1144
    std::int64_t io_cat = 0;
1145
    std::int64_t acc_pat = 0;
1146
    std::int64_t time_range = 0;
1147

1148
    bool operator==(const CoarseKey& o) const {
×
1149
        return cat == o.cat && func_name == o.func_name && pid == o.pid &&
×
1150
               tid == o.tid && file_hash == o.file_hash &&
×
1151
               host_hash == o.host_hash && file_name == o.file_name &&
×
1152
               host_name == o.host_name && proc_name == o.proc_name &&
×
UNCOV
1153
               io_cat == o.io_cat && acc_pat == o.acc_pat &&
×
UNCOV
1154
               time_range == o.time_range;
×
1155
    }
1156
};
1157

1158
struct CoarseKeyHash {
1159
    std::size_t operator()(const CoarseKey& k) const {
×
UNCOV
1160
        auto combine = [](std::size_t h, std::size_t v) {
×
1161
            return h ^ (v + 0x9e3779b97f4a7c15ULL + (h << 6) + (h >> 2));
×
1162
        };
1163
        std::size_t h = std::hash<std::string_view>{}(k.cat);
×
1164
        h = combine(h, std::hash<std::string_view>{}(k.func_name));
×
1165
        h = combine(h, std::hash<std::uint64_t>{}(k.pid));
×
1166
        h = combine(h, std::hash<std::uint64_t>{}(k.tid));
×
1167
        h = combine(h, std::hash<std::string_view>{}(k.file_hash));
×
1168
        h = combine(h, std::hash<std::string_view>{}(k.host_hash));
×
1169
        h = combine(h, std::hash<std::string_view>{}(k.file_name));
×
1170
        h = combine(h, std::hash<std::string_view>{}(k.host_name));
×
1171
        h = combine(h, std::hash<std::string_view>{}(k.proc_name));
×
1172
        h = combine(h, std::hash<std::int64_t>{}(k.io_cat));
×
1173
        h = combine(h, std::hash<std::int64_t>{}(k.acc_pat));
×
UNCOV
1174
        h = combine(h, std::hash<std::int64_t>{}(k.time_range));
×
UNCOV
1175
        return h;
×
1176
    }
1177
};
1178

1179
struct CoarseMetrics {
1180
    std::uint64_t count = 0;
1181
    double time_sum = 0.0;
1182
    double time_sq_sum = 0.0;
1183
    double time_min_val = std::numeric_limits<double>::infinity();
1184
    double time_max_val = -std::numeric_limits<double>::infinity();
1185
    double time_call_min_val = std::numeric_limits<double>::infinity();
1186
    double time_call_max_val = -std::numeric_limits<double>::infinity();
1187
    std::uint64_t size_sum = 0;
1188
    double size_sq_sum = 0.0;
1189
    std::uint64_t size_min_val = std::numeric_limits<std::uint64_t>::max();
1190
    std::uint64_t size_max_val = 0;
1191
    std::uint64_t size_call_min_val = std::numeric_limits<std::uint64_t>::max();
1192
    std::uint64_t size_call_max_val = 0;
1193
    bool has_size = false;
1194
    std::uint64_t time_start_val = std::numeric_limits<std::uint64_t>::max();
1195
    std::uint64_t time_end_val = 0;
1196
    bool has_time_bounds = false;
1197
};
1198

1199
inline std::vector<ColumnSpec> make_coarse_schema(const GroupByConfig& cfg) {
×
1200
    std::vector<ColumnSpec> specs;
×
1201
    specs.reserve(cfg.order.size() + 16);
×
1202
    for (std::size_t i = 0; i < cfg.order.size(); ++i) {
×
1203
        GroupByField f = cfg.order[i];
×
UNCOV
1204
        const std::string& name = cfg.names[i];
×
UNCOV
1205
        switch (f) {
×
UNCOV
1206
            case GB_CAT:
×
1207
            case GB_FUNC_NAME:
1208
            case GB_FILE_HASH:
1209
            case GB_HOST_HASH:
1210
            case GB_FILE_NAME:
1211
            case GB_HOST_NAME:
1212
            case GB_PROC_NAME:
UNCOV
1213
                specs.push_back({name, ColumnType::DICT_STRING});
×
UNCOV
1214
                break;
×
UNCOV
1215
            case GB_PID:
×
1216
            case GB_TID:
1217
            case GB_IO_CAT:
1218
            case GB_ACC_PAT:
1219
            case GB_TIME_RANGE:
UNCOV
1220
                specs.push_back({name, ColumnType::INT64});
×
UNCOV
1221
                break;
×
1222
        }
1223
    }
1224
    specs.push_back({"count", ColumnType::INT64});
×
1225
    specs.push_back({"time", ColumnType::DOUBLE});
×
1226
    specs.push_back({"size", ColumnType::INT64});
×
1227
    specs.push_back({"time_sq", ColumnType::DOUBLE});
×
1228
    specs.push_back({"size_sq", ColumnType::DOUBLE});
×
1229
    specs.push_back({"time_min", ColumnType::DOUBLE});
×
1230
    specs.push_back({"time_max", ColumnType::DOUBLE});
×
1231
    specs.push_back({"size_min", ColumnType::INT64});
×
1232
    specs.push_back({"size_max", ColumnType::INT64});
×
1233
    specs.push_back({"time_call_min", ColumnType::DOUBLE});
×
1234
    specs.push_back({"time_call_max", ColumnType::DOUBLE});
×
1235
    specs.push_back({"size_call_min", ColumnType::INT64});
×
1236
    specs.push_back({"size_call_max", ColumnType::INT64});
×
1237
    specs.push_back({"time_start", ColumnType::INT64});
×
1238
    specs.push_back({"time_end", ColumnType::INT64});
×
UNCOV
1239
    return specs;
×
UNCOV
1240
}
×
1241

1242
struct DfanalyzerScanInput {
1243
    const EventAggregator* agg;
1244
    const DfanalyzerContext* ctx;
1245
    std::optional<AggMapType> type_filter;
1246
    Py_ssize_t batch_size;
1247
    std::uint16_t shard_begin;
1248
    std::uint16_t shard_end;
1249
    const GroupByConfig* group_by = nullptr;  // null = full granularity
1250
};
1251

1252
struct DfanalyzerScanOutput {
1253
    std::vector<ArrowExportResult> events;
1254
    std::vector<ArrowExportResult> profiles;
1255
    std::vector<ArrowExportResult> system;
1256
};
1257

1258
DfanalyzerScanOutput scan_dfanalyzer_shards(DfanalyzerScanInput input) {
39✔
1259
    DfanalyzerScanOutput output;
39✔
1260

1261
    const bool coarse = input.group_by != nullptr;
44✔
1262
    const std::vector<ColumnSpec> coarse_schema =
1263
        coarse ? make_coarse_schema(*input.group_by)
44✔
1264
               : std::vector<ColumnSpec>{};
44!
1265

1266
    auto make_builder = [&]() {
102✔
1267
        RecordBatchBuilder b;
102✔
1268
        if (coarse) {
103!
UNCOV
1269
            b.declare_schema(coarse_schema);
×
1270
        } else {
1271
            b.declare_schema(DFANALYZER_SCHEMA);
103!
1272
        }
1273
        b.reserve(static_cast<std::size_t>(input.batch_size));
111!
1274
        return b;
115✔
UNCOV
1275
    };
×
1276

1277
    RecordBatchBuilder event_builder, profile_builder, system_builder;
42!
1278
    bool use_events =
1279
        !input.type_filter || *input.type_filter == AggMapType::EVENT;
41!
1280
    bool use_profiles =
1281
        !input.type_filter || *input.type_filter == AggMapType::PROFILE;
40!
1282
    bool use_system =
1283
        !input.type_filter || *input.type_filter == AggMapType::SYSTEM;
41!
1284

1285
    if (use_events) event_builder = make_builder();
41!
1286
    if (use_profiles) profile_builder = make_builder();
37!
1287
    if (use_system) system_builder = make_builder();
43!
1288

1289
    auto bucket_width_us = static_cast<std::uint64_t>(
45✔
1290
        input.ctx->time_granularity * input.ctx->time_resolution);
45✔
1291
    std::size_t event_count = 0, profile_count = 0, system_count = 0;
45✔
1292

1293
    HashResolver resolver(input.ctx->file_hashes, input.ctx->host_hashes);
45!
1294
    std::unordered_map<ProcKey, std::string, ProcKeyHash> proc_name_cache;
44✔
1295
    std::unordered_map<std::string_view, IOCategory> io_cat_cache;
42✔
1296

1297
    std::unordered_map<CoarseKey, CoarseMetrics, CoarseKeyHash> event_coarse,
42✔
1298
        profile_coarse, system_coarse;
42✔
1299

1300
    auto flush_builder = [&](RecordBatchBuilder& builder, std::size_t& count,
115✔
1301
                             std::vector<ArrowExportResult>& results) {
1302
        if (count > 0) {
115✔
1303
            auto arrow = builder.finish();
39!
1304
            if (arrow.valid()) {
40!
1305
                results.push_back(std::move(arrow));
40!
1306
            }
1307
            builder.reset(true);
38!
1308
            builder.reserve(static_cast<std::size_t>(input.batch_size));
40!
1309
            count = 0;
39✔
1310
        }
39✔
1311
    };
114✔
1312

1313
    auto append_row = [&](RecordBatchBuilder& builder, std::size_t& count,
327✔
1314
                          std::vector<ArrowExportResult>& results,
1315
                          const AggKeyView& kv, const AggMetricsView& mv,
1316
                          std::string_view file_name,
1317
                          std::string_view host_name,
1318
                          std::string_view proc_name, IOCategory io_cat) {
1319
        std::size_t ci = 0;
327✔
1320
        builder.append_dict_string(ci++, kv.cat);
327✔
1321
        builder.append_dict_string(ci++, kv.name);
330✔
1322
        builder.append_int64(ci++, static_cast<std::int64_t>(kv.pid));
328✔
1323
        builder.append_int64(ci++, static_cast<std::int64_t>(kv.tid));
323✔
1324
        builder.append_dict_string(ci++, kv.fhash);
326✔
1325
        builder.append_dict_string(ci++, kv.hhash);
326✔
1326
        builder.append_dict_string(ci++, file_name);
330✔
1327
        builder.append_dict_string(ci++, host_name);
330✔
1328
        builder.append_dict_string(ci++, proc_name);
330✔
1329
        builder.append_int64(ci++, static_cast<std::int64_t>(io_cat));
329✔
1330
        builder.append_int64(ci++, 0);
327✔
1331

1332
        builder.append_int64(ci++, static_cast<std::int64_t>(mv.count));
328✔
1333
        builder.append_double(ci++, static_cast<double>(mv.dur_total) /
328✔
1334
                                        input.ctx->time_resolution);
328✔
1335

1336
        if (mv.size_total > 0) {
327✔
1337
            builder.append_int64(ci++,
243✔
1338
                                 static_cast<std::int64_t>(mv.size_total));
243✔
1339
        } else {
1340
            builder.append_null(ci++);
84✔
1341
        }
1342

1343
        builder.append_double(ci++, mv.count > 0
654!
1344
                                        ? static_cast<double>(mv.dur_min) /
327✔
1345
                                              input.ctx->time_resolution
327✔
1346
                                        : 0.0);
1347
        builder.append_double(ci++, mv.count > 0
654!
1348
                                        ? static_cast<double>(mv.dur_max) /
327✔
1349
                                              input.ctx->time_resolution
327✔
1350
                                        : 0.0);
1351

1352
        if (mv.size_total > 0 && mv.count > 0) {
327!
1353
            builder.append_int64(ci++, static_cast<std::int64_t>(mv.size_min));
244✔
1354
            builder.append_int64(ci++, static_cast<std::int64_t>(mv.size_max));
245✔
1355
        } else {
1356
            builder.append_null(ci++);
83✔
1357
            builder.append_null(ci++);
84✔
1358
        }
1359

1360
        // offset_min > offset_max only when no offset was ever recorded
1361
        // (MetricStats default min=UINT64_MAX, max=0); 0 is a valid offset.
1362
        if (mv.offset_min <= mv.offset_max) {
328!
1363
            builder.append_int64(ci++,
328✔
1364
                                 static_cast<std::int64_t>(mv.offset_min));
328✔
1365
            builder.append_int64(ci++,
327✔
1366
                                 static_cast<std::int64_t>(mv.offset_max));
327✔
1367
        } else {
UNCOV
1368
            builder.append_null(ci++);
×
UNCOV
1369
            builder.append_null(ci++);
×
1370
        }
1371

1372
        auto time_range = bucket_width_us > 0
984✔
1373
                              ? static_cast<std::int64_t>(
328!
1374
                                    (kv.time_bucket - input.ctx->time_origin) /
328✔
1375
                                    bucket_width_us)
328✔
1376
                              : 0;
1377
        builder.append_int64(ci++, time_range);
328✔
1378
        // Counter (profile) rows align to the bucket grid: time_start is the
1379
        // bucket start, time_end one bucket later. Plain events keep the
1380
        // precise min/max event timestamps.
1381
        if (kv.map_type == AggMapType::PROFILE) {
328!
UNCOV
1382
            auto bucket_start = static_cast<std::int64_t>(
×
UNCOV
1383
                kv.time_bucket - input.ctx->time_origin);
×
UNCOV
1384
            builder.append_int64(ci++, bucket_start);
×
1385
            builder.append_int64(ci++, bucket_start + static_cast<std::int64_t>(
×
1386
                                                          bucket_width_us));
×
1387
        } else {
1388
            builder.append_int64(ci++, static_cast<std::int64_t>(
328✔
1389
                                           mv.ts - input.ctx->time_origin));
328✔
1390
            builder.append_int64(ci++, static_cast<std::int64_t>(
329✔
1391
                                           mv.te - input.ctx->time_origin));
329✔
1392
        }
1393
        builder.end_row();
328✔
1394

1395
        count++;
330✔
1396
        if (static_cast<Py_ssize_t>(count) >= input.batch_size) {
330!
1397
            flush_builder(builder, count, results);
×
1398
        }
1399
    };
373✔
1400

1401
    auto accumulate_coarse =
1402
        [&](std::unordered_map<CoarseKey, CoarseMetrics, CoarseKeyHash>& map,
×
1403
            const AggKeyView& kv, const AggMetricsView& mv,
1404
            std::string_view file_name, std::string_view host_name,
1405
            std::string_view proc_name, IOCategory io_cat) {
UNCOV
1406
            const auto& cfg = *input.group_by;
×
1407
            // Probe with non-interned views; hash/equality compare by content,
1408
            // so string_view lifetime doesn't matter for lookup. We only copy
1409
            // (intern) on first insert.
UNCOV
1410
            CoarseKey probe;
×
UNCOV
1411
            if (cfg.mask & GB_CAT) probe.cat = kv.cat;
×
UNCOV
1412
            if (cfg.mask & GB_FUNC_NAME) probe.func_name = kv.name;
×
1413
            if (cfg.mask & GB_PID) probe.pid = kv.pid;
×
1414
            if (cfg.mask & GB_TID) probe.tid = kv.tid;
×
1415
            if (cfg.mask & GB_FILE_HASH) probe.file_hash = kv.fhash;
×
1416
            if (cfg.mask & GB_HOST_HASH) probe.host_hash = kv.hhash;
×
1417
            if (cfg.mask & GB_FILE_NAME) probe.file_name = file_name;
×
1418
            if (cfg.mask & GB_HOST_NAME) probe.host_name = host_name;
×
1419
            if (cfg.mask & GB_PROC_NAME) probe.proc_name = proc_name;
×
1420
            if (cfg.mask & GB_IO_CAT)
×
1421
                probe.io_cat = static_cast<std::int64_t>(io_cat);
×
1422
            if (cfg.mask & GB_TIME_RANGE) {
×
UNCOV
1423
                probe.time_range =
×
1424
                    bucket_width_us > 0
×
1425
                        ? static_cast<std::int64_t>(
×
1426
                              (kv.time_bucket - input.ctx->time_origin) /
×
1427
                              bucket_width_us)
×
1428
                        : 0;
1429
            }
1430
            // acc_pat is always 0 today; included for completeness.
1431

1432
            auto it = map.find(probe);
×
1433
            if (it == map.end()) {
×
1434
                // First sighting: promote views referencing unstable DB buffers
1435
                // to interned copies. file_name/host_name come from the
1436
                // resolver's intern pool, and proc_name from proc_name_cache;
1437
                // both already stable across iterations, no copy needed.
1438
                CoarseKey stable = probe;
×
UNCOV
1439
                if (cfg.mask & GB_CAT) stable.cat = resolver.intern(kv.cat);
×
1440
                if (cfg.mask & GB_FUNC_NAME)
×
1441
                    stable.func_name = resolver.intern(kv.name);
×
1442
                if (cfg.mask & GB_FILE_HASH)
×
1443
                    stable.file_hash = resolver.intern(kv.fhash);
×
1444
                if (cfg.mask & GB_HOST_HASH)
×
1445
                    stable.host_hash = resolver.intern(kv.hhash);
×
1446
                auto [nit, _] = map.emplace(std::move(stable), CoarseMetrics{});
×
1447
                it = nit;
×
1448
            }
1449
            CoarseMetrics& m = it->second;
×
1450
            m.count += mv.count;
×
1451
            double time_val =
×
1452
                static_cast<double>(mv.dur_total) / input.ctx->time_resolution;
×
1453
            m.time_sum += time_val;
×
UNCOV
1454
            m.time_sq_sum += time_val * time_val;
×
UNCOV
1455
            if (time_val < m.time_call_min_val) m.time_call_min_val = time_val;
×
1456
            if (time_val > m.time_call_max_val) m.time_call_max_val = time_val;
×
1457
            if (mv.count > 0) {
×
1458
                double dur_min_v = static_cast<double>(mv.dur_min) /
×
1459
                                   input.ctx->time_resolution;
×
1460
                double dur_max_v = static_cast<double>(mv.dur_max) /
×
1461
                                   input.ctx->time_resolution;
×
UNCOV
1462
                if (dur_min_v < m.time_min_val) m.time_min_val = dur_min_v;
×
1463
                if (dur_max_v > m.time_max_val) m.time_max_val = dur_max_v;
×
1464
            }
UNCOV
1465
            if (mv.size_total > 0) {
×
UNCOV
1466
                m.has_size = true;
×
UNCOV
1467
                m.size_sum += mv.size_total;
×
UNCOV
1468
                double sz = static_cast<double>(mv.size_total);
×
UNCOV
1469
                m.size_sq_sum += sz * sz;
×
UNCOV
1470
                if (mv.size_total < m.size_call_min_val)
×
UNCOV
1471
                    m.size_call_min_val = mv.size_total;
×
1472
                if (mv.size_total > m.size_call_max_val)
×
UNCOV
1473
                    m.size_call_max_val = mv.size_total;
×
UNCOV
1474
                if (mv.count > 0) {
×
UNCOV
1475
                    if (mv.size_min < m.size_min_val)
×
UNCOV
1476
                        m.size_min_val = mv.size_min;
×
UNCOV
1477
                    if (mv.size_max > m.size_max_val)
×
UNCOV
1478
                        m.size_max_val = mv.size_max;
×
1479
                }
1480
            }
UNCOV
1481
            if (mv.ts >= input.ctx->time_origin) {
×
1482
                m.has_time_bounds = true;
×
UNCOV
1483
                auto ts_off = mv.ts - input.ctx->time_origin;
×
1484
                auto te_off = mv.te - input.ctx->time_origin;
×
UNCOV
1485
                if (ts_off < m.time_start_val) m.time_start_val = ts_off;
×
1486
                if (te_off > m.time_end_val) m.time_end_val = te_off;
×
1487
            }
UNCOV
1488
        };
×
1489

UNCOV
1490
    input.agg->scan_shard_range_raw(
×
1491
        input.shard_begin, input.shard_end,
43!
1492
        [&](std::string_view key_bytes, std::string_view val_bytes) -> bool {
405✔
1493
            AggKeyView kv;
405✔
1494
            if (!parse_agg_key_view(key_bytes, kv)) return true;
393!
1495

1496
            if (input.type_filter && kv.map_type != *input.type_filter)
401!
UNCOV
1497
                return true;
×
1498

1499
            if (input.ctx->query_filter) {
396✔
1500
                auto& q = *input.ctx->query_filter;
257✔
1501
                dftracer::utils::utilities::common::query::ValueMap fields;
257!
1502
                if (q.references("cat")) fields["cat"] = std::string(kv.cat);
260!
1503
                if (q.references("name")) fields["name"] = std::string(kv.name);
255!
1504
                if (q.references("pid")) fields["pid"] = kv.pid;
260!
1505
                if (q.references("tid")) fields["tid"] = kv.tid;
244!
1506
                if (q.references("hhash"))
257!
UNCOV
1507
                    fields["hhash"] = std::string(kv.hhash);
×
1508
                if (q.references("fhash"))
261!
UNCOV
1509
                    fields["fhash"] = std::string(kv.fhash);
×
1510
                if (q.references("time_bucket"))
262!
UNCOV
1511
                    fields["time_bucket"] = kv.time_bucket;
×
1512
                if (!q.evaluate(fields)) return true;
258!
1513
            }
263✔
1514

1515
            AggMetricsView mv;
1516
            if (!parse_agg_value_view(val_bytes, mv)) return true;
320!
1517

1518
            auto file_name = resolver.resolve_file(kv.fhash);
324!
1519
            auto host_name = resolver.resolve_host(kv.hhash);
319!
1520

1521
            ProcKey pk{kv.hhash, kv.pid, kv.tid};
326✔
1522
            auto proc_it = proc_name_cache.find(pk);
326!
1523
            std::string_view proc_name;
325✔
1524
            if (proc_it != proc_name_cache.end()) {
325✔
1525
                proc_name = proc_it->second;
191✔
1526
            } else {
1527
                std::string pn = "app#";
133!
1528
                if (!host_name.empty()) {
134!
1529
                    pn.append(host_name);
134!
1530
                } else if (!kv.hhash.empty()) {
×
UNCOV
1531
                    pn.append(kv.hhash);
×
1532
                } else {
1533
                    pn.append("unknown");
×
1534
                }
1535
                pn.push_back('#');
134!
1536
                pn.append(std::to_string(kv.pid));
131!
1537
                pn.push_back('#');
132!
1538
                pn.append(std::to_string(kv.tid));
131!
1539
                ProcKey stable_pk{resolver.intern(kv.hhash), kv.pid, kv.tid};
135!
1540
                auto [it, _] =
134✔
1541
                    proc_name_cache.emplace(stable_pk, std::move(pn));
136!
1542
                proc_name = it->second;
131✔
1543
            }
132✔
1544

1545
            auto io_it = io_cat_cache.find(kv.name);
325!
1546
            IOCategory io_cat;
1547
            if (io_it != io_cat_cache.end()) {
324✔
1548
                io_cat = io_it->second;
109✔
1549
            } else {
1550
                io_cat = get_io_category(kv.name);
213✔
1551
                io_cat_cache[resolver.intern(kv.name)] = io_cat;
219!
1552
            }
1553

1554
            if (coarse) {
325!
1555
                switch (kv.map_type) {
×
1556
                    case AggMapType::EVENT:
×
UNCOV
1557
                        if (use_events)
×
1558
                            accumulate_coarse(event_coarse, kv, mv, file_name,
×
1559
                                              host_name, proc_name, io_cat);
1560
                        break;
×
UNCOV
1561
                    case AggMapType::PROFILE:
×
UNCOV
1562
                        if (use_profiles)
×
1563
                            accumulate_coarse(profile_coarse, kv, mv, file_name,
×
1564
                                              host_name, proc_name, io_cat);
UNCOV
1565
                        break;
×
UNCOV
1566
                    case AggMapType::SYSTEM:
×
UNCOV
1567
                        if (use_system)
×
UNCOV
1568
                            accumulate_coarse(system_coarse, kv, mv, file_name,
×
1569
                                              host_name, proc_name, io_cat);
1570
                        break;
×
1571
                }
1572
            } else {
1573
                switch (kv.map_type) {
325!
1574
                    case AggMapType::EVENT:
326✔
1575
                        append_row(event_builder, event_count, output.events,
326!
1576
                                   kv, mv, file_name, host_name, proc_name,
1577
                                   io_cat);
1578
                        break;
330✔
UNCOV
1579
                    case AggMapType::PROFILE:
×
1580
                        append_row(profile_builder, profile_count,
×
1581
                                   output.profiles, kv, mv, file_name,
×
1582
                                   host_name, proc_name, io_cat);
1583
                        break;
×
1584
                    case AggMapType::SYSTEM:
×
UNCOV
1585
                        append_row(system_builder, system_count, output.system,
×
1586
                                   kv, mv, file_name, host_name, proc_name,
1587
                                   io_cat);
1588
                        break;
×
1589
                }
1590
            }
1591
            return true;
329✔
1592
        });
1593

1594
    if (coarse) {
44!
1595
        const auto& cfg = *input.group_by;
×
UNCOV
1596
        auto flush_coarse = [&](std::unordered_map<CoarseKey, CoarseMetrics,
×
1597
                                                   CoarseKeyHash>& map,
1598
                                RecordBatchBuilder& builder, std::size_t& count,
1599
                                std::vector<ArrowExportResult>& results) {
1600
            for (auto& [key, m] : map) {
×
1601
                std::size_t ci = 0;
×
UNCOV
1602
                for (std::size_t i = 0; i < cfg.order.size(); ++i) {
×
1603
                    switch (cfg.order[i]) {
×
1604
                        case GB_CAT:
×
UNCOV
1605
                            builder.append_dict_string(ci++, key.cat);
×
1606
                            break;
×
1607
                        case GB_FUNC_NAME:
×
UNCOV
1608
                            builder.append_dict_string(ci++, key.func_name);
×
1609
                            break;
×
1610
                        case GB_PID:
×
UNCOV
1611
                            builder.append_int64(
×
1612
                                ci++, static_cast<std::int64_t>(key.pid));
×
1613
                            break;
×
UNCOV
1614
                        case GB_TID:
×
1615
                            builder.append_int64(
×
1616
                                ci++, static_cast<std::int64_t>(key.tid));
×
UNCOV
1617
                            break;
×
UNCOV
1618
                        case GB_FILE_HASH:
×
1619
                            builder.append_dict_string(ci++, key.file_hash);
×
1620
                            break;
×
1621
                        case GB_HOST_HASH:
×
1622
                            builder.append_dict_string(ci++, key.host_hash);
×
1623
                            break;
×
UNCOV
1624
                        case GB_FILE_NAME:
×
1625
                            builder.append_dict_string(ci++, key.file_name);
×
UNCOV
1626
                            break;
×
1627
                        case GB_HOST_NAME:
×
1628
                            builder.append_dict_string(ci++, key.host_name);
×
1629
                            break;
×
UNCOV
1630
                        case GB_PROC_NAME:
×
1631
                            builder.append_dict_string(ci++, key.proc_name);
×
UNCOV
1632
                            break;
×
1633
                        case GB_IO_CAT:
×
1634
                            builder.append_int64(ci++, key.io_cat);
×
1635
                            break;
×
1636
                        case GB_ACC_PAT:
×
1637
                            builder.append_int64(ci++, key.acc_pat);
×
1638
                            break;
×
1639
                        case GB_TIME_RANGE:
×
UNCOV
1640
                            builder.append_int64(ci++, key.time_range);
×
1641
                            break;
×
1642
                    }
1643
                }
1644
                builder.append_int64(ci++, static_cast<std::int64_t>(m.count));
×
1645
                builder.append_double(ci++, m.time_sum);
×
1646
                if (m.has_size) {
×
1647
                    builder.append_int64(ci++,
×
1648
                                         static_cast<std::int64_t>(m.size_sum));
×
1649
                } else {
1650
                    builder.append_null(ci++);
×
1651
                }
1652
                builder.append_double(ci++, m.time_sq_sum);
×
UNCOV
1653
                if (m.has_size) {
×
1654
                    builder.append_double(ci++, m.size_sq_sum);
×
1655
                } else {
UNCOV
1656
                    builder.append_null(ci++);
×
1657
                }
1658
                builder.append_double(ci++, m.count > 0 ? m.time_min_val : 0.0);
×
1659
                builder.append_double(ci++, m.count > 0 ? m.time_max_val : 0.0);
×
UNCOV
1660
                if (m.has_size) {
×
1661
                    builder.append_int64(
×
1662
                        ci++, static_cast<std::int64_t>(m.size_min_val));
×
1663
                    builder.append_int64(
×
UNCOV
1664
                        ci++, static_cast<std::int64_t>(m.size_max_val));
×
1665
                } else {
1666
                    builder.append_null(ci++);
×
1667
                    builder.append_null(ci++);
×
1668
                }
UNCOV
1669
                builder.append_double(ci++,
×
UNCOV
1670
                                      m.count > 0 ? m.time_call_min_val : 0.0);
×
1671
                builder.append_double(ci++,
×
1672
                                      m.count > 0 ? m.time_call_max_val : 0.0);
×
1673
                if (m.has_size) {
×
1674
                    builder.append_int64(
×
1675
                        ci++, static_cast<std::int64_t>(m.size_call_min_val));
×
1676
                    builder.append_int64(
×
1677
                        ci++, static_cast<std::int64_t>(m.size_call_max_val));
×
1678
                } else {
1679
                    builder.append_null(ci++);
×
1680
                    builder.append_null(ci++);
×
1681
                }
UNCOV
1682
                builder.append_int64(
×
UNCOV
1683
                    ci++, m.has_time_bounds
×
UNCOV
1684
                              ? static_cast<std::int64_t>(m.time_start_val)
×
1685
                              : 0);
UNCOV
1686
                builder.append_int64(
×
UNCOV
1687
                    ci++, m.has_time_bounds
×
UNCOV
1688
                              ? static_cast<std::int64_t>(m.time_end_val)
×
1689
                              : 0);
UNCOV
1690
                builder.end_row();
×
UNCOV
1691
                ++count;
×
UNCOV
1692
                if (static_cast<Py_ssize_t>(count) >= input.batch_size) {
×
UNCOV
1693
                    flush_builder(builder, count, results);
×
1694
                }
1695
            }
1696
            flush_builder(builder, count, results);
×
UNCOV
1697
        };
×
UNCOV
1698
        if (use_events)
×
1699
            flush_coarse(event_coarse, event_builder, event_count,
×
1700
                         output.events);
×
UNCOV
1701
        if (use_profiles)
×
1702
            flush_coarse(profile_coarse, profile_builder, profile_count,
×
UNCOV
1703
                         output.profiles);
×
1704
        if (use_system)
×
UNCOV
1705
            flush_coarse(system_coarse, system_builder, system_count,
×
UNCOV
1706
                         output.system);
×
1707
    } else {
1708
        if (use_events)
44!
1709
            flush_builder(event_builder, event_count, output.events);
44!
1710
        if (use_profiles)
44✔
1711
            flush_builder(profile_builder, profile_count, output.profiles);
36!
1712
        if (use_system)
44✔
1713
            flush_builder(system_builder, system_count, output.system);
36!
1714
    }
1715

1716
    return output;
88✔
1717
}
44✔
1718

1719
// Two-pass scan over SYSTEM_METRICS CF: pass 1 discovers metric column names
1720
// (dynamic per workload), pass 2 emits rows. Needed because RecordBatchBuilder
1721
// requires the schema up front.
1722
std::vector<ArrowExportResult> scan_system_metrics_buffer(
9✔
1723
    const EventAggregator* agg, const DfanalyzerContext* ctx,
1724
    Py_ssize_t batch_size) {
1725
    std::vector<ArrowExportResult> results;
9✔
1726
    if (!agg) return results;
9!
1727

1728
    std::vector<std::string> metric_names_ordered;
9✔
1729
    std::unordered_set<std::string> metric_name_seen;
9✔
1730
    agg->scan_system_metrics_raw(
9!
NEW
1731
        [&](std::string_view, std::string_view val_bytes) -> bool {
×
NEW
1732
            auto m = deserialize_system_value(val_bytes);
×
NEW
1733
            if (m.metrics) {
×
NEW
1734
                for (const auto& [name, _] : *m.metrics) {
×
NEW
1735
                    if (metric_name_seen.insert(name).second) {
×
NEW
1736
                        metric_names_ordered.push_back(name);
×
1737
                    }
1738
                }
1739
            }
NEW
1740
            return true;
×
NEW
1741
        });
×
1742

1743
    if (metric_names_ordered.empty()) return results;
9!
1744

1745
    // SystemAggregationMetrics::metrics is an unordered_map; sort the
1746
    // discovered column names so the emitted Arrow schema is deterministic
1747
    // across runs and builds.
NEW
1748
    std::sort(metric_names_ordered.begin(), metric_names_ordered.end());
×
1749

1750
    std::vector<ColumnSpec> schema;
×
1751
    schema.reserve(6 + metric_names_ordered.size());
×
1752
    schema.push_back({"host_hash", ColumnType::DICT_STRING});
×
1753
    schema.push_back({"name", ColumnType::DICT_STRING});
×
UNCOV
1754
    schema.push_back({"time_bucket", ColumnType::INT64});
×
UNCOV
1755
    schema.push_back({"ts", ColumnType::INT64});
×
1756
    schema.push_back({"te", ColumnType::INT64});
×
1757
    schema.push_back({"count", ColumnType::INT64});
×
1758
    for (const auto& mn : metric_names_ordered) {
×
1759
        schema.push_back({mn, ColumnType::DOUBLE});
×
1760
    }
1761

1762
    RecordBatchBuilder builder;
×
UNCOV
1763
    builder.declare_schema(schema);
×
1764
    builder.reserve(static_cast<std::size_t>(batch_size));
×
1765

UNCOV
1766
    auto flush = [&](std::size_t& row_count) {
×
UNCOV
1767
        if (row_count == 0) return;
×
1768
        auto arrow = builder.finish();
×
UNCOV
1769
        if (arrow.valid()) results.push_back(std::move(arrow));
×
1770
        builder.reset(true);
×
UNCOV
1771
        builder.reserve(static_cast<std::size_t>(batch_size));
×
1772
        row_count = 0;
×
1773
    };
×
1774

UNCOV
1775
    std::size_t row_count = 0;
×
UNCOV
1776
    const std::size_t n_metric_cols = metric_names_ordered.size();
×
1777

UNCOV
1778
    agg->scan_system_metrics_raw(
×
UNCOV
1779
        [&](std::string_view key_bytes, std::string_view val_bytes) -> bool {
×
UNCOV
1780
            auto k = deserialize_system_key(key_bytes);
×
UNCOV
1781
            auto m = deserialize_system_value(val_bytes);
×
1782

UNCOV
1783
            std::size_t ci = 0;
×
UNCOV
1784
            builder.append_dict_string(ci++, k.key.hhash);
×
UNCOV
1785
            builder.append_dict_string(ci++, k.key.name);
×
UNCOV
1786
            builder.append_int64(ci++,
×
UNCOV
1787
                                 static_cast<std::int64_t>(k.key.time_bucket));
×
UNCOV
1788
            builder.append_int64(ci++, static_cast<std::int64_t>(m.ts));
×
1789
            builder.append_int64(ci++, static_cast<std::int64_t>(m.te));
×
UNCOV
1790
            builder.append_int64(ci++, static_cast<std::int64_t>(m.count));
×
1791

UNCOV
1792
            for (std::size_t i = 0; i < n_metric_cols; ++i) {
×
UNCOV
1793
                const auto& mn = metric_names_ordered[i];
×
UNCOV
1794
                bool present = false;
×
UNCOV
1795
                if (m.metrics) {
×
UNCOV
1796
                    auto it = m.metrics->find(mn);
×
UNCOV
1797
                    if (it != m.metrics->end()) {
×
UNCOV
1798
                        builder.append_double(ci++, it->second.mean);
×
UNCOV
1799
                        present = true;
×
1800
                    }
1801
                }
UNCOV
1802
                if (!present) builder.append_null(ci++);
×
1803
            }
UNCOV
1804
            builder.end_row();
×
UNCOV
1805
            row_count++;
×
UNCOV
1806
            if (static_cast<Py_ssize_t>(row_count) >= batch_size) {
×
UNCOV
1807
                flush(row_count);
×
1808
            }
UNCOV
1809
            return true;
×
UNCOV
1810
        });
×
UNCOV
1811
    flush(row_count);
×
1812

1813
    (void)ctx;
UNCOV
1814
    return results;
×
1815
}
9✔
1816

1817
}  // namespace
1818

UNCOV
1819
static PyObject* Indexer_iter_aggregation(IndexerObject* self, PyObject* args,
×
1820
                                          PyObject* kwds) {
1821
    static const char* kwlist[] = {"type", "batch_size", nullptr};
UNCOV
1822
    const char* type_str = "events";
×
UNCOV
1823
    Py_ssize_t batch_size = 10000;
×
1824

UNCOV
1825
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|sn", (char**)kwlist,
×
1826
                                     &type_str, &batch_size)) {
UNCOV
1827
        return nullptr;
×
1828
    }
1829

1830
    AggMapType target_type;
UNCOV
1831
    if (!parse_agg_type_str(type_str, target_type)) return nullptr;
×
1832

1833
    AggregationBatchType batch_type;
UNCOV
1834
    if (target_type == AggMapType::EVENT)
×
UNCOV
1835
        batch_type = AggregationBatchType::EVENT;
×
UNCOV
1836
    else if (target_type == AggMapType::PROFILE)
×
UNCOV
1837
        batch_type = AggregationBatchType::PROFILE;
×
1838
    else
UNCOV
1839
        batch_type = AggregationBatchType::SYSTEM;
×
1840

UNCOV
1841
    auto idx_opt = resolve_index_path(self);
×
UNCOV
1842
    if (!idx_opt) return nullptr;
×
UNCOV
1843
    std::string index_path = std::move(*idx_opt);
×
1844

UNCOV
1845
    PyObject* batch_list = PyList_New(0);
×
UNCOV
1846
    if (!batch_list) return nullptr;
×
1847

UNCOV
1848
    std::string error_msg;
×
1849
    std::vector<dftracer::utils::utilities::common::arrow::ArrowExportResult>
UNCOV
1850
        results;
×
1851

UNCOV
1852
    Py_BEGIN_ALLOW_THREADS try {
×
UNCOV
1853
        auto handle = open_agg_db(index_path, error_msg);
×
UNCOV
1854
        if (handle) {
×
UNCOV
1855
            Runtime* rt = get_batch_indexer_runtime(self);
×
1856
            std::vector<AggScanOutput> outputs;
×
1857
            parallel_shard_scan<AggScanOutput>(
×
1858
                rt,
UNCOV
1859
                [&](std::uint16_t shard_begin, std::uint16_t shard_end) {
×
1860
                    AggScanInput input;
UNCOV
1861
                    input.agg = handle->agg.get();
×
1862
                    input.target_type = target_type;
×
1863
                    input.batch_type = batch_type;
×
UNCOV
1864
                    input.batch_size = batch_size;
×
UNCOV
1865
                    input.shard_begin = shard_begin;
×
UNCOV
1866
                    input.shard_end = shard_end;
×
UNCOV
1867
                    return scan_aggregation_shard_range(input);
×
1868
                },
1869
                outputs);
1870

UNCOV
1871
            for (auto& out : outputs) {
×
UNCOV
1872
                for (auto& r : out.results) {
×
UNCOV
1873
                    results.push_back(std::move(r));
×
1874
                }
1875
            }
1876
        }
×
UNCOV
1877
    } catch (const std::exception& e) {
×
1878
        error_msg = e.what();
×
UNCOV
1879
    }
×
1880
    Py_END_ALLOW_THREADS
×
1881

1882
        if (!error_msg.empty()) {
×
1883
        Py_DECREF(batch_list);
1884
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
UNCOV
1885
        return nullptr;
×
1886
    }
1887

1888
    append_results_to_list(batch_list, results);
×
1889

1890
    PyObject* iter = PyObject_GetIter(batch_list);
×
1891
    Py_DECREF(batch_list);
1892
    return iter;
×
UNCOV
1893
}
×
1894

1895
static PyObject* Indexer_iter_arrow_dfanalyzer(IndexerObject* self,
2✔
1896
                                               PyObject* args, PyObject* kwds) {
1897
    static const char* kwlist[] = {
1898
        "type",  "batch_size", "time_granularity", "time_resolution",
1899
        "query", nullptr};
1900
    const char* type_str = "events";
2✔
1901
    Py_ssize_t batch_size = 10000;
2✔
1902
    double time_granularity = 1.0;
2✔
1903
    double time_resolution = 1000000.0;
2✔
1904
    const char* query_str = nullptr;
2✔
1905

1906
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|snddz", (char**)kwlist,
2!
1907
                                     &type_str, &batch_size, &time_granularity,
1908
                                     &time_resolution, &query_str)) {
UNCOV
1909
        return nullptr;
×
1910
    }
1911

1912
    AggMapType target_type;
1913
    if (!parse_agg_type_str(type_str, target_type)) return nullptr;
2!
1914

1915
    auto query_opt = parse_query_arg(query_str);
2!
1916
    if (!query_opt && PyErr_Occurred()) return nullptr;
2!
1917

1918
    auto idx_opt = resolve_index_path(self);
2!
1919
    if (!idx_opt) return nullptr;
2!
1920
    std::string index_path = std::move(*idx_opt);
2✔
1921

1922
    PyObject* batch_list = PyList_New(0);
2!
1923
    if (!batch_list) return nullptr;
2!
1924

1925
    std::string error_msg;
2✔
1926
    std::vector<ArrowExportResult> results;
2✔
1927

1928
    Py_BEGIN_ALLOW_THREADS try {
2!
1929
        auto handle = open_agg_db(index_path, error_msg);
2!
1930
        if (handle) {
2!
1931
            dftracer::utils::utilities::indexer::IndexDatabase idx_db(
1932
                index_path,
1933
                dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
2!
1934
            auto file_hashes =
1935
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
1936
                                            IndexDatabase::HashType::FILE);
2!
1937
            auto host_hashes =
1938
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
1939
                                            IndexDatabase::HashType::HOST);
2!
1940

1941
            auto time_bounds = handle->agg->query_time_bounds();
2!
1942
            std::uint64_t time_origin =
2✔
1943
                time_bounds.valid ? time_bounds.min_time_bucket : 0;
2!
1944

1945
            DfanalyzerContext ctx;
2✔
1946
            ctx.file_hashes = &file_hashes;
2✔
1947
            ctx.host_hashes = &host_hashes;
2✔
1948
            ctx.query_filter = query_opt ? &*query_opt : nullptr;
2!
1949
            ctx.time_origin = time_origin;
2✔
1950
            ctx.time_resolution = time_resolution;
2✔
1951
            ctx.time_granularity = time_granularity;
2✔
1952

1953
            Runtime* rt = get_batch_indexer_runtime(self);
2!
1954
            std::vector<DfanalyzerScanOutput> outputs;
2✔
1955
            parallel_shard_scan<DfanalyzerScanOutput>(
2!
1956
                rt,
1957
                [&](std::uint16_t shard_begin, std::uint16_t shard_end) {
8✔
1958
                    DfanalyzerScanInput input;
8✔
1959
                    input.agg = handle->agg.get();
7✔
1960
                    input.ctx = &ctx;
7✔
1961
                    input.type_filter = target_type;
7✔
1962
                    input.batch_size = batch_size;
8✔
1963
                    input.shard_begin = shard_begin;
8✔
1964
                    input.shard_end = shard_end;
8✔
1965
                    return scan_dfanalyzer_shards(input);
16!
1966
                },
1967
                outputs);
1968

1969
            for (auto& out : outputs) {
10✔
1970
                for (auto& r : out.events) results.push_back(std::move(r));
16!
1971
                for (auto& r : out.profiles) results.push_back(std::move(r));
8!
1972
                for (auto& r : out.system) results.push_back(std::move(r));
8!
1973
            }
1974
        }
2✔
1975
    } catch (const std::exception& e) {
2!
UNCOV
1976
        error_msg = e.what();
×
UNCOV
1977
    }
×
1978
    Py_END_ALLOW_THREADS
2!
1979

1980
        if (!error_msg.empty()) {
2!
1981
        Py_DECREF(batch_list);
UNCOV
1982
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
UNCOV
1983
        return nullptr;
×
1984
    }
1985

1986
    append_results_to_list(batch_list, results);
2!
1987

1988
    PyObject* iter = PyObject_GetIter(batch_list);
2!
1989
    Py_DECREF(batch_list);
1990
    return iter;
2✔
1991
}
2✔
1992

1993
static bool parse_group_by_arg(PyObject* obj, GroupByConfig& out) {
9✔
1994
    if (!obj || obj == Py_None) return true;
9!
UNCOV
1995
    if (!PySequence_Check(obj)) {
×
UNCOV
1996
        PyErr_SetString(PyExc_TypeError,
×
1997
                        "group_by must be a sequence of strings or None");
UNCOV
1998
        return false;
×
1999
    }
UNCOV
2000
    Py_ssize_t n = PySequence_Length(obj);
×
UNCOV
2001
    for (Py_ssize_t i = 0; i < n; ++i) {
×
UNCOV
2002
        PyObject* item = PySequence_GetItem(obj, i);
×
UNCOV
2003
        if (!item) return false;
×
UNCOV
2004
        if (!PyUnicode_Check(item)) {
×
2005
            Py_DECREF(item);
UNCOV
2006
            PyErr_SetString(PyExc_TypeError,
×
2007
                            "group_by entries must be strings");
UNCOV
2008
            return false;
×
2009
        }
UNCOV
2010
        Py_ssize_t sz = 0;
×
2011
        const char* s = PyUnicode_AsUTF8AndSize(item, &sz);
×
UNCOV
2012
        if (!s) {
×
2013
            Py_DECREF(item);
UNCOV
2014
            return false;
×
2015
        }
2016
        std::string_view sv(s, static_cast<std::size_t>(sz));
×
UNCOV
2017
        auto field = parse_group_by_name(sv);
×
UNCOV
2018
        if (!field) {
×
UNCOV
2019
            std::string msg = "unsupported group_by field: ";
×
UNCOV
2020
            msg.append(sv);
×
2021
            Py_DECREF(item);
UNCOV
2022
            PyErr_SetString(PyExc_ValueError, msg.c_str());
×
UNCOV
2023
            return false;
×
2024
        }
×
2025
        if (!(out.mask & *field)) {
×
UNCOV
2026
            out.mask |= *field;
×
UNCOV
2027
            out.order.push_back(*field);
×
UNCOV
2028
            out.names.emplace_back(sv);
×
2029
        }
2030
        Py_DECREF(item);
2031
    }
UNCOV
2032
    return true;
×
2033
}
2034

2035
static PyObject* Indexer_iter_arrow_dfanalyzer_all(IndexerObject* self,
10✔
2036
                                                   PyObject* args,
2037
                                                   PyObject* kwds) {
2038
    static const char* kwlist[] = {"batch_size",      "time_granularity",
2039
                                   "time_resolution", "query",
2040
                                   "group_by",        nullptr};
2041
    Py_ssize_t batch_size = 10000;
10✔
2042
    double time_granularity = 1.0;
10✔
2043
    double time_resolution = 1000000.0;
10✔
2044
    const char* query_str = nullptr;
10✔
2045
    PyObject* group_by_obj = nullptr;
10✔
2046

2047
    if (!PyArg_ParseTupleAndKeywords(
10!
2048
            args, kwds, "|nddzO", (char**)kwlist, &batch_size,
2049
            &time_granularity, &time_resolution, &query_str, &group_by_obj)) {
UNCOV
2050
        return nullptr;
×
2051
    }
2052

2053
    auto query_opt = parse_query_arg(query_str);
10!
2054
    if (!query_opt && PyErr_Occurred()) return nullptr;
10!
2055

2056
    GroupByConfig group_by_cfg;
9✔
2057
    if (!parse_group_by_arg(group_by_obj, group_by_cfg)) return nullptr;
9!
2058
    const GroupByConfig* group_by_ptr =
9✔
2059
        group_by_cfg.mask != 0 ? &group_by_cfg : nullptr;
9!
2060

2061
    auto idx_opt = resolve_index_path(self);
9!
2062
    if (!idx_opt) return nullptr;
9!
2063
    std::string index_path = std::move(*idx_opt);
9✔
2064

2065
    PyObject* result_dict = PyDict_New();
9!
2066
    if (!result_dict) return nullptr;
9!
2067

2068
    PyObject* events_list = PyList_New(0);
9!
2069
    PyObject* profiles_list = PyList_New(0);
9!
2070
    PyObject* system_list = PyList_New(0);
9!
2071
    if (!events_list || !profiles_list || !system_list) {
9!
2072
        Py_XDECREF(events_list);
×
2073
        Py_XDECREF(profiles_list);
×
UNCOV
2074
        Py_XDECREF(system_list);
×
2075
        Py_DECREF(result_dict);
2076
        return nullptr;
×
2077
    }
2078

2079
    std::string error_msg;
9✔
2080
    std::vector<ArrowExportResult> events_results, profiles_results,
9✔
2081
        system_results;
9✔
2082

2083
    Py_BEGIN_ALLOW_THREADS try {
9!
2084
        auto handle = open_agg_db(index_path, error_msg);
9!
2085
        if (handle) {
9!
2086
            dftracer::utils::utilities::indexer::IndexDatabase idx_db(
2087
                index_path,
2088
                dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
9!
2089
            auto file_hashes =
2090
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
2091
                                            IndexDatabase::HashType::FILE);
9!
2092
            auto host_hashes =
2093
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
2094
                                            IndexDatabase::HashType::HOST);
9!
2095

2096
            auto time_bounds = handle->agg->query_time_bounds();
9!
2097
            std::uint64_t time_origin =
9✔
2098
                time_bounds.valid ? time_bounds.min_time_bucket : 0;
9!
2099

2100
            DfanalyzerContext ctx;
9✔
2101
            ctx.file_hashes = &file_hashes;
9✔
2102
            ctx.host_hashes = &host_hashes;
9✔
2103
            ctx.query_filter = query_opt ? &*query_opt : nullptr;
9✔
2104
            ctx.time_origin = time_origin;
9✔
2105
            ctx.time_resolution = time_resolution;
9✔
2106
            ctx.time_granularity = time_granularity;
9✔
2107

2108
            Runtime* rt = get_batch_indexer_runtime(self);
9!
2109
            std::vector<DfanalyzerScanOutput> outputs;
9✔
2110
            parallel_shard_scan<DfanalyzerScanOutput>(
9!
2111
                rt,
2112
                [&](std::uint16_t shard_begin, std::uint16_t shard_end) {
35✔
2113
                    DfanalyzerScanInput input;
35✔
2114
                    input.agg = handle->agg.get();
36✔
2115
                    input.ctx = &ctx;
33✔
2116
                    input.type_filter = std::nullopt;
33✔
2117
                    input.batch_size = batch_size;
34✔
2118
                    input.shard_begin = shard_begin;
34✔
2119
                    input.shard_end = shard_end;
34✔
2120
                    input.group_by = group_by_ptr;
34✔
2121
                    return scan_dfanalyzer_shards(input);
70!
2122
                },
2123
                outputs);
2124

2125
            for (auto& out : outputs) {
45✔
2126
                for (auto& r : out.events)
68✔
2127
                    events_results.push_back(std::move(r));
32!
2128
                for (auto& r : out.profiles)
36!
2129
                    profiles_results.push_back(std::move(r));
×
2130
                for (auto& r : out.system)
36!
2131
                    system_results.push_back(std::move(r));
×
2132
            }
2133

2134
            auto sys_buf =
2135
                scan_system_metrics_buffer(handle->agg.get(), &ctx, batch_size);
9!
2136
            for (auto& r : sys_buf) system_results.push_back(std::move(r));
9!
2137
        }
9✔
2138
    } catch (const std::exception& e) {
9!
UNCOV
2139
        error_msg = e.what();
×
2140
    }
×
2141
    Py_END_ALLOW_THREADS
9!
2142

2143
        if (!error_msg.empty()) {
9!
2144
        Py_DECREF(events_list);
2145
        Py_DECREF(profiles_list);
2146
        Py_DECREF(system_list);
2147
        Py_DECREF(result_dict);
2148
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
UNCOV
2149
        return nullptr;
×
2150
    }
2151

2152
    append_results_to_list(events_list, events_results);
9!
2153
    append_results_to_list(profiles_list, profiles_results);
9!
2154
    append_results_to_list(system_list, system_results);
9!
2155

2156
    PyDict_SetItemString(result_dict, "events", events_list);
9!
2157
    PyDict_SetItemString(result_dict, "profiles", profiles_list);
9!
2158
    PyDict_SetItemString(result_dict, "system", system_list);
9!
2159
    Py_DECREF(events_list);
2160
    Py_DECREF(profiles_list);
2161
    Py_DECREF(system_list);
2162

2163
    return result_dict;
9✔
2164
}
10✔
2165

2166
// ---------------------------------------------------------------------------
2167
// scan_aggregation_manifest — module-level entry point for analyze_trace.
2168
//
2169
// Each Dask worker calls this with its slice of the agg manifest
2170
// (agg_ssts + sys_ssts) and optionally a [shard_begin, shard_end) range.
2171
// The function opens a scratch IndexDatabase at `scratch_dir`, ingests the
2172
// SSTs into its AGGREGATION/SYSTEM_METRICS CFs (nearly free when SSTs live
2173
// on the same filesystem as `scratch_dir` — RocksDB hard-links them), then
2174
// runs the same parallel shard scan that `iter_arrow_dfanalyzer_all` uses.
2175
//
2176
// AGG_GLOBAL_CONFIG_KEY is not written by worker SSTs, so we construct the
2177
// EventAggregator with config_hash=0 directly instead of going through
2178
// `open_agg_db` (which requires the config key). The config hash is used
2179
// by the aggregator only for write-time validation, not for reads.
2180
//
2181
// The scratch DB is NOT cleaned up here — the Python caller owns
2182
// `scratch_dir` lifetime and should remove it after gathering results.
2183
// ---------------------------------------------------------------------------
2184

2185
static bool collect_string_list(PyObject* obj, const char* name,
×
2186
                                std::vector<std::string>& out) {
2187
    if (!obj || obj == Py_None) return true;
×
UNCOV
2188
    PyObject* seq = PySequence_Fast(obj, name);
×
UNCOV
2189
    if (!seq) return false;
×
2190
    Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
2191
    out.reserve(static_cast<std::size_t>(n));
×
2192
    for (Py_ssize_t i = 0; i < n; ++i) {
×
2193
        PyObject* item = PySequence_Fast_GET_ITEM(seq, i);
×
2194
        if (!PyUnicode_Check(item)) {
×
2195
            Py_DECREF(seq);
2196
            PyErr_Format(PyExc_TypeError, "%s items must be str", name);
×
2197
            return false;
×
2198
        }
UNCOV
2199
        const char* s = PyUnicode_AsUTF8(item);
×
2200
        if (!s) {
×
2201
            Py_DECREF(seq);
UNCOV
2202
            return false;
×
2203
        }
2204
        out.emplace_back(s);
×
2205
    }
2206
    Py_DECREF(seq);
2207
    return true;
×
2208
}
2209

UNCOV
2210
static bool collect_string_string_dict(
×
2211
    PyObject* obj, const char* name,
2212
    std::unordered_map<std::string, std::string>& out) {
2213
    if (!obj || obj == Py_None) return true;
×
2214
    if (!PyDict_Check(obj)) {
×
UNCOV
2215
        PyErr_Format(PyExc_TypeError, "%s must be a dict[str, str] or None",
×
2216
                     name);
2217
        return false;
×
2218
    }
2219
    PyObject *k, *v;
2220
    Py_ssize_t pos = 0;
×
UNCOV
2221
    while (PyDict_Next(obj, &pos, &k, &v)) {
×
2222
        if (!PyUnicode_Check(k) || !PyUnicode_Check(v)) {
×
2223
            PyErr_Format(PyExc_TypeError, "%s must map str -> str", name);
×
2224
            return false;
×
2225
        }
2226
        const char* ks = PyUnicode_AsUTF8(k);
×
UNCOV
2227
        const char* vs = PyUnicode_AsUTF8(v);
×
2228
        if (!ks || !vs) return false;
×
2229
        out.emplace(ks, vs);
×
2230
    }
2231
    return true;
×
2232
}
2233

2234
static PyObject* scan_aggregation_manifest_fn(PyObject* /*self*/,
×
2235
                                              PyObject* args, PyObject* kwds) {
2236
    static const char* kwlist[] = {
2237
        "agg_ssts",        "sys_ssts",    "scratch_dir",
2238
        "meta_index_path", "batch_size",  "time_granularity",
2239
        "time_resolution", "query",       "group_by",
2240
        "shard_begin",     "shard_end",   "runtime",
2241
        "file_hashes",     "host_hashes", nullptr};
2242

2243
    PyObject* agg_ssts_obj = nullptr;
×
UNCOV
2244
    PyObject* sys_ssts_obj = nullptr;
×
2245
    const char* scratch_dir = nullptr;
×
UNCOV
2246
    const char* meta_index_path = nullptr;
×
2247
    Py_ssize_t batch_size = 10000;
×
UNCOV
2248
    double time_granularity = 1.0;
×
UNCOV
2249
    double time_resolution = 1000000.0;
×
UNCOV
2250
    const char* query_str = nullptr;
×
UNCOV
2251
    PyObject* group_by_obj = nullptr;
×
UNCOV
2252
    int shard_begin_i = 0;
×
UNCOV
2253
    int shard_end_i = DFT_NUM_SHARDS;
×
UNCOV
2254
    PyObject* runtime_obj = nullptr;
×
2255
    PyObject* file_hashes_obj = nullptr;
×
2256
    PyObject* host_hashes_obj = nullptr;
×
2257

2258
    if (!PyArg_ParseTupleAndKeywords(
×
2259
            args, kwds, "OOss|nddzOiiOOO", (char**)kwlist, &agg_ssts_obj,
2260
            &sys_ssts_obj, &scratch_dir, &meta_index_path, &batch_size,
2261
            &time_granularity, &time_resolution, &query_str, &group_by_obj,
2262
            &shard_begin_i, &shard_end_i, &runtime_obj, &file_hashes_obj,
2263
            &host_hashes_obj)) {
2264
        return nullptr;
×
2265
    }
2266

2267
    if (shard_begin_i < 0 || shard_end_i > DFT_NUM_SHARDS ||
×
UNCOV
2268
        shard_begin_i >= shard_end_i) {
×
UNCOV
2269
        PyErr_Format(PyExc_ValueError,
×
2270
                     "shard range [%d, %d) invalid (must be within [0, %d))",
2271
                     shard_begin_i, shard_end_i, (int)DFT_NUM_SHARDS);
UNCOV
2272
        return nullptr;
×
2273
    }
2274

2275
    std::vector<std::string> agg_ssts;
×
UNCOV
2276
    std::vector<std::string> sys_ssts;
×
2277
    if (!collect_string_list(agg_ssts_obj, "agg_ssts", agg_ssts))
×
2278
        return nullptr;
×
2279
    if (!collect_string_list(sys_ssts_obj, "sys_ssts", sys_ssts))
×
UNCOV
2280
        return nullptr;
×
2281

2282
    std::unordered_map<std::string, std::string> preloaded_file_hashes;
×
2283
    std::unordered_map<std::string, std::string> preloaded_host_hashes;
×
2284
    const bool hashes_preloaded =
×
2285
        (file_hashes_obj && file_hashes_obj != Py_None) ||
×
2286
        (host_hashes_obj && host_hashes_obj != Py_None);
×
2287
    if (!collect_string_string_dict(file_hashes_obj, "file_hashes",
×
2288
                                    preloaded_file_hashes))
2289
        return nullptr;
×
2290
    if (!collect_string_string_dict(host_hashes_obj, "host_hashes",
×
2291
                                    preloaded_host_hashes))
UNCOV
2292
        return nullptr;
×
2293

2294
    auto query_opt = parse_query_arg(query_str);
×
2295
    if (!query_opt && PyErr_Occurred()) return nullptr;
×
2296

2297
    GroupByConfig group_by_cfg;
×
2298
    if (!parse_group_by_arg(group_by_obj, group_by_cfg)) return nullptr;
×
2299
    const GroupByConfig* group_by_ptr =
×
2300
        group_by_cfg.mask != 0 ? &group_by_cfg : nullptr;
×
2301

2302
    Runtime* rt = nullptr;
×
UNCOV
2303
    if (runtime_obj && runtime_obj != Py_None) {
×
UNCOV
2304
        if (!PyObject_TypeCheck(runtime_obj, &RuntimeType)) {
×
2305
            PyErr_SetString(PyExc_TypeError,
×
2306
                            "runtime must be a Runtime instance or None");
2307
            return nullptr;
×
2308
        }
2309
        rt = ((RuntimeObject*)runtime_obj)->runtime.get();
×
2310
    } else {
2311
        rt = get_default_runtime();
×
2312
    }
2313

UNCOV
2314
    PyObject* result_dict = PyDict_New();
×
UNCOV
2315
    if (!result_dict) return nullptr;
×
2316
    PyObject* events_list = PyList_New(0);
×
UNCOV
2317
    PyObject* profiles_list = PyList_New(0);
×
UNCOV
2318
    PyObject* system_list = PyList_New(0);
×
UNCOV
2319
    if (!events_list || !profiles_list || !system_list) {
×
UNCOV
2320
        Py_XDECREF(events_list);
×
UNCOV
2321
        Py_XDECREF(profiles_list);
×
UNCOV
2322
        Py_XDECREF(system_list);
×
2323
        Py_DECREF(result_dict);
2324
        return nullptr;
×
2325
    }
2326

2327
    std::string error_msg;
×
2328
    std::vector<ArrowExportResult> events_results, profiles_results,
×
2329
        system_results;
×
UNCOV
2330
    std::string scratch_index_path = std::string(scratch_dir) + "/.dftindex";
×
2331
    std::string meta_index_path_str(meta_index_path);
×
2332

UNCOV
2333
    Py_BEGIN_ALLOW_THREADS try {
×
2334
        namespace rcf = dftracer::utils::rocksdb::cf;
2335
        using clock = std::chrono::steady_clock;
2336
        auto ms = [](clock::time_point a, clock::time_point b) -> long long {
×
2337
            return std::chrono::duration_cast<std::chrono::milliseconds>(b - a)
×
UNCOV
2338
                .count();
×
2339
        };
2340

2341
        auto t_start = clock::now();
×
2342
        dftracer::utils::utilities::indexer::IndexDatabase scratch_db(
UNCOV
2343
            scratch_index_path);
×
2344
        auto t_scratch_open = clock::now();
×
2345

2346
        auto raw_db = scratch_db.db();
×
UNCOV
2347
        for (const auto& p : agg_ssts) {
×
UNCOV
2348
            auto st = raw_db->ingest_external_files(rcf::AGGREGATION, {p},
×
UNCOV
2349
                                                    /*ingest_behind=*/false);
×
UNCOV
2350
            if (!st.ok()) {
×
2351
                error_msg =
2352
                    "ingest AGGREGATION sst '" + p + "': " + st.ToString();
×
UNCOV
2353
                break;
×
2354
            }
UNCOV
2355
        }
×
UNCOV
2356
        if (error_msg.empty()) {
×
UNCOV
2357
            for (const auto& p : sys_ssts) {
×
UNCOV
2358
                auto st = raw_db->ingest_external_files(
×
UNCOV
2359
                    rcf::SYSTEM_METRICS, {p}, /*ingest_behind=*/false);
×
UNCOV
2360
                if (!st.ok()) {
×
UNCOV
2361
                    error_msg = "ingest SYSTEM_METRICS sst '" + p +
×
UNCOV
2362
                                "': " + st.ToString();
×
UNCOV
2363
                    break;
×
2364
                }
UNCOV
2365
            }
×
2366
        }
UNCOV
2367
        auto t_ingest = clock::now();
×
2368

UNCOV
2369
        if (error_msg.empty()) {
×
2370
            auto agg =
UNCOV
2371
                std::make_unique<EventAggregator>(raw_db, /*cfg_hash=*/0);
×
2372

2373
            // If the caller passed pre-loaded hash tables, skip opening
2374
            // the meta DB on lustre. When many dask workers run
2375
            // scan_aggregation_manifest in parallel, loading the hash
2376
            // tables N times from the same file is significant lustre
2377
            // metadata pressure; loading once on the coordinator and
2378
            // passing them in eliminates the redundant reads.
UNCOV
2379
            std::unordered_map<std::string, std::string> loaded_file_hashes;
×
UNCOV
2380
            std::unordered_map<std::string, std::string> loaded_host_hashes;
×
2381
            std::unique_ptr<dftracer::utils::utilities::indexer::IndexDatabase>
UNCOV
2382
                meta_db;
×
UNCOV
2383
            if (!hashes_preloaded) {
×
2384
                meta_db = std::make_unique<
UNCOV
2385
                    dftracer::utils::utilities::indexer::IndexDatabase>(
×
2386
                    meta_index_path_str, dftracer::utils::rocksdb::
UNCOV
2387
                                             RocksDatabase::OpenMode::ReadOnly);
×
UNCOV
2388
                loaded_file_hashes = meta_db->query_hash_table(
×
2389
                    dftracer::utils::utilities::indexer::IndexDatabase::
UNCOV
2390
                        HashType::FILE);
×
UNCOV
2391
                loaded_host_hashes = meta_db->query_hash_table(
×
2392
                    dftracer::utils::utilities::indexer::IndexDatabase::
UNCOV
2393
                        HashType::HOST);
×
2394
            }
UNCOV
2395
            const auto& file_hashes =
×
2396
                hashes_preloaded ? preloaded_file_hashes : loaded_file_hashes;
UNCOV
2397
            const auto& host_hashes =
×
2398
                hashes_preloaded ? preloaded_host_hashes : loaded_host_hashes;
UNCOV
2399
            auto t_hash_tables = clock::now();
×
2400

UNCOV
2401
            auto time_bounds = agg->query_time_bounds();
×
UNCOV
2402
            std::uint64_t time_origin =
×
UNCOV
2403
                time_bounds.valid ? time_bounds.min_time_bucket : 0;
×
2404

UNCOV
2405
            DfanalyzerContext ctx;
×
UNCOV
2406
            ctx.file_hashes = &file_hashes;
×
UNCOV
2407
            ctx.host_hashes = &host_hashes;
×
UNCOV
2408
            ctx.query_filter = query_opt ? &*query_opt : nullptr;
×
UNCOV
2409
            ctx.time_origin = time_origin;
×
UNCOV
2410
            ctx.time_resolution = time_resolution;
×
UNCOV
2411
            ctx.time_granularity = time_granularity;
×
2412

UNCOV
2413
            std::vector<DfanalyzerScanOutput> outputs;
×
UNCOV
2414
            parallel_shard_scan_range<DfanalyzerScanOutput>(
×
2415
                rt, static_cast<std::uint16_t>(shard_begin_i),
2416
                static_cast<std::uint16_t>(shard_end_i),
UNCOV
2417
                [&](std::uint16_t sb, std::uint16_t se) {
×
UNCOV
2418
                    DfanalyzerScanInput input;
×
UNCOV
2419
                    input.agg = agg.get();
×
UNCOV
2420
                    input.ctx = &ctx;
×
UNCOV
2421
                    input.type_filter = std::nullopt;
×
UNCOV
2422
                    input.batch_size = batch_size;
×
UNCOV
2423
                    input.shard_begin = sb;
×
UNCOV
2424
                    input.shard_end = se;
×
UNCOV
2425
                    input.group_by = group_by_ptr;
×
UNCOV
2426
                    return scan_dfanalyzer_shards(input);
×
2427
                },
2428
                outputs);
UNCOV
2429
            auto t_scan = clock::now();
×
2430

UNCOV
2431
            for (auto& out : outputs) {
×
UNCOV
2432
                for (auto& r : out.events)
×
UNCOV
2433
                    events_results.push_back(std::move(r));
×
UNCOV
2434
                for (auto& r : out.profiles)
×
UNCOV
2435
                    profiles_results.push_back(std::move(r));
×
UNCOV
2436
                for (auto& r : out.system)
×
UNCOV
2437
                    system_results.push_back(std::move(r));
×
2438
            }
2439

UNCOV
2440
            std::fprintf(
×
2441
                stderr,
2442
                "[scan_aggregation_manifest] n_agg=%zu n_sys=%zu "
2443
                "scratch_open=%lldms ingest=%lldms hash_tables=%lldms "
2444
                "scan=%lldms\n",
2445
                agg_ssts.size(), sys_ssts.size(), ms(t_start, t_scratch_open),
2446
                ms(t_scratch_open, t_ingest), ms(t_ingest, t_hash_tables),
2447
                ms(t_hash_tables, t_scan));
UNCOV
2448
            std::fflush(stderr);
×
UNCOV
2449
        }
×
UNCOV
2450
    } catch (const std::exception& e) {
×
UNCOV
2451
        error_msg = e.what();
×
UNCOV
2452
    }
×
UNCOV
2453
    Py_END_ALLOW_THREADS
×
2454

UNCOV
2455
        if (!error_msg.empty()) {
×
2456
        Py_DECREF(events_list);
2457
        Py_DECREF(profiles_list);
2458
        Py_DECREF(system_list);
2459
        Py_DECREF(result_dict);
UNCOV
2460
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
UNCOV
2461
        return nullptr;
×
2462
    }
2463

UNCOV
2464
    append_results_to_list(events_list, events_results);
×
UNCOV
2465
    append_results_to_list(profiles_list, profiles_results);
×
UNCOV
2466
    append_results_to_list(system_list, system_results);
×
2467

UNCOV
2468
    PyDict_SetItemString(result_dict, "events", events_list);
×
UNCOV
2469
    PyDict_SetItemString(result_dict, "profiles", profiles_list);
×
UNCOV
2470
    PyDict_SetItemString(result_dict, "system", system_list);
×
2471
    Py_DECREF(events_list);
2472
    Py_DECREF(profiles_list);
2473
    Py_DECREF(system_list);
2474

UNCOV
2475
    return result_dict;
×
UNCOV
2476
}
×
2477

2478
static PyMethodDef BatchIndexerModuleMethods[] = {
2479
    {"scan_aggregation_manifest", (PyCFunction)scan_aggregation_manifest_fn,
2480
     METH_VARARGS | METH_KEYWORDS,
2481
     "scan_aggregation_manifest(agg_ssts, sys_ssts, scratch_dir, "
2482
     "meta_index_path, batch_size=10000, time_granularity=1.0, "
2483
     "time_resolution=1e6, query=None, group_by=None, shard_begin=0, "
2484
     "shard_end=4096, runtime=None) -> dict\n"
2485
     "--\n\n"
2486
     "Scan a worker's slice of the distributed aggregation manifest.\n\n"
2487
     "Ingests agg_ssts + sys_ssts into a scratch IndexDatabase at "
2488
     "scratch_dir (caller owns the directory lifecycle) and runs the "
2489
     "dfanalyzer aggregation scan over [shard_begin, shard_end). "
2490
     "meta_index_path is the unified .dftindex used to resolve file / "
2491
     "host hashes. Returns the same dict shape as "
2492
     "Indexer.iter_arrow_dfanalyzer_all."},
2493
    {nullptr, nullptr, 0, nullptr}};
2494
#endif
2495

2496
static PyMethodDef Indexer_methods[] = {
2497
    {"get_checkpoint_indexer", (PyCFunction)Indexer_get_checkpoint_indexer,
2498
     METH_VARARGS,
2499
     "get_checkpoint_indexer(file_path)\n"
2500
     "--\n\n"
2501
     "Get a checkpoint indexer for a specific file.\n\n"
2502
     "Args:\n"
2503
     "    file_path: Path to the trace file (.pfw/.pfw.gz)\n\n"
2504
     "Returns:\n"
2505
     "    Indexer instance for checkpoint-level operations.\n"},
2506
    {"resolve", (PyCFunction)Indexer_resolve, METH_NOARGS,
2507
     "resolve()\n"
2508
     "--\n\n"
2509
     "Check what files exist vs need indexing.\n\n"
2510
     "Returns:\n"
2511
     "    dict with 'total_files', 'ready', 'needs_work', 'index_path'\n"},
2512
    {"build", (PyCFunction)Indexer_build, METH_NOARGS,
2513
     "build()\n"
2514
     "--\n\n"
2515
     "Build all missing index tiers based on require_* flags.\n"},
2516
    {"ensure_indexed", (PyCFunction)Indexer_ensure_indexed, METH_NOARGS,
2517
     "ensure_indexed()\n"
2518
     "--\n\n"
2519
     "Resolve and build if needed.\n\n"
2520
     "Returns:\n"
2521
     "    dict with index status after building.\n"},
2522
    {"get_hash_table", (PyCFunction)Indexer_get_hash_table, METH_VARARGS,
2523
     "get_hash_table(type)\n"
2524
     "--\n\n"
2525
     "Query hash table mappings.\n\n"
2526
     "Args:\n"
2527
     "    type: 'file', 'host', 'string', or 'proc'\n\n"
2528
     "Returns:\n"
2529
     "    dict mapping hash values to resolved names.\n"},
2530
    {"query_file_pids", (PyCFunction)Indexer_query_file_pids, METH_VARARGS,
2531
     "query_file_pids(file_id)\n"
2532
     "--\n\n"
2533
     "Query PIDs observed in a specific file.\n\n"
2534
     "Args:\n"
2535
     "    file_id: Integer file ID from index.\n\n"
2536
     "Returns:\n"
2537
     "    set of PIDs.\n"},
2538
    {"query_all_file_pids", (PyCFunction)Indexer_query_all_file_pids,
2539
     METH_NOARGS,
2540
     "query_all_file_pids()\n"
2541
     "--\n\n"
2542
     "Query PIDs for all indexed files.\n\n"
2543
     "Returns:\n"
2544
     "    dict mapping file_id to set of PIDs.\n"},
2545
    {"query_file_info", (PyCFunction)Indexer_query_file_info, METH_NOARGS,
2546
     "query_file_info()\n"
2547
     "--\n\n"
2548
     "Query file ID to path mapping and per-file PIDs in one call.\n\n"
2549
     "Returns:\n"
2550
     "    tuple of (dict[int, str], dict[int, set[int]]).\n"},
2551
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2552
    {"iter_aggregation", (PyCFunction)Indexer_iter_aggregation,
2553
     METH_VARARGS | METH_KEYWORDS,
2554
     "iter_aggregation(type='events', batch_size=10000)\n"
2555
     "--\n\n"
2556
     "Iterate over aggregation data as Arrow batches.\n\n"
2557
     "Args:\n"
2558
     "    type: 'events', 'profiles', or 'system'\n"
2559
     "    batch_size: Number of entries per batch (default 10000)\n\n"
2560
     "Returns:\n"
2561
     "    Iterator over Arrow batch capsules.\n"},
2562
    {"iter_arrow_dfanalyzer", (PyCFunction)Indexer_iter_arrow_dfanalyzer,
2563
     METH_VARARGS | METH_KEYWORDS,
2564
     "iter_arrow_dfanalyzer(type='events', batch_size=10000, "
2565
     "time_granularity=1.0, time_resolution=1e6, query=None)\n"
2566
     "--\n\n"
2567
     "Iterate over aggregation data as dfanalyzer-compatible Arrow batches.\n\n"
2568
     "Output schema matches dfanalyzer expectations with resolved hashes,\n"
2569
     "normalized time_range, and computed columns (proc_name, io_cat).\n\n"
2570
     "Args:\n"
2571
     "    type: 'events', 'profiles', or 'system'\n"
2572
     "    batch_size: Number of entries per batch (default 10000)\n"
2573
     "    time_granularity: Bucket width in seconds (default 1.0)\n"
2574
     "    time_resolution: Microseconds per output time unit (default 1e6)\n"
2575
     "    query: Optional query filter string (e.g., \"pid == 1234\")\n\n"
2576
     "Returns:\n"
2577
     "    Iterator over Arrow batch capsules.\n"},
2578
    {"iter_arrow_dfanalyzer_all",
2579
     (PyCFunction)Indexer_iter_arrow_dfanalyzer_all,
2580
     METH_VARARGS | METH_KEYWORDS,
2581
     "iter_arrow_dfanalyzer_all(batch_size=10000, time_granularity=1.0, "
2582
     "time_resolution=1e6, query=None, group_by=None)\n"
2583
     "--\n\n"
2584
     "Iterate over all aggregation types in a single scan.\n\n"
2585
     "Returns a dict with 'events', 'profiles', 'system' keys, each "
2586
     "containing\n"
2587
     "a list of Arrow batch capsules. This is ~3x faster than calling\n"
2588
     "iter_arrow_dfanalyzer separately for each type.\n\n"
2589
     "When group_by is provided, the scan collapses dimensions during "
2590
     "aggregation\n"
2591
     "and emits a reduced schema containing only the requested columns plus\n"
2592
     "aggregated metrics (count, time, size, time_sq, size_sq, time_min,\n"
2593
     "time_max, size_min, size_max, time_call_min, time_call_max, "
2594
     "size_call_min,\n"
2595
     "size_call_max, time_start, time_end). Supported group_by columns: "
2596
     "cat,\n"
2597
     "func_name, pid, tid, file_hash, host_hash, file_name, host_name, "
2598
     "proc_name,\n"
2599
     "io_cat, acc_pat, time_range.\n\n"
2600
     "Args:\n"
2601
     "    batch_size: Number of entries per batch (default 10000)\n"
2602
     "    time_granularity: Bucket width in seconds (default 1.0)\n"
2603
     "    time_resolution: Microseconds per output time unit (default 1e6)\n"
2604
     "    query: Optional query filter string\n"
2605
     "    group_by: Optional list of columns to group by; enables coarse\n"
2606
     "        in-scan aggregation (default None = full granularity)\n\n"
2607
     "Returns:\n"
2608
     "    dict with 'events', 'profiles', 'system' lists of Arrow capsules.\n"},
2609
#endif
2610
    {nullptr}};
2611

2612
static PyGetSetDef Indexer_getsetters[] = {{nullptr}};
2613

2614
PyTypeObject IndexerType = {
2615
    PyVarObject_HEAD_INIT(nullptr, 0) "dftracer_utils_ext.Indexer",
2616
    sizeof(IndexerObject),
2617
    0,
2618
    (destructor)Indexer_dealloc,
2619
    0,
2620
    0,
2621
    0,
2622
    0,
2623
    0,
2624
    0,
2625
    0,
2626
    0,
2627
    0,
2628
    0,
2629
    0,
2630
    0,
2631
    0,
2632
    0,
2633
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
2634
    "BatchIndexer(directory='', files=None, index_dir='',\n"
2635
    "             require_checkpoint=True, require_bloom=True,\n"
2636
    "             require_manifest=True, require_aggregation=False,\n"
2637
    "             time_interval_ms=5000.0, group_keys=None,\n"
2638
    "             custom_metric_fields=None, compute_percentiles=False,\n"
2639
    "             parallelism=0, force_rebuild=False, runtime=None)\n"
2640
    "--\n\n"
2641
    "Indexer with tiered index building.\n\n"
2642
    "At least one of 'directory' or 'files' must be provided.\n"
2643
    "- directory: scan for .pfw/.pfw.gz files\n"
2644
    "- files: list of specific file paths\n\n"
2645
    "Supports:\n"
2646
    "- Tier 1: Checkpoints (require_checkpoint)\n"
2647
    "- Tier 2: Bloom filters (require_bloom), Manifests (require_manifest)\n"
2648
    "- Tier 3: Aggregation (require_aggregation + config params)\n",
2649
    0,
2650
    0,
2651
    0,
2652
    0,
2653
    0,
2654
    0,
2655
    Indexer_methods,
2656
    0,
2657
    Indexer_getsetters,
2658
    0,
2659
    0,
2660
    0,
2661
    0,
2662
    0,
2663
    (initproc)Indexer_init,
2664
    0,
2665
    Indexer_new,
2666
};
2667

2668
int init_indexer(PyObject* m) {
1✔
2669
    if (PyType_Ready(&IndexerType) < 0) return -1;
1!
2670

2671
    Py_INCREF(&IndexerType);
2672
    if (PyModule_AddObject(m, "Indexer", (PyObject*)&IndexerType) < 0) {
1!
2673
        Py_DECREF(&IndexerType);
UNCOV
2674
        return -1;
×
2675
    }
2676

2677
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2678
    if (PyModule_AddFunctions(m, BatchIndexerModuleMethods) < 0) return -1;
1!
2679
#endif
2680

2681
    return 0;
1✔
2682
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc