• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26165774062

20 May 2026 01:29PM UTC coverage: 51.981% (-0.2%) from 52.2%
26165774062

Pull #68

github

web-flow
Merge a4eaed4d4 into 6c9aaa7c9
Pull Request #68: feat(aggregator): offset metrics, per-event-name system metrics, and time-bucket persistence

36911 of 92534 branches covered (39.89%)

Branch coverage included in aggregate %.

89 of 185 new or added lines in 8 files covered. (48.11%)

276 existing lines in 9 files now uncovered.

33359 of 42649 relevant lines covered (78.22%)

20407.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.75
/src/dftracer/utils/python/batch_indexer.cpp
1
#include <dftracer/utils/core/common/filesystem.h>
2
#include <dftracer/utils/core/common/string_intern.h>
3
#include <dftracer/utils/core/coro/task.h>
4
#include <dftracer/utils/core/coro/when_all.h>
5
#include <dftracer/utils/core/rocksdb/db_manager.h>
6
#include <dftracer/utils/core/runtime.h>
7
#include <dftracer/utils/core/tasks/coro_scope.h>
8
#include <dftracer/utils/python/batch_indexer.h>
9
#include <dftracer/utils/python/indexer.h>
10
#include <dftracer/utils/python/runtime.h>
11
#include <dftracer/utils/utilities/common/query/query.h>
12
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_config.h>
13
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_serialization.h>
14
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregator_types.h>
15
#include <dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.h>
16
#include <dftracer/utils/utilities/composites/dft/aggregators/system_metrics.h>
17
#include <dftracer/utils/utilities/composites/dft/aggregators/system_metrics_serialization.h>
18
#include <dftracer/utils/utilities/composites/dft/indexing/index_resolver_utility.h>
19
#include <dftracer/utils/utilities/composites/dft/indexing/resolve_and_build.h>
20
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
21
#include <dftracer/utils/utilities/indexer/index_database.h>
22

23
#ifdef DFTRACER_UTILS_ENABLE_ARROW
24
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
25
#endif
26

27
#include <algorithm>
28
#include <chrono>
29
#include <cstdio>
30
#include <limits>
31
#include <optional>
32
#include <string>
33
#include <unordered_map>
34
#include <vector>
35

36
using dftracer::utils::CoroScope;
37
using dftracer::utils::Runtime;
38
using dftracer::utils::coro::CoroTask;
39
using namespace dftracer::utils::utilities::composites::dft::indexing;
40
using namespace dftracer::utils::utilities::composites::dft::aggregators;
41

42
// ---------------------------------------------------------------------------
43
// BatchIndexer - directory-level indexer with resolve/build pattern
44
// ---------------------------------------------------------------------------
45

46
static void Indexer_dealloc(IndexerObject* self) {
94✔
47
    Py_XDECREF(self->runtime_obj);
94✔
48
    Py_XDECREF(self->directory);
94✔
49
    Py_XDECREF(self->files);
94✔
50
    Py_XDECREF(self->index_dir);
94✔
51
    Py_XDECREF(self->group_keys);
94✔
52
    Py_XDECREF(self->custom_metric_fields);
94✔
53
    Py_TYPE(self)->tp_free((PyObject*)self);
94✔
54
}
94✔
55

56
static PyObject* Indexer_new(PyTypeObject* type, PyObject* args,
94✔
57
                             PyObject* kwds) {
58
    IndexerObject* self = (IndexerObject*)type->tp_alloc(type, 0);
94✔
59
    if (self) {
94✔
60
        self->runtime_obj = nullptr;
94✔
61
        self->directory = nullptr;
94✔
62
        self->files = nullptr;
94✔
63
        self->index_dir = nullptr;
94✔
64
        self->require_checkpoint = 1;
94✔
65
        self->require_bloom = 1;
94✔
66
        self->require_manifest = 1;
94✔
67
        self->require_aggregation = 0;
94✔
68
        self->time_interval_ms = 5000.0;
94✔
69
        self->group_keys = nullptr;
94✔
70
        self->custom_metric_fields = nullptr;
94✔
71
        self->compute_percentiles = 0;
94✔
72
        self->checkpoint_size = 32 * 1024 * 1024;
94✔
73
        self->parallelism = 0;
94✔
74
        self->force_rebuild = 0;
94✔
75
    }
47✔
76
    return (PyObject*)self;
94✔
77
}
78

79
static int Indexer_init(IndexerObject* self, PyObject* args, PyObject* kwds) {
94✔
80
    static const char* kwlist[] = {"directory",
81
                                   "files",
82
                                   "index_dir",
83
                                   "require_checkpoint",
84
                                   "require_bloom",
85
                                   "require_manifest",
86
                                   "require_aggregation",
87
                                   "time_interval_ms",
88
                                   "group_keys",
89
                                   "custom_metric_fields",
90
                                   "compute_percentiles",
91
                                   "checkpoint_size",
92
                                   "parallelism",
93
                                   "force_rebuild",
94
                                   "runtime",
95
                                   nullptr};
96

97
    const char* directory = "";
94✔
98
    PyObject* files_obj = Py_None;
94✔
99
    const char* index_dir = "";
94✔
100
    int require_checkpoint = 1;
94✔
101
    int require_bloom = 1;
94✔
102
    int require_manifest = 1;
94✔
103
    int require_aggregation = 0;
94✔
104
    double time_interval_ms = 5000.0;
94✔
105
    PyObject* group_keys_obj = Py_None;
94✔
106
    PyObject* custom_metrics_obj = Py_None;
94✔
107
    int compute_percentiles = 0;
94✔
108
    Py_ssize_t checkpoint_size = 32 * 1024 * 1024;  // 32MB default
94✔
109
    Py_ssize_t parallelism = 0;
94✔
110
    int force_rebuild = 0;
94✔
111
    PyObject* runtime_arg = nullptr;
94✔
112

113
    if (!PyArg_ParseTupleAndKeywords(
94!
114
            args, kwds, "|sOsppppdOOpnnpO", (char**)kwlist, &directory,
47✔
115
            &files_obj, &index_dir, &require_checkpoint, &require_bloom,
116
            &require_manifest, &require_aggregation, &time_interval_ms,
117
            &group_keys_obj, &custom_metrics_obj, &compute_percentiles,
118
            &checkpoint_size, &parallelism, &force_rebuild, &runtime_arg)) {
UNCOV
119
        return -1;
×
120
    }
121

122
    // Validate: at least one of directory or files must be provided
123
    bool has_directory = directory && directory[0] != '\0';
94✔
124
    bool has_files = files_obj && files_obj != Py_None &&
146✔
125
                     PyList_Check(files_obj) && PyList_Size(files_obj) > 0;
146!
126

127
    if (!has_directory && !has_files) {
94✔
128
        PyErr_SetString(PyExc_ValueError,
2!
129
                        "At least one of 'directory' or 'files' must be "
130
                        "provided");
131
        return -1;
2✔
132
    }
133

134
    // Store runtime
135
    if (runtime_arg && runtime_arg != Py_None) {
92!
UNCOV
136
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
137
            Py_INCREF(runtime_arg);
×
138
            self->runtime_obj = runtime_arg;
×
139
        } else {
UNCOV
140
            PyObject* native = PyObject_GetAttrString(runtime_arg, "_native");
×
141
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
×
142
                self->runtime_obj = native;
×
143
            } else {
UNCOV
144
                Py_XDECREF(native);
×
145
                PyErr_SetString(PyExc_TypeError,
×
146
                                "runtime must be a Runtime instance or None");
UNCOV
147
                return -1;
×
148
            }
149
        }
150
    }
151

152
    self->directory = PyUnicode_FromString(directory);
92!
153
    self->index_dir = PyUnicode_FromString(index_dir);
92!
154
    self->require_checkpoint = require_checkpoint;
92✔
155
    self->require_bloom = require_bloom;
92✔
156
    self->require_manifest = require_manifest;
92✔
157
    self->require_aggregation = require_aggregation;
92✔
158
    self->time_interval_ms = time_interval_ms;
92✔
159
    self->compute_percentiles = compute_percentiles;
92✔
160
    self->checkpoint_size = static_cast<std::size_t>(checkpoint_size);
92✔
161
    self->parallelism = static_cast<std::size_t>(parallelism);
92✔
162
    self->force_rebuild = force_rebuild;
92✔
163

164
    // Store files list
165
    if (has_files) {
92✔
166
        Py_INCREF(files_obj);
52!
167
        self->files = files_obj;
52✔
168
    } else {
26✔
169
        self->files = nullptr;
40✔
170
    }
171

172
    // Store group_keys
173
    if (group_keys_obj && group_keys_obj != Py_None) {
92!
UNCOV
174
        Py_INCREF(group_keys_obj);
×
175
        self->group_keys = group_keys_obj;
×
176
    } else {
177
        self->group_keys = nullptr;
92✔
178
    }
179

180
    // Store custom_metric_fields
181
    if (custom_metrics_obj && custom_metrics_obj != Py_None) {
92!
UNCOV
182
        Py_INCREF(custom_metrics_obj);
×
183
        self->custom_metric_fields = custom_metrics_obj;
×
184
    } else {
185
        self->custom_metric_fields = nullptr;
92✔
186
    }
187

188
    return 0;
92✔
189
}
47✔
190

191
static Runtime* get_batch_indexer_runtime(IndexerObject* self) {
268✔
192
    if (self->runtime_obj) {
268!
UNCOV
193
        return ((RuntimeObject*)self->runtime_obj)->runtime.get();
×
194
    }
195
    return get_default_runtime();
268✔
196
}
134✔
197

198
static std::optional<AggregationConfig> build_aggregation_config(
246✔
199
    IndexerObject* self) {
200
    if (!self->require_aggregation) {
246✔
201
        return std::nullopt;
152✔
202
    }
203

204
    AggregationConfig config;
94!
205
    config.time_interval_us =
94✔
206
        static_cast<std::uint64_t>(self->time_interval_ms * 1000.0);
94✔
207

208
    if (self->group_keys && PyList_Check(self->group_keys)) {
94!
UNCOV
209
        Py_ssize_t n = PyList_Size(self->group_keys);
×
210
        for (Py_ssize_t i = 0; i < n; i++) {
×
211
            const char* s =
UNCOV
212
                PyUnicode_AsUTF8(PyList_GetItem(self->group_keys, i));
×
213
            if (s) config.extra_group_keys.emplace_back(s);
×
214
        }
215
    }
216
    if (self->custom_metric_fields &&
94!
UNCOV
217
        PyList_Check(self->custom_metric_fields)) {
×
218
        Py_ssize_t n = PyList_Size(self->custom_metric_fields);
×
219
        for (Py_ssize_t i = 0; i < n; i++) {
×
220
            const char* s =
UNCOV
221
                PyUnicode_AsUTF8(PyList_GetItem(self->custom_metric_fields, i));
×
222
            if (s) config.custom_metric_fields.emplace_back(s);
×
223
        }
224
    }
225

226
    config.compute_percentiles = self->compute_percentiles != 0;
94✔
227
    return config;
94!
228
}
170✔
229

230
// ---------------------------------------------------------------------------
231
// resolve() - check what exists vs needs building
232
// ---------------------------------------------------------------------------
233

234
static PyObject* Indexer_resolve(IndexerObject* self,
186✔
235
                                 PyObject* Py_UNUSED(ignored)) {
236
    const char* directory = PyUnicode_AsUTF8(self->directory);
186!
237
    const char* index_dir = PyUnicode_AsUTF8(self->index_dir);
186!
238

239
    ResolverInput input;
186✔
240
    input.directory = directory ? directory : "";
186!
241
    input.index_dir = index_dir ? index_dir : "";
186!
242
    input.require_checkpoints = self->require_checkpoint;
186✔
243
    input.require_bloom = self->require_bloom;
186✔
244
    input.require_manifest = self->require_manifest;
186✔
245
    input.require_aggregation = self->require_aggregation;
186✔
246
    input.aggregation_config = build_aggregation_config(self);
186!
247

248
    // Add files if provided
249
    if (self->files && PyList_Check(self->files)) {
186!
250
        Py_ssize_t n = PyList_Size(self->files);
120!
251
        for (Py_ssize_t i = 0; i < n; i++) {
258✔
252
            const char* s = PyUnicode_AsUTF8(PyList_GetItem(self->files, i));
138!
253
            if (s) input.files.emplace_back(s);
138!
254
        }
69✔
255
    }
60✔
256

257
    ResolverResult result;
186✔
258
    std::string error_msg;
186✔
259

260
    Py_BEGIN_ALLOW_THREADS try {
186!
261
        Runtime* rt = get_batch_indexer_runtime(self);
186!
262
        rt->submit(run_coro_scope(
558!
263
                       rt->executor(),
93!
264
                       [](CoroScope& scope, ResolverInput in,
744!
265
                          ResolverResult* out) -> CoroTask<void> {
93!
266
                           IndexResolverUtility resolver;
279!
267
                           // Use scope.spawn(utility, input) which auto-binds
268
                           // context for utilities with NeedsContext tag
269
                           *out = co_await scope.spawn(resolver, std::move(in));
465!
270
                       },
465!
271
                       std::move(input), &result),
186✔
272
                   "batch-indexer-resolve")
93!
273
            .get();
186!
274
    } catch (const std::exception& e) {
93!
UNCOV
275
        error_msg = e.what();
×
276
    }
×
277
    Py_END_ALLOW_THREADS
186!
278

279
        if (!error_msg.empty()) {
186!
UNCOV
280
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
281
        return nullptr;
×
282
    }
283

284
    // Build result dict
285
    PyObject* dict = PyDict_New();
186!
286
    if (!dict) return nullptr;
186!
287

288
    PyDict_SetItemString(dict, "total_files",
186!
289
                         PyLong_FromSize_t(result.all_files.size()));
93!
290
    PyDict_SetItemString(dict, "index_path",
186!
291
                         PyUnicode_FromString(result.index_path.c_str()));
93!
292

293
    // Ready files
294
    PyObject* ready_list = PyList_New(result.cached.size());
186!
295
    for (std::size_t i = 0; i < result.cached.size(); ++i) {
334✔
296
        PyList_SetItem(
148!
297
            ready_list, i,
74✔
298
            PyUnicode_FromString(result.cached[i].file_path.c_str()));
148!
299
    }
74✔
300
    PyDict_SetItemString(dict, "ready", ready_list);
186!
301

302
    // Needs work files (union of all needs_* lists)
303
    std::vector<std::string> needs_work;
186✔
304
    for (const auto& item : result.needs_checkpoint) {
260✔
305
        needs_work.push_back(item.file_path);
74!
306
    }
307
    for (const auto& item : result.needs_bloom) {
186!
UNCOV
308
        bool found = false;
×
309
        for (const auto& existing : needs_work) {
×
310
            if (existing == item.file_path) {
×
311
                found = true;
×
312
                break;
×
313
            }
314
        }
UNCOV
315
        if (!found) needs_work.push_back(item.file_path);
×
316
    }
317
    for (const auto& item : result.needs_manifest) {
186!
UNCOV
318
        bool found = false;
×
319
        for (const auto& existing : needs_work) {
×
320
            if (existing == item.file_path) {
×
321
                found = true;
×
322
                break;
×
323
            }
324
        }
UNCOV
325
        if (!found) needs_work.push_back(item.file_path);
×
326
    }
327
    for (const auto& item : result.needs_aggregation) {
186✔
UNCOV
328
        bool found = false;
×
329
        for (const auto& existing : needs_work) {
×
330
            if (existing == item.file_path) {
×
331
                found = true;
×
332
                break;
×
333
            }
334
        }
UNCOV
335
        if (!found) needs_work.push_back(item.file_path);
×
336
    }
337

338
    PyObject* needs_list = PyList_New(needs_work.size());
186!
339
    for (std::size_t i = 0; i < needs_work.size(); ++i) {
260✔
340
        PyList_SetItem(needs_list, i,
74!
341
                       PyUnicode_FromString(needs_work[i].c_str()));
74!
342
    }
37✔
343
    PyDict_SetItemString(dict, "needs_work", needs_list);
186!
344

345
    return dict;
186✔
346
}
186✔
347

348
// ---------------------------------------------------------------------------
349
// build() - build missing index tiers
350
// ---------------------------------------------------------------------------
351

352
static PyObject* Indexer_build(IndexerObject* self,
60✔
353
                               PyObject* Py_UNUSED(ignored)) {
354
    const char* directory = PyUnicode_AsUTF8(self->directory);
60!
355
    const char* index_dir = PyUnicode_AsUTF8(self->index_dir);
60!
356

357
    ResolveAndBuildInput input;
60✔
358
    input.directory = directory ? directory : "";
60!
359
    input.index_dir = index_dir ? index_dir : "";
60!
360
    input.require_checkpoints = self->require_checkpoint;
60✔
361
    input.require_bloom = self->require_bloom;
60✔
362
    input.require_manifest = self->require_manifest;
60✔
363
    input.require_aggregation = self->require_aggregation;
60✔
364
    input.aggregation_config = build_aggregation_config(self);
60!
365
    input.checkpoint_size = self->checkpoint_size;
60✔
366
    input.parallelism = self->parallelism;
60✔
367
    input.force_rebuild = self->force_rebuild;
60✔
368

369
    // Add files if provided
370
    if (self->files && PyList_Check(self->files)) {
60!
371
        Py_ssize_t n = PyList_Size(self->files);
48!
372
        for (Py_ssize_t i = 0; i < n; i++) {
104✔
373
            const char* s = PyUnicode_AsUTF8(PyList_GetItem(self->files, i));
56!
374
            if (s) input.files.emplace_back(s);
56!
375
        }
28✔
376
    }
24✔
377

378
    std::string error_msg;
60✔
379

380
    Py_BEGIN_ALLOW_THREADS try {
60!
381
        Runtime* rt = get_batch_indexer_runtime(self);
60!
382
        rt->submit(run_coro_scope(
210!
383
                       rt->executor(),
30✔
384
                       [](CoroScope& scope,
240!
385
                          ResolveAndBuildInput in) -> CoroTask<void> {
30!
386
                           co_await resolve_and_build_index(&scope,
240!
387
                                                            std::move(in));
90✔
388
                       },
120!
389
                       std::move(input)),
60✔
390
                   "batch-indexer-build")
30!
391
            .get();
60!
392
    } catch (const std::exception& e) {
30!
UNCOV
393
        error_msg = e.what();
×
394
    }
×
395
    Py_END_ALLOW_THREADS
60!
396

397
        if (!error_msg.empty()) {
60!
UNCOV
398
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
399
        return nullptr;
×
400
    }
401

402
    Py_RETURN_NONE;
60✔
403
}
60✔
404

405
// ---------------------------------------------------------------------------
406
// ensure_indexed() - resolve + build if needed
407
// ---------------------------------------------------------------------------
408

409
static PyObject* Indexer_ensure_indexed(IndexerObject* self,
76✔
410
                                        PyObject* Py_UNUSED(ignored)) {
411
    // First resolve
412
    PyObject* status = Indexer_resolve(self, nullptr);
76✔
413
    if (!status) return nullptr;
76✔
414

415
    // Check if needs_work is non-empty
416
    PyObject* needs_work = PyDict_GetItemString(status, "needs_work");
76✔
417
    if (needs_work && PyList_Size(needs_work) > 0) {
76!
418
        Py_DECREF(status);
29✔
419

420
        // Build
421
        PyObject* result = Indexer_build(self, nullptr);
58✔
422
        if (!result) return nullptr;
58✔
423
        Py_DECREF(result);
29✔
424

425
        // Re-resolve
426
        status = Indexer_resolve(self, nullptr);
58✔
427
    }
29✔
428

429
    return status;
76✔
430
}
38✔
431

432
// ---------------------------------------------------------------------------
433
// get_checkpoint_indexer() - get a single-file checkpoint indexer
434
// ---------------------------------------------------------------------------
435

436
static PyObject* Indexer_get_checkpoint_indexer(IndexerObject* self,
12✔
437
                                                PyObject* args) {
438
    const char* file_path = nullptr;
12✔
439
    if (!PyArg_ParseTuple(args, "s", &file_path)) {
12!
UNCOV
440
        return nullptr;
×
441
    }
442

443
    // Determine index path using BatchIndexer's index_dir setting
444
    const char* index_dir = PyUnicode_AsUTF8(self->index_dir);
12!
445
    std::string index_path = dftracer::utils::utilities::composites::dft::
6!
446
        internal::determine_index_path(file_path, index_dir ? index_dir : "");
18!
447

448
    // Create IndexerObject
449
    CheckpointIndexerObject* indexer =
6✔
450
        (CheckpointIndexerObject*)CheckpointIndexerType.tp_alloc(
12!
451
            &CheckpointIndexerType, 0);
452
    if (!indexer) {
12✔
UNCOV
453
        return nullptr;
×
454
    }
455

456
    indexer->handle = nullptr;
12✔
457
    indexer->gz_path = PyUnicode_FromString(file_path);
12!
458
    indexer->index_path = PyUnicode_FromString(index_path.c_str());
12!
459
    indexer->checkpoint_size = self->checkpoint_size;
12✔
460
    indexer->build_bloom = 0;
12✔
461
    indexer->build_manifest = 0;
12✔
462

463
    // Share runtime reference
464
    if (self->runtime_obj) {
12!
UNCOV
465
        Py_INCREF(self->runtime_obj);
×
466
        indexer->runtime_obj = self->runtime_obj;
×
467
    } else {
468
        indexer->runtime_obj = nullptr;
12✔
469
    }
470

471
    // Create the native handle
472
    indexer->handle = dft_indexer_create(file_path, index_path.c_str(),
18!
473
                                         self->checkpoint_size, 0);
6✔
474
    if (!indexer->handle) {
12!
475
        Py_DECREF((PyObject*)indexer);
×
UNCOV
476
        PyErr_SetString(PyExc_RuntimeError,
×
477
                        "Failed to create checkpoint indexer");
UNCOV
478
        return nullptr;
×
479
    }
480

481
    return (PyObject*)indexer;
12✔
482
}
12✔
483

484
static std::optional<std::string> resolve_index_path(IndexerObject* self) {
42✔
485
    PyObject* status = Indexer_resolve(self, nullptr);
42!
486
    if (!status) return std::nullopt;
42✔
487
    PyObject* obj = PyDict_GetItemString(status, "index_path");
42!
488
    const char* path = obj ? PyUnicode_AsUTF8(obj) : nullptr;
42!
489
    if (!path || path[0] == '\0') {
42!
490
        Py_DECREF(status);
UNCOV
491
        PyErr_SetString(PyExc_RuntimeError, "No index path available");
×
492
        return std::nullopt;
×
493
    }
494
    std::string result(path);
63!
495
    Py_DECREF(status);
21!
496
    return result;
42!
497
}
42✔
498

499
static PyObject* Indexer_get_hash_table(IndexerObject* self, PyObject* args) {
12✔
500
    const char* type_str = nullptr;
12✔
501
    if (!PyArg_ParseTuple(args, "s", &type_str)) {
12!
UNCOV
502
        return nullptr;
×
503
    }
504

505
    using dftracer::utils::utilities::indexer::IndexDatabase;
506
    using HashType = IndexDatabase::HashType;
507

508
    HashType type;
509
    if (std::strcmp(type_str, "file") == 0) {
12✔
510
        type = HashType::FILE;
4✔
511
    } else if (std::strcmp(type_str, "host") == 0) {
10✔
512
        type = HashType::HOST;
4✔
513
    } else if (std::strcmp(type_str, "string") == 0) {
6✔
514
        type = HashType::STRING;
2✔
515
    } else if (std::strcmp(type_str, "proc") == 0) {
3✔
UNCOV
516
        type = HashType::PROC;
×
517
    } else {
518
        PyErr_SetString(PyExc_ValueError,
2!
519
                        "type must be 'file', 'host', 'string', or 'proc'");
520
        return nullptr;
2✔
521
    }
522

523
    auto idx_opt = resolve_index_path(self);
10!
524
    if (!idx_opt) return nullptr;
10✔
525
    std::string index_path = std::move(*idx_opt);
10✔
526

527
    std::unordered_map<std::string, std::string> hash_map;
10✔
528
    std::string error_msg;
10✔
529

530
    Py_BEGIN_ALLOW_THREADS try {
10!
531
        IndexDatabase db(
5!
532
            index_path,
533
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
5!
534
        hash_map = db.query_hash_table(type);
10!
535
    } catch (const std::exception& e) {
10!
UNCOV
536
        error_msg = e.what();
×
537
    }
×
538
    Py_END_ALLOW_THREADS
10!
539

540
        if (!error_msg.empty()) {
10!
UNCOV
541
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
542
        return nullptr;
×
543
    }
544

545
    PyObject* dict = PyDict_New();
10!
546
    if (!dict) return nullptr;
10✔
547

548
    for (const auto& [hash, name] : hash_map) {
10!
UNCOV
549
        PyObject* key = PyUnicode_FromStringAndSize(hash.data(), hash.size());
×
550
        PyObject* val = PyUnicode_FromStringAndSize(name.data(), name.size());
×
551
        PyDict_SetItem(dict, key, val);
×
552
        Py_DECREF(key);
×
553
        Py_DECREF(val);
×
554
    }
555

556
    return dict;
10✔
557
}
11✔
558

559
static PyObject* Indexer_query_file_pids(IndexerObject* self, PyObject* args) {
4✔
560
    int file_id;
561
    if (!PyArg_ParseTuple(args, "i", &file_id)) {
4!
UNCOV
562
        return nullptr;
×
563
    }
564

565
    using dftracer::utils::utilities::indexer::IndexDatabase;
566

567
    auto idx_opt = resolve_index_path(self);
4!
568
    if (!idx_opt) return nullptr;
4✔
569
    std::string index_path = std::move(*idx_opt);
4✔
570

571
    std::unordered_set<std::uint64_t> pids;
4✔
572
    std::string error_msg;
4✔
573

574
    Py_BEGIN_ALLOW_THREADS try {
4!
575
        IndexDatabase db(
2!
576
            index_path,
577
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
2!
578
        pids = db.query_file_pids(file_id);
4!
579
    } catch (const std::exception& e) {
4!
UNCOV
580
        error_msg = e.what();
×
581
    }
×
582
    Py_END_ALLOW_THREADS
4!
583

584
        if (!error_msg.empty()) {
4!
UNCOV
585
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
586
        return nullptr;
×
587
    }
588

589
    PyObject* set = PySet_New(nullptr);
4!
590
    if (!set) return nullptr;
4✔
591

592
    for (auto pid : pids) {
6!
593
        PyObject* val = PyLong_FromUnsignedLongLong(pid);
2!
594
        PySet_Add(set, val);
2!
595
        Py_DECREF(val);
1!
596
    }
597

598
    return set;
4✔
599
}
4✔
600

601
static PyObject* Indexer_query_all_file_pids(IndexerObject* self,
6✔
602
                                             PyObject* Py_UNUSED(ignored)) {
603
    using dftracer::utils::utilities::indexer::IndexDatabase;
604

605
    auto idx_opt = resolve_index_path(self);
6!
606
    if (!idx_opt) return nullptr;
6!
607
    std::string index_path = std::move(*idx_opt);
6✔
608

609
    std::unordered_map<int, std::unordered_set<std::uint64_t>> all_pids;
6✔
610
    std::string error_msg;
6✔
611

612
    Py_BEGIN_ALLOW_THREADS try {
6!
613
        IndexDatabase db(
3!
614
            index_path,
615
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
3!
616
        all_pids = db.query_all_file_pids();
6!
617
    } catch (const std::exception& e) {
6!
UNCOV
618
        error_msg = e.what();
×
619
    }
×
620
    Py_END_ALLOW_THREADS
6!
621

622
        if (!error_msg.empty()) {
6!
UNCOV
623
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
624
        return nullptr;
×
625
    }
626

627
    PyObject* dict = PyDict_New();
6!
628
    if (!dict) return nullptr;
6✔
629

630
    for (const auto& [file_id, pids] : all_pids) {
15!
631
        PyObject* key = PyLong_FromLong(file_id);
6!
632
        PyObject* set = PySet_New(nullptr);
6!
633
        for (auto pid : pids) {
12✔
634
            PyObject* val = PyLong_FromUnsignedLongLong(pid);
6!
635
            PySet_Add(set, val);
6!
636
            Py_DECREF(val);
3!
637
        }
638
        PyDict_SetItem(dict, key, set);
6!
639
        Py_DECREF(key);
3!
640
        Py_DECREF(set);
3!
641
    }
642

643
    return dict;
6✔
644
}
6✔
645

UNCOV
646
static PyObject* Indexer_query_file_info(IndexerObject* self,
×
647
                                         PyObject* Py_UNUSED(ignored)) {
648
    using dftracer::utils::utilities::indexer::IndexDatabase;
649

UNCOV
650
    auto idx_opt = resolve_index_path(self);
×
651
    if (!idx_opt) return nullptr;
×
652
    std::string index_path = std::move(*idx_opt);
×
653

UNCOV
654
    std::unordered_map<std::string, int> file_ids;
×
655
    std::unordered_map<int, std::unordered_set<std::uint64_t>> all_pids;
×
656
    std::string error_msg;
×
657

UNCOV
658
    Py_BEGIN_ALLOW_THREADS try {
×
659
        IndexDatabase db(
×
660
            index_path,
661
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
×
UNCOV
662
        file_ids = db.query_all_file_info_ids();
×
663
        all_pids = db.query_all_file_pids();
×
664
    } catch (const std::exception& e) {
×
665
        error_msg = e.what();
×
666
    }
×
667
    Py_END_ALLOW_THREADS
×
668

UNCOV
669
        if (!error_msg.empty()) {
×
670
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
671
        return nullptr;
×
672
    }
673

UNCOV
674
    auto data_dir = fs::weakly_canonical(fs::path(index_path)).parent_path();
×
675

UNCOV
676
    PyObject* id_to_path = PyDict_New();
×
677
    if (!id_to_path) return nullptr;
×
678
    for (const auto& [logical_name, fid] : file_ids) {
×
679
        auto resolved = (data_dir / logical_name).string();
×
680
        PyObject* key = PyLong_FromLong(fid);
×
681
        PyObject* val = PyUnicode_FromStringAndSize(
×
682
            resolved.data(), static_cast<Py_ssize_t>(resolved.size()));
×
683
        PyDict_SetItem(id_to_path, key, val);
×
684
        Py_DECREF(key);
×
685
        Py_DECREF(val);
×
UNCOV
686
    }
×
687

UNCOV
688
    PyObject* pid_dict = PyDict_New();
×
689
    if (!pid_dict) {
×
690
        Py_DECREF(id_to_path);
×
UNCOV
691
        return nullptr;
×
692
    }
UNCOV
693
    for (const auto& [file_id, pids] : all_pids) {
×
694
        PyObject* key = PyLong_FromLong(file_id);
×
695
        PyObject* set = PySet_New(nullptr);
×
696
        for (auto pid : pids) {
×
697
            PyObject* val = PyLong_FromUnsignedLongLong(pid);
×
698
            PySet_Add(set, val);
×
699
            Py_DECREF(val);
×
700
        }
UNCOV
701
        PyDict_SetItem(pid_dict, key, set);
×
702
        Py_DECREF(key);
×
703
        Py_DECREF(set);
×
704
    }
705

UNCOV
706
    PyObject* result = PyTuple_Pack(2, id_to_path, pid_dict);
×
707
    Py_DECREF(id_to_path);
×
708
    Py_DECREF(pid_dict);
×
UNCOV
709
    return result;
×
710
}
×
711

712
#ifdef DFTRACER_UTILS_ENABLE_ARROW
713
#include <dftracer/utils/python/trace_reader_iterator.h>
714
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
715

716
static PyObject* create_arrow_batch_capsule(
70✔
717
    dftracer::utils::utilities::common::arrow::ArrowExportResult&& result) {
718
    auto* obj = (ArrowBatchCapsuleObject*)ArrowBatchCapsuleType.tp_alloc(
70✔
719
        &ArrowBatchCapsuleType, 0);
720
    if (!obj) return nullptr;
70✔
721
    obj->result =
70✔
722
        new dftracer::utils::utilities::common::arrow::ArrowExportResult(
60!
723
            std::move(result));
70!
724
    return (PyObject*)obj;
70✔
725
}
30✔
726

727
namespace {
728

729
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
730
using dftracer::utils::utilities::common::arrow::ColumnSpec;
731
using dftracer::utils::utilities::common::arrow::ColumnType;
732
using dftracer::utils::utilities::common::arrow::RecordBatchBuilder;
733

734
static bool parse_agg_type_str(const char* type_str, AggMapType& out) {
4✔
735
    if (strcmp(type_str, "events") == 0) {
4✔
736
        out = AggMapType::EVENT;
4✔
737
        return true;
4✔
738
    }
UNCOV
739
    if (strcmp(type_str, "profiles") == 0) {
×
740
        out = AggMapType::PROFILE;
×
741
        return true;
×
742
    }
UNCOV
743
    if (strcmp(type_str, "system") == 0) {
×
744
        out = AggMapType::SYSTEM;
×
745
        return true;
×
746
    }
UNCOV
747
    PyErr_SetString(PyExc_ValueError,
×
748
                    "type must be 'events', 'profiles', or 'system'");
UNCOV
749
    return false;
×
750
}
2✔
751

752
struct AggDbHandle {
753
    std::shared_ptr<dftracer::utils::rocksdb::RocksDatabase> db;
754
    std::unique_ptr<EventAggregator> agg;
755
};
756

757
static std::unique_ptr<AggDbHandle> open_agg_db(const std::string& index_path,
22✔
758
                                                std::string& error_msg) {
759
    std::shared_ptr<dftracer::utils::rocksdb::RocksDatabase> db;
22✔
760
    try {
761
        db = EventAggregator::open_with_merge_operator(index_path);
22!
762
    } catch (...) {
11✔
UNCOV
763
        auto& mgr = dftracer::utils::rocksdb::RocksDBManager::instance();
×
764
        mgr.reset(index_path);
×
765
        db = mgr.get_or_open(
×
766
            index_path,
767
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
UNCOV
768
        if (db && db->is_open()) {
×
769
            load_intern_dictionary(*db);
×
770
        }
UNCOV
771
    }
×
772
    if (!db || !db->is_open()) {
22!
UNCOV
773
        error_msg = "Failed to open aggregation database";
×
774
        return nullptr;
×
775
    }
776
    std::string config_val;
22✔
777
    auto key = std::string_view(AGG_GLOBAL_CONFIG_KEY,
22✔
778
                                sizeof(AGG_GLOBAL_CONFIG_KEY) - 1);
779
    if (!db->get(key, &config_val, dftracer::utils::rocksdb::cf::AGGREGATION)
44!
780
             .ok()) {
22!
UNCOV
781
        error_msg = "No aggregation config found - was aggregation enabled?";
×
782
        return nullptr;
×
783
    }
784
    auto cfg = deserialize_agg_global_config(config_val);
22!
785
    auto handle = std::make_unique<AggDbHandle>();
22!
786
    handle->db = db;
22✔
787
    handle->agg = std::make_unique<EventAggregator>(db, cfg.config_hash);
22!
788
    return handle;
22✔
789
}
33!
790

791
static std::optional<dftracer::utils::utilities::common::query::Query>
792
parse_query_arg(const char* query_str) {
24✔
793
    if (!query_str || query_str[0] == '\0') return std::nullopt;
24!
794
    auto result = dftracer::utils::utilities::common::query::Query::from_string(
9✔
795
        query_str);
18!
796
    if (!result) {
18✔
797
        PyErr_SetString(PyExc_ValueError, result.error().message.c_str());
2!
798
        return std::nullopt;
2✔
799
    }
800
    return std::move(*result);
16!
801
}
21✔
802

803
constexpr std::uint16_t DFT_NUM_SHARDS = 4096;
804

805
template <typename Output, typename ScanFn>
806
void parallel_shard_scan_range(Runtime* rt, std::uint16_t outer_begin,
22✔
807
                               std::uint16_t outer_end, ScanFn&& scan_fn,
808
                               std::vector<Output>& outputs) {
809
    if (outer_end <= outer_begin) return;
22!
810
    const std::size_t span = static_cast<std::size_t>(outer_end - outer_begin);
22✔
811
    const std::size_t num_tasks = std::min<std::size_t>(rt->threads(), span);
22!
812
    const std::size_t shards_per_task = (span + num_tasks - 1) / num_tasks;
22✔
813
    rt->submit(run_coro_scope(
44!
814
                   rt->executor(),
11✔
815
                   [&](CoroScope& scope) -> CoroTask<void> {
172!
816
                       std::vector<dftracer::utils::coro::SpawnFuture<Output>>
11✔
817
                           futures;
11✔
818
                       futures.reserve(num_tasks);
11!
819
                       for (std::size_t t = 0; t < num_tasks; ++t) {
44!
820
                           auto shard_begin = static_cast<std::uint16_t>(
66✔
821
                               outer_begin + t * shards_per_task);
33✔
822
                           auto shard_end =
33✔
823
                               static_cast<std::uint16_t>(std::min<std::size_t>(
33!
824
                                   outer_begin + (t + 1) * shards_per_task,
33✔
825
                                   outer_end));
33✔
826
                           futures.push_back(
33!
827
                               scope.spawn([&scan_fn, shard_begin, shard_end](
285!
828
                                               CoroScope&) -> CoroTask<Output> {
34!
829
                                   co_return scan_fn(shard_begin, shard_end);
33!
830
                               }));
831
                       }
33✔
832
                       outputs.reserve(num_tasks);
11!
833
                       for (auto& f : futures) {
128!
834
                           outputs.push_back(co_await f);
96!
835
                       }
33!
836
                   }),
95!
837
               "parallel-shard-scan-range")
11!
838
        .get();
22!
839
}
11✔
840

841
template <typename Output, typename ScanFn>
842
void parallel_shard_scan(Runtime* rt, ScanFn&& scan_fn,
22✔
843
                         std::vector<Output>& outputs) {
844
    parallel_shard_scan_range<Output>(rt, 0, DFT_NUM_SHARDS,
33✔
845
                                      std::forward<ScanFn>(scan_fn), outputs);
11✔
846
}
22✔
847

848
static void append_results_to_list(PyObject* list,
58✔
849
                                   std::vector<ArrowExportResult>& results) {
850
    for (auto& r : results) {
128✔
851
        PyObject* capsule = create_arrow_batch_capsule(std::move(r));
70!
852
        if (capsule) {
70✔
853
            PyList_Append(list, capsule);
70!
854
            Py_DECREF(capsule);
30✔
855
        }
30✔
856
    }
857
}
58✔
858

859
struct AggScanInput {
860
    const EventAggregator* agg;
861
    AggMapType target_type;
862
    AggregationBatchType batch_type;
863
    Py_ssize_t batch_size;
864
    std::uint16_t shard_begin;
865
    std::uint16_t shard_end;
866
};
867

868
struct AggScanOutput {
869
    std::vector<ArrowExportResult> results;
870
};
871

UNCOV
872
AggScanOutput scan_aggregation_shard_range(AggScanInput input) {
×
873
    AggScanOutput output;
×
874

875
    static const std::vector<ColumnSpec> schema = {
×
876
        {"batch_type", ColumnType::INT64},  {"cat", ColumnType::DICT_STRING},
×
877
        {"name", ColumnType::DICT_STRING},  {"pid", ColumnType::UINT64},
×
878
        {"tid", ColumnType::UINT64},        {"hhash", ColumnType::DICT_STRING},
×
879
        {"fhash", ColumnType::DICT_STRING}, {"time_bucket", ColumnType::UINT64},
×
880
        {"count", ColumnType::UINT64},      {"dur_total", ColumnType::UINT64},
×
881
        {"dur_min", ColumnType::UINT64},    {"dur_max", ColumnType::UINT64},
×
882
        {"dur_mean", ColumnType::DOUBLE},   {"dur_std", ColumnType::DOUBLE},
×
883
        {"size_total", ColumnType::UINT64}, {"size_min", ColumnType::UINT64},
×
884
        {"size_max", ColumnType::UINT64},   {"size_mean", ColumnType::DOUBLE},
×
885
        {"size_std", ColumnType::DOUBLE},   {"ts", ColumnType::UINT64},
×
886
        {"te", ColumnType::UINT64},
×
887
    };
×
888

UNCOV
889
    RecordBatchBuilder builder;
×
890
    builder.declare_schema(schema);
×
891
    builder.reserve(static_cast<std::size_t>(input.batch_size));
×
892

UNCOV
893
    std::size_t row_count = 0;
×
894

UNCOV
895
    input.agg->scan_shard_range_raw(
×
896
        input.shard_begin, input.shard_end,
×
897
        [&](std::string_view key_bytes, std::string_view val_bytes) -> bool {
×
898
            AggKeyView kv;
×
899
            if (!parse_agg_key_view(key_bytes, kv)) return true;
×
900
            if (kv.map_type != input.target_type) return true;
×
901

902
            AggMetricsFullView mv;
UNCOV
903
            if (!parse_agg_value_full_view(val_bytes, mv)) return true;
×
904

UNCOV
905
            std::size_t ci = 0;
×
906
            builder.append_int64(ci++,
×
907
                                 static_cast<std::int64_t>(input.batch_type));
×
908
            builder.append_dict_string(ci++, kv.cat);
×
909
            builder.append_dict_string(ci++, kv.name);
×
910
            builder.append_uint64(ci++, kv.pid);
×
911
            builder.append_uint64(ci++, kv.tid);
×
912
            builder.append_dict_string(ci++, kv.hhash);
×
913
            builder.append_dict_string(ci++, kv.fhash);
×
914
            builder.append_uint64(ci++, kv.time_bucket);
×
915
            builder.append_uint64(ci++, mv.count);
×
916
            builder.append_uint64(ci++, mv.dur_total);
×
917
            builder.append_uint64(ci++, mv.count > 0 ? mv.dur_min : 0);
×
918
            builder.append_uint64(ci++, mv.dur_max);
×
919
            builder.append_double(ci++, mv.dur_mean);
×
920
            builder.append_double(ci++, mv.dur_stddev());
×
921
            builder.append_uint64(ci++, mv.size_total);
×
922
            builder.append_uint64(ci++, mv.count > 0 ? mv.size_min : 0);
×
923
            builder.append_uint64(ci++, mv.size_max);
×
924
            builder.append_double(ci++, mv.size_mean);
×
925
            builder.append_double(ci++, mv.size_stddev());
×
926
            builder.append_uint64(ci++, mv.ts);
×
927
            builder.append_uint64(ci++, mv.te);
×
928
            builder.end_row();
×
929

UNCOV
930
            row_count++;
×
931
            if (static_cast<Py_ssize_t>(row_count) >= input.batch_size) {
×
932
                auto arrow = builder.finish();
×
933
                if (arrow.valid()) {
×
934
                    output.results.push_back(std::move(arrow));
×
935
                }
UNCOV
936
                builder.reset(true);
×
937
                builder.reserve(static_cast<std::size_t>(input.batch_size));
×
938
                row_count = 0;
×
939
            }
×
940
            return true;
×
941
        });
942

UNCOV
943
    if (row_count > 0) {
×
944
        auto arrow = builder.finish();
×
945
        if (arrow.valid()) {
×
946
            output.results.push_back(std::move(arrow));
×
947
        }
UNCOV
948
    }
×
949

UNCOV
950
    return output;
×
951
}
×
952

953
enum class IOCategory : std::int8_t {
954
    READ = 1,
955
    WRITE = 2,
956
    METADATA = 3,
957
    PCTL = 4,
958
    IPC = 5,
959
    OTHER = 6,
960
    SYNC = 7,
961
};
962

963
inline IOCategory get_io_category(std::string_view func_name) {
418✔
964
    if (func_name == "read" || func_name == "pread" || func_name == "readv" ||
928✔
965
        func_name == "preadv" || func_name == "fread") {
727✔
966
        return IOCategory::READ;
154✔
967
    }
968
    if (func_name == "write" || func_name == "pwrite" ||
535✔
969
        func_name == "writev" || func_name == "pwritev" ||
489!
970
        func_name == "fwrite") {
301✔
971
        return IOCategory::WRITE;
156✔
972
    }
973
    if (func_name == "fsync" || func_name == "fdatasync" ||
273!
974
        func_name == "msync" || func_name == "sync") {
213!
UNCOV
975
        return IOCategory::SYNC;
×
976
    }
977
    if (func_name == "open" || func_name == "open64" || func_name == "close" ||
136!
UNCOV
978
        func_name == "fopen" || func_name == "fopen64" ||
×
979
        func_name == "fclose" || func_name == "stat" || func_name == "fstat" ||
×
980
        func_name == "lstat" || func_name == "fstatat" ||
×
981
        func_name == "__xstat" || func_name == "__xstat64" ||
×
982
        func_name == "__lxstat" || func_name == "__lxstat64" ||
×
983
        func_name == "__fxstat" || func_name == "__fxstat64" ||
×
984
        func_name == "access" || func_name == "lseek" ||
×
985
        func_name == "lseek64" || func_name == "fseek" ||
×
986
        func_name == "ftell" || func_name == "seek" || func_name == "fcntl" ||
×
987
        func_name == "ftruncate" || func_name == "mkdir" ||
×
988
        func_name == "rmdir" || func_name == "unlink" ||
×
989
        func_name == "remove" || func_name == "rename" || func_name == "link" ||
×
990
        func_name == "readlink" || func_name == "opendir" ||
×
991
        func_name == "closedir" || func_name == "readdir") {
76!
992
        return IOCategory::METADATA;
110✔
993
    }
UNCOV
994
    return IOCategory::OTHER;
×
995
}
200✔
996

997
inline char* fast_itoa(std::uint64_t val, char* buf) {
998
    char* p = buf;
999
    do {
1000
        *p++ = '0' + (val % 10);
1001
        val /= 10;
1002
    } while (val);
1003
    std::reverse(buf, p);
1004
    return p;
1005
}
1006

1007
class HashResolver {
1008
   public:
1009
    HashResolver(
139✔
1010
        const std::unordered_map<std::string, std::string>* file_hashes,
1011
        const std::unordered_map<std::string, std::string>* host_hashes)
1012
        : file_hashes_(file_hashes), host_hashes_(host_hashes) {
107✔
1013
        if (file_hashes_) {
76✔
1014
            for (const auto& [hash, name] : *file_hashes_) {
194!
1015
                auto hash_sv = intern_.intern(hash);
169!
1016
                auto name_sv = intern_.intern(name);
170!
1017
                file_map_[hash_sv] = name_sv;
118!
1018
            }
1019
        }
33✔
1020
        if (host_hashes_) {
77✔
1021
            for (const auto& [hash, name] : *host_hashes_) {
195!
1022
                auto hash_sv = intern_.intern(hash);
170!
1023
                auto name_sv = intern_.intern(name);
168!
1024
                host_map_[hash_sv] = name_sv;
119!
1025
            }
1026
        }
33✔
1027
    }
110✔
1028

1029
    // Unresolved hashes resolve to empty (not the hash itself): the
1030
    // dfanalyzer side treats empty file_name/host_name as missing (NA).
1031
    std::string_view resolve_file(std::string_view hash) {
659✔
1032
        if (hash.empty()) return hash;
659!
1033
        auto it = file_map_.find(intern_.intern(hash));
624!
1034
        return it != file_map_.end() ? it->second : std::string_view{};
946!
1035
    }
330✔
1036

1037
    std::string_view resolve_host(std::string_view hash) {
641✔
1038
        if (hash.empty()) return hash;
641!
1039
        auto it = host_map_.find(intern_.intern(hash));
628!
1040
        return it != host_map_.end() ? it->second : std::string_view{};
982!
1041
    }
330✔
1042

1043
    std::string_view intern(std::string_view sv) { return intern_.intern(sv); }
672✔
1044

1045
   private:
1046
    const std::unordered_map<std::string, std::string>* file_hashes_;
1047
    const std::unordered_map<std::string, std::string>* host_hashes_;
1048
    dftracer::utils::StringIntern intern_;
1049
    std::unordered_map<std::string_view, std::string_view> file_map_;
1050
    std::unordered_map<std::string_view, std::string_view> host_map_;
1051
};
1052

1053
struct ProcKey {
1054
    std::string_view hhash;
1055
    std::uint64_t pid;
1056
    std::uint64_t tid;
1057
    bool operator==(const ProcKey& o) const {
397✔
1058
        return hhash == o.hhash && pid == o.pid && tid == o.tid;
397!
1059
    }
1060
};
1061

1062
struct ProcKeyHash {
1063
    std::size_t operator()(const ProcKey& k) const {
892✔
1064
        return std::hash<std::string_view>{}(k.hhash) ^
1,788✔
1065
               (std::hash<std::uint64_t>{}(k.pid) << 1) ^
1,337✔
1066
               (std::hash<std::uint64_t>{}(k.tid) << 2);
900✔
1067
    }
1068
};
1069

1070
static const std::vector<ColumnSpec> DFANALYZER_SCHEMA = {
1!
1071
    {"cat", ColumnType::DICT_STRING},
1!
1072
    {"func_name", ColumnType::DICT_STRING},
1!
1073
    {"pid", ColumnType::INT64},
1!
1074
    {"tid", ColumnType::INT64},
1!
1075
    {"file_hash", ColumnType::DICT_STRING},
1!
1076
    {"host_hash", ColumnType::DICT_STRING},
1!
1077
    {"file_name", ColumnType::DICT_STRING},
1!
1078
    {"host_name", ColumnType::DICT_STRING},
1!
1079
    {"proc_name", ColumnType::DICT_STRING},
1!
1080
    {"io_cat", ColumnType::INT64},
1!
1081
    {"acc_pat", ColumnType::INT64},
1!
1082
    {"count", ColumnType::INT64},
1!
1083
    {"time", ColumnType::DOUBLE},
1!
1084
    {"size", ColumnType::INT64},
1!
1085
    {"time_min", ColumnType::DOUBLE},
1!
1086
    {"time_max", ColumnType::DOUBLE},
1!
1087
    {"size_min", ColumnType::INT64},
1!
1088
    {"size_max", ColumnType::INT64},
1!
1089
    {"offset_min", ColumnType::INT64},
1!
1090
    {"offset_max", ColumnType::INT64},
1!
1091
    {"time_range", ColumnType::INT64},
1!
1092
    {"time_start", ColumnType::INT64},
1!
1093
    {"time_end", ColumnType::INT64},
1!
1094
};
1095

1096
enum GroupByField : std::uint32_t {
1097
    GB_CAT = 1u << 0,
1098
    GB_FUNC_NAME = 1u << 1,
1099
    GB_PID = 1u << 2,
1100
    GB_TID = 1u << 3,
1101
    GB_FILE_HASH = 1u << 4,
1102
    GB_HOST_HASH = 1u << 5,
1103
    GB_FILE_NAME = 1u << 6,
1104
    GB_HOST_NAME = 1u << 7,
1105
    GB_PROC_NAME = 1u << 8,
1106
    GB_IO_CAT = 1u << 9,
1107
    GB_ACC_PAT = 1u << 10,
1108
    GB_TIME_RANGE = 1u << 11,
1109
};
1110

1111
struct GroupByConfig {
9✔
1112
    std::uint32_t mask = 0;
9✔
1113
    std::vector<GroupByField> order;
1114
    std::vector<std::string> names;  // matches `order`, used for schema
1115
};
1116

UNCOV
1117
inline std::optional<GroupByField> parse_group_by_name(std::string_view name) {
×
1118
    if (name == "cat") return GB_CAT;
×
1119
    if (name == "func_name") return GB_FUNC_NAME;
×
1120
    if (name == "pid") return GB_PID;
×
1121
    if (name == "tid") return GB_TID;
×
1122
    if (name == "file_hash") return GB_FILE_HASH;
×
1123
    if (name == "host_hash") return GB_HOST_HASH;
×
1124
    if (name == "file_name") return GB_FILE_NAME;
×
1125
    if (name == "host_name") return GB_HOST_NAME;
×
1126
    if (name == "proc_name") return GB_PROC_NAME;
×
1127
    if (name == "io_cat") return GB_IO_CAT;
×
1128
    if (name == "acc_pat") return GB_ACC_PAT;
×
1129
    if (name == "time_range") return GB_TIME_RANGE;
×
1130
    return std::nullopt;
×
1131
}
1132

1133
struct CoarseKey {
1134
    std::string_view cat;
1135
    std::string_view func_name;
1136
    std::uint64_t pid = 0;
1137
    std::uint64_t tid = 0;
1138
    std::string_view file_hash;
1139
    std::string_view host_hash;
1140
    std::string_view file_name;
1141
    std::string_view host_name;
1142
    std::string_view proc_name;
1143
    std::int64_t io_cat = 0;
1144
    std::int64_t acc_pat = 0;
1145
    std::int64_t time_range = 0;
1146

UNCOV
1147
    bool operator==(const CoarseKey& o) const {
×
1148
        return cat == o.cat && func_name == o.func_name && pid == o.pid &&
×
1149
               tid == o.tid && file_hash == o.file_hash &&
×
1150
               host_hash == o.host_hash && file_name == o.file_name &&
×
1151
               host_name == o.host_name && proc_name == o.proc_name &&
×
1152
               io_cat == o.io_cat && acc_pat == o.acc_pat &&
×
1153
               time_range == o.time_range;
×
1154
    }
1155
};
1156

1157
struct CoarseKeyHash {
UNCOV
1158
    std::size_t operator()(const CoarseKey& k) const {
×
1159
        auto combine = [](std::size_t h, std::size_t v) {
×
1160
            return h ^ (v + 0x9e3779b97f4a7c15ULL + (h << 6) + (h >> 2));
×
1161
        };
UNCOV
1162
        std::size_t h = std::hash<std::string_view>{}(k.cat);
×
1163
        h = combine(h, std::hash<std::string_view>{}(k.func_name));
×
1164
        h = combine(h, std::hash<std::uint64_t>{}(k.pid));
×
1165
        h = combine(h, std::hash<std::uint64_t>{}(k.tid));
×
1166
        h = combine(h, std::hash<std::string_view>{}(k.file_hash));
×
1167
        h = combine(h, std::hash<std::string_view>{}(k.host_hash));
×
1168
        h = combine(h, std::hash<std::string_view>{}(k.file_name));
×
1169
        h = combine(h, std::hash<std::string_view>{}(k.host_name));
×
1170
        h = combine(h, std::hash<std::string_view>{}(k.proc_name));
×
1171
        h = combine(h, std::hash<std::int64_t>{}(k.io_cat));
×
1172
        h = combine(h, std::hash<std::int64_t>{}(k.acc_pat));
×
1173
        h = combine(h, std::hash<std::int64_t>{}(k.time_range));
×
1174
        return h;
×
1175
    }
1176
};
1177

1178
struct CoarseMetrics {
1179
    std::uint64_t count = 0;
1180
    double time_sum = 0.0;
1181
    double time_sq_sum = 0.0;
1182
    double time_min_val = std::numeric_limits<double>::infinity();
1183
    double time_max_val = -std::numeric_limits<double>::infinity();
1184
    double time_call_min_val = std::numeric_limits<double>::infinity();
1185
    double time_call_max_val = -std::numeric_limits<double>::infinity();
1186
    std::uint64_t size_sum = 0;
1187
    double size_sq_sum = 0.0;
1188
    std::uint64_t size_min_val = std::numeric_limits<std::uint64_t>::max();
1189
    std::uint64_t size_max_val = 0;
1190
    std::uint64_t size_call_min_val = std::numeric_limits<std::uint64_t>::max();
1191
    std::uint64_t size_call_max_val = 0;
1192
    bool has_size = false;
1193
    std::uint64_t time_start_val = std::numeric_limits<std::uint64_t>::max();
1194
    std::uint64_t time_end_val = 0;
1195
    bool has_time_bounds = false;
1196
};
1197

UNCOV
1198
inline std::vector<ColumnSpec> make_coarse_schema(const GroupByConfig& cfg) {
×
1199
    std::vector<ColumnSpec> specs;
×
1200
    specs.reserve(cfg.order.size() + 16);
×
1201
    for (std::size_t i = 0; i < cfg.order.size(); ++i) {
×
1202
        GroupByField f = cfg.order[i];
×
1203
        const std::string& name = cfg.names[i];
×
1204
        switch (f) {
×
1205
            case GB_CAT:
1206
            case GB_FUNC_NAME:
1207
            case GB_FILE_HASH:
1208
            case GB_HOST_HASH:
1209
            case GB_FILE_NAME:
1210
            case GB_HOST_NAME:
1211
            case GB_PROC_NAME:
UNCOV
1212
                specs.push_back({name, ColumnType::DICT_STRING});
×
1213
                break;
×
1214
            case GB_PID:
1215
            case GB_TID:
1216
            case GB_IO_CAT:
1217
            case GB_ACC_PAT:
1218
            case GB_TIME_RANGE:
UNCOV
1219
                specs.push_back({name, ColumnType::INT64});
×
1220
                break;
×
1221
        }
1222
    }
UNCOV
1223
    specs.push_back({"count", ColumnType::INT64});
×
1224
    specs.push_back({"time", ColumnType::DOUBLE});
×
1225
    specs.push_back({"size", ColumnType::INT64});
×
1226
    specs.push_back({"time_sq", ColumnType::DOUBLE});
×
1227
    specs.push_back({"size_sq", ColumnType::DOUBLE});
×
1228
    specs.push_back({"time_min", ColumnType::DOUBLE});
×
1229
    specs.push_back({"time_max", ColumnType::DOUBLE});
×
1230
    specs.push_back({"size_min", ColumnType::INT64});
×
1231
    specs.push_back({"size_max", ColumnType::INT64});
×
1232
    specs.push_back({"time_call_min", ColumnType::DOUBLE});
×
1233
    specs.push_back({"time_call_max", ColumnType::DOUBLE});
×
1234
    specs.push_back({"size_call_min", ColumnType::INT64});
×
1235
    specs.push_back({"size_call_max", ColumnType::INT64});
×
1236
    specs.push_back({"time_start", ColumnType::INT64});
×
1237
    specs.push_back({"time_end", ColumnType::INT64});
×
1238
    return specs;
×
1239
}
×
1240

1241
struct DfanalyzerScanInput {
33✔
1242
    const EventAggregator* agg;
1243
    const DfanalyzerContext* ctx;
1244
    std::optional<AggMapType> type_filter;
1245
    Py_ssize_t batch_size;
1246
    std::uint16_t shard_begin;
1247
    std::uint16_t shard_end;
1248
    const GroupByConfig* group_by = nullptr;  // null = full granularity
33✔
1249
};
1250

1251
struct DfanalyzerScanOutput {
1252
    std::vector<ArrowExportResult> events;
1253
    std::vector<ArrowExportResult> profiles;
1254
    std::vector<ArrowExportResult> system;
1255
};
1256

1257
DfanalyzerScanOutput scan_dfanalyzer_shards(DfanalyzerScanInput input) {
71✔
1258
    DfanalyzerScanOutput output;
71✔
1259

1260
    const bool coarse = input.group_by != nullptr;
75✔
1261
    const std::vector<ColumnSpec> coarse_schema =
1262
        coarse ? make_coarse_schema(*input.group_by)
75!
1263
               : std::vector<ColumnSpec>{};
75!
1264

1265
    auto make_builder = [&]() {
229✔
1266
        RecordBatchBuilder b;
196✔
1267
        if (coarse) {
193!
UNCOV
1268
            b.declare_schema(coarse_schema);
×
1269
        } else {
1270
            b.declare_schema(DFANALYZER_SCHEMA);
193✔
1271
        }
1272
        b.reserve(static_cast<std::size_t>(input.batch_size));
200✔
1273
        return b;
200✔
1274
    };
90!
1275

1276
    RecordBatchBuilder event_builder, profile_builder, system_builder;
74!
1277
    bool use_events =
33✔
1278
        !input.type_filter || *input.type_filter == AggMapType::EVENT;
76!
1279
    bool use_profiles =
33✔
1280
        !input.type_filter || *input.type_filter == AggMapType::PROFILE;
75!
1281
    bool use_system =
33✔
1282
        !input.type_filter || *input.type_filter == AggMapType::SYSTEM;
76!
1283

1284
    if (use_events) event_builder = make_builder();
76✔
1285
    if (use_profiles) profile_builder = make_builder();
75✔
1286
    if (use_system) system_builder = make_builder();
78!
1287

1288
    auto bucket_width_us = static_cast<std::uint64_t>(
79✔
1289
        input.ctx->time_granularity * input.ctx->time_resolution);
79✔
1290
    std::size_t event_count = 0, profile_count = 0, system_count = 0;
79✔
1291

1292
    HashResolver resolver(input.ctx->file_hashes, input.ctx->host_hashes);
79✔
1293
    std::unordered_map<ProcKey, std::string, ProcKeyHash> proc_name_cache;
77✔
1294
    std::unordered_map<std::string_view, IOCategory> io_cat_cache;
76✔
1295

1296
    std::unordered_map<CoarseKey, CoarseMetrics, CoarseKeyHash> event_coarse,
74✔
1297
        profile_coarse, system_coarse;
75✔
1298

1299
    auto flush_builder = [&](RecordBatchBuilder& builder, std::size_t& count,
236✔
1300
                             std::vector<ArrowExportResult>& results) {
1301
        if (count > 0) {
203✔
1302
            auto arrow = builder.finish();
70!
1303
            if (arrow.valid()) {
70!
1304
                results.push_back(std::move(arrow));
70!
1305
            }
30✔
1306
            builder.reset(true);
70✔
1307
            builder.reserve(static_cast<std::size_t>(input.batch_size));
69✔
1308
            count = 0;
70✔
1309
        }
72✔
1310
    };
205✔
1311

1312
    auto append_row = [&](RecordBatchBuilder& builder, std::size_t& count,
690✔
1313
                          std::vector<ArrowExportResult>& results,
1314
                          const AggKeyView& kv, const AggMetricsView& mv,
1315
                          std::string_view file_name,
1316
                          std::string_view host_name,
1317
                          std::string_view proc_name, IOCategory io_cat) {
1318
        std::size_t ci = 0;
657✔
1319
        builder.append_dict_string(ci++, kv.cat);
657✔
1320
        builder.append_dict_string(ci++, kv.name);
655✔
1321
        builder.append_int64(ci++, static_cast<std::int64_t>(kv.pid));
657✔
1322
        builder.append_int64(ci++, static_cast<std::int64_t>(kv.tid));
649✔
1323
        builder.append_dict_string(ci++, kv.fhash);
654✔
1324
        builder.append_dict_string(ci++, kv.hhash);
656✔
1325
        builder.append_dict_string(ci++, file_name);
660✔
1326
        builder.append_dict_string(ci++, host_name);
660✔
1327
        builder.append_dict_string(ci++, proc_name);
660✔
1328
        builder.append_int64(ci++, static_cast<std::int64_t>(io_cat));
660✔
1329
        builder.append_int64(ci++, 0);
658✔
1330

1331
        builder.append_int64(ci++, static_cast<std::int64_t>(mv.count));
657✔
1332
        builder.append_double(ci++, static_cast<double>(mv.dur_total) /
988✔
1333
                                        input.ctx->time_resolution);
658✔
1334

1335
        if (mv.size_total > 0) {
658✔
1336
            builder.append_int64(ci++,
736✔
1337
                                 static_cast<std::int64_t>(mv.size_total));
490✔
1338
        } else {
246✔
1339
            builder.append_null(ci++);
168✔
1340
        }
1341

1342
        builder.append_double(ci++, mv.count > 0
988!
1343
                                        ? static_cast<double>(mv.dur_min) /
989✔
1344
                                              input.ctx->time_resolution
659✔
1345
                                        : 0.0);
1346
        builder.append_double(ci++, mv.count > 0
986!
1347
                                        ? static_cast<double>(mv.dur_max) /
988✔
1348
                                              input.ctx->time_resolution
658✔
1349
                                        : 0.0);
1350

1351
        if (mv.size_total > 0 && mv.count > 0) {
657✔
1352
            builder.append_int64(ci++, static_cast<std::int64_t>(mv.size_min));
490✔
1353
            builder.append_int64(ci++, static_cast<std::int64_t>(mv.size_max));
490✔
1354
        } else {
246✔
1355
            builder.append_null(ci++);
167✔
1356
            builder.append_null(ci++);
168✔
1357
        }
1358

1359
        // offset_min > offset_max only when no offset was ever recorded
1360
        // (MetricStats default min=UINT64_MAX, max=0); 0 is a valid offset.
1361
        if (mv.offset_min <= mv.offset_max) {
660!
1362
            builder.append_int64(ci++,
990✔
1363
                                 static_cast<std::int64_t>(mv.offset_min));
660✔
1364
            builder.append_int64(ci++,
989✔
1365
                                 static_cast<std::int64_t>(mv.offset_max));
659✔
1366
        } else {
330✔
NEW
1367
            builder.append_null(ci++);
×
NEW
1368
            builder.append_null(ci++);
×
1369
        }
1370

1371
        auto time_range = bucket_width_us > 0
1,308!
1372
                              ? static_cast<std::int64_t>(
326!
1373
                                    (kv.time_bucket - input.ctx->time_origin) /
986✔
1374
                                    bucket_width_us)
656✔
1375
                              : 0;
1376
        builder.append_int64(ci++, time_range);
656✔
1377
        // Counter (profile) rows align to the bucket grid: time_start is the
1378
        // bucket start, time_end one bucket later. Plain events keep the
1379
        // precise min/max event timestamps.
1380
        if (kv.map_type == AggMapType::PROFILE) {
658!
NEW
1381
            auto bucket_start = static_cast<std::int64_t>(
×
NEW
1382
                kv.time_bucket - input.ctx->time_origin);
×
NEW
1383
            builder.append_int64(ci++, bucket_start);
×
NEW
1384
            builder.append_int64(ci++, bucket_start + static_cast<std::int64_t>(
×
NEW
1385
                                                          bucket_width_us));
×
1386
        } else {
1387
            builder.append_int64(ci++, static_cast<std::int64_t>(
988✔
1388
                                           mv.ts - input.ctx->time_origin));
658✔
1389
            builder.append_int64(ci++, static_cast<std::int64_t>(
990✔
1390
                                           mv.te - input.ctx->time_origin));
660✔
1391
        }
1392
        builder.end_row();
660✔
1393

1394
        count++;
660✔
1395
        if (static_cast<Py_ssize_t>(count) >= input.batch_size) {
660✔
UNCOV
1396
            flush_builder(builder, count, results);
×
1397
        }
1398
    };
704✔
1399

1400
    auto accumulate_coarse =
1401
        [&](std::unordered_map<CoarseKey, CoarseMetrics, CoarseKeyHash>& map,
33✔
1402
            const AggKeyView& kv, const AggMetricsView& mv,
1403
            std::string_view file_name, std::string_view host_name,
1404
            std::string_view proc_name, IOCategory io_cat) {
UNCOV
1405
            const auto& cfg = *input.group_by;
×
1406
            // Probe with non-interned views; hash/equality compare by content,
1407
            // so string_view lifetime doesn't matter for lookup. We only copy
1408
            // (intern) on first insert.
UNCOV
1409
            CoarseKey probe;
×
1410
            if (cfg.mask & GB_CAT) probe.cat = kv.cat;
×
1411
            if (cfg.mask & GB_FUNC_NAME) probe.func_name = kv.name;
×
1412
            if (cfg.mask & GB_PID) probe.pid = kv.pid;
×
1413
            if (cfg.mask & GB_TID) probe.tid = kv.tid;
×
1414
            if (cfg.mask & GB_FILE_HASH) probe.file_hash = kv.fhash;
×
1415
            if (cfg.mask & GB_HOST_HASH) probe.host_hash = kv.hhash;
×
1416
            if (cfg.mask & GB_FILE_NAME) probe.file_name = file_name;
×
1417
            if (cfg.mask & GB_HOST_NAME) probe.host_name = host_name;
×
1418
            if (cfg.mask & GB_PROC_NAME) probe.proc_name = proc_name;
×
1419
            if (cfg.mask & GB_IO_CAT)
×
1420
                probe.io_cat = static_cast<std::int64_t>(io_cat);
×
1421
            if (cfg.mask & GB_TIME_RANGE) {
×
1422
                probe.time_range =
×
1423
                    bucket_width_us > 0
×
1424
                        ? static_cast<std::int64_t>(
×
UNCOV
1425
                              (kv.time_bucket - input.ctx->time_origin) /
×
1426
                              bucket_width_us)
×
1427
                        : 0;
1428
            }
1429
            // acc_pat is always 0 today; included for completeness.
1430

UNCOV
1431
            auto it = map.find(probe);
×
1432
            if (it == map.end()) {
×
1433
                // First sighting: promote views referencing unstable DB buffers
1434
                // to interned copies. file_name/host_name come from the
1435
                // resolver's intern pool, and proc_name from proc_name_cache;
1436
                // both already stable across iterations, no copy needed.
UNCOV
1437
                CoarseKey stable = probe;
×
1438
                if (cfg.mask & GB_CAT) stable.cat = resolver.intern(kv.cat);
×
1439
                if (cfg.mask & GB_FUNC_NAME)
×
1440
                    stable.func_name = resolver.intern(kv.name);
×
1441
                if (cfg.mask & GB_FILE_HASH)
×
1442
                    stable.file_hash = resolver.intern(kv.fhash);
×
1443
                if (cfg.mask & GB_HOST_HASH)
×
1444
                    stable.host_hash = resolver.intern(kv.hhash);
×
1445
                auto [nit, _] = map.emplace(std::move(stable), CoarseMetrics{});
×
1446
                it = nit;
×
1447
            }
UNCOV
1448
            CoarseMetrics& m = it->second;
×
1449
            m.count += mv.count;
×
1450
            double time_val =
×
1451
                static_cast<double>(mv.dur_total) / input.ctx->time_resolution;
×
1452
            m.time_sum += time_val;
×
1453
            m.time_sq_sum += time_val * time_val;
×
1454
            if (time_val < m.time_call_min_val) m.time_call_min_val = time_val;
×
1455
            if (time_val > m.time_call_max_val) m.time_call_max_val = time_val;
×
1456
            if (mv.count > 0) {
×
1457
                double dur_min_v = static_cast<double>(mv.dur_min) /
×
1458
                                   input.ctx->time_resolution;
×
1459
                double dur_max_v = static_cast<double>(mv.dur_max) /
×
1460
                                   input.ctx->time_resolution;
×
1461
                if (dur_min_v < m.time_min_val) m.time_min_val = dur_min_v;
×
1462
                if (dur_max_v > m.time_max_val) m.time_max_val = dur_max_v;
×
1463
            }
UNCOV
1464
            if (mv.size_total > 0) {
×
1465
                m.has_size = true;
×
1466
                m.size_sum += mv.size_total;
×
1467
                double sz = static_cast<double>(mv.size_total);
×
1468
                m.size_sq_sum += sz * sz;
×
1469
                if (mv.size_total < m.size_call_min_val)
×
1470
                    m.size_call_min_val = mv.size_total;
×
1471
                if (mv.size_total > m.size_call_max_val)
×
1472
                    m.size_call_max_val = mv.size_total;
×
1473
                if (mv.count > 0) {
×
1474
                    if (mv.size_min < m.size_min_val)
×
1475
                        m.size_min_val = mv.size_min;
×
1476
                    if (mv.size_max > m.size_max_val)
×
1477
                        m.size_max_val = mv.size_max;
×
1478
                }
1479
            }
UNCOV
1480
            if (mv.ts >= input.ctx->time_origin) {
×
1481
                m.has_time_bounds = true;
×
1482
                auto ts_off = mv.ts - input.ctx->time_origin;
×
1483
                auto te_off = mv.te - input.ctx->time_origin;
×
1484
                if (ts_off < m.time_start_val) m.time_start_val = ts_off;
×
1485
                if (te_off > m.time_end_val) m.time_end_val = te_off;
×
1486
            }
UNCOV
1487
        };
×
1488

1489
    input.agg->scan_shard_range_raw(
66!
1490
        input.shard_begin, input.shard_end,
77!
1491
        [&](std::string_view key_bytes, std::string_view val_bytes) -> bool {
860✔
1492
            AggKeyView kv;
827✔
1493
            if (!parse_agg_key_view(key_bytes, kv)) return true;
803!
1494

1495
            if (input.type_filter && kv.map_type != *input.type_filter)
826!
UNCOV
1496
                return true;
×
1497

1498
            if (input.ctx->query_filter) {
817✔
1499
                auto& q = *input.ctx->query_filter;
537✔
1500
                dftracer::utils::utilities::common::query::ValueMap fields;
537!
1501
                if (q.references("cat")) fields["cat"] = std::string(kv.cat);
536!
1502
                if (q.references("name")) fields["name"] = std::string(kv.name);
516!
1503
                if (q.references("pid")) fields["pid"] = kv.pid;
524✔
1504
                if (q.references("tid")) fields["tid"] = kv.tid;
507!
1505
                if (q.references("hhash"))
519!
UNCOV
1506
                    fields["hhash"] = std::string(kv.hhash);
×
1507
                if (q.references("fhash"))
521!
UNCOV
1508
                    fields["fhash"] = std::string(kv.fhash);
×
1509
                if (q.references("time_bucket"))
525!
UNCOV
1510
                    fields["time_bucket"] = kv.time_bucket;
×
1511
                if (!q.evaluate(fields)) return true;
524✔
1512
            }
521✔
1513

1514
            AggMetricsView mv;
1515
            if (!parse_agg_value_view(val_bytes, mv)) return true;
643✔
1516

1517
            auto file_name = resolver.resolve_file(kv.fhash);
659!
1518
            auto host_name = resolver.resolve_host(kv.hhash);
643!
1519

1520
            ProcKey pk{kv.hhash, kv.pid, kv.tid};
660✔
1521
            auto proc_it = proc_name_cache.find(pk);
660!
1522
            std::string_view proc_name;
660✔
1523
            if (proc_it != proc_name_cache.end()) {
660✔
1524
                proc_name = proc_it->second;
400✔
1525
            } else {
218✔
1526
                std::string pn = "app#";
253!
1527
                if (!host_name.empty()) {
251!
1528
                    pn.append(host_name);
247!
1529
                } else if (!kv.hhash.empty()) {
118!
UNCOV
1530
                    pn.append(kv.hhash);
×
1531
                } else {
UNCOV
1532
                    pn.append("unknown");
×
1533
                }
1534
                pn.push_back('#');
252✔
1535
                pn.append(std::to_string(kv.pid));
251!
1536
                pn.push_back('#');
250!
1537
                pn.append(std::to_string(kv.tid));
250!
1538
                ProcKey stable_pk{resolver.intern(kv.hhash), kv.pid, kv.tid};
253✔
1539
                auto [it, _] =
253✔
1540
                    proc_name_cache.emplace(stable_pk, std::move(pn));
250✔
1541
                proc_name = it->second;
250✔
1542
            }
245✔
1543

1544
            auto io_it = io_cat_cache.find(kv.name);
650!
1545
            IOCategory io_cat;
1546
            if (io_it != io_cat_cache.end()) {
654✔
1547
                io_cat = io_it->second;
239✔
1548
            } else {
130✔
1549
                io_cat = get_io_category(kv.name);
417✔
1550
                io_cat_cache[resolver.intern(kv.name)] = io_cat;
419!
1551
            }
1552

1553
            if (coarse) {
654!
UNCOV
1554
                switch (kv.map_type) {
×
1555
                    case AggMapType::EVENT:
UNCOV
1556
                        if (use_events)
×
1557
                            accumulate_coarse(event_coarse, kv, mv, file_name,
×
1558
                                              host_name, proc_name, io_cat);
UNCOV
1559
                        break;
×
1560
                    case AggMapType::PROFILE:
UNCOV
1561
                        if (use_profiles)
×
1562
                            accumulate_coarse(profile_coarse, kv, mv, file_name,
×
1563
                                              host_name, proc_name, io_cat);
UNCOV
1564
                        break;
×
1565
                    case AggMapType::SYSTEM:
UNCOV
1566
                        if (use_system)
×
1567
                            accumulate_coarse(system_coarse, kv, mv, file_name,
×
1568
                                              host_name, proc_name, io_cat);
UNCOV
1569
                        break;
×
1570
                }
1571
            } else {
1572
                switch (kv.map_type) {
654!
1573
                    case AggMapType::EVENT:
326✔
1574
                        append_row(event_builder, event_count, output.events,
986!
1575
                                   kv, mv, file_name, host_name, proc_name,
330✔
1576
                                   io_cat);
330✔
1577
                        break;
660✔
1578
                    case AggMapType::PROFILE:
UNCOV
1579
                        append_row(profile_builder, profile_count,
×
1580
                                   output.profiles, kv, mv, file_name,
×
1581
                                   host_name, proc_name, io_cat);
UNCOV
1582
                        break;
×
1583
                    case AggMapType::SYSTEM:
UNCOV
1584
                        append_row(system_builder, system_count, output.system,
×
1585
                                   kv, mv, file_name, host_name, proc_name,
1586
                                   io_cat);
UNCOV
1587
                        break;
×
1588
                }
1589
            }
1590
            return true;
658✔
1591
        });
456✔
1592

1593
    if (coarse) {
77!
UNCOV
1594
        const auto& cfg = *input.group_by;
×
1595
        auto flush_coarse = [&](std::unordered_map<CoarseKey, CoarseMetrics,
×
1596
                                                   CoarseKeyHash>& map,
1597
                                RecordBatchBuilder& builder, std::size_t& count,
1598
                                std::vector<ArrowExportResult>& results) {
UNCOV
1599
            for (auto& [key, m] : map) {
×
1600
                std::size_t ci = 0;
×
1601
                for (std::size_t i = 0; i < cfg.order.size(); ++i) {
×
1602
                    switch (cfg.order[i]) {
×
1603
                        case GB_CAT:
UNCOV
1604
                            builder.append_dict_string(ci++, key.cat);
×
1605
                            break;
×
1606
                        case GB_FUNC_NAME:
UNCOV
1607
                            builder.append_dict_string(ci++, key.func_name);
×
1608
                            break;
×
1609
                        case GB_PID:
UNCOV
1610
                            builder.append_int64(
×
1611
                                ci++, static_cast<std::int64_t>(key.pid));
×
1612
                            break;
×
1613
                        case GB_TID:
UNCOV
1614
                            builder.append_int64(
×
1615
                                ci++, static_cast<std::int64_t>(key.tid));
×
1616
                            break;
×
1617
                        case GB_FILE_HASH:
UNCOV
1618
                            builder.append_dict_string(ci++, key.file_hash);
×
1619
                            break;
×
1620
                        case GB_HOST_HASH:
UNCOV
1621
                            builder.append_dict_string(ci++, key.host_hash);
×
1622
                            break;
×
1623
                        case GB_FILE_NAME:
UNCOV
1624
                            builder.append_dict_string(ci++, key.file_name);
×
1625
                            break;
×
1626
                        case GB_HOST_NAME:
UNCOV
1627
                            builder.append_dict_string(ci++, key.host_name);
×
1628
                            break;
×
1629
                        case GB_PROC_NAME:
UNCOV
1630
                            builder.append_dict_string(ci++, key.proc_name);
×
1631
                            break;
×
1632
                        case GB_IO_CAT:
UNCOV
1633
                            builder.append_int64(ci++, key.io_cat);
×
1634
                            break;
×
1635
                        case GB_ACC_PAT:
UNCOV
1636
                            builder.append_int64(ci++, key.acc_pat);
×
1637
                            break;
×
1638
                        case GB_TIME_RANGE:
UNCOV
1639
                            builder.append_int64(ci++, key.time_range);
×
1640
                            break;
×
1641
                    }
1642
                }
UNCOV
1643
                builder.append_int64(ci++, static_cast<std::int64_t>(m.count));
×
1644
                builder.append_double(ci++, m.time_sum);
×
1645
                if (m.has_size) {
×
1646
                    builder.append_int64(ci++,
×
1647
                                         static_cast<std::int64_t>(m.size_sum));
×
1648
                } else {
UNCOV
1649
                    builder.append_null(ci++);
×
1650
                }
UNCOV
1651
                builder.append_double(ci++, m.time_sq_sum);
×
1652
                if (m.has_size) {
×
1653
                    builder.append_double(ci++, m.size_sq_sum);
×
1654
                } else {
UNCOV
1655
                    builder.append_null(ci++);
×
1656
                }
UNCOV
1657
                builder.append_double(ci++, m.count > 0 ? m.time_min_val : 0.0);
×
1658
                builder.append_double(ci++, m.count > 0 ? m.time_max_val : 0.0);
×
1659
                if (m.has_size) {
×
1660
                    builder.append_int64(
×
1661
                        ci++, static_cast<std::int64_t>(m.size_min_val));
×
1662
                    builder.append_int64(
×
1663
                        ci++, static_cast<std::int64_t>(m.size_max_val));
×
1664
                } else {
UNCOV
1665
                    builder.append_null(ci++);
×
1666
                    builder.append_null(ci++);
×
1667
                }
UNCOV
1668
                builder.append_double(ci++,
×
1669
                                      m.count > 0 ? m.time_call_min_val : 0.0);
×
1670
                builder.append_double(ci++,
×
1671
                                      m.count > 0 ? m.time_call_max_val : 0.0);
×
1672
                if (m.has_size) {
×
1673
                    builder.append_int64(
×
1674
                        ci++, static_cast<std::int64_t>(m.size_call_min_val));
×
1675
                    builder.append_int64(
×
1676
                        ci++, static_cast<std::int64_t>(m.size_call_max_val));
×
1677
                } else {
UNCOV
1678
                    builder.append_null(ci++);
×
1679
                    builder.append_null(ci++);
×
1680
                }
UNCOV
1681
                builder.append_int64(
×
1682
                    ci++, m.has_time_bounds
×
1683
                              ? static_cast<std::int64_t>(m.time_start_val)
×
1684
                              : 0);
UNCOV
1685
                builder.append_int64(
×
1686
                    ci++, m.has_time_bounds
×
1687
                              ? static_cast<std::int64_t>(m.time_end_val)
×
1688
                              : 0);
UNCOV
1689
                builder.end_row();
×
1690
                ++count;
×
1691
                if (static_cast<Py_ssize_t>(count) >= input.batch_size) {
×
1692
                    flush_builder(builder, count, results);
×
1693
                }
1694
            }
UNCOV
1695
            flush_builder(builder, count, results);
×
1696
        };
×
1697
        if (use_events)
×
1698
            flush_coarse(event_coarse, event_builder, event_count,
×
1699
                         output.events);
×
1700
        if (use_profiles)
×
1701
            flush_coarse(profile_coarse, profile_builder, profile_count,
×
1702
                         output.profiles);
×
1703
        if (use_system)
×
1704
            flush_coarse(system_coarse, system_builder, system_count,
×
1705
                         output.system);
×
1706
    } else {
1707
        if (use_events)
77!
1708
            flush_builder(event_builder, event_count, output.events);
77!
1709
        if (use_profiles)
77✔
1710
            flush_builder(profile_builder, profile_count, output.profiles);
63!
1711
        if (use_system)
77✔
1712
            flush_builder(system_builder, system_count, output.system);
63!
1713
    }
1714

1715
    return output;
121✔
1716
}
85!
1717

1718
// Two-pass scan over SYSTEM_METRICS CF: pass 1 discovers metric column names
1719
// (dynamic per workload), pass 2 emits rows. Needed because RecordBatchBuilder
1720
// requires the schema up front.
1721
std::vector<ArrowExportResult> scan_system_metrics_buffer(
18✔
1722
    const EventAggregator* agg, const DfanalyzerContext* ctx,
1723
    Py_ssize_t batch_size) {
1724
    std::vector<ArrowExportResult> results;
18✔
1725
    if (!agg) return results;
18✔
1726

1727
    std::vector<std::string> metric_names_ordered;
18✔
1728
    std::unordered_map<std::string, std::size_t> metric_name_index;
18✔
1729
    agg->scan_system_metrics_raw([&](std::string_view,
18!
1730
                                     std::string_view val_bytes) -> bool {
NEW
1731
        auto m = deserialize_system_value(val_bytes);
×
NEW
1732
        if (m.metrics) {
×
NEW
1733
            for (const auto& [name, _] : *m.metrics) {
×
NEW
1734
                if (metric_name_index.find(name) == metric_name_index.end()) {
×
NEW
1735
                    metric_name_index.emplace(name,
×
NEW
1736
                                              metric_names_ordered.size());
×
NEW
1737
                    metric_names_ordered.push_back(name);
×
1738
                }
1739
            }
1740
        }
1741
        return true;
NEW
1742
    });
×
1743

1744
    if (metric_names_ordered.empty()) return results;
18!
1745

NEW
1746
    std::vector<ColumnSpec> schema;
×
NEW
1747
    schema.reserve(6 + metric_names_ordered.size());
×
NEW
1748
    schema.push_back({"host_hash", ColumnType::DICT_STRING});
×
NEW
1749
    schema.push_back({"name", ColumnType::DICT_STRING});
×
NEW
1750
    schema.push_back({"time_bucket", ColumnType::INT64});
×
NEW
1751
    schema.push_back({"ts", ColumnType::INT64});
×
NEW
1752
    schema.push_back({"te", ColumnType::INT64});
×
NEW
1753
    schema.push_back({"count", ColumnType::INT64});
×
NEW
1754
    for (const auto& mn : metric_names_ordered) {
×
NEW
1755
        schema.push_back({mn, ColumnType::DOUBLE});
×
1756
    }
1757

NEW
1758
    RecordBatchBuilder builder;
×
NEW
1759
    builder.declare_schema(schema);
×
NEW
1760
    builder.reserve(static_cast<std::size_t>(batch_size));
×
1761

NEW
1762
    auto flush = [&](std::size_t& row_count) {
×
NEW
1763
        if (row_count == 0) return;
×
NEW
1764
        auto arrow = builder.finish();
×
NEW
1765
        if (arrow.valid()) results.push_back(std::move(arrow));
×
NEW
1766
        builder.reset(true);
×
NEW
1767
        builder.reserve(static_cast<std::size_t>(batch_size));
×
NEW
1768
        row_count = 0;
×
NEW
1769
    };
×
1770

NEW
1771
    std::size_t row_count = 0;
×
NEW
1772
    const std::size_t n_metric_cols = metric_names_ordered.size();
×
1773

NEW
1774
    agg->scan_system_metrics_raw(
×
NEW
1775
        [&](std::string_view key_bytes, std::string_view val_bytes) -> bool {
×
NEW
1776
            auto k = deserialize_system_key(key_bytes);
×
NEW
1777
            auto m = deserialize_system_value(val_bytes);
×
1778

NEW
1779
            std::size_t ci = 0;
×
NEW
1780
            builder.append_dict_string(ci++, k.key.hhash);
×
NEW
1781
            builder.append_dict_string(ci++, k.key.name);
×
NEW
1782
            builder.append_int64(ci++,
×
NEW
1783
                                 static_cast<std::int64_t>(k.key.time_bucket));
×
NEW
1784
            builder.append_int64(ci++, static_cast<std::int64_t>(m.ts));
×
NEW
1785
            builder.append_int64(ci++, static_cast<std::int64_t>(m.te));
×
NEW
1786
            builder.append_int64(ci++, static_cast<std::int64_t>(m.count));
×
1787

NEW
1788
            for (std::size_t i = 0; i < n_metric_cols; ++i) {
×
NEW
1789
                const auto& mn = metric_names_ordered[i];
×
NEW
1790
                bool present = false;
×
NEW
1791
                if (m.metrics) {
×
NEW
1792
                    auto it = m.metrics->find(mn);
×
NEW
1793
                    if (it != m.metrics->end()) {
×
NEW
1794
                        builder.append_double(ci++, it->second.mean);
×
NEW
1795
                        present = true;
×
1796
                    }
1797
                }
NEW
1798
                if (!present) builder.append_null(ci++);
×
1799
            }
NEW
1800
            builder.end_row();
×
NEW
1801
            row_count++;
×
NEW
1802
            if (static_cast<Py_ssize_t>(row_count) >= batch_size) {
×
NEW
1803
                flush(row_count);
×
1804
            }
1805
            return true;
NEW
1806
        });
×
NEW
1807
    flush(row_count);
×
1808

1809
    (void)ctx;
NEW
1810
    return results;
×
1811
}
18!
1812

1813
}  // namespace
1814

NEW
1815
static PyObject* Indexer_iter_aggregation(IndexerObject* self, PyObject* args,
×
1816
                                          PyObject* kwds) {
1817
    static const char* kwlist[] = {"type", "batch_size", nullptr};
UNCOV
1818
    const char* type_str = "events";
×
1819
    Py_ssize_t batch_size = 10000;
×
1820

UNCOV
1821
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|sn", (char**)kwlist,
×
1822
                                     &type_str, &batch_size)) {
1823
        return nullptr;
×
1824
    }
1825

1826
    AggMapType target_type;
1827
    if (!parse_agg_type_str(type_str, target_type)) return nullptr;
×
1828

1829
    AggregationBatchType batch_type;
UNCOV
1830
    if (target_type == AggMapType::EVENT)
×
1831
        batch_type = AggregationBatchType::EVENT;
×
UNCOV
1832
    else if (target_type == AggMapType::PROFILE)
×
UNCOV
1833
        batch_type = AggregationBatchType::PROFILE;
×
1834
    else
1835
        batch_type = AggregationBatchType::SYSTEM;
×
1836

1837
    auto idx_opt = resolve_index_path(self);
×
UNCOV
1838
    if (!idx_opt) return nullptr;
×
1839
    std::string index_path = std::move(*idx_opt);
×
1840

1841
    PyObject* batch_list = PyList_New(0);
×
1842
    if (!batch_list) return nullptr;
×
1843

UNCOV
1844
    std::string error_msg;
×
1845
    std::vector<dftracer::utils::utilities::common::arrow::ArrowExportResult>
1846
        results;
×
1847

1848
    Py_BEGIN_ALLOW_THREADS try {
×
UNCOV
1849
        auto handle = open_agg_db(index_path, error_msg);
×
1850
        if (handle) {
×
UNCOV
1851
            Runtime* rt = get_batch_indexer_runtime(self);
×
1852
            std::vector<AggScanOutput> outputs;
×
1853
            parallel_shard_scan<AggScanOutput>(
×
1854
                rt,
1855
                [&](std::uint16_t shard_begin, std::uint16_t shard_end) {
×
1856
                    AggScanInput input;
1857
                    input.agg = handle->agg.get();
×
UNCOV
1858
                    input.target_type = target_type;
×
1859
                    input.batch_type = batch_type;
×
UNCOV
1860
                    input.batch_size = batch_size;
×
1861
                    input.shard_begin = shard_begin;
×
1862
                    input.shard_end = shard_end;
×
1863
                    return scan_aggregation_shard_range(input);
×
1864
                },
1865
                outputs);
1866

1867
            for (auto& out : outputs) {
×
UNCOV
1868
                for (auto& r : out.results) {
×
UNCOV
1869
                    results.push_back(std::move(r));
×
1870
                }
1871
            }
1872
        }
×
1873
    } catch (const std::exception& e) {
×
UNCOV
1874
        error_msg = e.what();
×
UNCOV
1875
    }
×
1876
    Py_END_ALLOW_THREADS
×
1877

1878
        if (!error_msg.empty()) {
×
1879
        Py_DECREF(batch_list);
×
1880
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
UNCOV
1881
        return nullptr;
×
1882
    }
1883

1884
    append_results_to_list(batch_list, results);
×
1885

UNCOV
1886
    PyObject* iter = PyObject_GetIter(batch_list);
×
1887
    Py_DECREF(batch_list);
×
1888
    return iter;
×
UNCOV
1889
}
×
1890

1891
static PyObject* Indexer_iter_arrow_dfanalyzer(IndexerObject* self,
4✔
1892
                                               PyObject* args, PyObject* kwds) {
1893
    static const char* kwlist[] = {
1894
        "type",  "batch_size", "time_granularity", "time_resolution",
1895
        "query", nullptr};
1896
    const char* type_str = "events";
4✔
1897
    Py_ssize_t batch_size = 10000;
4✔
1898
    double time_granularity = 1.0;
4✔
1899
    double time_resolution = 1000000.0;
4✔
1900
    const char* query_str = nullptr;
4✔
1901

1902
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|snddz", (char**)kwlist,
4!
1903
                                     &type_str, &batch_size, &time_granularity,
1904
                                     &time_resolution, &query_str)) {
UNCOV
1905
        return nullptr;
×
1906
    }
1907

1908
    AggMapType target_type;
1909
    if (!parse_agg_type_str(type_str, target_type)) return nullptr;
4!
1910

1911
    auto query_opt = parse_query_arg(query_str);
4!
1912
    if (!query_opt && PyErr_Occurred()) return nullptr;
4!
1913

1914
    auto idx_opt = resolve_index_path(self);
4!
1915
    if (!idx_opt) return nullptr;
4✔
1916
    std::string index_path = std::move(*idx_opt);
4✔
1917

1918
    PyObject* batch_list = PyList_New(0);
4!
1919
    if (!batch_list) return nullptr;
4✔
1920

1921
    std::string error_msg;
4✔
1922
    std::vector<ArrowExportResult> results;
4✔
1923

1924
    Py_BEGIN_ALLOW_THREADS try {
4!
1925
        auto handle = open_agg_db(index_path, error_msg);
4!
1926
        if (handle) {
4✔
1927
            dftracer::utils::utilities::indexer::IndexDatabase idx_db(
2!
1928
                index_path,
1929
                dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
2!
1930
            auto file_hashes =
1931
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
2!
1932
                                            IndexDatabase::HashType::FILE);
2!
1933
            auto host_hashes =
1934
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
2!
1935
                                            IndexDatabase::HashType::HOST);
2!
1936

1937
            auto time_bounds = handle->agg->query_time_bounds();
4!
1938
            std::uint64_t time_origin =
4✔
1939
                time_bounds.valid ? time_bounds.min_time_bucket : 0;
4!
1940

1941
            DfanalyzerContext ctx;
4✔
1942
            ctx.file_hashes = &file_hashes;
4✔
1943
            ctx.host_hashes = &host_hashes;
4✔
1944
            ctx.query_filter = query_opt ? &*query_opt : nullptr;
4!
1945
            ctx.time_origin = time_origin;
4✔
1946
            ctx.time_resolution = time_resolution;
4✔
1947
            ctx.time_granularity = time_granularity;
4✔
1948

1949
            Runtime* rt = get_batch_indexer_runtime(self);
4!
1950
            std::vector<DfanalyzerScanOutput> outputs;
4✔
1951
            parallel_shard_scan<DfanalyzerScanOutput>(
4!
1952
                rt,
2✔
1953
                [&](std::uint16_t shard_begin, std::uint16_t shard_end) {
15✔
1954
                    DfanalyzerScanInput input;
13✔
1955
                    input.agg = handle->agg.get();
13✔
1956
                    input.ctx = &ctx;
13✔
1957
                    input.type_filter = target_type;
13✔
1958
                    input.batch_size = batch_size;
13✔
1959
                    input.shard_begin = shard_begin;
13✔
1960
                    input.shard_end = shard_end;
13✔
1961
                    return scan_dfanalyzer_shards(input);
21!
1962
                },
1963
                outputs);
1964

1965
            for (auto& out : outputs) {
18✔
1966
                for (auto& r : out.events) results.push_back(std::move(r));
28✔
1967
                for (auto& r : out.profiles) results.push_back(std::move(r));
14!
1968
                for (auto& r : out.system) results.push_back(std::move(r));
14!
1969
            }
1970
        }
4✔
1971
    } catch (const std::exception& e) {
4!
UNCOV
1972
        error_msg = e.what();
×
UNCOV
1973
    }
×
1974
    Py_END_ALLOW_THREADS
4!
1975

1976
        if (!error_msg.empty()) {
4!
1977
        Py_DECREF(batch_list);
×
UNCOV
1978
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
UNCOV
1979
        return nullptr;
×
1980
    }
1981

1982
    append_results_to_list(batch_list, results);
4!
1983

1984
    PyObject* iter = PyObject_GetIter(batch_list);
4!
1985
    Py_DECREF(batch_list);
2!
1986
    return iter;
4✔
1987
}
4✔
1988

1989
static bool parse_group_by_arg(PyObject* obj, GroupByConfig& out) {
18✔
1990
    if (!obj || obj == Py_None) return true;
18!
UNCOV
1991
    if (!PySequence_Check(obj)) {
×
UNCOV
1992
        PyErr_SetString(PyExc_TypeError,
×
1993
                        "group_by must be a sequence of strings or None");
UNCOV
1994
        return false;
×
1995
    }
1996
    Py_ssize_t n = PySequence_Length(obj);
×
UNCOV
1997
    for (Py_ssize_t i = 0; i < n; ++i) {
×
1998
        PyObject* item = PySequence_GetItem(obj, i);
×
UNCOV
1999
        if (!item) return false;
×
2000
        if (!PyUnicode_Check(item)) {
×
2001
            Py_DECREF(item);
2002
            PyErr_SetString(PyExc_TypeError,
×
2003
                            "group_by entries must be strings");
2004
            return false;
×
2005
        }
2006
        Py_ssize_t sz = 0;
×
UNCOV
2007
        const char* s = PyUnicode_AsUTF8AndSize(item, &sz);
×
2008
        if (!s) {
×
2009
            Py_DECREF(item);
2010
            return false;
×
2011
        }
2012
        std::string_view sv(s, static_cast<std::size_t>(sz));
×
UNCOV
2013
        auto field = parse_group_by_name(sv);
×
2014
        if (!field) {
×
UNCOV
2015
            std::string msg = "unsupported group_by field: ";
×
2016
            msg.append(sv);
×
2017
            Py_DECREF(item);
×
2018
            PyErr_SetString(PyExc_ValueError, msg.c_str());
×
2019
            return false;
×
2020
        }
×
UNCOV
2021
        if (!(out.mask & *field)) {
×
2022
            out.mask |= *field;
×
2023
            out.order.push_back(*field);
×
2024
            out.names.emplace_back(sv);
×
2025
        }
2026
        Py_DECREF(item);
2027
    }
2028
    return true;
×
2029
}
9✔
2030

2031
static PyObject* Indexer_iter_arrow_dfanalyzer_all(IndexerObject* self,
20✔
2032
                                                   PyObject* args,
2033
                                                   PyObject* kwds) {
2034
    static const char* kwlist[] = {"batch_size",      "time_granularity",
2035
                                   "time_resolution", "query",
2036
                                   "group_by",        nullptr};
2037
    Py_ssize_t batch_size = 10000;
20✔
2038
    double time_granularity = 1.0;
20✔
2039
    double time_resolution = 1000000.0;
20✔
2040
    const char* query_str = nullptr;
20✔
2041
    PyObject* group_by_obj = nullptr;
20✔
2042

2043
    if (!PyArg_ParseTupleAndKeywords(
20!
2044
            args, kwds, "|nddzO", (char**)kwlist, &batch_size,
10✔
2045
            &time_granularity, &time_resolution, &query_str, &group_by_obj)) {
UNCOV
2046
        return nullptr;
×
2047
    }
2048

2049
    auto query_opt = parse_query_arg(query_str);
20!
2050
    if (!query_opt && PyErr_Occurred()) return nullptr;
20!
2051

2052
    GroupByConfig group_by_cfg;
18✔
2053
    if (!parse_group_by_arg(group_by_obj, group_by_cfg)) return nullptr;
18!
2054
    const GroupByConfig* group_by_ptr =
18✔
2055
        group_by_cfg.mask != 0 ? &group_by_cfg : nullptr;
18!
2056

2057
    auto idx_opt = resolve_index_path(self);
18!
2058
    if (!idx_opt) return nullptr;
18✔
2059
    std::string index_path = std::move(*idx_opt);
18✔
2060

2061
    PyObject* result_dict = PyDict_New();
18!
2062
    if (!result_dict) return nullptr;
18✔
2063

2064
    PyObject* events_list = PyList_New(0);
18!
2065
    PyObject* profiles_list = PyList_New(0);
18!
2066
    PyObject* system_list = PyList_New(0);
18!
2067
    if (!events_list || !profiles_list || !system_list) {
18!
UNCOV
2068
        Py_XDECREF(events_list);
×
UNCOV
2069
        Py_XDECREF(profiles_list);
×
UNCOV
2070
        Py_XDECREF(system_list);
×
2071
        Py_DECREF(result_dict);
×
2072
        return nullptr;
×
2073
    }
2074

2075
    std::string error_msg;
18✔
2076
    std::vector<ArrowExportResult> events_results, profiles_results,
18✔
2077
        system_results;
18✔
2078

2079
    Py_BEGIN_ALLOW_THREADS try {
18!
2080
        auto handle = open_agg_db(index_path, error_msg);
18!
2081
        if (handle) {
18✔
2082
            dftracer::utils::utilities::indexer::IndexDatabase idx_db(
9!
2083
                index_path,
2084
                dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
9!
2085
            auto file_hashes =
2086
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
9!
2087
                                            IndexDatabase::HashType::FILE);
9!
2088
            auto host_hashes =
2089
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
9!
2090
                                            IndexDatabase::HashType::HOST);
9!
2091

2092
            auto time_bounds = handle->agg->query_time_bounds();
18!
2093
            std::uint64_t time_origin =
18✔
2094
                time_bounds.valid ? time_bounds.min_time_bucket : 0;
18!
2095

2096
            DfanalyzerContext ctx;
18✔
2097
            ctx.file_hashes = &file_hashes;
18✔
2098
            ctx.host_hashes = &host_hashes;
18✔
2099
            ctx.query_filter = query_opt ? &*query_opt : nullptr;
18✔
2100
            ctx.time_origin = time_origin;
18✔
2101
            ctx.time_resolution = time_resolution;
18✔
2102
            ctx.time_granularity = time_granularity;
18✔
2103

2104
            Runtime* rt = get_batch_indexer_runtime(self);
18!
2105
            std::vector<DfanalyzerScanOutput> outputs;
18✔
2106
            parallel_shard_scan<DfanalyzerScanOutput>(
18!
2107
                rt,
9✔
2108
                [&](std::uint16_t shard_begin, std::uint16_t shard_end) {
71✔
2109
                    DfanalyzerScanInput input;
62✔
2110
                    input.agg = handle->agg.get();
62✔
2111
                    input.ctx = &ctx;
59✔
2112
                    input.type_filter = std::nullopt;
59✔
2113
                    input.batch_size = batch_size;
60✔
2114
                    input.shard_begin = shard_begin;
60✔
2115
                    input.shard_end = shard_end;
60✔
2116
                    input.group_by = group_by_ptr;
60✔
2117
                    return scan_dfanalyzer_shards(input);
96!
2118
                },
2119
                outputs);
2120

2121
            for (auto& out : outputs) {
81✔
2122
                for (auto& r : out.events)
119✔
2123
                    events_results.push_back(std::move(r));
56!
2124
                for (auto& r : out.profiles)
63✔
UNCOV
2125
                    profiles_results.push_back(std::move(r));
×
2126
                for (auto& r : out.system)
63✔
UNCOV
2127
                    system_results.push_back(std::move(r));
×
2128
            }
2129

2130
            auto sys_buf =
2131
                scan_system_metrics_buffer(handle->agg.get(), &ctx, batch_size);
18!
2132
            for (auto& r : sys_buf) system_results.push_back(std::move(r));
18!
2133
        }
18✔
2134
    } catch (const std::exception& e) {
18!
NEW
2135
        error_msg = e.what();
×
NEW
2136
    }
×
2137
    Py_END_ALLOW_THREADS
18!
2138

2139
        if (!error_msg.empty()) {
18!
2140
        Py_DECREF(events_list);
×
2141
        Py_DECREF(profiles_list);
×
2142
        Py_DECREF(system_list);
×
2143
        Py_DECREF(result_dict);
×
UNCOV
2144
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
UNCOV
2145
        return nullptr;
×
2146
    }
2147

2148
    append_results_to_list(events_list, events_results);
18!
2149
    append_results_to_list(profiles_list, profiles_results);
18!
2150
    append_results_to_list(system_list, system_results);
18!
2151

2152
    PyDict_SetItemString(result_dict, "events", events_list);
18!
2153
    PyDict_SetItemString(result_dict, "profiles", profiles_list);
18!
2154
    PyDict_SetItemString(result_dict, "system", system_list);
18!
2155
    Py_DECREF(events_list);
9!
2156
    Py_DECREF(profiles_list);
9!
2157
    Py_DECREF(system_list);
9!
2158

2159
    return result_dict;
18✔
2160
}
20✔
2161

2162
// ---------------------------------------------------------------------------
2163
// scan_aggregation_manifest — module-level entry point for analyze_trace.
2164
//
2165
// Each Dask worker calls this with its slice of the agg manifest
2166
// (agg_ssts + sys_ssts) and optionally a [shard_begin, shard_end) range.
2167
// The function opens a scratch IndexDatabase at `scratch_dir`, ingests the
2168
// SSTs into its AGGREGATION/SYSTEM_METRICS CFs (nearly free when SSTs live
2169
// on the same filesystem as `scratch_dir` — RocksDB hard-links them), then
2170
// runs the same parallel shard scan that `iter_arrow_dfanalyzer_all` uses.
2171
//
2172
// AGG_GLOBAL_CONFIG_KEY is not written by worker SSTs, so we construct the
2173
// EventAggregator with config_hash=0 directly instead of going through
2174
// `open_agg_db` (which requires the config key). The config hash is used
2175
// by the aggregator only for write-time validation, not for reads.
2176
//
2177
// The scratch DB is NOT cleaned up here — the Python caller owns
2178
// `scratch_dir` lifetime and should remove it after gathering results.
2179
// ---------------------------------------------------------------------------
2180

UNCOV
2181
static bool collect_string_list(PyObject* obj, const char* name,
×
2182
                                std::vector<std::string>& out) {
UNCOV
2183
    if (!obj || obj == Py_None) return true;
×
UNCOV
2184
    PyObject* seq = PySequence_Fast(obj, name);
×
2185
    if (!seq) return false;
×
UNCOV
2186
    Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
2187
    out.reserve(static_cast<std::size_t>(n));
×
2188
    for (Py_ssize_t i = 0; i < n; ++i) {
×
2189
        PyObject* item = PySequence_Fast_GET_ITEM(seq, i);
×
2190
        if (!PyUnicode_Check(item)) {
×
2191
            Py_DECREF(seq);
2192
            PyErr_Format(PyExc_TypeError, "%s items must be str", name);
×
2193
            return false;
×
2194
        }
UNCOV
2195
        const char* s = PyUnicode_AsUTF8(item);
×
2196
        if (!s) {
×
2197
            Py_DECREF(seq);
UNCOV
2198
            return false;
×
2199
        }
2200
        out.emplace_back(s);
×
2201
    }
2202
    Py_DECREF(seq);
UNCOV
2203
    return true;
×
2204
}
2205

UNCOV
2206
static bool collect_string_string_dict(
×
2207
    PyObject* obj, const char* name,
2208
    std::unordered_map<std::string, std::string>& out) {
UNCOV
2209
    if (!obj || obj == Py_None) return true;
×
2210
    if (!PyDict_Check(obj)) {
×
UNCOV
2211
        PyErr_Format(PyExc_TypeError, "%s must be a dict[str, str] or None",
×
2212
                     name);
2213
        return false;
×
2214
    }
2215
    PyObject *k, *v;
UNCOV
2216
    Py_ssize_t pos = 0;
×
2217
    while (PyDict_Next(obj, &pos, &k, &v)) {
×
UNCOV
2218
        if (!PyUnicode_Check(k) || !PyUnicode_Check(v)) {
×
UNCOV
2219
            PyErr_Format(PyExc_TypeError, "%s must map str -> str", name);
×
2220
            return false;
×
2221
        }
2222
        const char* ks = PyUnicode_AsUTF8(k);
×
2223
        const char* vs = PyUnicode_AsUTF8(v);
×
2224
        if (!ks || !vs) return false;
×
UNCOV
2225
        out.emplace(ks, vs);
×
2226
    }
2227
    return true;
×
2228
}
2229

UNCOV
2230
static PyObject* scan_aggregation_manifest_fn(PyObject* /*self*/,
×
2231
                                              PyObject* args, PyObject* kwds) {
2232
    static const char* kwlist[] = {
2233
        "agg_ssts",        "sys_ssts",    "scratch_dir",
2234
        "meta_index_path", "batch_size",  "time_granularity",
2235
        "time_resolution", "query",       "group_by",
2236
        "shard_begin",     "shard_end",   "runtime",
2237
        "file_hashes",     "host_hashes", nullptr};
2238

UNCOV
2239
    PyObject* agg_ssts_obj = nullptr;
×
UNCOV
2240
    PyObject* sys_ssts_obj = nullptr;
×
UNCOV
2241
    const char* scratch_dir = nullptr;
×
UNCOV
2242
    const char* meta_index_path = nullptr;
×
2243
    Py_ssize_t batch_size = 10000;
×
2244
    double time_granularity = 1.0;
×
2245
    double time_resolution = 1000000.0;
×
2246
    const char* query_str = nullptr;
×
2247
    PyObject* group_by_obj = nullptr;
×
2248
    int shard_begin_i = 0;
×
2249
    int shard_end_i = DFT_NUM_SHARDS;
×
2250
    PyObject* runtime_obj = nullptr;
×
2251
    PyObject* file_hashes_obj = nullptr;
×
2252
    PyObject* host_hashes_obj = nullptr;
×
2253

2254
    if (!PyArg_ParseTupleAndKeywords(
×
2255
            args, kwds, "OOss|nddzOiiOOO", (char**)kwlist, &agg_ssts_obj,
2256
            &sys_ssts_obj, &scratch_dir, &meta_index_path, &batch_size,
2257
            &time_granularity, &time_resolution, &query_str, &group_by_obj,
2258
            &shard_begin_i, &shard_end_i, &runtime_obj, &file_hashes_obj,
2259
            &host_hashes_obj)) {
UNCOV
2260
        return nullptr;
×
2261
    }
2262

UNCOV
2263
    if (shard_begin_i < 0 || shard_end_i > DFT_NUM_SHARDS ||
×
2264
        shard_begin_i >= shard_end_i) {
×
UNCOV
2265
        PyErr_Format(PyExc_ValueError,
×
2266
                     "shard range [%d, %d) invalid (must be within [0, %d))",
2267
                     shard_begin_i, shard_end_i, (int)DFT_NUM_SHARDS);
2268
        return nullptr;
×
2269
    }
2270

UNCOV
2271
    std::vector<std::string> agg_ssts;
×
2272
    std::vector<std::string> sys_ssts;
×
UNCOV
2273
    if (!collect_string_list(agg_ssts_obj, "agg_ssts", agg_ssts))
×
UNCOV
2274
        return nullptr;
×
2275
    if (!collect_string_list(sys_ssts_obj, "sys_ssts", sys_ssts))
×
2276
        return nullptr;
×
2277

2278
    std::unordered_map<std::string, std::string> preloaded_file_hashes;
×
2279
    std::unordered_map<std::string, std::string> preloaded_host_hashes;
×
2280
    const bool hashes_preloaded =
×
UNCOV
2281
        (file_hashes_obj && file_hashes_obj != Py_None) ||
×
2282
        (host_hashes_obj && host_hashes_obj != Py_None);
×
2283
    if (!collect_string_string_dict(file_hashes_obj, "file_hashes",
×
2284
                                    preloaded_file_hashes))
2285
        return nullptr;
×
2286
    if (!collect_string_string_dict(host_hashes_obj, "host_hashes",
×
2287
                                    preloaded_host_hashes))
UNCOV
2288
        return nullptr;
×
2289

2290
    auto query_opt = parse_query_arg(query_str);
×
UNCOV
2291
    if (!query_opt && PyErr_Occurred()) return nullptr;
×
2292

UNCOV
2293
    GroupByConfig group_by_cfg;
×
2294
    if (!parse_group_by_arg(group_by_obj, group_by_cfg)) return nullptr;
×
2295
    const GroupByConfig* group_by_ptr =
×
UNCOV
2296
        group_by_cfg.mask != 0 ? &group_by_cfg : nullptr;
×
2297

2298
    Runtime* rt = nullptr;
×
2299
    if (runtime_obj && runtime_obj != Py_None) {
×
2300
        if (!PyObject_TypeCheck(runtime_obj, &RuntimeType)) {
×
UNCOV
2301
            PyErr_SetString(PyExc_TypeError,
×
2302
                            "runtime must be a Runtime instance or None");
2303
            return nullptr;
×
2304
        }
2305
        rt = ((RuntimeObject*)runtime_obj)->runtime.get();
×
2306
    } else {
2307
        rt = get_default_runtime();
×
2308
    }
2309

UNCOV
2310
    PyObject* result_dict = PyDict_New();
×
2311
    if (!result_dict) return nullptr;
×
UNCOV
2312
    PyObject* events_list = PyList_New(0);
×
UNCOV
2313
    PyObject* profiles_list = PyList_New(0);
×
2314
    PyObject* system_list = PyList_New(0);
×
2315
    if (!events_list || !profiles_list || !system_list) {
×
2316
        Py_XDECREF(events_list);
×
2317
        Py_XDECREF(profiles_list);
×
2318
        Py_XDECREF(system_list);
×
2319
        Py_DECREF(result_dict);
×
2320
        return nullptr;
×
2321
    }
2322

UNCOV
2323
    std::string error_msg;
×
2324
    std::vector<ArrowExportResult> events_results, profiles_results,
×
UNCOV
2325
        system_results;
×
UNCOV
2326
    std::string scratch_index_path = std::string(scratch_dir) + "/.dftindex";
×
2327
    std::string meta_index_path_str(meta_index_path);
×
2328

2329
    Py_BEGIN_ALLOW_THREADS try {
×
2330
        namespace rcf = dftracer::utils::rocksdb::cf;
2331
        using clock = std::chrono::steady_clock;
UNCOV
2332
        auto ms = [](clock::time_point a, clock::time_point b) -> long long {
×
2333
            return std::chrono::duration_cast<std::chrono::milliseconds>(b - a)
×
UNCOV
2334
                .count();
×
2335
        };
2336

2337
        auto t_start = clock::now();
×
2338
        dftracer::utils::utilities::indexer::IndexDatabase scratch_db(
×
2339
            scratch_index_path);
×
UNCOV
2340
        auto t_scratch_open = clock::now();
×
2341

UNCOV
2342
        auto raw_db = scratch_db.db();
×
UNCOV
2343
        for (const auto& p : agg_ssts) {
×
2344
            auto st = raw_db->ingest_external_files(rcf::AGGREGATION, {p},
×
2345
                                                    /*ingest_behind=*/false);
×
2346
            if (!st.ok()) {
×
2347
                error_msg =
2348
                    "ingest AGGREGATION sst '" + p + "': " + st.ToString();
×
UNCOV
2349
                break;
×
2350
            }
UNCOV
2351
        }
×
2352
        if (error_msg.empty()) {
×
2353
            for (const auto& p : sys_ssts) {
×
UNCOV
2354
                auto st = raw_db->ingest_external_files(
×
2355
                    rcf::SYSTEM_METRICS, {p}, /*ingest_behind=*/false);
×
2356
                if (!st.ok()) {
×
2357
                    error_msg = "ingest SYSTEM_METRICS sst '" + p +
×
2358
                                "': " + st.ToString();
×
2359
                    break;
×
2360
                }
2361
            }
×
2362
        }
2363
        auto t_ingest = clock::now();
×
2364

2365
        if (error_msg.empty()) {
×
2366
            auto agg =
2367
                std::make_unique<EventAggregator>(raw_db, /*cfg_hash=*/0);
×
2368

2369
            // If the caller passed pre-loaded hash tables, skip opening
2370
            // the meta DB on lustre. When many dask workers run
2371
            // scan_aggregation_manifest in parallel, loading the hash
2372
            // tables N times from the same file is significant lustre
2373
            // metadata pressure; loading once on the coordinator and
2374
            // passing them in eliminates the redundant reads.
UNCOV
2375
            std::unordered_map<std::string, std::string> loaded_file_hashes;
×
UNCOV
2376
            std::unordered_map<std::string, std::string> loaded_host_hashes;
×
2377
            std::unique_ptr<dftracer::utils::utilities::indexer::IndexDatabase>
UNCOV
2378
                meta_db;
×
2379
            if (!hashes_preloaded) {
×
2380
                meta_db = std::make_unique<
×
2381
                    dftracer::utils::utilities::indexer::IndexDatabase>(
2382
                    meta_index_path_str, dftracer::utils::rocksdb::
2383
                                             RocksDatabase::OpenMode::ReadOnly);
×
UNCOV
2384
                loaded_file_hashes = meta_db->query_hash_table(
×
2385
                    dftracer::utils::utilities::indexer::IndexDatabase::
2386
                        HashType::FILE);
UNCOV
2387
                loaded_host_hashes = meta_db->query_hash_table(
×
2388
                    dftracer::utils::utilities::indexer::IndexDatabase::
2389
                        HashType::HOST);
2390
            }
2391
            const auto& file_hashes =
×
2392
                hashes_preloaded ? preloaded_file_hashes : loaded_file_hashes;
×
UNCOV
2393
            const auto& host_hashes =
×
2394
                hashes_preloaded ? preloaded_host_hashes : loaded_host_hashes;
×
2395
            auto t_hash_tables = clock::now();
×
2396

2397
            auto time_bounds = agg->query_time_bounds();
×
UNCOV
2398
            std::uint64_t time_origin =
×
2399
                time_bounds.valid ? time_bounds.min_time_bucket : 0;
×
2400

2401
            DfanalyzerContext ctx;
×
2402
            ctx.file_hashes = &file_hashes;
×
2403
            ctx.host_hashes = &host_hashes;
×
UNCOV
2404
            ctx.query_filter = query_opt ? &*query_opt : nullptr;
×
2405
            ctx.time_origin = time_origin;
×
2406
            ctx.time_resolution = time_resolution;
×
2407
            ctx.time_granularity = time_granularity;
×
2408

2409
            std::vector<DfanalyzerScanOutput> outputs;
×
2410
            parallel_shard_scan_range<DfanalyzerScanOutput>(
×
2411
                rt, static_cast<std::uint16_t>(shard_begin_i),
2412
                static_cast<std::uint16_t>(shard_end_i),
2413
                [&](std::uint16_t sb, std::uint16_t se) {
×
2414
                    DfanalyzerScanInput input;
×
UNCOV
2415
                    input.agg = agg.get();
×
UNCOV
2416
                    input.ctx = &ctx;
×
2417
                    input.type_filter = std::nullopt;
×
2418
                    input.batch_size = batch_size;
×
2419
                    input.shard_begin = sb;
×
2420
                    input.shard_end = se;
×
2421
                    input.group_by = group_by_ptr;
×
2422
                    return scan_dfanalyzer_shards(input);
×
2423
                },
2424
                outputs);
2425
            auto t_scan = clock::now();
×
2426

UNCOV
2427
            for (auto& out : outputs) {
×
UNCOV
2428
                for (auto& r : out.events)
×
2429
                    events_results.push_back(std::move(r));
×
UNCOV
2430
                for (auto& r : out.profiles)
×
2431
                    profiles_results.push_back(std::move(r));
×
2432
                for (auto& r : out.system)
×
2433
                    system_results.push_back(std::move(r));
×
2434
            }
2435

2436
            std::fprintf(
×
2437
                stderr,
2438
                "[scan_aggregation_manifest] n_agg=%zu n_sys=%zu "
2439
                "scratch_open=%lldms ingest=%lldms hash_tables=%lldms "
2440
                "scan=%lldms\n",
2441
                agg_ssts.size(), sys_ssts.size(), ms(t_start, t_scratch_open),
×
2442
                ms(t_scratch_open, t_ingest), ms(t_ingest, t_hash_tables),
×
2443
                ms(t_hash_tables, t_scan));
×
UNCOV
2444
            std::fflush(stderr);
×
UNCOV
2445
        }
×
UNCOV
2446
    } catch (const std::exception& e) {
×
UNCOV
2447
        error_msg = e.what();
×
2448
    }
×
2449
    Py_END_ALLOW_THREADS
×
2450

2451
        if (!error_msg.empty()) {
×
2452
        Py_DECREF(events_list);
×
2453
        Py_DECREF(profiles_list);
×
2454
        Py_DECREF(system_list);
×
2455
        Py_DECREF(result_dict);
×
UNCOV
2456
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
UNCOV
2457
        return nullptr;
×
2458
    }
2459

2460
    append_results_to_list(events_list, events_results);
×
2461
    append_results_to_list(profiles_list, profiles_results);
×
UNCOV
2462
    append_results_to_list(system_list, system_results);
×
2463

2464
    PyDict_SetItemString(result_dict, "events", events_list);
×
2465
    PyDict_SetItemString(result_dict, "profiles", profiles_list);
×
2466
    PyDict_SetItemString(result_dict, "system", system_list);
×
2467
    Py_DECREF(events_list);
×
2468
    Py_DECREF(profiles_list);
×
2469
    Py_DECREF(system_list);
×
2470

UNCOV
2471
    return result_dict;
×
UNCOV
2472
}
×
2473

2474
static PyMethodDef BatchIndexerModuleMethods[] = {
2475
    {"scan_aggregation_manifest", (PyCFunction)scan_aggregation_manifest_fn,
2476
     METH_VARARGS | METH_KEYWORDS,
2477
     "scan_aggregation_manifest(agg_ssts, sys_ssts, scratch_dir, "
2478
     "meta_index_path, batch_size=10000, time_granularity=1.0, "
2479
     "time_resolution=1e6, query=None, group_by=None, shard_begin=0, "
2480
     "shard_end=4096, runtime=None) -> dict\n"
2481
     "--\n\n"
2482
     "Scan a worker's slice of the distributed aggregation manifest.\n\n"
2483
     "Ingests agg_ssts + sys_ssts into a scratch IndexDatabase at "
2484
     "scratch_dir (caller owns the directory lifecycle) and runs the "
2485
     "dfanalyzer aggregation scan over [shard_begin, shard_end). "
2486
     "meta_index_path is the unified .dftindex used to resolve file / "
2487
     "host hashes. Returns the same dict shape as "
2488
     "Indexer.iter_arrow_dfanalyzer_all."},
2489
    {nullptr, nullptr, 0, nullptr}};
2490
#endif
2491

2492
static PyMethodDef Indexer_methods[] = {
2493
    {"get_checkpoint_indexer", (PyCFunction)Indexer_get_checkpoint_indexer,
2494
     METH_VARARGS,
2495
     "get_checkpoint_indexer(file_path)\n"
2496
     "--\n\n"
2497
     "Get a checkpoint indexer for a specific file.\n\n"
2498
     "Args:\n"
2499
     "    file_path: Path to the trace file (.pfw/.pfw.gz)\n\n"
2500
     "Returns:\n"
2501
     "    Indexer instance for checkpoint-level operations.\n"},
2502
    {"resolve", (PyCFunction)Indexer_resolve, METH_NOARGS,
2503
     "resolve()\n"
2504
     "--\n\n"
2505
     "Check what files exist vs need indexing.\n\n"
2506
     "Returns:\n"
2507
     "    dict with 'total_files', 'ready', 'needs_work', 'index_path'\n"},
2508
    {"build", (PyCFunction)Indexer_build, METH_NOARGS,
2509
     "build()\n"
2510
     "--\n\n"
2511
     "Build all missing index tiers based on require_* flags.\n"},
2512
    {"ensure_indexed", (PyCFunction)Indexer_ensure_indexed, METH_NOARGS,
2513
     "ensure_indexed()\n"
2514
     "--\n\n"
2515
     "Resolve and build if needed.\n\n"
2516
     "Returns:\n"
2517
     "    dict with index status after building.\n"},
2518
    {"get_hash_table", (PyCFunction)Indexer_get_hash_table, METH_VARARGS,
2519
     "get_hash_table(type)\n"
2520
     "--\n\n"
2521
     "Query hash table mappings.\n\n"
2522
     "Args:\n"
2523
     "    type: 'file', 'host', 'string', or 'proc'\n\n"
2524
     "Returns:\n"
2525
     "    dict mapping hash values to resolved names.\n"},
2526
    {"query_file_pids", (PyCFunction)Indexer_query_file_pids, METH_VARARGS,
2527
     "query_file_pids(file_id)\n"
2528
     "--\n\n"
2529
     "Query PIDs observed in a specific file.\n\n"
2530
     "Args:\n"
2531
     "    file_id: Integer file ID from index.\n\n"
2532
     "Returns:\n"
2533
     "    set of PIDs.\n"},
2534
    {"query_all_file_pids", (PyCFunction)Indexer_query_all_file_pids,
2535
     METH_NOARGS,
2536
     "query_all_file_pids()\n"
2537
     "--\n\n"
2538
     "Query PIDs for all indexed files.\n\n"
2539
     "Returns:\n"
2540
     "    dict mapping file_id to set of PIDs.\n"},
2541
    {"query_file_info", (PyCFunction)Indexer_query_file_info, METH_NOARGS,
2542
     "query_file_info()\n"
2543
     "--\n\n"
2544
     "Query file ID to path mapping and per-file PIDs in one call.\n\n"
2545
     "Returns:\n"
2546
     "    tuple of (dict[int, str], dict[int, set[int]]).\n"},
2547
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2548
    {"iter_aggregation", (PyCFunction)Indexer_iter_aggregation,
2549
     METH_VARARGS | METH_KEYWORDS,
2550
     "iter_aggregation(type='events', batch_size=10000)\n"
2551
     "--\n\n"
2552
     "Iterate over aggregation data as Arrow batches.\n\n"
2553
     "Args:\n"
2554
     "    type: 'events', 'profiles', or 'system'\n"
2555
     "    batch_size: Number of entries per batch (default 10000)\n\n"
2556
     "Returns:\n"
2557
     "    Iterator over Arrow batch capsules.\n"},
2558
    {"iter_arrow_dfanalyzer", (PyCFunction)Indexer_iter_arrow_dfanalyzer,
2559
     METH_VARARGS | METH_KEYWORDS,
2560
     "iter_arrow_dfanalyzer(type='events', batch_size=10000, "
2561
     "time_granularity=1.0, time_resolution=1e6, query=None)\n"
2562
     "--\n\n"
2563
     "Iterate over aggregation data as dfanalyzer-compatible Arrow batches.\n\n"
2564
     "Output schema matches dfanalyzer expectations with resolved hashes,\n"
2565
     "normalized time_range, and computed columns (proc_name, io_cat).\n\n"
2566
     "Args:\n"
2567
     "    type: 'events', 'profiles', or 'system'\n"
2568
     "    batch_size: Number of entries per batch (default 10000)\n"
2569
     "    time_granularity: Bucket width in seconds (default 1.0)\n"
2570
     "    time_resolution: Microseconds per output time unit (default 1e6)\n"
2571
     "    query: Optional query filter string (e.g., \"pid == 1234\")\n\n"
2572
     "Returns:\n"
2573
     "    Iterator over Arrow batch capsules.\n"},
2574
    {"iter_arrow_dfanalyzer_all",
2575
     (PyCFunction)Indexer_iter_arrow_dfanalyzer_all,
2576
     METH_VARARGS | METH_KEYWORDS,
2577
     "iter_arrow_dfanalyzer_all(batch_size=10000, time_granularity=1.0, "
2578
     "time_resolution=1e6, query=None, group_by=None)\n"
2579
     "--\n\n"
2580
     "Iterate over all aggregation types in a single scan.\n\n"
2581
     "Returns a dict with 'events', 'profiles', 'system' keys, each "
2582
     "containing\n"
2583
     "a list of Arrow batch capsules. This is ~3x faster than calling\n"
2584
     "iter_arrow_dfanalyzer separately for each type.\n\n"
2585
     "When group_by is provided, the scan collapses dimensions during "
2586
     "aggregation\n"
2587
     "and emits a reduced schema containing only the requested columns plus\n"
2588
     "aggregated metrics (count, time, size, time_sq, size_sq, time_min,\n"
2589
     "time_max, size_min, size_max, time_call_min, time_call_max, "
2590
     "size_call_min,\n"
2591
     "size_call_max, time_start, time_end). Supported group_by columns: "
2592
     "cat,\n"
2593
     "func_name, pid, tid, file_hash, host_hash, file_name, host_name, "
2594
     "proc_name,\n"
2595
     "io_cat, acc_pat, time_range.\n\n"
2596
     "Args:\n"
2597
     "    batch_size: Number of entries per batch (default 10000)\n"
2598
     "    time_granularity: Bucket width in seconds (default 1.0)\n"
2599
     "    time_resolution: Microseconds per output time unit (default 1e6)\n"
2600
     "    query: Optional query filter string\n"
2601
     "    group_by: Optional list of columns to group by; enables coarse\n"
2602
     "        in-scan aggregation (default None = full granularity)\n\n"
2603
     "Returns:\n"
2604
     "    dict with 'events', 'profiles', 'system' lists of Arrow capsules.\n"},
2605
#endif
2606
    {nullptr}};
2607

2608
static PyGetSetDef Indexer_getsetters[] = {{nullptr}};
2609

2610
PyTypeObject IndexerType = {
2611
    PyVarObject_HEAD_INIT(nullptr, 0) "dftracer_utils_ext.Indexer",
2612
    sizeof(IndexerObject),
2613
    0,
2614
    (destructor)Indexer_dealloc,
2615
    0,
2616
    0,
2617
    0,
2618
    0,
2619
    0,
2620
    0,
2621
    0,
2622
    0,
2623
    0,
2624
    0,
2625
    0,
2626
    0,
2627
    0,
2628
    0,
2629
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
2630
    "BatchIndexer(directory='', files=None, index_dir='',\n"
2631
    "             require_checkpoint=True, require_bloom=True,\n"
2632
    "             require_manifest=True, require_aggregation=False,\n"
2633
    "             time_interval_ms=5000.0, group_keys=None,\n"
2634
    "             custom_metric_fields=None, compute_percentiles=False,\n"
2635
    "             parallelism=0, force_rebuild=False, runtime=None)\n"
2636
    "--\n\n"
2637
    "Indexer with tiered index building.\n\n"
2638
    "At least one of 'directory' or 'files' must be provided.\n"
2639
    "- directory: scan for .pfw/.pfw.gz files\n"
2640
    "- files: list of specific file paths\n\n"
2641
    "Supports:\n"
2642
    "- Tier 1: Checkpoints (require_checkpoint)\n"
2643
    "- Tier 2: Bloom filters (require_bloom), Manifests (require_manifest)\n"
2644
    "- Tier 3: Aggregation (require_aggregation + config params)\n",
2645
    0,
2646
    0,
2647
    0,
2648
    0,
2649
    0,
2650
    0,
2651
    Indexer_methods,
2652
    0,
2653
    Indexer_getsetters,
2654
    0,
2655
    0,
2656
    0,
2657
    0,
2658
    0,
2659
    (initproc)Indexer_init,
2660
    0,
2661
    Indexer_new,
2662
};
2663

2664
int init_indexer(PyObject* m) {
2✔
2665
    if (PyType_Ready(&IndexerType) < 0) return -1;
2✔
2666

2667
    Py_INCREF(&IndexerType);
1✔
2668
    if (PyModule_AddObject(m, "Indexer", (PyObject*)&IndexerType) < 0) {
2✔
2669
        Py_DECREF(&IndexerType);
UNCOV
2670
        return -1;
×
2671
    }
2672

2673
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2674
    if (PyModule_AddFunctions(m, BatchIndexerModuleMethods) < 0) return -1;
2✔
2675
#endif
2676

2677
    return 0;
2✔
2678
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc