• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26043728131

18 May 2026 03:37PM UTC coverage: 51.706% (-0.4%) from 52.076%
26043728131

push

github

hariharan-devarajan
feat(perf): performance improvements for parallel reading, indexing, and aggregation

Indexer
- Streaming parse-and-emit worker pipeline with bounded memory usage
- Concurrent SST artifact ingestion with staging support
- Gzip member slicing for parallel indexing
- Lazy decoding for compressed value counts
- Bypass DOM wrapper for indexer hot path (simdjson on_demand)
- Decoupled write workers from parse workers
- --rebuild-summaries flag and optimized root summary rebuild

Aggregator / MPI
- Task-based DAG execution for aggregator pipeline
- Shared staging for multi-node artifact relocation
- Per-node thread scaling to avoid oversubscription
- Unified distributed aggregation tracking, removed manifest consolidation
- Deterministic aggregation and intra-file parallelism

Trace reader / query
- Compiled predicate evaluation for AND-of-EQ queries
- Uniform-match shortcut for AND-of-EQ queries
- Line-range support for work items and checkpoint processing
- Optimized chunk pruning and checkpoint handling

Replay
- Pipelined replay with coroutines and channels
- JsonParser-based trace processing
- Optimized string handling and i/o buffering

Organize / writer / dft
- Parallel slice creation and merging in organize visitor
- Inline indexer in organize
- Gzip member tracking in writer
- Coroutine-based event dispatcher with extracted parse logic
- Batch flushing in organize visitor

Arrow / call_tree
- Optimized arrow conversion
- Arrow IPC support and improved save/load in call_tree

Build / infrastructure
- zlib-ng option, system simdjson fallback
- cgroup v1/v2 memory limit detection
- Auto-computed per-file memory estimates and batch sizes
- CI: perf branch trigger, formatting

Docs
- Rewritten indexer and trace reader API references

35907 of 90345 branches covered (39.74%)

Branch coverage included in aggregate %.

16869 of 21880 new or added lines in 137 files covered. (77.1%)

273 existing lines in 39 files now uncovered.

32021 of 41028 relevant lines covered (78.05%)

13164.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.77
/src/dftracer/utils/python/batch_indexer.cpp
1
#include <dftracer/utils/core/common/filesystem.h>
2
#include <dftracer/utils/core/common/string_intern.h>
3
#include <dftracer/utils/core/coro/task.h>
4
#include <dftracer/utils/core/coro/when_all.h>
5
#include <dftracer/utils/core/rocksdb/db_manager.h>
6
#include <dftracer/utils/core/runtime.h>
7
#include <dftracer/utils/core/tasks/coro_scope.h>
8
#include <dftracer/utils/python/batch_indexer.h>
9
#include <dftracer/utils/python/indexer.h>
10
#include <dftracer/utils/python/runtime.h>
11
#include <dftracer/utils/utilities/common/query/query.h>
12
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_config.h>
13
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregation_serialization.h>
14
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregator_types.h>
15
#include <dftracer/utils/utilities/composites/dft/aggregators/event_aggregator.h>
16
#include <dftracer/utils/utilities/composites/dft/aggregators/system_metrics.h>
17
#include <dftracer/utils/utilities/composites/dft/aggregators/system_metrics_serialization.h>
18
#include <dftracer/utils/utilities/composites/dft/indexing/index_resolver_utility.h>
19
#include <dftracer/utils/utilities/composites/dft/indexing/resolve_and_build.h>
20
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
21
#include <dftracer/utils/utilities/indexer/index_database.h>
22

23
#ifdef DFTRACER_UTILS_ENABLE_ARROW
24
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
25
#endif
26

27
#include <algorithm>
28
#include <chrono>
29
#include <cstdio>
30
#include <limits>
31
#include <optional>
32
#include <string>
33
#include <unordered_map>
34
#include <vector>
35

36
using dftracer::utils::CoroScope;
37
using dftracer::utils::Runtime;
38
using dftracer::utils::coro::CoroTask;
39
using namespace dftracer::utils::utilities::composites::dft::indexing;
40
using namespace dftracer::utils::utilities::composites::dft::aggregators;
41

42
// ---------------------------------------------------------------------------
43
// BatchIndexer - directory-level indexer with resolve/build pattern
44
// ---------------------------------------------------------------------------
45

46
static void Indexer_dealloc(IndexerObject* self) {
94✔
47
    Py_XDECREF(self->runtime_obj);
94✔
48
    Py_XDECREF(self->directory);
94✔
49
    Py_XDECREF(self->files);
94✔
50
    Py_XDECREF(self->index_dir);
94✔
51
    Py_XDECREF(self->group_keys);
94✔
52
    Py_XDECREF(self->custom_metric_fields);
94✔
53
    Py_TYPE(self)->tp_free((PyObject*)self);
94✔
54
}
94✔
55

56
static PyObject* Indexer_new(PyTypeObject* type, PyObject* args,
94✔
57
                             PyObject* kwds) {
58
    IndexerObject* self = (IndexerObject*)type->tp_alloc(type, 0);
94✔
59
    if (self) {
94✔
60
        self->runtime_obj = nullptr;
94✔
61
        self->directory = nullptr;
94✔
62
        self->files = nullptr;
94✔
63
        self->index_dir = nullptr;
94✔
64
        self->require_checkpoint = 1;
94✔
65
        self->require_bloom = 1;
94✔
66
        self->require_manifest = 1;
94✔
67
        self->require_aggregation = 0;
94✔
68
        self->time_interval_ms = 5000.0;
94✔
69
        self->group_keys = nullptr;
94✔
70
        self->custom_metric_fields = nullptr;
94✔
71
        self->compute_percentiles = 0;
94✔
72
        self->checkpoint_size = 32 * 1024 * 1024;
94✔
73
        self->parallelism = 0;
94✔
74
        self->force_rebuild = 0;
94✔
75
    }
47✔
76
    return (PyObject*)self;
94✔
77
}
78

79
static int Indexer_init(IndexerObject* self, PyObject* args, PyObject* kwds) {
94✔
80
    static const char* kwlist[] = {"directory",
81
                                   "files",
82
                                   "index_dir",
83
                                   "require_checkpoint",
84
                                   "require_bloom",
85
                                   "require_manifest",
86
                                   "require_aggregation",
87
                                   "time_interval_ms",
88
                                   "group_keys",
89
                                   "custom_metric_fields",
90
                                   "compute_percentiles",
91
                                   "checkpoint_size",
92
                                   "parallelism",
93
                                   "force_rebuild",
94
                                   "runtime",
95
                                   nullptr};
96

97
    const char* directory = "";
94✔
98
    PyObject* files_obj = Py_None;
94✔
99
    const char* index_dir = "";
94✔
100
    int require_checkpoint = 1;
94✔
101
    int require_bloom = 1;
94✔
102
    int require_manifest = 1;
94✔
103
    int require_aggregation = 0;
94✔
104
    double time_interval_ms = 5000.0;
94✔
105
    PyObject* group_keys_obj = Py_None;
94✔
106
    PyObject* custom_metrics_obj = Py_None;
94✔
107
    int compute_percentiles = 0;
94✔
108
    Py_ssize_t checkpoint_size = 32 * 1024 * 1024;  // 32MB default
94✔
109
    Py_ssize_t parallelism = 0;
94✔
110
    int force_rebuild = 0;
94✔
111
    PyObject* runtime_arg = nullptr;
94✔
112

113
    if (!PyArg_ParseTupleAndKeywords(
94!
114
            args, kwds, "|sOsppppdOOpnnpO", (char**)kwlist, &directory,
47✔
115
            &files_obj, &index_dir, &require_checkpoint, &require_bloom,
116
            &require_manifest, &require_aggregation, &time_interval_ms,
117
            &group_keys_obj, &custom_metrics_obj, &compute_percentiles,
118
            &checkpoint_size, &parallelism, &force_rebuild, &runtime_arg)) {
NEW
119
        return -1;
×
120
    }
121

122
    // Validate: at least one of directory or files must be provided
123
    bool has_directory = directory && directory[0] != '\0';
94✔
124
    bool has_files = files_obj && files_obj != Py_None &&
146✔
125
                     PyList_Check(files_obj) && PyList_Size(files_obj) > 0;
146!
126

127
    if (!has_directory && !has_files) {
94✔
128
        PyErr_SetString(PyExc_ValueError,
2!
129
                        "At least one of 'directory' or 'files' must be "
130
                        "provided");
131
        return -1;
2✔
132
    }
133

134
    // Store runtime
135
    if (runtime_arg && runtime_arg != Py_None) {
92!
NEW
136
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
NEW
137
            Py_INCREF(runtime_arg);
×
NEW
138
            self->runtime_obj = runtime_arg;
×
139
        } else {
NEW
140
            PyObject* native = PyObject_GetAttrString(runtime_arg, "_native");
×
NEW
141
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
×
NEW
142
                self->runtime_obj = native;
×
143
            } else {
NEW
144
                Py_XDECREF(native);
×
NEW
145
                PyErr_SetString(PyExc_TypeError,
×
146
                                "runtime must be a Runtime instance or None");
NEW
147
                return -1;
×
148
            }
149
        }
150
    }
151

152
    self->directory = PyUnicode_FromString(directory);
92!
153
    self->index_dir = PyUnicode_FromString(index_dir);
92!
154
    self->require_checkpoint = require_checkpoint;
92✔
155
    self->require_bloom = require_bloom;
92✔
156
    self->require_manifest = require_manifest;
92✔
157
    self->require_aggregation = require_aggregation;
92✔
158
    self->time_interval_ms = time_interval_ms;
92✔
159
    self->compute_percentiles = compute_percentiles;
92✔
160
    self->checkpoint_size = static_cast<std::size_t>(checkpoint_size);
92✔
161
    self->parallelism = static_cast<std::size_t>(parallelism);
92✔
162
    self->force_rebuild = force_rebuild;
92✔
163

164
    // Store files list
165
    if (has_files) {
92✔
166
        Py_INCREF(files_obj);
52!
167
        self->files = files_obj;
52✔
168
    } else {
26✔
169
        self->files = nullptr;
40✔
170
    }
171

172
    // Store group_keys
173
    if (group_keys_obj && group_keys_obj != Py_None) {
92!
NEW
174
        Py_INCREF(group_keys_obj);
×
NEW
175
        self->group_keys = group_keys_obj;
×
176
    } else {
177
        self->group_keys = nullptr;
92✔
178
    }
179

180
    // Store custom_metric_fields
181
    if (custom_metrics_obj && custom_metrics_obj != Py_None) {
92!
NEW
182
        Py_INCREF(custom_metrics_obj);
×
NEW
183
        self->custom_metric_fields = custom_metrics_obj;
×
184
    } else {
185
        self->custom_metric_fields = nullptr;
92✔
186
    }
187

188
    return 0;
92✔
189
}
47✔
190

191
static Runtime* get_batch_indexer_runtime(IndexerObject* self) {
268✔
192
    if (self->runtime_obj) {
268!
NEW
193
        return ((RuntimeObject*)self->runtime_obj)->runtime.get();
×
194
    }
195
    return get_default_runtime();
268✔
196
}
134✔
197

198
static std::optional<AggregationConfig> build_aggregation_config(
246✔
199
    IndexerObject* self) {
200
    if (!self->require_aggregation) {
246✔
201
        return std::nullopt;
152✔
202
    }
203

204
    AggregationConfig config;
94!
205
    config.time_interval_us =
94✔
206
        static_cast<std::uint64_t>(self->time_interval_ms * 1000.0);
94✔
207

208
    if (self->group_keys && PyList_Check(self->group_keys)) {
94!
NEW
209
        Py_ssize_t n = PyList_Size(self->group_keys);
×
NEW
210
        for (Py_ssize_t i = 0; i < n; i++) {
×
211
            const char* s =
NEW
212
                PyUnicode_AsUTF8(PyList_GetItem(self->group_keys, i));
×
NEW
213
            if (s) config.extra_group_keys.emplace_back(s);
×
214
        }
215
    }
216
    if (self->custom_metric_fields &&
94!
NEW
217
        PyList_Check(self->custom_metric_fields)) {
×
NEW
218
        Py_ssize_t n = PyList_Size(self->custom_metric_fields);
×
NEW
219
        for (Py_ssize_t i = 0; i < n; i++) {
×
220
            const char* s =
NEW
221
                PyUnicode_AsUTF8(PyList_GetItem(self->custom_metric_fields, i));
×
NEW
222
            if (s) config.custom_metric_fields.emplace_back(s);
×
223
        }
224
    }
225

226
    config.compute_percentiles = self->compute_percentiles != 0;
94✔
227
    return config;
94!
228
}
170✔
229

230
// ---------------------------------------------------------------------------
231
// resolve() - check what exists vs needs building
232
// ---------------------------------------------------------------------------
233

234
static PyObject* Indexer_resolve(IndexerObject* self,
186✔
235
                                 PyObject* Py_UNUSED(ignored)) {
236
    const char* directory = PyUnicode_AsUTF8(self->directory);
186!
237
    const char* index_dir = PyUnicode_AsUTF8(self->index_dir);
186!
238

239
    ResolverInput input;
186✔
240
    input.directory = directory ? directory : "";
186!
241
    input.index_dir = index_dir ? index_dir : "";
186!
242
    input.require_checkpoints = self->require_checkpoint;
186✔
243
    input.require_bloom = self->require_bloom;
186✔
244
    input.require_manifest = self->require_manifest;
186✔
245
    input.require_aggregation = self->require_aggregation;
186✔
246
    input.aggregation_config = build_aggregation_config(self);
186!
247

248
    // Add files if provided
249
    if (self->files && PyList_Check(self->files)) {
186!
250
        Py_ssize_t n = PyList_Size(self->files);
120!
251
        for (Py_ssize_t i = 0; i < n; i++) {
258✔
252
            const char* s = PyUnicode_AsUTF8(PyList_GetItem(self->files, i));
138!
253
            if (s) input.files.emplace_back(s);
138!
254
        }
69✔
255
    }
60✔
256

257
    ResolverResult result;
186✔
258
    std::string error_msg;
186✔
259

260
    Py_BEGIN_ALLOW_THREADS try {
186!
261
        Runtime* rt = get_batch_indexer_runtime(self);
186!
262
        rt->submit(run_coro_scope(
558!
263
                       rt->executor(),
93!
264
                       [](CoroScope& scope, ResolverInput in,
744!
265
                          ResolverResult* out) -> CoroTask<void> {
93!
266
                           IndexResolverUtility resolver;
279!
267
                           // Use scope.spawn(utility, input) which auto-binds
268
                           // context for utilities with NeedsContext tag
269
                           *out = co_await scope.spawn(resolver, std::move(in));
465!
270
                       },
465!
271
                       std::move(input), &result),
186✔
272
                   "batch-indexer-resolve")
93!
273
            .get();
186!
274
    } catch (const std::exception& e) {
93!
NEW
275
        error_msg = e.what();
×
NEW
276
    }
×
277
    Py_END_ALLOW_THREADS
186!
278

279
        if (!error_msg.empty()) {
186!
NEW
280
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
NEW
281
        return nullptr;
×
282
    }
283

284
    // Build result dict
285
    PyObject* dict = PyDict_New();
186!
286
    if (!dict) return nullptr;
186!
287

288
    PyDict_SetItemString(dict, "total_files",
186!
289
                         PyLong_FromSize_t(result.all_files.size()));
93!
290
    PyDict_SetItemString(dict, "index_path",
186!
291
                         PyUnicode_FromString(result.index_path.c_str()));
93!
292

293
    // Ready files
294
    PyObject* ready_list = PyList_New(result.cached.size());
186!
295
    for (std::size_t i = 0; i < result.cached.size(); ++i) {
334✔
296
        PyList_SetItem(
148!
297
            ready_list, i,
74✔
298
            PyUnicode_FromString(result.cached[i].file_path.c_str()));
148!
299
    }
74✔
300
    PyDict_SetItemString(dict, "ready", ready_list);
186!
301

302
    // Needs work files (union of all needs_* lists)
303
    std::vector<std::string> needs_work;
186✔
304
    for (const auto& item : result.needs_checkpoint) {
260✔
305
        needs_work.push_back(item.file_path);
74!
306
    }
307
    for (const auto& item : result.needs_bloom) {
186!
NEW
308
        bool found = false;
×
NEW
309
        for (const auto& existing : needs_work) {
×
NEW
310
            if (existing == item.file_path) {
×
NEW
311
                found = true;
×
NEW
312
                break;
×
313
            }
314
        }
NEW
315
        if (!found) needs_work.push_back(item.file_path);
×
316
    }
317
    for (const auto& item : result.needs_manifest) {
186!
NEW
318
        bool found = false;
×
NEW
319
        for (const auto& existing : needs_work) {
×
NEW
320
            if (existing == item.file_path) {
×
NEW
321
                found = true;
×
NEW
322
                break;
×
323
            }
324
        }
NEW
325
        if (!found) needs_work.push_back(item.file_path);
×
326
    }
327
    for (const auto& item : result.needs_aggregation) {
186✔
NEW
328
        bool found = false;
×
NEW
329
        for (const auto& existing : needs_work) {
×
NEW
330
            if (existing == item.file_path) {
×
NEW
331
                found = true;
×
NEW
332
                break;
×
333
            }
334
        }
NEW
335
        if (!found) needs_work.push_back(item.file_path);
×
336
    }
337

338
    PyObject* needs_list = PyList_New(needs_work.size());
186!
339
    for (std::size_t i = 0; i < needs_work.size(); ++i) {
260✔
340
        PyList_SetItem(needs_list, i,
74!
341
                       PyUnicode_FromString(needs_work[i].c_str()));
74!
342
    }
37✔
343
    PyDict_SetItemString(dict, "needs_work", needs_list);
186!
344

345
    return dict;
186✔
346
}
186✔
347

348
// ---------------------------------------------------------------------------
349
// build() - build missing index tiers
350
// ---------------------------------------------------------------------------
351

352
static PyObject* Indexer_build(IndexerObject* self,
60✔
353
                               PyObject* Py_UNUSED(ignored)) {
354
    const char* directory = PyUnicode_AsUTF8(self->directory);
60!
355
    const char* index_dir = PyUnicode_AsUTF8(self->index_dir);
60!
356

357
    ResolveAndBuildInput input;
60✔
358
    input.directory = directory ? directory : "";
60!
359
    input.index_dir = index_dir ? index_dir : "";
60!
360
    input.require_checkpoints = self->require_checkpoint;
60✔
361
    input.require_bloom = self->require_bloom;
60✔
362
    input.require_manifest = self->require_manifest;
60✔
363
    input.require_aggregation = self->require_aggregation;
60✔
364
    input.aggregation_config = build_aggregation_config(self);
60!
365
    input.checkpoint_size = self->checkpoint_size;
60✔
366
    input.parallelism = self->parallelism;
60✔
367
    input.force_rebuild = self->force_rebuild;
60✔
368

369
    // Add files if provided
370
    if (self->files && PyList_Check(self->files)) {
60!
371
        Py_ssize_t n = PyList_Size(self->files);
48!
372
        for (Py_ssize_t i = 0; i < n; i++) {
104✔
373
            const char* s = PyUnicode_AsUTF8(PyList_GetItem(self->files, i));
56!
374
            if (s) input.files.emplace_back(s);
56!
375
        }
28✔
376
    }
24✔
377

378
    std::string error_msg;
60✔
379

380
    Py_BEGIN_ALLOW_THREADS try {
60!
381
        Runtime* rt = get_batch_indexer_runtime(self);
60!
382
        rt->submit(run_coro_scope(
210!
383
                       rt->executor(),
30✔
384
                       [](CoroScope& scope,
240!
385
                          ResolveAndBuildInput in) -> CoroTask<void> {
30!
386
                           co_await resolve_and_build_index(&scope,
240!
387
                                                            std::move(in));
90✔
388
                       },
120!
389
                       std::move(input)),
60✔
390
                   "batch-indexer-build")
30!
391
            .get();
60!
392
    } catch (const std::exception& e) {
30!
NEW
393
        error_msg = e.what();
×
NEW
394
    }
×
395
    Py_END_ALLOW_THREADS
60!
396

397
        if (!error_msg.empty()) {
60!
NEW
398
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
NEW
399
        return nullptr;
×
400
    }
401

402
    Py_RETURN_NONE;
60✔
403
}
60✔
404

405
// ---------------------------------------------------------------------------
406
// ensure_indexed() - resolve + build if needed
407
// ---------------------------------------------------------------------------
408

409
static PyObject* Indexer_ensure_indexed(IndexerObject* self,
76✔
410
                                        PyObject* Py_UNUSED(ignored)) {
411
    // First resolve
412
    PyObject* status = Indexer_resolve(self, nullptr);
76✔
413
    if (!status) return nullptr;
76✔
414

415
    // Check if needs_work is non-empty
416
    PyObject* needs_work = PyDict_GetItemString(status, "needs_work");
76✔
417
    if (needs_work && PyList_Size(needs_work) > 0) {
76!
418
        Py_DECREF(status);
29✔
419

420
        // Build
421
        PyObject* result = Indexer_build(self, nullptr);
58✔
422
        if (!result) return nullptr;
58✔
423
        Py_DECREF(result);
29✔
424

425
        // Re-resolve
426
        status = Indexer_resolve(self, nullptr);
58✔
427
    }
29✔
428

429
    return status;
76✔
430
}
38✔
431

432
// ---------------------------------------------------------------------------
433
// get_checkpoint_indexer() - get a single-file checkpoint indexer
434
// ---------------------------------------------------------------------------
435

436
static PyObject* Indexer_get_checkpoint_indexer(IndexerObject* self,
12✔
437
                                                PyObject* args) {
438
    const char* file_path = nullptr;
12✔
439
    if (!PyArg_ParseTuple(args, "s", &file_path)) {
12!
NEW
440
        return nullptr;
×
441
    }
442

443
    // Determine index path using BatchIndexer's index_dir setting
444
    const char* index_dir = PyUnicode_AsUTF8(self->index_dir);
12!
445
    std::string index_path = dftracer::utils::utilities::composites::dft::
6!
446
        internal::determine_index_path(file_path, index_dir ? index_dir : "");
18!
447

448
    // Create IndexerObject
449
    CheckpointIndexerObject* indexer =
6✔
450
        (CheckpointIndexerObject*)CheckpointIndexerType.tp_alloc(
12!
451
            &CheckpointIndexerType, 0);
452
    if (!indexer) {
12✔
NEW
453
        return nullptr;
×
454
    }
455

456
    indexer->handle = nullptr;
12✔
457
    indexer->gz_path = PyUnicode_FromString(file_path);
12!
458
    indexer->index_path = PyUnicode_FromString(index_path.c_str());
12!
459
    indexer->checkpoint_size = self->checkpoint_size;
12✔
460
    indexer->build_bloom = 0;
12✔
461
    indexer->build_manifest = 0;
12✔
462

463
    // Share runtime reference
464
    if (self->runtime_obj) {
12!
NEW
465
        Py_INCREF(self->runtime_obj);
×
NEW
466
        indexer->runtime_obj = self->runtime_obj;
×
467
    } else {
468
        indexer->runtime_obj = nullptr;
12✔
469
    }
470

471
    // Create the native handle
472
    indexer->handle = dft_indexer_create(file_path, index_path.c_str(),
18!
473
                                         self->checkpoint_size, 0);
6✔
474
    if (!indexer->handle) {
12!
475
        Py_DECREF((PyObject*)indexer);
×
NEW
476
        PyErr_SetString(PyExc_RuntimeError,
×
477
                        "Failed to create checkpoint indexer");
NEW
478
        return nullptr;
×
479
    }
480

481
    return (PyObject*)indexer;
12✔
482
}
12✔
483

484
static std::optional<std::string> resolve_index_path(IndexerObject* self) {
42✔
485
    PyObject* status = Indexer_resolve(self, nullptr);
42!
486
    if (!status) return std::nullopt;
42✔
487
    PyObject* obj = PyDict_GetItemString(status, "index_path");
42!
488
    const char* path = obj ? PyUnicode_AsUTF8(obj) : nullptr;
42!
489
    if (!path || path[0] == '\0') {
42!
490
        Py_DECREF(status);
NEW
491
        PyErr_SetString(PyExc_RuntimeError, "No index path available");
×
NEW
492
        return std::nullopt;
×
493
    }
494
    std::string result(path);
63!
495
    Py_DECREF(status);
21!
496
    return result;
42!
497
}
42✔
498

499
static PyObject* Indexer_get_hash_table(IndexerObject* self, PyObject* args) {
12✔
500
    const char* type_str = nullptr;
12✔
501
    if (!PyArg_ParseTuple(args, "s", &type_str)) {
12!
NEW
502
        return nullptr;
×
503
    }
504

505
    using dftracer::utils::utilities::indexer::IndexDatabase;
506
    using HashType = IndexDatabase::HashType;
507

508
    HashType type;
509
    if (std::strcmp(type_str, "file") == 0) {
12✔
510
        type = HashType::FILE;
4✔
511
    } else if (std::strcmp(type_str, "host") == 0) {
10✔
512
        type = HashType::HOST;
4✔
513
    } else if (std::strcmp(type_str, "string") == 0) {
6✔
514
        type = HashType::STRING;
2✔
515
    } else if (std::strcmp(type_str, "proc") == 0) {
3✔
NEW
516
        type = HashType::PROC;
×
517
    } else {
518
        PyErr_SetString(PyExc_ValueError,
2!
519
                        "type must be 'file', 'host', 'string', or 'proc'");
520
        return nullptr;
2✔
521
    }
522

523
    auto idx_opt = resolve_index_path(self);
10!
524
    if (!idx_opt) return nullptr;
10✔
525
    std::string index_path = std::move(*idx_opt);
10✔
526

527
    std::unordered_map<std::string, std::string> hash_map;
10✔
528
    std::string error_msg;
10✔
529

530
    Py_BEGIN_ALLOW_THREADS try {
10!
531
        IndexDatabase db(
5!
532
            index_path,
533
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
5!
534
        hash_map = db.query_hash_table(type);
10!
535
    } catch (const std::exception& e) {
10!
NEW
536
        error_msg = e.what();
×
NEW
537
    }
×
538
    Py_END_ALLOW_THREADS
10!
539

540
        if (!error_msg.empty()) {
10!
NEW
541
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
NEW
542
        return nullptr;
×
543
    }
544

545
    PyObject* dict = PyDict_New();
10!
546
    if (!dict) return nullptr;
10✔
547

548
    for (const auto& [hash, name] : hash_map) {
10!
NEW
549
        PyObject* key = PyUnicode_FromStringAndSize(hash.data(), hash.size());
×
NEW
550
        PyObject* val = PyUnicode_FromStringAndSize(name.data(), name.size());
×
NEW
551
        PyDict_SetItem(dict, key, val);
×
552
        Py_DECREF(key);
×
553
        Py_DECREF(val);
×
554
    }
555

556
    return dict;
10✔
557
}
11✔
558

559
static PyObject* Indexer_query_file_pids(IndexerObject* self, PyObject* args) {
4✔
560
    int file_id;
561
    if (!PyArg_ParseTuple(args, "i", &file_id)) {
4!
NEW
562
        return nullptr;
×
563
    }
564

565
    using dftracer::utils::utilities::indexer::IndexDatabase;
566

567
    auto idx_opt = resolve_index_path(self);
4!
568
    if (!idx_opt) return nullptr;
4✔
569
    std::string index_path = std::move(*idx_opt);
4✔
570

571
    std::unordered_set<std::uint64_t> pids;
4✔
572
    std::string error_msg;
4✔
573

574
    Py_BEGIN_ALLOW_THREADS try {
4!
575
        IndexDatabase db(
2!
576
            index_path,
577
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
2!
578
        pids = db.query_file_pids(file_id);
4!
579
    } catch (const std::exception& e) {
4!
NEW
580
        error_msg = e.what();
×
NEW
581
    }
×
582
    Py_END_ALLOW_THREADS
4!
583

584
        if (!error_msg.empty()) {
4!
NEW
585
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
NEW
586
        return nullptr;
×
587
    }
588

589
    PyObject* set = PySet_New(nullptr);
4!
590
    if (!set) return nullptr;
4✔
591

592
    for (auto pid : pids) {
6!
593
        PyObject* val = PyLong_FromUnsignedLongLong(pid);
2!
594
        PySet_Add(set, val);
2!
595
        Py_DECREF(val);
1!
596
    }
597

598
    return set;
4✔
599
}
4✔
600

601
static PyObject* Indexer_query_all_file_pids(IndexerObject* self,
6✔
602
                                             PyObject* Py_UNUSED(ignored)) {
603
    using dftracer::utils::utilities::indexer::IndexDatabase;
604

605
    auto idx_opt = resolve_index_path(self);
6!
606
    if (!idx_opt) return nullptr;
6!
607
    std::string index_path = std::move(*idx_opt);
6✔
608

609
    std::unordered_map<int, std::unordered_set<std::uint64_t>> all_pids;
6✔
610
    std::string error_msg;
6✔
611

612
    Py_BEGIN_ALLOW_THREADS try {
6!
613
        IndexDatabase db(
3!
614
            index_path,
615
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
3!
616
        all_pids = db.query_all_file_pids();
6!
617
    } catch (const std::exception& e) {
6!
NEW
618
        error_msg = e.what();
×
NEW
619
    }
×
620
    Py_END_ALLOW_THREADS
6!
621

622
        if (!error_msg.empty()) {
6!
NEW
623
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
NEW
624
        return nullptr;
×
625
    }
626

627
    PyObject* dict = PyDict_New();
6!
628
    if (!dict) return nullptr;
6✔
629

630
    for (const auto& [file_id, pids] : all_pids) {
15!
631
        PyObject* key = PyLong_FromLong(file_id);
6!
632
        PyObject* set = PySet_New(nullptr);
6!
633
        for (auto pid : pids) {
12✔
634
            PyObject* val = PyLong_FromUnsignedLongLong(pid);
6!
635
            PySet_Add(set, val);
6!
636
            Py_DECREF(val);
3!
637
        }
638
        PyDict_SetItem(dict, key, set);
6!
639
        Py_DECREF(key);
3!
640
        Py_DECREF(set);
3!
641
    }
642

643
    return dict;
6✔
644
}
6✔
645

NEW
646
static PyObject* Indexer_query_file_info(IndexerObject* self,
×
647
                                         PyObject* Py_UNUSED(ignored)) {
648
    using dftracer::utils::utilities::indexer::IndexDatabase;
649

NEW
650
    auto idx_opt = resolve_index_path(self);
×
NEW
651
    if (!idx_opt) return nullptr;
×
NEW
652
    std::string index_path = std::move(*idx_opt);
×
653

NEW
654
    std::unordered_map<std::string, int> file_ids;
×
NEW
655
    std::unordered_map<int, std::unordered_set<std::uint64_t>> all_pids;
×
NEW
656
    std::string error_msg;
×
657

NEW
658
    Py_BEGIN_ALLOW_THREADS try {
×
659
        IndexDatabase db(
×
660
            index_path,
661
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
×
NEW
662
        file_ids = db.query_all_file_info_ids();
×
NEW
663
        all_pids = db.query_all_file_pids();
×
NEW
664
    } catch (const std::exception& e) {
×
NEW
665
        error_msg = e.what();
×
NEW
666
    }
×
NEW
667
    Py_END_ALLOW_THREADS
×
668

NEW
669
        if (!error_msg.empty()) {
×
NEW
670
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
NEW
671
        return nullptr;
×
672
    }
673

NEW
674
    auto data_dir = fs::weakly_canonical(fs::path(index_path)).parent_path();
×
675

NEW
676
    PyObject* id_to_path = PyDict_New();
×
NEW
677
    if (!id_to_path) return nullptr;
×
NEW
678
    for (const auto& [logical_name, fid] : file_ids) {
×
NEW
679
        auto resolved = (data_dir / logical_name).string();
×
NEW
680
        PyObject* key = PyLong_FromLong(fid);
×
NEW
681
        PyObject* val = PyUnicode_FromStringAndSize(
×
NEW
682
            resolved.data(), static_cast<Py_ssize_t>(resolved.size()));
×
NEW
683
        PyDict_SetItem(id_to_path, key, val);
×
684
        Py_DECREF(key);
×
685
        Py_DECREF(val);
×
NEW
686
    }
×
687

NEW
688
    PyObject* pid_dict = PyDict_New();
×
NEW
689
    if (!pid_dict) {
×
690
        Py_DECREF(id_to_path);
×
NEW
691
        return nullptr;
×
692
    }
NEW
693
    for (const auto& [file_id, pids] : all_pids) {
×
NEW
694
        PyObject* key = PyLong_FromLong(file_id);
×
NEW
695
        PyObject* set = PySet_New(nullptr);
×
NEW
696
        for (auto pid : pids) {
×
NEW
697
            PyObject* val = PyLong_FromUnsignedLongLong(pid);
×
NEW
698
            PySet_Add(set, val);
×
699
            Py_DECREF(val);
×
700
        }
NEW
701
        PyDict_SetItem(pid_dict, key, set);
×
702
        Py_DECREF(key);
×
703
        Py_DECREF(set);
×
704
    }
705

NEW
706
    PyObject* result = PyTuple_Pack(2, id_to_path, pid_dict);
×
707
    Py_DECREF(id_to_path);
×
708
    Py_DECREF(pid_dict);
×
NEW
709
    return result;
×
NEW
710
}
×
711

712
#ifdef DFTRACER_UTILS_ENABLE_ARROW
713
#include <dftracer/utils/python/trace_reader_iterator.h>
714
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
715

716
static PyObject* create_arrow_batch_capsule(
70✔
717
    dftracer::utils::utilities::common::arrow::ArrowExportResult&& result) {
718
    auto* obj = (ArrowBatchCapsuleObject*)ArrowBatchCapsuleType.tp_alloc(
70✔
719
        &ArrowBatchCapsuleType, 0);
720
    if (!obj) return nullptr;
70✔
721
    obj->result =
70✔
722
        new dftracer::utils::utilities::common::arrow::ArrowExportResult(
60!
723
            std::move(result));
70!
724
    return (PyObject*)obj;
70✔
725
}
30✔
726

727
namespace {
728

729
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
730
using dftracer::utils::utilities::common::arrow::ColumnSpec;
731
using dftracer::utils::utilities::common::arrow::ColumnType;
732
using dftracer::utils::utilities::common::arrow::RecordBatchBuilder;
733

734
static bool parse_agg_type_str(const char* type_str, AggMapType& out) {
4✔
735
    if (strcmp(type_str, "events") == 0) {
4✔
736
        out = AggMapType::EVENT;
4✔
737
        return true;
4✔
738
    }
NEW
739
    if (strcmp(type_str, "profiles") == 0) {
×
NEW
740
        out = AggMapType::PROFILE;
×
NEW
741
        return true;
×
742
    }
NEW
743
    if (strcmp(type_str, "system") == 0) {
×
NEW
744
        out = AggMapType::SYSTEM;
×
NEW
745
        return true;
×
746
    }
NEW
747
    PyErr_SetString(PyExc_ValueError,
×
748
                    "type must be 'events', 'profiles', or 'system'");
NEW
749
    return false;
×
750
}
2✔
751

752
struct AggDbHandle {
753
    std::shared_ptr<dftracer::utils::rocksdb::RocksDatabase> db;
754
    std::unique_ptr<EventAggregator> agg;
755
};
756

757
static std::unique_ptr<AggDbHandle> open_agg_db(const std::string& index_path,
22✔
758
                                                std::string& error_msg) {
759
    std::shared_ptr<dftracer::utils::rocksdb::RocksDatabase> db;
22✔
760
    try {
761
        db = EventAggregator::open_with_merge_operator(index_path);
22!
762
    } catch (...) {
11✔
NEW
763
        auto& mgr = dftracer::utils::rocksdb::RocksDBManager::instance();
×
NEW
764
        mgr.reset(index_path);
×
NEW
765
        db = mgr.get_or_open(
×
766
            index_path,
767
            dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
NEW
768
        if (db && db->is_open()) {
×
NEW
769
            load_intern_dictionary(*db);
×
770
        }
NEW
771
    }
×
772
    if (!db || !db->is_open()) {
22!
NEW
773
        error_msg = "Failed to open aggregation database";
×
NEW
774
        return nullptr;
×
775
    }
776
    std::string config_val;
22✔
777
    auto key = std::string_view(AGG_GLOBAL_CONFIG_KEY,
22✔
778
                                sizeof(AGG_GLOBAL_CONFIG_KEY) - 1);
779
    if (!db->get(key, &config_val, dftracer::utils::rocksdb::cf::AGGREGATION)
44!
780
             .ok()) {
22!
NEW
781
        error_msg = "No aggregation config found - was aggregation enabled?";
×
NEW
782
        return nullptr;
×
783
    }
784
    auto cfg = deserialize_agg_global_config(config_val);
22!
785
    auto handle = std::make_unique<AggDbHandle>();
22!
786
    handle->db = db;
22✔
787
    handle->agg = std::make_unique<EventAggregator>(db, cfg.config_hash);
22!
788
    return handle;
22✔
789
}
33!
790

791
static std::optional<dftracer::utils::utilities::common::query::Query>
792
parse_query_arg(const char* query_str) {
24✔
793
    if (!query_str || query_str[0] == '\0') return std::nullopt;
24!
794
    auto result = dftracer::utils::utilities::common::query::Query::from_string(
9✔
795
        query_str);
18!
796
    if (!result) {
18✔
797
        PyErr_SetString(PyExc_ValueError, result.error().message.c_str());
2!
798
        return std::nullopt;
2✔
799
    }
800
    return std::move(*result);
16!
801
}
21✔
802

803
constexpr std::uint16_t DFT_NUM_SHARDS = 4096;
804

805
template <typename Output, typename ScanFn>
806
void parallel_shard_scan_range(Runtime* rt, std::uint16_t outer_begin,
22✔
807
                               std::uint16_t outer_end, ScanFn&& scan_fn,
808
                               std::vector<Output>& outputs) {
809
    if (outer_end <= outer_begin) return;
22!
810
    const std::size_t span = static_cast<std::size_t>(outer_end - outer_begin);
22✔
811
    const std::size_t num_tasks = std::min<std::size_t>(rt->threads(), span);
22!
812
    const std::size_t shards_per_task = (span + num_tasks - 1) / num_tasks;
22✔
813
    rt->submit(run_coro_scope(
44!
814
                   rt->executor(),
11✔
815
                   [&](CoroScope& scope) -> CoroTask<void> {
180!
816
                       std::vector<dftracer::utils::coro::SpawnFuture<Output>>
11✔
817
                           futures;
11✔
818
                       futures.reserve(num_tasks);
11!
819
                       for (std::size_t t = 0; t < num_tasks; ++t) {
44!
820
                           auto shard_begin = static_cast<std::uint16_t>(
66✔
821
                               outer_begin + t * shards_per_task);
33✔
822
                           auto shard_end =
33✔
823
                               static_cast<std::uint16_t>(std::min<std::size_t>(
33!
824
                                   outer_begin + (t + 1) * shards_per_task,
33✔
825
                                   outer_end));
33✔
826
                           futures.push_back(
33!
827
                               scope.spawn([&scan_fn, shard_begin, shard_end](
265!
828
                                               CoroScope&) -> CoroTask<Output> {
33!
829
                                   co_return scan_fn(shard_begin, shard_end);
33!
830
                               }));
831
                       }
33✔
832
                       outputs.reserve(num_tasks);
11!
833
                       for (auto& f : futures) {
136!
834
                           outputs.push_back(co_await f);
102!
835
                       }
33!
836
                   }),
103!
837
               "parallel-shard-scan-range")
11!
838
        .get();
22!
839
}
11✔
840

841
template <typename Output, typename ScanFn>
842
void parallel_shard_scan(Runtime* rt, ScanFn&& scan_fn,
22✔
843
                         std::vector<Output>& outputs) {
844
    parallel_shard_scan_range<Output>(rt, 0, DFT_NUM_SHARDS,
33✔
845
                                      std::forward<ScanFn>(scan_fn), outputs);
11✔
846
}
22✔
847

848
static void append_results_to_list(PyObject* list,
58✔
849
                                   std::vector<ArrowExportResult>& results) {
850
    for (auto& r : results) {
128✔
851
        PyObject* capsule = create_arrow_batch_capsule(std::move(r));
70!
852
        if (capsule) {
70✔
853
            PyList_Append(list, capsule);
70!
854
            Py_DECREF(capsule);
30✔
855
        }
30✔
856
    }
857
}
58✔
858

859
struct AggScanInput {
860
    const EventAggregator* agg;
861
    AggMapType target_type;
862
    AggregationBatchType batch_type;
863
    Py_ssize_t batch_size;
864
    std::uint16_t shard_begin;
865
    std::uint16_t shard_end;
866
};
867

868
struct AggScanOutput {
869
    std::vector<ArrowExportResult> results;
870
};
871

NEW
872
AggScanOutput scan_aggregation_shard_range(AggScanInput input) {
×
NEW
873
    AggScanOutput output;
×
874

875
    static const std::vector<ColumnSpec> schema = {
×
876
        {"batch_type", ColumnType::INT64},  {"cat", ColumnType::DICT_STRING},
×
877
        {"name", ColumnType::DICT_STRING},  {"pid", ColumnType::UINT64},
×
878
        {"tid", ColumnType::UINT64},        {"hhash", ColumnType::DICT_STRING},
×
879
        {"fhash", ColumnType::DICT_STRING}, {"time_bucket", ColumnType::UINT64},
×
880
        {"count", ColumnType::UINT64},      {"dur_total", ColumnType::UINT64},
×
881
        {"dur_min", ColumnType::UINT64},    {"dur_max", ColumnType::UINT64},
×
882
        {"dur_mean", ColumnType::DOUBLE},   {"dur_std", ColumnType::DOUBLE},
×
883
        {"size_total", ColumnType::UINT64}, {"size_min", ColumnType::UINT64},
×
884
        {"size_max", ColumnType::UINT64},   {"size_mean", ColumnType::DOUBLE},
×
885
        {"size_std", ColumnType::DOUBLE},   {"ts", ColumnType::UINT64},
×
886
        {"te", ColumnType::UINT64},
×
887
    };
×
888

NEW
889
    RecordBatchBuilder builder;
×
NEW
890
    builder.declare_schema(schema);
×
NEW
891
    builder.reserve(static_cast<std::size_t>(input.batch_size));
×
892

NEW
893
    std::size_t row_count = 0;
×
894

NEW
895
    input.agg->scan_shard_range_raw(
×
NEW
896
        input.shard_begin, input.shard_end,
×
NEW
897
        [&](std::string_view key_bytes, std::string_view val_bytes) -> bool {
×
NEW
898
            AggKeyView kv;
×
NEW
899
            if (!parse_agg_key_view(key_bytes, kv)) return true;
×
NEW
900
            if (kv.map_type != input.target_type) return true;
×
901

902
            AggMetricsFullView mv;
NEW
903
            if (!parse_agg_value_full_view(val_bytes, mv)) return true;
×
904

NEW
905
            std::size_t ci = 0;
×
NEW
906
            builder.append_int64(ci++,
×
NEW
907
                                 static_cast<std::int64_t>(input.batch_type));
×
NEW
908
            builder.append_dict_string(ci++, kv.cat);
×
NEW
909
            builder.append_dict_string(ci++, kv.name);
×
NEW
910
            builder.append_uint64(ci++, kv.pid);
×
NEW
911
            builder.append_uint64(ci++, kv.tid);
×
NEW
912
            builder.append_dict_string(ci++, kv.hhash);
×
NEW
913
            builder.append_dict_string(ci++, kv.fhash);
×
NEW
914
            builder.append_uint64(ci++, kv.time_bucket);
×
NEW
915
            builder.append_uint64(ci++, mv.count);
×
NEW
916
            builder.append_uint64(ci++, mv.dur_total);
×
NEW
917
            builder.append_uint64(ci++, mv.count > 0 ? mv.dur_min : 0);
×
NEW
918
            builder.append_uint64(ci++, mv.dur_max);
×
NEW
919
            builder.append_double(ci++, mv.dur_mean);
×
NEW
920
            builder.append_double(ci++, mv.dur_stddev());
×
NEW
921
            builder.append_uint64(ci++, mv.size_total);
×
NEW
922
            builder.append_uint64(ci++, mv.count > 0 ? mv.size_min : 0);
×
NEW
923
            builder.append_uint64(ci++, mv.size_max);
×
NEW
924
            builder.append_double(ci++, mv.size_mean);
×
NEW
925
            builder.append_double(ci++, mv.size_stddev());
×
NEW
926
            builder.append_uint64(ci++, mv.ts);
×
NEW
927
            builder.append_uint64(ci++, mv.te);
×
NEW
928
            builder.end_row();
×
929

NEW
930
            row_count++;
×
NEW
931
            if (static_cast<Py_ssize_t>(row_count) >= input.batch_size) {
×
NEW
932
                auto arrow = builder.finish();
×
NEW
933
                if (arrow.valid()) {
×
NEW
934
                    output.results.push_back(std::move(arrow));
×
935
                }
NEW
936
                builder.reset(true);
×
NEW
937
                builder.reserve(static_cast<std::size_t>(input.batch_size));
×
NEW
938
                row_count = 0;
×
NEW
939
            }
×
NEW
940
            return true;
×
941
        });
942

NEW
943
    if (row_count > 0) {
×
NEW
944
        auto arrow = builder.finish();
×
NEW
945
        if (arrow.valid()) {
×
NEW
946
            output.results.push_back(std::move(arrow));
×
947
        }
NEW
948
    }
×
949

NEW
950
    return output;
×
NEW
951
}
×
952

953
enum class IOCategory : std::int8_t {
954
    READ = 1,
955
    WRITE = 2,
956
    METADATA = 3,
957
    PCTL = 4,
958
    IPC = 5,
959
    OTHER = 6,
960
    SYNC = 7,
961
};
962

963
inline IOCategory get_io_category(std::string_view func_name) {
417✔
964
    if (func_name == "read" || func_name == "pread" || func_name == "readv" ||
930!
965
        func_name == "preadv" || func_name == "fread") {
734✔
966
        return IOCategory::READ;
154✔
967
    }
968
    if (func_name == "write" || func_name == "pwrite" ||
535✔
969
        func_name == "writev" || func_name == "pwritev" ||
489!
970
        func_name == "fwrite") {
302✔
971
        return IOCategory::WRITE;
156✔
972
    }
973
    if (func_name == "fsync" || func_name == "fdatasync" ||
273!
974
        func_name == "msync" || func_name == "sync") {
213!
NEW
975
        return IOCategory::SYNC;
×
976
    }
977
    if (func_name == "open" || func_name == "open64" || func_name == "close" ||
136!
NEW
978
        func_name == "fopen" || func_name == "fopen64" ||
×
NEW
979
        func_name == "fclose" || func_name == "stat" || func_name == "fstat" ||
×
NEW
980
        func_name == "lstat" || func_name == "fstatat" ||
×
NEW
981
        func_name == "__xstat" || func_name == "__xstat64" ||
×
NEW
982
        func_name == "__lxstat" || func_name == "__lxstat64" ||
×
NEW
983
        func_name == "__fxstat" || func_name == "__fxstat64" ||
×
NEW
984
        func_name == "access" || func_name == "lseek" ||
×
NEW
985
        func_name == "lseek64" || func_name == "fseek" ||
×
NEW
986
        func_name == "ftell" || func_name == "seek" || func_name == "fcntl" ||
×
NEW
987
        func_name == "ftruncate" || func_name == "mkdir" ||
×
NEW
988
        func_name == "rmdir" || func_name == "unlink" ||
×
NEW
989
        func_name == "remove" || func_name == "rename" || func_name == "link" ||
×
NEW
990
        func_name == "readlink" || func_name == "opendir" ||
×
991
        func_name == "closedir" || func_name == "readdir") {
76!
992
        return IOCategory::METADATA;
111✔
993
    }
NEW
994
    return IOCategory::OTHER;
×
995
}
200✔
996

997
inline char* fast_itoa(std::uint64_t val, char* buf) {
998
    char* p = buf;
999
    do {
1000
        *p++ = '0' + (val % 10);
1001
        val /= 10;
1002
    } while (val);
1003
    std::reverse(buf, p);
1004
    return p;
1005
}
1006

1007
class HashResolver {
1008
   public:
1009
    HashResolver(
136✔
1010
        const std::unordered_map<std::string, std::string>* file_hashes,
1011
        const std::unordered_map<std::string, std::string>* host_hashes)
1012
        : file_hashes_(file_hashes), host_hashes_(host_hashes) {
105✔
1013
        if (file_hashes_) {
75✔
1014
            for (const auto& [hash, name] : *file_hashes_) {
195!
1015
                auto hash_sv = intern_.intern(hash);
168!
1016
                auto name_sv = intern_.intern(name);
169!
1017
                file_map_[hash_sv] = name_sv;
118✔
1018
            }
1019
        }
33✔
1020
        if (host_hashes_) {
80✔
1021
            for (const auto& [hash, name] : *host_hashes_) {
196!
1022
                auto hash_sv = intern_.intern(hash);
169!
1023
                auto name_sv = intern_.intern(name);
169!
1024
                host_map_[hash_sv] = name_sv;
119!
1025
            }
1026
        }
33✔
1027
    }
116✔
1028

1029
    std::string_view resolve_file(std::string_view hash) {
653✔
1030
        if (hash.empty()) return hash;
653!
1031
        auto interned = intern_.intern(hash);
657!
1032
        auto it = file_map_.find(interned);
657!
1033
        return it != file_map_.end() ? it->second : interned;
650!
1034
    }
330✔
1035

1036
    std::string_view resolve_host(std::string_view hash) {
651✔
1037
        if (hash.empty()) return hash;
651!
1038
        auto interned = intern_.intern(hash);
652!
1039
        auto it = host_map_.find(interned);
660!
1040
        return it != host_map_.end() ? it->second : interned;
658!
1041
    }
330✔
1042

1043
    std::string_view intern(std::string_view sv) { return intern_.intern(sv); }
673✔
1044

1045
   private:
1046
    const std::unordered_map<std::string, std::string>* file_hashes_;
1047
    const std::unordered_map<std::string, std::string>* host_hashes_;
1048
    dftracer::utils::StringIntern intern_;
1049
    std::unordered_map<std::string_view, std::string_view> file_map_;
1050
    std::unordered_map<std::string_view, std::string_view> host_map_;
1051
};
1052

1053
struct ProcKey {
1054
    std::string_view hhash;
1055
    std::uint64_t pid;
1056
    std::uint64_t tid;
1057
    bool operator==(const ProcKey& o) const {
405✔
1058
        return hhash == o.hhash && pid == o.pid && tid == o.tid;
405!
1059
    }
1060
};
1061

1062
struct ProcKeyHash {
1063
    std::size_t operator()(const ProcKey& k) const {
909✔
1064
        return std::hash<std::string_view>{}(k.hhash) ^
1,805✔
1065
               (std::hash<std::uint64_t>{}(k.pid) << 1) ^
1,361✔
1066
               (std::hash<std::uint64_t>{}(k.tid) << 2);
910✔
1067
    }
1068
};
1069

1070
static const std::vector<ColumnSpec> DFANALYZER_SCHEMA = {
1!
1071
    {"cat", ColumnType::DICT_STRING},
1!
1072
    {"func_name", ColumnType::DICT_STRING},
1!
1073
    {"pid", ColumnType::INT64},
1!
1074
    {"tid", ColumnType::INT64},
1!
1075
    {"file_hash", ColumnType::DICT_STRING},
1!
1076
    {"host_hash", ColumnType::DICT_STRING},
1!
1077
    {"file_name", ColumnType::DICT_STRING},
1!
1078
    {"host_name", ColumnType::DICT_STRING},
1!
1079
    {"proc_name", ColumnType::DICT_STRING},
1!
1080
    {"io_cat", ColumnType::INT64},
1!
1081
    {"acc_pat", ColumnType::INT64},
1!
1082
    {"count", ColumnType::INT64},
1!
1083
    {"time", ColumnType::DOUBLE},
1!
1084
    {"size", ColumnType::INT64},
1!
1085
    {"time_min", ColumnType::DOUBLE},
1!
1086
    {"time_max", ColumnType::DOUBLE},
1!
1087
    {"size_min", ColumnType::INT64},
1!
1088
    {"size_max", ColumnType::INT64},
1!
1089
    {"time_range", ColumnType::INT64},
1!
1090
    {"time_start", ColumnType::INT64},
1!
1091
    {"time_end", ColumnType::INT64},
1!
1092
};
1093

1094
enum GroupByField : std::uint32_t {
1095
    GB_CAT = 1u << 0,
1096
    GB_FUNC_NAME = 1u << 1,
1097
    GB_PID = 1u << 2,
1098
    GB_TID = 1u << 3,
1099
    GB_FILE_HASH = 1u << 4,
1100
    GB_HOST_HASH = 1u << 5,
1101
    GB_FILE_NAME = 1u << 6,
1102
    GB_HOST_NAME = 1u << 7,
1103
    GB_PROC_NAME = 1u << 8,
1104
    GB_IO_CAT = 1u << 9,
1105
    GB_ACC_PAT = 1u << 10,
1106
    GB_TIME_RANGE = 1u << 11,
1107
};
1108

1109
struct GroupByConfig {
9✔
1110
    std::uint32_t mask = 0;
9✔
1111
    std::vector<GroupByField> order;
1112
    std::vector<std::string> names;  // matches `order`, used for schema
1113
};
1114

NEW
1115
inline std::optional<GroupByField> parse_group_by_name(std::string_view name) {
×
NEW
1116
    if (name == "cat") return GB_CAT;
×
NEW
1117
    if (name == "func_name") return GB_FUNC_NAME;
×
NEW
1118
    if (name == "pid") return GB_PID;
×
NEW
1119
    if (name == "tid") return GB_TID;
×
NEW
1120
    if (name == "file_hash") return GB_FILE_HASH;
×
NEW
1121
    if (name == "host_hash") return GB_HOST_HASH;
×
NEW
1122
    if (name == "file_name") return GB_FILE_NAME;
×
NEW
1123
    if (name == "host_name") return GB_HOST_NAME;
×
NEW
1124
    if (name == "proc_name") return GB_PROC_NAME;
×
NEW
1125
    if (name == "io_cat") return GB_IO_CAT;
×
NEW
1126
    if (name == "acc_pat") return GB_ACC_PAT;
×
NEW
1127
    if (name == "time_range") return GB_TIME_RANGE;
×
NEW
1128
    return std::nullopt;
×
1129
}
1130

1131
struct CoarseKey {
1132
    std::string_view cat;
1133
    std::string_view func_name;
1134
    std::uint64_t pid = 0;
1135
    std::uint64_t tid = 0;
1136
    std::string_view file_hash;
1137
    std::string_view host_hash;
1138
    std::string_view file_name;
1139
    std::string_view host_name;
1140
    std::string_view proc_name;
1141
    std::int64_t io_cat = 0;
1142
    std::int64_t acc_pat = 0;
1143
    std::int64_t time_range = 0;
1144

NEW
1145
    bool operator==(const CoarseKey& o) const {
×
NEW
1146
        return cat == o.cat && func_name == o.func_name && pid == o.pid &&
×
NEW
1147
               tid == o.tid && file_hash == o.file_hash &&
×
NEW
1148
               host_hash == o.host_hash && file_name == o.file_name &&
×
NEW
1149
               host_name == o.host_name && proc_name == o.proc_name &&
×
NEW
1150
               io_cat == o.io_cat && acc_pat == o.acc_pat &&
×
NEW
1151
               time_range == o.time_range;
×
1152
    }
1153
};
1154

1155
struct CoarseKeyHash {
NEW
1156
    std::size_t operator()(const CoarseKey& k) const {
×
NEW
1157
        auto combine = [](std::size_t h, std::size_t v) {
×
NEW
1158
            return h ^ (v + 0x9e3779b97f4a7c15ULL + (h << 6) + (h >> 2));
×
1159
        };
NEW
1160
        std::size_t h = std::hash<std::string_view>{}(k.cat);
×
NEW
1161
        h = combine(h, std::hash<std::string_view>{}(k.func_name));
×
NEW
1162
        h = combine(h, std::hash<std::uint64_t>{}(k.pid));
×
NEW
1163
        h = combine(h, std::hash<std::uint64_t>{}(k.tid));
×
NEW
1164
        h = combine(h, std::hash<std::string_view>{}(k.file_hash));
×
NEW
1165
        h = combine(h, std::hash<std::string_view>{}(k.host_hash));
×
NEW
1166
        h = combine(h, std::hash<std::string_view>{}(k.file_name));
×
NEW
1167
        h = combine(h, std::hash<std::string_view>{}(k.host_name));
×
NEW
1168
        h = combine(h, std::hash<std::string_view>{}(k.proc_name));
×
NEW
1169
        h = combine(h, std::hash<std::int64_t>{}(k.io_cat));
×
NEW
1170
        h = combine(h, std::hash<std::int64_t>{}(k.acc_pat));
×
NEW
1171
        h = combine(h, std::hash<std::int64_t>{}(k.time_range));
×
NEW
1172
        return h;
×
1173
    }
1174
};
1175

1176
struct CoarseMetrics {
1177
    std::uint64_t count = 0;
1178
    double time_sum = 0.0;
1179
    double time_sq_sum = 0.0;
1180
    double time_min_val = std::numeric_limits<double>::infinity();
1181
    double time_max_val = -std::numeric_limits<double>::infinity();
1182
    double time_call_min_val = std::numeric_limits<double>::infinity();
1183
    double time_call_max_val = -std::numeric_limits<double>::infinity();
1184
    std::uint64_t size_sum = 0;
1185
    double size_sq_sum = 0.0;
1186
    std::uint64_t size_min_val = std::numeric_limits<std::uint64_t>::max();
1187
    std::uint64_t size_max_val = 0;
1188
    std::uint64_t size_call_min_val = std::numeric_limits<std::uint64_t>::max();
1189
    std::uint64_t size_call_max_val = 0;
1190
    bool has_size = false;
1191
    std::uint64_t time_start_val = std::numeric_limits<std::uint64_t>::max();
1192
    std::uint64_t time_end_val = 0;
1193
    bool has_time_bounds = false;
1194
};
1195

NEW
1196
inline std::vector<ColumnSpec> make_coarse_schema(const GroupByConfig& cfg) {
×
NEW
1197
    std::vector<ColumnSpec> specs;
×
NEW
1198
    specs.reserve(cfg.order.size() + 16);
×
NEW
1199
    for (std::size_t i = 0; i < cfg.order.size(); ++i) {
×
NEW
1200
        GroupByField f = cfg.order[i];
×
NEW
1201
        const std::string& name = cfg.names[i];
×
NEW
1202
        switch (f) {
×
1203
            case GB_CAT:
1204
            case GB_FUNC_NAME:
1205
            case GB_FILE_HASH:
1206
            case GB_HOST_HASH:
1207
            case GB_FILE_NAME:
1208
            case GB_HOST_NAME:
1209
            case GB_PROC_NAME:
NEW
1210
                specs.push_back({name, ColumnType::DICT_STRING});
×
NEW
1211
                break;
×
1212
            case GB_PID:
1213
            case GB_TID:
1214
            case GB_IO_CAT:
1215
            case GB_ACC_PAT:
1216
            case GB_TIME_RANGE:
NEW
1217
                specs.push_back({name, ColumnType::INT64});
×
NEW
1218
                break;
×
1219
        }
1220
    }
NEW
1221
    specs.push_back({"count", ColumnType::INT64});
×
NEW
1222
    specs.push_back({"time", ColumnType::DOUBLE});
×
NEW
1223
    specs.push_back({"size", ColumnType::INT64});
×
NEW
1224
    specs.push_back({"time_sq", ColumnType::DOUBLE});
×
NEW
1225
    specs.push_back({"size_sq", ColumnType::DOUBLE});
×
NEW
1226
    specs.push_back({"time_min", ColumnType::DOUBLE});
×
NEW
1227
    specs.push_back({"time_max", ColumnType::DOUBLE});
×
NEW
1228
    specs.push_back({"size_min", ColumnType::INT64});
×
NEW
1229
    specs.push_back({"size_max", ColumnType::INT64});
×
NEW
1230
    specs.push_back({"time_call_min", ColumnType::DOUBLE});
×
NEW
1231
    specs.push_back({"time_call_max", ColumnType::DOUBLE});
×
NEW
1232
    specs.push_back({"size_call_min", ColumnType::INT64});
×
NEW
1233
    specs.push_back({"size_call_max", ColumnType::INT64});
×
NEW
1234
    specs.push_back({"time_start", ColumnType::INT64});
×
NEW
1235
    specs.push_back({"time_end", ColumnType::INT64});
×
NEW
1236
    return specs;
×
NEW
1237
}
×
1238

1239
struct DfanalyzerScanInput {
33✔
1240
    const EventAggregator* agg;
1241
    const DfanalyzerContext* ctx;
1242
    std::optional<AggMapType> type_filter;
1243
    Py_ssize_t batch_size;
1244
    std::uint16_t shard_begin;
1245
    std::uint16_t shard_end;
1246
    const GroupByConfig* group_by = nullptr;  // null = full granularity
33✔
1247
};
1248

1249
struct DfanalyzerScanOutput {
1250
    std::vector<ArrowExportResult> events;
1251
    std::vector<ArrowExportResult> profiles;
1252
    std::vector<ArrowExportResult> system;
1253
};
1254

1255
DfanalyzerScanOutput scan_dfanalyzer_shards(DfanalyzerScanInput input) {
67✔
1256
    DfanalyzerScanOutput output;
67✔
1257

1258
    const bool coarse = input.group_by != nullptr;
76✔
1259
    const std::vector<ColumnSpec> coarse_schema =
1260
        coarse ? make_coarse_schema(*input.group_by)
76!
1261
               : std::vector<ColumnSpec>{};
76!
1262

1263
    auto make_builder = [&]() {
219✔
1264
        RecordBatchBuilder b;
186✔
1265
        if (coarse) {
195!
NEW
1266
            b.declare_schema(coarse_schema);
×
1267
        } else {
1268
            b.declare_schema(DFANALYZER_SCHEMA);
195✔
1269
        }
1270
        b.reserve(static_cast<std::size_t>(input.batch_size));
192✔
1271
        return b;
199✔
1272
    };
91!
1273

1274
    RecordBatchBuilder event_builder, profile_builder, system_builder;
70!
1275
    bool use_events =
33✔
1276
        !input.type_filter || *input.type_filter == AggMapType::EVENT;
75!
1277
    bool use_profiles =
33✔
1278
        !input.type_filter || *input.type_filter == AggMapType::PROFILE;
73!
1279
    bool use_system =
33✔
1280
        !input.type_filter || *input.type_filter == AggMapType::SYSTEM;
73!
1281

1282
    if (use_events) event_builder = make_builder();
76!
1283
    if (use_profiles) profile_builder = make_builder();
76✔
1284
    if (use_system) system_builder = make_builder();
80!
1285

1286
    auto bucket_width_us = static_cast<std::uint64_t>(
80✔
1287
        input.ctx->time_granularity * input.ctx->time_resolution);
80✔
1288
    std::size_t event_count = 0, profile_count = 0, system_count = 0;
80✔
1289

1290
    HashResolver resolver(input.ctx->file_hashes, input.ctx->host_hashes);
80✔
1291
    std::unordered_map<ProcKey, std::string, ProcKeyHash> proc_name_cache;
77✔
1292
    std::unordered_map<std::string_view, IOCategory> io_cat_cache;
74✔
1293

1294
    std::unordered_map<CoarseKey, CoarseMetrics, CoarseKeyHash> event_coarse,
75✔
1295
        profile_coarse, system_coarse;
75✔
1296

1297
    auto flush_builder = [&](RecordBatchBuilder& builder, std::size_t& count,
236✔
1298
                             std::vector<ArrowExportResult>& results) {
1299
        if (count > 0) {
203✔
1300
            auto arrow = builder.finish();
70!
1301
            if (arrow.valid()) {
68!
1302
                results.push_back(std::move(arrow));
68!
1303
            }
30✔
1304
            builder.reset(true);
68!
1305
            builder.reserve(static_cast<std::size_t>(input.batch_size));
70!
1306
            count = 0;
70✔
1307
        }
70✔
1308
    };
202✔
1309

1310
    auto append_row = [&](RecordBatchBuilder& builder, std::size_t& count,
689✔
1311
                          std::vector<ArrowExportResult>& results,
1312
                          const AggKeyView& kv, const AggMetricsView& mv,
1313
                          std::string_view file_name,
1314
                          std::string_view host_name,
1315
                          std::string_view proc_name, IOCategory io_cat) {
1316
        std::size_t ci = 0;
656✔
1317
        builder.append_dict_string(ci++, kv.cat);
656✔
1318
        builder.append_dict_string(ci++, kv.name);
655✔
1319
        builder.append_int64(ci++, static_cast<std::int64_t>(kv.pid));
659✔
1320
        builder.append_int64(ci++, static_cast<std::int64_t>(kv.tid));
656✔
1321
        builder.append_dict_string(ci++, kv.fhash);
656✔
1322
        builder.append_dict_string(ci++, kv.hhash);
658✔
1323
        builder.append_dict_string(ci++, file_name);
658✔
1324
        builder.append_dict_string(ci++, host_name);
658✔
1325
        builder.append_dict_string(ci++, proc_name);
658✔
1326
        builder.append_int64(ci++, static_cast<std::int64_t>(io_cat));
658✔
1327
        builder.append_int64(ci++, 0);
658✔
1328

1329
        builder.append_int64(ci++, static_cast<std::int64_t>(mv.count));
657✔
1330
        builder.append_double(ci++, static_cast<double>(mv.dur_total) /
986✔
1331
                                        input.ctx->time_resolution);
656✔
1332

1333
        if (mv.size_total > 0) {
657✔
1334
            builder.append_int64(ci++,
735✔
1335
                                 static_cast<std::int64_t>(mv.size_total));
489✔
1336
        } else {
246✔
1337
            builder.append_null(ci++);
168✔
1338
        }
1339

1340
        builder.append_double(ci++, mv.count > 0
988!
1341
                                        ? static_cast<double>(mv.dur_min) /
989✔
1342
                                              input.ctx->time_resolution
659✔
1343
                                        : 0.0);
1344
        builder.append_double(ci++, mv.count > 0
988!
1345
                                        ? static_cast<double>(mv.dur_max) /
989✔
1346
                                              input.ctx->time_resolution
659✔
1347
                                        : 0.0);
1348

1349
        if (mv.size_total > 0 && mv.count > 0) {
659✔
1350
            builder.append_int64(ci++, static_cast<std::int64_t>(mv.size_min));
491✔
1351
            builder.append_int64(ci++, static_cast<std::int64_t>(mv.size_max));
491✔
1352
        } else {
246✔
1353
            builder.append_null(ci++);
168✔
1354
            builder.append_null(ci++);
168✔
1355
        }
1356

1357
        auto time_range = bucket_width_us > 0
1,317!
1358
                              ? static_cast<std::int64_t>(
329!
1359
                                    (kv.time_bucket - input.ctx->time_origin) /
989✔
1360
                                    bucket_width_us)
659✔
1361
                              : 0;
1362
        builder.append_int64(ci++, time_range);
659✔
1363
        builder.append_int64(
989✔
1364
            ci++, static_cast<std::int64_t>(mv.ts - input.ctx->time_origin));
659✔
1365
        builder.append_int64(
988✔
1366
            ci++, static_cast<std::int64_t>(mv.te - input.ctx->time_origin));
658✔
1367
        builder.end_row();
657✔
1368

1369
        count++;
658✔
1370
        if (static_cast<Py_ssize_t>(count) >= input.batch_size) {
658✔
NEW
1371
            flush_builder(builder, count, results);
×
1372
        }
1373
    };
701✔
1374

1375
    auto accumulate_coarse =
1376
        [&](std::unordered_map<CoarseKey, CoarseMetrics, CoarseKeyHash>& map,
33✔
1377
            const AggKeyView& kv, const AggMetricsView& mv,
1378
            std::string_view file_name, std::string_view host_name,
1379
            std::string_view proc_name, IOCategory io_cat) {
NEW
1380
            const auto& cfg = *input.group_by;
×
1381
            // Probe with non-interned views; hash/equality compare by content,
1382
            // so string_view lifetime doesn't matter for lookup. We only copy
1383
            // (intern) on first insert.
NEW
1384
            CoarseKey probe;
×
NEW
1385
            if (cfg.mask & GB_CAT) probe.cat = kv.cat;
×
NEW
1386
            if (cfg.mask & GB_FUNC_NAME) probe.func_name = kv.name;
×
NEW
1387
            if (cfg.mask & GB_PID) probe.pid = kv.pid;
×
NEW
1388
            if (cfg.mask & GB_TID) probe.tid = kv.tid;
×
NEW
1389
            if (cfg.mask & GB_FILE_HASH) probe.file_hash = kv.fhash;
×
NEW
1390
            if (cfg.mask & GB_HOST_HASH) probe.host_hash = kv.hhash;
×
NEW
1391
            if (cfg.mask & GB_FILE_NAME) probe.file_name = file_name;
×
NEW
1392
            if (cfg.mask & GB_HOST_NAME) probe.host_name = host_name;
×
NEW
1393
            if (cfg.mask & GB_PROC_NAME) probe.proc_name = proc_name;
×
NEW
1394
            if (cfg.mask & GB_IO_CAT)
×
NEW
1395
                probe.io_cat = static_cast<std::int64_t>(io_cat);
×
NEW
1396
            if (cfg.mask & GB_TIME_RANGE) {
×
NEW
1397
                probe.time_range =
×
NEW
1398
                    bucket_width_us > 0
×
1399
                        ? static_cast<std::int64_t>(
×
NEW
1400
                              (kv.time_bucket - input.ctx->time_origin) /
×
NEW
1401
                              bucket_width_us)
×
1402
                        : 0;
1403
            }
1404
            // acc_pat is always 0 today; included for completeness.
1405

NEW
1406
            auto it = map.find(probe);
×
NEW
1407
            if (it == map.end()) {
×
1408
                // First sighting: promote views referencing unstable DB buffers
1409
                // to interned copies. file_name/host_name come from the
1410
                // resolver's intern pool, and proc_name from proc_name_cache;
1411
                // both already stable across iterations, no copy needed.
NEW
1412
                CoarseKey stable = probe;
×
NEW
1413
                if (cfg.mask & GB_CAT) stable.cat = resolver.intern(kv.cat);
×
NEW
1414
                if (cfg.mask & GB_FUNC_NAME)
×
NEW
1415
                    stable.func_name = resolver.intern(kv.name);
×
NEW
1416
                if (cfg.mask & GB_FILE_HASH)
×
NEW
1417
                    stable.file_hash = resolver.intern(kv.fhash);
×
NEW
1418
                if (cfg.mask & GB_HOST_HASH)
×
NEW
1419
                    stable.host_hash = resolver.intern(kv.hhash);
×
NEW
1420
                auto [nit, _] = map.emplace(std::move(stable), CoarseMetrics{});
×
NEW
1421
                it = nit;
×
1422
            }
NEW
1423
            CoarseMetrics& m = it->second;
×
NEW
1424
            m.count += mv.count;
×
NEW
1425
            double time_val =
×
NEW
1426
                static_cast<double>(mv.dur_total) / input.ctx->time_resolution;
×
NEW
1427
            m.time_sum += time_val;
×
NEW
1428
            m.time_sq_sum += time_val * time_val;
×
NEW
1429
            if (time_val < m.time_call_min_val) m.time_call_min_val = time_val;
×
NEW
1430
            if (time_val > m.time_call_max_val) m.time_call_max_val = time_val;
×
NEW
1431
            if (mv.count > 0) {
×
NEW
1432
                double dur_min_v = static_cast<double>(mv.dur_min) /
×
NEW
1433
                                   input.ctx->time_resolution;
×
NEW
1434
                double dur_max_v = static_cast<double>(mv.dur_max) /
×
NEW
1435
                                   input.ctx->time_resolution;
×
NEW
1436
                if (dur_min_v < m.time_min_val) m.time_min_val = dur_min_v;
×
NEW
1437
                if (dur_max_v > m.time_max_val) m.time_max_val = dur_max_v;
×
1438
            }
NEW
1439
            if (mv.size_total > 0) {
×
NEW
1440
                m.has_size = true;
×
NEW
1441
                m.size_sum += mv.size_total;
×
NEW
1442
                double sz = static_cast<double>(mv.size_total);
×
NEW
1443
                m.size_sq_sum += sz * sz;
×
NEW
1444
                if (mv.size_total < m.size_call_min_val)
×
NEW
1445
                    m.size_call_min_val = mv.size_total;
×
NEW
1446
                if (mv.size_total > m.size_call_max_val)
×
NEW
1447
                    m.size_call_max_val = mv.size_total;
×
NEW
1448
                if (mv.count > 0) {
×
NEW
1449
                    if (mv.size_min < m.size_min_val)
×
NEW
1450
                        m.size_min_val = mv.size_min;
×
NEW
1451
                    if (mv.size_max > m.size_max_val)
×
NEW
1452
                        m.size_max_val = mv.size_max;
×
1453
                }
1454
            }
NEW
1455
            if (mv.ts >= input.ctx->time_origin) {
×
NEW
1456
                m.has_time_bounds = true;
×
NEW
1457
                auto ts_off = mv.ts - input.ctx->time_origin;
×
NEW
1458
                auto te_off = mv.te - input.ctx->time_origin;
×
NEW
1459
                if (ts_off < m.time_start_val) m.time_start_val = ts_off;
×
NEW
1460
                if (te_off > m.time_end_val) m.time_end_val = te_off;
×
1461
            }
NEW
1462
        };
×
1463

1464
    input.agg->scan_shard_range_raw(
66!
1465
        input.shard_begin, input.shard_end,
76!
1466
        [&](std::string_view key_bytes, std::string_view val_bytes) -> bool {
850✔
1467
            AggKeyView kv;
817✔
1468
            if (!parse_agg_key_view(key_bytes, kv)) return true;
827!
1469

1470
            if (input.type_filter && kv.map_type != *input.type_filter)
828!
NEW
1471
                return true;
×
1472

1473
            if (input.ctx->query_filter) {
829✔
1474
                auto& q = *input.ctx->query_filter;
538✔
1475
                dftracer::utils::utilities::common::query::ValueMap fields;
538!
1476
                if (q.references("cat")) fields["cat"] = std::string(kv.cat);
537!
1477
                if (q.references("name")) fields["name"] = std::string(kv.name);
524!
1478
                if (q.references("pid")) fields["pid"] = kv.pid;
526!
1479
                if (q.references("tid")) fields["tid"] = kv.tid;
529!
1480
                if (q.references("hhash"))
525!
NEW
1481
                    fields["hhash"] = std::string(kv.hhash);
×
1482
                if (q.references("fhash"))
527!
NEW
1483
                    fields["fhash"] = std::string(kv.fhash);
×
1484
                if (q.references("time_bucket"))
527!
NEW
1485
                    fields["time_bucket"] = kv.time_bucket;
×
1486
                if (!q.evaluate(fields)) return true;
528✔
1487
            }
521✔
1488

1489
            AggMetricsView mv;
1490
            if (!parse_agg_value_view(val_bytes, mv)) return true;
663✔
1491

1492
            auto file_name = resolver.resolve_file(kv.fhash);
660!
1493
            auto host_name = resolver.resolve_host(kv.hhash);
657!
1494

1495
            ProcKey pk{kv.hhash, kv.pid, kv.tid};
660✔
1496
            auto proc_it = proc_name_cache.find(pk);
660!
1497
            std::string_view proc_name;
661✔
1498
            if (proc_it != proc_name_cache.end()) {
661✔
1499
                proc_name = proc_it->second;
408✔
1500
            } else {
215✔
1501
                std::string pn = "app#";
254!
1502
                if (!host_name.empty()) {
254!
1503
                    pn.append(host_name);
254!
1504
                } else if (!kv.hhash.empty()) {
117!
NEW
1505
                    pn.append(kv.hhash);
×
1506
                } else {
NEW
1507
                    pn.append("unknown");
×
1508
                }
1509
                pn.push_back('#');
254✔
1510
                pn.append(std::to_string(kv.pid));
254!
1511
                pn.push_back('#');
253!
1512
                pn.append(std::to_string(kv.tid));
254!
1513
                ProcKey stable_pk{resolver.intern(kv.hhash), kv.pid, kv.tid};
254!
1514
                auto [it, _] =
252✔
1515
                    proc_name_cache.emplace(stable_pk, std::move(pn));
254✔
1516
                proc_name = it->second;
254✔
1517
            }
252✔
1518

1519
            auto io_it = io_cat_cache.find(kv.name);
659!
1520
            IOCategory io_cat;
1521
            if (io_it != io_cat_cache.end()) {
658✔
1522
                io_cat = io_it->second;
240✔
1523
            } else {
131✔
1524
                io_cat = get_io_category(kv.name);
418✔
1525
                io_cat_cache[resolver.intern(kv.name)] = io_cat;
420!
1526
            }
1527

1528
            if (coarse) {
659!
NEW
1529
                switch (kv.map_type) {
×
1530
                    case AggMapType::EVENT:
NEW
1531
                        if (use_events)
×
NEW
1532
                            accumulate_coarse(event_coarse, kv, mv, file_name,
×
1533
                                              host_name, proc_name, io_cat);
NEW
1534
                        break;
×
1535
                    case AggMapType::PROFILE:
NEW
1536
                        if (use_profiles)
×
NEW
1537
                            accumulate_coarse(profile_coarse, kv, mv, file_name,
×
1538
                                              host_name, proc_name, io_cat);
NEW
1539
                        break;
×
1540
                    case AggMapType::SYSTEM:
NEW
1541
                        if (use_system)
×
NEW
1542
                            accumulate_coarse(system_coarse, kv, mv, file_name,
×
1543
                                              host_name, proc_name, io_cat);
NEW
1544
                        break;
×
1545
                }
1546
            } else {
1547
                switch (kv.map_type) {
659✔
1548
                    case AggMapType::EVENT:
327✔
1549
                        append_row(event_builder, event_count, output.events,
985!
1550
                                   kv, mv, file_name, host_name, proc_name,
329✔
1551
                                   io_cat);
329✔
1552
                        break;
656✔
1553
                    case AggMapType::PROFILE:
NEW
1554
                        append_row(profile_builder, profile_count,
×
NEW
1555
                                   output.profiles, kv, mv, file_name,
×
1556
                                   host_name, proc_name, io_cat);
NEW
1557
                        break;
×
1558
                    case AggMapType::SYSTEM:
NEW
1559
                        append_row(system_builder, system_count, output.system,
×
1560
                                   kv, mv, file_name, host_name, proc_name,
1561
                                   io_cat);
NEW
1562
                        break;
×
1563
                }
1564
            }
1565
            return true;
659✔
1566
        });
436✔
1567

1568
    if (coarse) {
77!
NEW
1569
        const auto& cfg = *input.group_by;
×
NEW
1570
        auto flush_coarse = [&](std::unordered_map<CoarseKey, CoarseMetrics,
×
1571
                                                   CoarseKeyHash>& map,
1572
                                RecordBatchBuilder& builder, std::size_t& count,
1573
                                std::vector<ArrowExportResult>& results) {
NEW
1574
            for (auto& [key, m] : map) {
×
NEW
1575
                std::size_t ci = 0;
×
NEW
1576
                for (std::size_t i = 0; i < cfg.order.size(); ++i) {
×
NEW
1577
                    switch (cfg.order[i]) {
×
1578
                        case GB_CAT:
NEW
1579
                            builder.append_dict_string(ci++, key.cat);
×
NEW
1580
                            break;
×
1581
                        case GB_FUNC_NAME:
NEW
1582
                            builder.append_dict_string(ci++, key.func_name);
×
NEW
1583
                            break;
×
1584
                        case GB_PID:
NEW
1585
                            builder.append_int64(
×
NEW
1586
                                ci++, static_cast<std::int64_t>(key.pid));
×
NEW
1587
                            break;
×
1588
                        case GB_TID:
NEW
1589
                            builder.append_int64(
×
NEW
1590
                                ci++, static_cast<std::int64_t>(key.tid));
×
NEW
1591
                            break;
×
1592
                        case GB_FILE_HASH:
NEW
1593
                            builder.append_dict_string(ci++, key.file_hash);
×
NEW
1594
                            break;
×
1595
                        case GB_HOST_HASH:
NEW
1596
                            builder.append_dict_string(ci++, key.host_hash);
×
NEW
1597
                            break;
×
1598
                        case GB_FILE_NAME:
NEW
1599
                            builder.append_dict_string(ci++, key.file_name);
×
NEW
1600
                            break;
×
1601
                        case GB_HOST_NAME:
NEW
1602
                            builder.append_dict_string(ci++, key.host_name);
×
NEW
1603
                            break;
×
1604
                        case GB_PROC_NAME:
NEW
1605
                            builder.append_dict_string(ci++, key.proc_name);
×
NEW
1606
                            break;
×
1607
                        case GB_IO_CAT:
NEW
1608
                            builder.append_int64(ci++, key.io_cat);
×
NEW
1609
                            break;
×
1610
                        case GB_ACC_PAT:
NEW
1611
                            builder.append_int64(ci++, key.acc_pat);
×
NEW
1612
                            break;
×
1613
                        case GB_TIME_RANGE:
NEW
1614
                            builder.append_int64(ci++, key.time_range);
×
NEW
1615
                            break;
×
1616
                    }
1617
                }
NEW
1618
                builder.append_int64(ci++, static_cast<std::int64_t>(m.count));
×
NEW
1619
                builder.append_double(ci++, m.time_sum);
×
NEW
1620
                if (m.has_size) {
×
NEW
1621
                    builder.append_int64(ci++,
×
NEW
1622
                                         static_cast<std::int64_t>(m.size_sum));
×
1623
                } else {
NEW
1624
                    builder.append_null(ci++);
×
1625
                }
NEW
1626
                builder.append_double(ci++, m.time_sq_sum);
×
NEW
1627
                if (m.has_size) {
×
NEW
1628
                    builder.append_double(ci++, m.size_sq_sum);
×
1629
                } else {
NEW
1630
                    builder.append_null(ci++);
×
1631
                }
NEW
1632
                builder.append_double(ci++, m.count > 0 ? m.time_min_val : 0.0);
×
NEW
1633
                builder.append_double(ci++, m.count > 0 ? m.time_max_val : 0.0);
×
NEW
1634
                if (m.has_size) {
×
NEW
1635
                    builder.append_int64(
×
NEW
1636
                        ci++, static_cast<std::int64_t>(m.size_min_val));
×
NEW
1637
                    builder.append_int64(
×
NEW
1638
                        ci++, static_cast<std::int64_t>(m.size_max_val));
×
1639
                } else {
NEW
1640
                    builder.append_null(ci++);
×
NEW
1641
                    builder.append_null(ci++);
×
1642
                }
NEW
1643
                builder.append_double(ci++,
×
NEW
1644
                                      m.count > 0 ? m.time_call_min_val : 0.0);
×
NEW
1645
                builder.append_double(ci++,
×
NEW
1646
                                      m.count > 0 ? m.time_call_max_val : 0.0);
×
NEW
1647
                if (m.has_size) {
×
NEW
1648
                    builder.append_int64(
×
NEW
1649
                        ci++, static_cast<std::int64_t>(m.size_call_min_val));
×
NEW
1650
                    builder.append_int64(
×
NEW
1651
                        ci++, static_cast<std::int64_t>(m.size_call_max_val));
×
1652
                } else {
NEW
1653
                    builder.append_null(ci++);
×
NEW
1654
                    builder.append_null(ci++);
×
1655
                }
NEW
1656
                builder.append_int64(
×
NEW
1657
                    ci++, m.has_time_bounds
×
NEW
1658
                              ? static_cast<std::int64_t>(m.time_start_val)
×
1659
                              : 0);
NEW
1660
                builder.append_int64(
×
NEW
1661
                    ci++, m.has_time_bounds
×
NEW
1662
                              ? static_cast<std::int64_t>(m.time_end_val)
×
1663
                              : 0);
NEW
1664
                builder.end_row();
×
NEW
1665
                ++count;
×
NEW
1666
                if (static_cast<Py_ssize_t>(count) >= input.batch_size) {
×
NEW
1667
                    flush_builder(builder, count, results);
×
1668
                }
1669
            }
NEW
1670
            flush_builder(builder, count, results);
×
NEW
1671
        };
×
NEW
1672
        if (use_events)
×
NEW
1673
            flush_coarse(event_coarse, event_builder, event_count,
×
NEW
1674
                         output.events);
×
NEW
1675
        if (use_profiles)
×
NEW
1676
            flush_coarse(profile_coarse, profile_builder, profile_count,
×
NEW
1677
                         output.profiles);
×
NEW
1678
        if (use_system)
×
NEW
1679
            flush_coarse(system_coarse, system_builder, system_count,
×
NEW
1680
                         output.system);
×
1681
    } else {
1682
        if (use_events)
77!
1683
            flush_builder(event_builder, event_count, output.events);
77!
1684
        if (use_profiles)
76✔
1685
            flush_builder(profile_builder, profile_count, output.profiles);
63!
1686
        if (use_system)
75✔
1687
            flush_builder(system_builder, system_count, output.system);
63!
1688
    }
1689

1690
    return output;
120✔
1691
}
80!
1692

1693
}  // namespace
1694

NEW
1695
static PyObject* Indexer_iter_aggregation(IndexerObject* self, PyObject* args,
×
1696
                                          PyObject* kwds) {
1697
    static const char* kwlist[] = {"type", "batch_size", nullptr};
NEW
1698
    const char* type_str = "events";
×
NEW
1699
    Py_ssize_t batch_size = 10000;
×
1700

NEW
1701
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|sn", (char**)kwlist,
×
1702
                                     &type_str, &batch_size)) {
NEW
1703
        return nullptr;
×
1704
    }
1705

1706
    AggMapType target_type;
NEW
1707
    if (!parse_agg_type_str(type_str, target_type)) return nullptr;
×
1708

1709
    AggregationBatchType batch_type;
NEW
1710
    if (target_type == AggMapType::EVENT)
×
NEW
1711
        batch_type = AggregationBatchType::EVENT;
×
NEW
1712
    else if (target_type == AggMapType::PROFILE)
×
NEW
1713
        batch_type = AggregationBatchType::PROFILE;
×
1714
    else
NEW
1715
        batch_type = AggregationBatchType::SYSTEM;
×
1716

NEW
1717
    auto idx_opt = resolve_index_path(self);
×
NEW
1718
    if (!idx_opt) return nullptr;
×
NEW
1719
    std::string index_path = std::move(*idx_opt);
×
1720

NEW
1721
    PyObject* batch_list = PyList_New(0);
×
NEW
1722
    if (!batch_list) return nullptr;
×
1723

NEW
1724
    std::string error_msg;
×
1725
    std::vector<dftracer::utils::utilities::common::arrow::ArrowExportResult>
NEW
1726
        results;
×
1727

NEW
1728
    Py_BEGIN_ALLOW_THREADS try {
×
NEW
1729
        auto handle = open_agg_db(index_path, error_msg);
×
NEW
1730
        if (handle) {
×
NEW
1731
            Runtime* rt = get_batch_indexer_runtime(self);
×
NEW
1732
            std::vector<AggScanOutput> outputs;
×
NEW
1733
            parallel_shard_scan<AggScanOutput>(
×
1734
                rt,
NEW
1735
                [&](std::uint16_t shard_begin, std::uint16_t shard_end) {
×
1736
                    AggScanInput input;
NEW
1737
                    input.agg = handle->agg.get();
×
NEW
1738
                    input.target_type = target_type;
×
NEW
1739
                    input.batch_type = batch_type;
×
NEW
1740
                    input.batch_size = batch_size;
×
NEW
1741
                    input.shard_begin = shard_begin;
×
NEW
1742
                    input.shard_end = shard_end;
×
NEW
1743
                    return scan_aggregation_shard_range(input);
×
1744
                },
1745
                outputs);
1746

NEW
1747
            for (auto& out : outputs) {
×
NEW
1748
                for (auto& r : out.results) {
×
NEW
1749
                    results.push_back(std::move(r));
×
1750
                }
1751
            }
NEW
1752
        }
×
NEW
1753
    } catch (const std::exception& e) {
×
NEW
1754
        error_msg = e.what();
×
NEW
1755
    }
×
NEW
1756
    Py_END_ALLOW_THREADS
×
1757

NEW
1758
        if (!error_msg.empty()) {
×
1759
        Py_DECREF(batch_list);
×
NEW
1760
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
NEW
1761
        return nullptr;
×
1762
    }
1763

NEW
1764
    append_results_to_list(batch_list, results);
×
1765

NEW
1766
    PyObject* iter = PyObject_GetIter(batch_list);
×
1767
    Py_DECREF(batch_list);
×
NEW
1768
    return iter;
×
NEW
1769
}
×
1770

1771
static PyObject* Indexer_iter_arrow_dfanalyzer(IndexerObject* self,
4✔
1772
                                               PyObject* args, PyObject* kwds) {
1773
    static const char* kwlist[] = {
1774
        "type",  "batch_size", "time_granularity", "time_resolution",
1775
        "query", nullptr};
1776
    const char* type_str = "events";
4✔
1777
    Py_ssize_t batch_size = 10000;
4✔
1778
    double time_granularity = 1.0;
4✔
1779
    double time_resolution = 1000000.0;
4✔
1780
    const char* query_str = nullptr;
4✔
1781

1782
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|snddz", (char**)kwlist,
4!
1783
                                     &type_str, &batch_size, &time_granularity,
1784
                                     &time_resolution, &query_str)) {
NEW
1785
        return nullptr;
×
1786
    }
1787

1788
    AggMapType target_type;
1789
    if (!parse_agg_type_str(type_str, target_type)) return nullptr;
4!
1790

1791
    auto query_opt = parse_query_arg(query_str);
4!
1792
    if (!query_opt && PyErr_Occurred()) return nullptr;
4!
1793

1794
    auto idx_opt = resolve_index_path(self);
4!
1795
    if (!idx_opt) return nullptr;
4✔
1796
    std::string index_path = std::move(*idx_opt);
4✔
1797

1798
    PyObject* batch_list = PyList_New(0);
4!
1799
    if (!batch_list) return nullptr;
4✔
1800

1801
    std::string error_msg;
4✔
1802
    std::vector<ArrowExportResult> results;
4✔
1803

1804
    Py_BEGIN_ALLOW_THREADS try {
4!
1805
        auto handle = open_agg_db(index_path, error_msg);
4!
1806
        if (handle) {
4✔
1807
            dftracer::utils::utilities::indexer::IndexDatabase idx_db(
2!
1808
                index_path,
1809
                dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
2!
1810
            auto file_hashes =
1811
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
2!
1812
                                            IndexDatabase::HashType::FILE);
2!
1813
            auto host_hashes =
1814
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
2!
1815
                                            IndexDatabase::HashType::HOST);
2!
1816

1817
            auto time_bounds = handle->agg->query_time_bounds();
4!
1818
            std::uint64_t time_origin =
4✔
1819
                time_bounds.valid ? time_bounds.min_time_bucket : 0;
4!
1820

1821
            DfanalyzerContext ctx;
4✔
1822
            ctx.file_hashes = &file_hashes;
4✔
1823
            ctx.host_hashes = &host_hashes;
4✔
1824
            ctx.query_filter = query_opt ? &*query_opt : nullptr;
4!
1825
            ctx.time_origin = time_origin;
4✔
1826
            ctx.time_resolution = time_resolution;
4✔
1827
            ctx.time_granularity = time_granularity;
4✔
1828

1829
            Runtime* rt = get_batch_indexer_runtime(self);
4!
1830
            std::vector<DfanalyzerScanOutput> outputs;
4✔
1831
            parallel_shard_scan<DfanalyzerScanOutput>(
4!
1832
                rt,
2✔
1833
                [&](std::uint16_t shard_begin, std::uint16_t shard_end) {
13✔
1834
                    DfanalyzerScanInput input;
11✔
1835
                    input.agg = handle->agg.get();
13✔
1836
                    input.ctx = &ctx;
13✔
1837
                    input.type_filter = target_type;
13✔
1838
                    input.batch_size = batch_size;
14✔
1839
                    input.shard_begin = shard_begin;
14✔
1840
                    input.shard_end = shard_end;
14✔
1841
                    return scan_dfanalyzer_shards(input);
22!
1842
                },
1843
                outputs);
1844

1845
            for (auto& out : outputs) {
18✔
1846
                for (auto& r : out.events) results.push_back(std::move(r));
28✔
1847
                for (auto& r : out.profiles) results.push_back(std::move(r));
14!
1848
                for (auto& r : out.system) results.push_back(std::move(r));
14!
1849
            }
1850
        }
4✔
1851
    } catch (const std::exception& e) {
4!
NEW
1852
        error_msg = e.what();
×
NEW
1853
    }
×
1854
    Py_END_ALLOW_THREADS
4!
1855

1856
        if (!error_msg.empty()) {
4!
1857
        Py_DECREF(batch_list);
×
NEW
1858
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
NEW
1859
        return nullptr;
×
1860
    }
1861

1862
    append_results_to_list(batch_list, results);
4!
1863

1864
    PyObject* iter = PyObject_GetIter(batch_list);
4!
1865
    Py_DECREF(batch_list);
2!
1866
    return iter;
4✔
1867
}
4✔
1868

1869
static bool parse_group_by_arg(PyObject* obj, GroupByConfig& out) {
18✔
1870
    if (!obj || obj == Py_None) return true;
18!
NEW
1871
    if (!PySequence_Check(obj)) {
×
NEW
1872
        PyErr_SetString(PyExc_TypeError,
×
1873
                        "group_by must be a sequence of strings or None");
NEW
1874
        return false;
×
1875
    }
NEW
1876
    Py_ssize_t n = PySequence_Length(obj);
×
NEW
1877
    for (Py_ssize_t i = 0; i < n; ++i) {
×
NEW
1878
        PyObject* item = PySequence_GetItem(obj, i);
×
NEW
1879
        if (!item) return false;
×
NEW
1880
        if (!PyUnicode_Check(item)) {
×
1881
            Py_DECREF(item);
NEW
1882
            PyErr_SetString(PyExc_TypeError,
×
1883
                            "group_by entries must be strings");
NEW
1884
            return false;
×
1885
        }
NEW
1886
        Py_ssize_t sz = 0;
×
NEW
1887
        const char* s = PyUnicode_AsUTF8AndSize(item, &sz);
×
NEW
1888
        if (!s) {
×
1889
            Py_DECREF(item);
NEW
1890
            return false;
×
1891
        }
NEW
1892
        std::string_view sv(s, static_cast<std::size_t>(sz));
×
NEW
1893
        auto field = parse_group_by_name(sv);
×
NEW
1894
        if (!field) {
×
NEW
1895
            std::string msg = "unsupported group_by field: ";
×
NEW
1896
            msg.append(sv);
×
1897
            Py_DECREF(item);
×
NEW
1898
            PyErr_SetString(PyExc_ValueError, msg.c_str());
×
NEW
1899
            return false;
×
NEW
1900
        }
×
NEW
1901
        if (!(out.mask & *field)) {
×
NEW
1902
            out.mask |= *field;
×
NEW
1903
            out.order.push_back(*field);
×
NEW
1904
            out.names.emplace_back(sv);
×
1905
        }
1906
        Py_DECREF(item);
1907
    }
NEW
1908
    return true;
×
1909
}
9✔
1910

1911
static PyObject* Indexer_iter_arrow_dfanalyzer_all(IndexerObject* self,
20✔
1912
                                                   PyObject* args,
1913
                                                   PyObject* kwds) {
1914
    static const char* kwlist[] = {"batch_size",      "time_granularity",
1915
                                   "time_resolution", "query",
1916
                                   "group_by",        nullptr};
1917
    Py_ssize_t batch_size = 10000;
20✔
1918
    double time_granularity = 1.0;
20✔
1919
    double time_resolution = 1000000.0;
20✔
1920
    const char* query_str = nullptr;
20✔
1921
    PyObject* group_by_obj = nullptr;
20✔
1922

1923
    if (!PyArg_ParseTupleAndKeywords(
20!
1924
            args, kwds, "|nddzO", (char**)kwlist, &batch_size,
10✔
1925
            &time_granularity, &time_resolution, &query_str, &group_by_obj)) {
NEW
1926
        return nullptr;
×
1927
    }
1928

1929
    auto query_opt = parse_query_arg(query_str);
20!
1930
    if (!query_opt && PyErr_Occurred()) return nullptr;
20!
1931

1932
    GroupByConfig group_by_cfg;
18✔
1933
    if (!parse_group_by_arg(group_by_obj, group_by_cfg)) return nullptr;
18!
1934
    const GroupByConfig* group_by_ptr =
18✔
1935
        group_by_cfg.mask != 0 ? &group_by_cfg : nullptr;
18!
1936

1937
    auto idx_opt = resolve_index_path(self);
18!
1938
    if (!idx_opt) return nullptr;
18✔
1939
    std::string index_path = std::move(*idx_opt);
18✔
1940

1941
    PyObject* result_dict = PyDict_New();
18!
1942
    if (!result_dict) return nullptr;
18✔
1943

1944
    PyObject* events_list = PyList_New(0);
18!
1945
    PyObject* profiles_list = PyList_New(0);
18!
1946
    PyObject* system_list = PyList_New(0);
18!
1947
    if (!events_list || !profiles_list || !system_list) {
18!
NEW
1948
        Py_XDECREF(events_list);
×
NEW
1949
        Py_XDECREF(profiles_list);
×
NEW
1950
        Py_XDECREF(system_list);
×
1951
        Py_DECREF(result_dict);
×
NEW
1952
        return nullptr;
×
1953
    }
1954

1955
    std::string error_msg;
18✔
1956
    std::vector<ArrowExportResult> events_results, profiles_results,
18✔
1957
        system_results;
18✔
1958

1959
    Py_BEGIN_ALLOW_THREADS try {
18!
1960
        auto handle = open_agg_db(index_path, error_msg);
18!
1961
        if (handle) {
18✔
1962
            dftracer::utils::utilities::indexer::IndexDatabase idx_db(
9!
1963
                index_path,
1964
                dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
9!
1965
            auto file_hashes =
1966
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
9!
1967
                                            IndexDatabase::HashType::FILE);
9!
1968
            auto host_hashes =
1969
                idx_db.query_hash_table(dftracer::utils::utilities::indexer::
9!
1970
                                            IndexDatabase::HashType::HOST);
9!
1971

1972
            auto time_bounds = handle->agg->query_time_bounds();
18!
1973
            std::uint64_t time_origin =
18✔
1974
                time_bounds.valid ? time_bounds.min_time_bucket : 0;
18!
1975

1976
            DfanalyzerContext ctx;
18✔
1977
            ctx.file_hashes = &file_hashes;
18✔
1978
            ctx.host_hashes = &host_hashes;
18✔
1979
            ctx.query_filter = query_opt ? &*query_opt : nullptr;
18✔
1980
            ctx.time_origin = time_origin;
18✔
1981
            ctx.time_resolution = time_resolution;
18✔
1982
            ctx.time_granularity = time_granularity;
18✔
1983

1984
            Runtime* rt = get_batch_indexer_runtime(self);
18!
1985
            std::vector<DfanalyzerScanOutput> outputs;
18✔
1986
            parallel_shard_scan<DfanalyzerScanOutput>(
18!
1987
                rt,
9✔
1988
                [&](std::uint16_t shard_begin, std::uint16_t shard_end) {
62✔
1989
                    DfanalyzerScanInput input;
53✔
1990
                    input.agg = handle->agg.get();
60✔
1991
                    input.ctx = &ctx;
57✔
1992
                    input.type_filter = std::nullopt;
57✔
1993
                    input.batch_size = batch_size;
56✔
1994
                    input.shard_begin = shard_begin;
56✔
1995
                    input.shard_end = shard_end;
56✔
1996
                    input.group_by = group_by_ptr;
56✔
1997
                    return scan_dfanalyzer_shards(input);
92!
1998
                },
1999
                outputs);
2000

2001
            for (auto& out : outputs) {
81✔
2002
                for (auto& r : out.events)
119✔
2003
                    events_results.push_back(std::move(r));
56!
2004
                for (auto& r : out.profiles)
63✔
NEW
2005
                    profiles_results.push_back(std::move(r));
×
2006
                for (auto& r : out.system)
63✔
NEW
2007
                    system_results.push_back(std::move(r));
×
2008
            }
2009
        }
18✔
2010
    } catch (const std::exception& e) {
18!
NEW
2011
        error_msg = e.what();
×
NEW
2012
    }
×
2013
    Py_END_ALLOW_THREADS
18!
2014

2015
        if (!error_msg.empty()) {
18!
2016
        Py_DECREF(events_list);
×
2017
        Py_DECREF(profiles_list);
×
2018
        Py_DECREF(system_list);
×
2019
        Py_DECREF(result_dict);
×
NEW
2020
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
NEW
2021
        return nullptr;
×
2022
    }
2023

2024
    append_results_to_list(events_list, events_results);
18!
2025
    append_results_to_list(profiles_list, profiles_results);
18!
2026
    append_results_to_list(system_list, system_results);
18!
2027

2028
    PyDict_SetItemString(result_dict, "events", events_list);
18!
2029
    PyDict_SetItemString(result_dict, "profiles", profiles_list);
18!
2030
    PyDict_SetItemString(result_dict, "system", system_list);
18!
2031
    Py_DECREF(events_list);
9!
2032
    Py_DECREF(profiles_list);
9!
2033
    Py_DECREF(system_list);
9!
2034

2035
    return result_dict;
18✔
2036
}
20✔
2037

2038
// ---------------------------------------------------------------------------
2039
// scan_aggregation_manifest — module-level entry point for analyze_trace.
2040
//
2041
// Each Dask worker calls this with its slice of the agg manifest
2042
// (agg_ssts + sys_ssts) and optionally a [shard_begin, shard_end) range.
2043
// The function opens a scratch IndexDatabase at `scratch_dir`, ingests the
2044
// SSTs into its AGGREGATION/SYSTEM_METRICS CFs (nearly free when SSTs live
2045
// on the same filesystem as `scratch_dir` — RocksDB hard-links them), then
2046
// runs the same parallel shard scan that `iter_arrow_dfanalyzer_all` uses.
2047
//
2048
// AGG_GLOBAL_CONFIG_KEY is not written by worker SSTs, so we construct the
2049
// EventAggregator with config_hash=0 directly instead of going through
2050
// `open_agg_db` (which requires the config key). The config hash is used
2051
// by the aggregator only for write-time validation, not for reads.
2052
//
2053
// The scratch DB is NOT cleaned up here — the Python caller owns
2054
// `scratch_dir` lifetime and should remove it after gathering results.
2055
// ---------------------------------------------------------------------------
2056

NEW
2057
static bool collect_string_list(PyObject* obj, const char* name,
×
2058
                                std::vector<std::string>& out) {
NEW
2059
    if (!obj || obj == Py_None) return true;
×
NEW
2060
    PyObject* seq = PySequence_Fast(obj, name);
×
NEW
2061
    if (!seq) return false;
×
NEW
2062
    Py_ssize_t n = PySequence_Fast_GET_SIZE(seq);
×
NEW
2063
    out.reserve(static_cast<std::size_t>(n));
×
NEW
2064
    for (Py_ssize_t i = 0; i < n; ++i) {
×
NEW
2065
        PyObject* item = PySequence_Fast_GET_ITEM(seq, i);
×
NEW
2066
        if (!PyUnicode_Check(item)) {
×
2067
            Py_DECREF(seq);
NEW
2068
            PyErr_Format(PyExc_TypeError, "%s items must be str", name);
×
NEW
2069
            return false;
×
2070
        }
NEW
2071
        const char* s = PyUnicode_AsUTF8(item);
×
NEW
2072
        if (!s) {
×
2073
            Py_DECREF(seq);
NEW
2074
            return false;
×
2075
        }
NEW
2076
        out.emplace_back(s);
×
2077
    }
2078
    Py_DECREF(seq);
NEW
2079
    return true;
×
2080
}
2081

NEW
2082
static bool collect_string_string_dict(
×
2083
    PyObject* obj, const char* name,
2084
    std::unordered_map<std::string, std::string>& out) {
NEW
2085
    if (!obj || obj == Py_None) return true;
×
NEW
2086
    if (!PyDict_Check(obj)) {
×
NEW
2087
        PyErr_Format(PyExc_TypeError, "%s must be a dict[str, str] or None",
×
2088
                     name);
NEW
2089
        return false;
×
2090
    }
2091
    PyObject *k, *v;
NEW
2092
    Py_ssize_t pos = 0;
×
NEW
2093
    while (PyDict_Next(obj, &pos, &k, &v)) {
×
NEW
2094
        if (!PyUnicode_Check(k) || !PyUnicode_Check(v)) {
×
NEW
2095
            PyErr_Format(PyExc_TypeError, "%s must map str -> str", name);
×
NEW
2096
            return false;
×
2097
        }
NEW
2098
        const char* ks = PyUnicode_AsUTF8(k);
×
NEW
2099
        const char* vs = PyUnicode_AsUTF8(v);
×
NEW
2100
        if (!ks || !vs) return false;
×
NEW
2101
        out.emplace(ks, vs);
×
2102
    }
NEW
2103
    return true;
×
2104
}
2105

NEW
2106
static PyObject* scan_aggregation_manifest_fn(PyObject* /*self*/,
×
2107
                                              PyObject* args, PyObject* kwds) {
2108
    static const char* kwlist[] = {
2109
        "agg_ssts",        "sys_ssts",    "scratch_dir",
2110
        "meta_index_path", "batch_size",  "time_granularity",
2111
        "time_resolution", "query",       "group_by",
2112
        "shard_begin",     "shard_end",   "runtime",
2113
        "file_hashes",     "host_hashes", nullptr};
2114

NEW
2115
    PyObject* agg_ssts_obj = nullptr;
×
NEW
2116
    PyObject* sys_ssts_obj = nullptr;
×
NEW
2117
    const char* scratch_dir = nullptr;
×
NEW
2118
    const char* meta_index_path = nullptr;
×
NEW
2119
    Py_ssize_t batch_size = 10000;
×
NEW
2120
    double time_granularity = 1.0;
×
NEW
2121
    double time_resolution = 1000000.0;
×
NEW
2122
    const char* query_str = nullptr;
×
NEW
2123
    PyObject* group_by_obj = nullptr;
×
NEW
2124
    int shard_begin_i = 0;
×
NEW
2125
    int shard_end_i = DFT_NUM_SHARDS;
×
NEW
2126
    PyObject* runtime_obj = nullptr;
×
NEW
2127
    PyObject* file_hashes_obj = nullptr;
×
NEW
2128
    PyObject* host_hashes_obj = nullptr;
×
2129

NEW
2130
    if (!PyArg_ParseTupleAndKeywords(
×
2131
            args, kwds, "OOss|nddzOiiOOO", (char**)kwlist, &agg_ssts_obj,
2132
            &sys_ssts_obj, &scratch_dir, &meta_index_path, &batch_size,
2133
            &time_granularity, &time_resolution, &query_str, &group_by_obj,
2134
            &shard_begin_i, &shard_end_i, &runtime_obj, &file_hashes_obj,
2135
            &host_hashes_obj)) {
NEW
2136
        return nullptr;
×
2137
    }
2138

NEW
2139
    if (shard_begin_i < 0 || shard_end_i > DFT_NUM_SHARDS ||
×
NEW
2140
        shard_begin_i >= shard_end_i) {
×
NEW
2141
        PyErr_Format(PyExc_ValueError,
×
2142
                     "shard range [%d, %d) invalid (must be within [0, %d))",
2143
                     shard_begin_i, shard_end_i, (int)DFT_NUM_SHARDS);
NEW
2144
        return nullptr;
×
2145
    }
2146

NEW
2147
    std::vector<std::string> agg_ssts;
×
NEW
2148
    std::vector<std::string> sys_ssts;
×
NEW
2149
    if (!collect_string_list(agg_ssts_obj, "agg_ssts", agg_ssts))
×
NEW
2150
        return nullptr;
×
NEW
2151
    if (!collect_string_list(sys_ssts_obj, "sys_ssts", sys_ssts))
×
NEW
2152
        return nullptr;
×
2153

NEW
2154
    std::unordered_map<std::string, std::string> preloaded_file_hashes;
×
NEW
2155
    std::unordered_map<std::string, std::string> preloaded_host_hashes;
×
NEW
2156
    const bool hashes_preloaded =
×
NEW
2157
        (file_hashes_obj && file_hashes_obj != Py_None) ||
×
NEW
2158
        (host_hashes_obj && host_hashes_obj != Py_None);
×
NEW
2159
    if (!collect_string_string_dict(file_hashes_obj, "file_hashes",
×
2160
                                    preloaded_file_hashes))
NEW
2161
        return nullptr;
×
NEW
2162
    if (!collect_string_string_dict(host_hashes_obj, "host_hashes",
×
2163
                                    preloaded_host_hashes))
NEW
2164
        return nullptr;
×
2165

NEW
2166
    auto query_opt = parse_query_arg(query_str);
×
NEW
2167
    if (!query_opt && PyErr_Occurred()) return nullptr;
×
2168

NEW
2169
    GroupByConfig group_by_cfg;
×
NEW
2170
    if (!parse_group_by_arg(group_by_obj, group_by_cfg)) return nullptr;
×
NEW
2171
    const GroupByConfig* group_by_ptr =
×
NEW
2172
        group_by_cfg.mask != 0 ? &group_by_cfg : nullptr;
×
2173

NEW
2174
    Runtime* rt = nullptr;
×
NEW
2175
    if (runtime_obj && runtime_obj != Py_None) {
×
NEW
2176
        if (!PyObject_TypeCheck(runtime_obj, &RuntimeType)) {
×
NEW
2177
            PyErr_SetString(PyExc_TypeError,
×
2178
                            "runtime must be a Runtime instance or None");
NEW
2179
            return nullptr;
×
2180
        }
NEW
2181
        rt = ((RuntimeObject*)runtime_obj)->runtime.get();
×
2182
    } else {
NEW
2183
        rt = get_default_runtime();
×
2184
    }
2185

NEW
2186
    PyObject* result_dict = PyDict_New();
×
NEW
2187
    if (!result_dict) return nullptr;
×
NEW
2188
    PyObject* events_list = PyList_New(0);
×
NEW
2189
    PyObject* profiles_list = PyList_New(0);
×
NEW
2190
    PyObject* system_list = PyList_New(0);
×
NEW
2191
    if (!events_list || !profiles_list || !system_list) {
×
NEW
2192
        Py_XDECREF(events_list);
×
NEW
2193
        Py_XDECREF(profiles_list);
×
NEW
2194
        Py_XDECREF(system_list);
×
2195
        Py_DECREF(result_dict);
×
NEW
2196
        return nullptr;
×
2197
    }
2198

NEW
2199
    std::string error_msg;
×
NEW
2200
    std::vector<ArrowExportResult> events_results, profiles_results,
×
NEW
2201
        system_results;
×
NEW
2202
    std::string scratch_index_path = std::string(scratch_dir) + "/.dftindex";
×
NEW
2203
    std::string meta_index_path_str(meta_index_path);
×
2204

NEW
2205
    Py_BEGIN_ALLOW_THREADS try {
×
2206
        namespace rcf = dftracer::utils::rocksdb::cf;
2207
        using clock = std::chrono::steady_clock;
NEW
2208
        auto ms = [](clock::time_point a, clock::time_point b) -> long long {
×
NEW
2209
            return std::chrono::duration_cast<std::chrono::milliseconds>(b - a)
×
NEW
2210
                .count();
×
2211
        };
2212

NEW
2213
        auto t_start = clock::now();
×
2214
        dftracer::utils::utilities::indexer::IndexDatabase scratch_db(
×
2215
            scratch_index_path);
×
NEW
2216
        auto t_scratch_open = clock::now();
×
2217

NEW
2218
        auto raw_db = scratch_db.db();
×
NEW
2219
        for (const auto& p : agg_ssts) {
×
NEW
2220
            auto st = raw_db->ingest_external_files(rcf::AGGREGATION, {p},
×
2221
                                                    /*ingest_behind=*/false);
×
NEW
2222
            if (!st.ok()) {
×
2223
                error_msg =
NEW
2224
                    "ingest AGGREGATION sst '" + p + "': " + st.ToString();
×
NEW
2225
                break;
×
2226
            }
NEW
2227
        }
×
NEW
2228
        if (error_msg.empty()) {
×
NEW
2229
            for (const auto& p : sys_ssts) {
×
NEW
2230
                auto st = raw_db->ingest_external_files(
×
NEW
2231
                    rcf::SYSTEM_METRICS, {p}, /*ingest_behind=*/false);
×
NEW
2232
                if (!st.ok()) {
×
NEW
2233
                    error_msg = "ingest SYSTEM_METRICS sst '" + p +
×
NEW
2234
                                "': " + st.ToString();
×
NEW
2235
                    break;
×
2236
                }
NEW
2237
            }
×
2238
        }
NEW
2239
        auto t_ingest = clock::now();
×
2240

NEW
2241
        if (error_msg.empty()) {
×
2242
            auto agg =
NEW
2243
                std::make_unique<EventAggregator>(raw_db, /*cfg_hash=*/0);
×
2244

2245
            // If the caller passed pre-loaded hash tables, skip opening
2246
            // the meta DB on lustre. When many dask workers run
2247
            // scan_aggregation_manifest in parallel, loading the hash
2248
            // tables N times from the same file is significant lustre
2249
            // metadata pressure; loading once on the coordinator and
2250
            // passing them in eliminates the redundant reads.
NEW
2251
            std::unordered_map<std::string, std::string> loaded_file_hashes;
×
NEW
2252
            std::unordered_map<std::string, std::string> loaded_host_hashes;
×
2253
            std::unique_ptr<dftracer::utils::utilities::indexer::IndexDatabase>
NEW
2254
                meta_db;
×
NEW
2255
            if (!hashes_preloaded) {
×
2256
                meta_db = std::make_unique<
×
2257
                    dftracer::utils::utilities::indexer::IndexDatabase>(
2258
                    meta_index_path_str, dftracer::utils::rocksdb::
2259
                                             RocksDatabase::OpenMode::ReadOnly);
×
NEW
2260
                loaded_file_hashes = meta_db->query_hash_table(
×
2261
                    dftracer::utils::utilities::indexer::IndexDatabase::
2262
                        HashType::FILE);
NEW
2263
                loaded_host_hashes = meta_db->query_hash_table(
×
2264
                    dftracer::utils::utilities::indexer::IndexDatabase::
2265
                        HashType::HOST);
2266
            }
NEW
2267
            const auto& file_hashes =
×
2268
                hashes_preloaded ? preloaded_file_hashes : loaded_file_hashes;
×
NEW
2269
            const auto& host_hashes =
×
2270
                hashes_preloaded ? preloaded_host_hashes : loaded_host_hashes;
×
NEW
2271
            auto t_hash_tables = clock::now();
×
2272

NEW
2273
            auto time_bounds = agg->query_time_bounds();
×
NEW
2274
            std::uint64_t time_origin =
×
NEW
2275
                time_bounds.valid ? time_bounds.min_time_bucket : 0;
×
2276

NEW
2277
            DfanalyzerContext ctx;
×
NEW
2278
            ctx.file_hashes = &file_hashes;
×
NEW
2279
            ctx.host_hashes = &host_hashes;
×
NEW
2280
            ctx.query_filter = query_opt ? &*query_opt : nullptr;
×
NEW
2281
            ctx.time_origin = time_origin;
×
NEW
2282
            ctx.time_resolution = time_resolution;
×
NEW
2283
            ctx.time_granularity = time_granularity;
×
2284

NEW
2285
            std::vector<DfanalyzerScanOutput> outputs;
×
NEW
2286
            parallel_shard_scan_range<DfanalyzerScanOutput>(
×
2287
                rt, static_cast<std::uint16_t>(shard_begin_i),
2288
                static_cast<std::uint16_t>(shard_end_i),
NEW
2289
                [&](std::uint16_t sb, std::uint16_t se) {
×
NEW
2290
                    DfanalyzerScanInput input;
×
NEW
2291
                    input.agg = agg.get();
×
NEW
2292
                    input.ctx = &ctx;
×
NEW
2293
                    input.type_filter = std::nullopt;
×
NEW
2294
                    input.batch_size = batch_size;
×
NEW
2295
                    input.shard_begin = sb;
×
NEW
2296
                    input.shard_end = se;
×
NEW
2297
                    input.group_by = group_by_ptr;
×
NEW
2298
                    return scan_dfanalyzer_shards(input);
×
2299
                },
2300
                outputs);
NEW
2301
            auto t_scan = clock::now();
×
2302

NEW
2303
            for (auto& out : outputs) {
×
NEW
2304
                for (auto& r : out.events)
×
NEW
2305
                    events_results.push_back(std::move(r));
×
NEW
2306
                for (auto& r : out.profiles)
×
NEW
2307
                    profiles_results.push_back(std::move(r));
×
NEW
2308
                for (auto& r : out.system)
×
NEW
2309
                    system_results.push_back(std::move(r));
×
2310
            }
2311

NEW
2312
            std::fprintf(
×
2313
                stderr,
2314
                "[scan_aggregation_manifest] n_agg=%zu n_sys=%zu "
2315
                "scratch_open=%lldms ingest=%lldms hash_tables=%lldms "
2316
                "scan=%lldms\n",
2317
                agg_ssts.size(), sys_ssts.size(), ms(t_start, t_scratch_open),
×
2318
                ms(t_scratch_open, t_ingest), ms(t_ingest, t_hash_tables),
×
2319
                ms(t_hash_tables, t_scan));
×
NEW
2320
            std::fflush(stderr);
×
NEW
2321
        }
×
NEW
2322
    } catch (const std::exception& e) {
×
NEW
2323
        error_msg = e.what();
×
NEW
2324
    }
×
NEW
2325
    Py_END_ALLOW_THREADS
×
2326

NEW
2327
        if (!error_msg.empty()) {
×
2328
        Py_DECREF(events_list);
×
2329
        Py_DECREF(profiles_list);
×
2330
        Py_DECREF(system_list);
×
2331
        Py_DECREF(result_dict);
×
NEW
2332
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
NEW
2333
        return nullptr;
×
2334
    }
2335

NEW
2336
    append_results_to_list(events_list, events_results);
×
NEW
2337
    append_results_to_list(profiles_list, profiles_results);
×
NEW
2338
    append_results_to_list(system_list, system_results);
×
2339

NEW
2340
    PyDict_SetItemString(result_dict, "events", events_list);
×
NEW
2341
    PyDict_SetItemString(result_dict, "profiles", profiles_list);
×
NEW
2342
    PyDict_SetItemString(result_dict, "system", system_list);
×
2343
    Py_DECREF(events_list);
×
2344
    Py_DECREF(profiles_list);
×
2345
    Py_DECREF(system_list);
×
2346

NEW
2347
    return result_dict;
×
NEW
2348
}
×
2349

2350
static PyMethodDef BatchIndexerModuleMethods[] = {
2351
    {"scan_aggregation_manifest", (PyCFunction)scan_aggregation_manifest_fn,
2352
     METH_VARARGS | METH_KEYWORDS,
2353
     "scan_aggregation_manifest(agg_ssts, sys_ssts, scratch_dir, "
2354
     "meta_index_path, batch_size=10000, time_granularity=1.0, "
2355
     "time_resolution=1e6, query=None, group_by=None, shard_begin=0, "
2356
     "shard_end=4096, runtime=None) -> dict\n"
2357
     "--\n\n"
2358
     "Scan a worker's slice of the distributed aggregation manifest.\n\n"
2359
     "Ingests agg_ssts + sys_ssts into a scratch IndexDatabase at "
2360
     "scratch_dir (caller owns the directory lifecycle) and runs the "
2361
     "dfanalyzer aggregation scan over [shard_begin, shard_end). "
2362
     "meta_index_path is the unified .dftindex used to resolve file / "
2363
     "host hashes. Returns the same dict shape as "
2364
     "Indexer.iter_arrow_dfanalyzer_all."},
2365
    {nullptr, nullptr, 0, nullptr}};
2366
#endif
2367

2368
static PyMethodDef Indexer_methods[] = {
2369
    {"get_checkpoint_indexer", (PyCFunction)Indexer_get_checkpoint_indexer,
2370
     METH_VARARGS,
2371
     "get_checkpoint_indexer(file_path)\n"
2372
     "--\n\n"
2373
     "Get a checkpoint indexer for a specific file.\n\n"
2374
     "Args:\n"
2375
     "    file_path: Path to the trace file (.pfw/.pfw.gz)\n\n"
2376
     "Returns:\n"
2377
     "    Indexer instance for checkpoint-level operations.\n"},
2378
    {"resolve", (PyCFunction)Indexer_resolve, METH_NOARGS,
2379
     "resolve()\n"
2380
     "--\n\n"
2381
     "Check what files exist vs need indexing.\n\n"
2382
     "Returns:\n"
2383
     "    dict with 'total_files', 'ready', 'needs_work', 'index_path'\n"},
2384
    {"build", (PyCFunction)Indexer_build, METH_NOARGS,
2385
     "build()\n"
2386
     "--\n\n"
2387
     "Build all missing index tiers based on require_* flags.\n"},
2388
    {"ensure_indexed", (PyCFunction)Indexer_ensure_indexed, METH_NOARGS,
2389
     "ensure_indexed()\n"
2390
     "--\n\n"
2391
     "Resolve and build if needed.\n\n"
2392
     "Returns:\n"
2393
     "    dict with index status after building.\n"},
2394
    {"get_hash_table", (PyCFunction)Indexer_get_hash_table, METH_VARARGS,
2395
     "get_hash_table(type)\n"
2396
     "--\n\n"
2397
     "Query hash table mappings.\n\n"
2398
     "Args:\n"
2399
     "    type: 'file', 'host', 'string', or 'proc'\n\n"
2400
     "Returns:\n"
2401
     "    dict mapping hash values to resolved names.\n"},
2402
    {"query_file_pids", (PyCFunction)Indexer_query_file_pids, METH_VARARGS,
2403
     "query_file_pids(file_id)\n"
2404
     "--\n\n"
2405
     "Query PIDs observed in a specific file.\n\n"
2406
     "Args:\n"
2407
     "    file_id: Integer file ID from index.\n\n"
2408
     "Returns:\n"
2409
     "    set of PIDs.\n"},
2410
    {"query_all_file_pids", (PyCFunction)Indexer_query_all_file_pids,
2411
     METH_NOARGS,
2412
     "query_all_file_pids()\n"
2413
     "--\n\n"
2414
     "Query PIDs for all indexed files.\n\n"
2415
     "Returns:\n"
2416
     "    dict mapping file_id to set of PIDs.\n"},
2417
    {"query_file_info", (PyCFunction)Indexer_query_file_info, METH_NOARGS,
2418
     "query_file_info()\n"
2419
     "--\n\n"
2420
     "Query file ID to path mapping and per-file PIDs in one call.\n\n"
2421
     "Returns:\n"
2422
     "    tuple of (dict[int, str], dict[int, set[int]]).\n"},
2423
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2424
    {"iter_aggregation", (PyCFunction)Indexer_iter_aggregation,
2425
     METH_VARARGS | METH_KEYWORDS,
2426
     "iter_aggregation(type='events', batch_size=10000)\n"
2427
     "--\n\n"
2428
     "Iterate over aggregation data as Arrow batches.\n\n"
2429
     "Args:\n"
2430
     "    type: 'events', 'profiles', or 'system'\n"
2431
     "    batch_size: Number of entries per batch (default 10000)\n\n"
2432
     "Returns:\n"
2433
     "    Iterator over Arrow batch capsules.\n"},
2434
    {"iter_arrow_dfanalyzer", (PyCFunction)Indexer_iter_arrow_dfanalyzer,
2435
     METH_VARARGS | METH_KEYWORDS,
2436
     "iter_arrow_dfanalyzer(type='events', batch_size=10000, "
2437
     "time_granularity=1.0, time_resolution=1e6, query=None)\n"
2438
     "--\n\n"
2439
     "Iterate over aggregation data as dfanalyzer-compatible Arrow batches.\n\n"
2440
     "Output schema matches dfanalyzer expectations with resolved hashes,\n"
2441
     "normalized time_range, and computed columns (proc_name, io_cat).\n\n"
2442
     "Args:\n"
2443
     "    type: 'events', 'profiles', or 'system'\n"
2444
     "    batch_size: Number of entries per batch (default 10000)\n"
2445
     "    time_granularity: Bucket width in seconds (default 1.0)\n"
2446
     "    time_resolution: Microseconds per output time unit (default 1e6)\n"
2447
     "    query: Optional query filter string (e.g., \"pid == 1234\")\n\n"
2448
     "Returns:\n"
2449
     "    Iterator over Arrow batch capsules.\n"},
2450
    {"iter_arrow_dfanalyzer_all",
2451
     (PyCFunction)Indexer_iter_arrow_dfanalyzer_all,
2452
     METH_VARARGS | METH_KEYWORDS,
2453
     "iter_arrow_dfanalyzer_all(batch_size=10000, time_granularity=1.0, "
2454
     "time_resolution=1e6, query=None, group_by=None)\n"
2455
     "--\n\n"
2456
     "Iterate over all aggregation types in a single scan.\n\n"
2457
     "Returns a dict with 'events', 'profiles', 'system' keys, each "
2458
     "containing\n"
2459
     "a list of Arrow batch capsules. This is ~3x faster than calling\n"
2460
     "iter_arrow_dfanalyzer separately for each type.\n\n"
2461
     "When group_by is provided, the scan collapses dimensions during "
2462
     "aggregation\n"
2463
     "and emits a reduced schema containing only the requested columns plus\n"
2464
     "aggregated metrics (count, time, size, time_sq, size_sq, time_min,\n"
2465
     "time_max, size_min, size_max, time_call_min, time_call_max, "
2466
     "size_call_min,\n"
2467
     "size_call_max, time_start, time_end). Supported group_by columns: "
2468
     "cat,\n"
2469
     "func_name, pid, tid, file_hash, host_hash, file_name, host_name, "
2470
     "proc_name,\n"
2471
     "io_cat, acc_pat, time_range.\n\n"
2472
     "Args:\n"
2473
     "    batch_size: Number of entries per batch (default 10000)\n"
2474
     "    time_granularity: Bucket width in seconds (default 1.0)\n"
2475
     "    time_resolution: Microseconds per output time unit (default 1e6)\n"
2476
     "    query: Optional query filter string\n"
2477
     "    group_by: Optional list of columns to group by; enables coarse\n"
2478
     "        in-scan aggregation (default None = full granularity)\n\n"
2479
     "Returns:\n"
2480
     "    dict with 'events', 'profiles', 'system' lists of Arrow capsules.\n"},
2481
#endif
2482
    {nullptr}};
2483

2484
static PyGetSetDef Indexer_getsetters[] = {{nullptr}};
2485

2486
PyTypeObject IndexerType = {
2487
    PyVarObject_HEAD_INIT(nullptr, 0) "dftracer_utils_ext.Indexer",
2488
    sizeof(IndexerObject),
2489
    0,
2490
    (destructor)Indexer_dealloc,
2491
    0,
2492
    0,
2493
    0,
2494
    0,
2495
    0,
2496
    0,
2497
    0,
2498
    0,
2499
    0,
2500
    0,
2501
    0,
2502
    0,
2503
    0,
2504
    0,
2505
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
2506
    "BatchIndexer(directory='', files=None, index_dir='',\n"
2507
    "             require_checkpoint=True, require_bloom=True,\n"
2508
    "             require_manifest=True, require_aggregation=False,\n"
2509
    "             time_interval_ms=5000.0, group_keys=None,\n"
2510
    "             custom_metric_fields=None, compute_percentiles=False,\n"
2511
    "             parallelism=0, force_rebuild=False, runtime=None)\n"
2512
    "--\n\n"
2513
    "Indexer with tiered index building.\n\n"
2514
    "At least one of 'directory' or 'files' must be provided.\n"
2515
    "- directory: scan for .pfw/.pfw.gz files\n"
2516
    "- files: list of specific file paths\n\n"
2517
    "Supports:\n"
2518
    "- Tier 1: Checkpoints (require_checkpoint)\n"
2519
    "- Tier 2: Bloom filters (require_bloom), Manifests (require_manifest)\n"
2520
    "- Tier 3: Aggregation (require_aggregation + config params)\n",
2521
    0,
2522
    0,
2523
    0,
2524
    0,
2525
    0,
2526
    0,
2527
    Indexer_methods,
2528
    0,
2529
    Indexer_getsetters,
2530
    0,
2531
    0,
2532
    0,
2533
    0,
2534
    0,
2535
    (initproc)Indexer_init,
2536
    0,
2537
    Indexer_new,
2538
};
2539

2540
int init_indexer(PyObject* m) {
2✔
2541
    if (PyType_Ready(&IndexerType) < 0) return -1;
2✔
2542

2543
    Py_INCREF(&IndexerType);
1✔
2544
    if (PyModule_AddObject(m, "Indexer", (PyObject*)&IndexerType) < 0) {
2✔
2545
        Py_DECREF(&IndexerType);
NEW
2546
        return -1;
×
2547
    }
2548

2549
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2550
    if (PyModule_AddFunctions(m, BatchIndexerModuleMethods) < 0) return -1;
2✔
2551
#endif
2552

2553
    return 0;
2✔
2554
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc