• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28693295402

04 Jul 2026 03:17AM UTC coverage: 52.408% (+0.1%) from 52.278%
28693295402

push

github

hariharan-devarajan
feat: silence noisy warnings on aarch64

37318 of 92666 branches covered (40.27%)

Branch coverage included in aggregate %.

33462 of 42389 relevant lines covered (78.94%)

20557.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.37
/src/dftracer/utils/python/utilities/aggregator.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <dftracer/utils/core/common/config.h>
3
#include <dftracer/utils/core/common/constants.h>
4
#include <dftracer/utils/core/common/memory_budget.h>
5
#include <dftracer/utils/core/coro/task.h>
6
#include <dftracer/utils/core/runtime.h>
7
#include <dftracer/utils/python/arrow_helpers.h>
8
#include <dftracer/utils/python/py_dict_helpers.h>
9
#include <dftracer/utils/python/py_list_helpers.h>
10
#include <dftracer/utils/python/py_runtime_mixin.h>
11
#include <dftracer/utils/python/py_type_helpers.h>
12
#include <dftracer/utils/python/runtime.h>
13
#include <dftracer/utils/python/trace_reader_iterator.h>
14
#include <dftracer/utils/python/utilities/aggregator.h>
15
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregator_utility.h>
16

17
#ifdef DFTRACER_UTILS_ENABLE_ARROW
18
#include <dftracer/utils/python/batch_byte_size.h>
19
#include <dftracer/utils/python/streaming_iterator.h>
20
#endif
21
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
22
#include <dftracer/utils/utilities/common/arrow/partition_router.h>
23
#include <dftracer/utils/utilities/common/arrow/partition_writer.h>
24
#include <dftracer/utils/utilities/common/query/query.h>
25
#endif
26

27
#include <cctype>
28
#include <memory>
29
#include <optional>
30
#include <string>
31
#include <vector>
32

33
using dftracer::utils::CoroScope;
34
using dftracer::utils::Runtime;
35
using dftracer::utils::coro::CoroTask;
36
using namespace dftracer::utils::utilities::composites::dft::aggregators;
37

38
using dftracer::utils::python::wrap_arrow_result;
39
using dftracer::utils::python::wrap_arrow_table;
40

41
#ifdef DFTRACER_UTILS_ENABLE_ARROW
42
using dftracer::utils::python::ArrowStreamingIteratorObject;
43
using dftracer::utils::python::ArrowStreamingIteratorType;
44
using dftracer::utils::python::StreamingState;
45
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
46
#endif
47
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
48
using dftracer::utils::utilities::common::arrow::IpcCompression;
49
using dftracer::utils::utilities::common::arrow::PartitionWriter;
50
using dftracer::utils::utilities::common::arrow::PartitionWriteStats;
51
using dftracer::utils::utilities::common::query::Query;
52
#endif
53

54
DFTRACER_UTILS_RUNTIME_BACKED_SLOTS(Aggregator, AggregatorObject)
132✔
55

56
// ---------------------------------------------------------------------------
57
// Helpers
58
// ---------------------------------------------------------------------------
59

60
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
61
// Parse a view query string into an optional Query
62
static int parse_view_query(PyObject *query_obj, std::optional<Query> &out) {
36✔
63
    if (!query_obj || query_obj == Py_None) {
36!
64
        out = std::nullopt;
32✔
65
        return 0;
32✔
66
    }
67
    const char *query_str = PyUnicode_AsUTF8(query_obj);
4!
68
    if (!query_str) return -1;
4✔
69
    auto parsed = Query::from_string(query_str);
4!
70
    if (!parsed) {
4!
71
        PyErr_Format(PyExc_ValueError, "Invalid query: %s",
×
72
                     parsed.error().format().c_str());
×
73
        return -1;
×
74
    }
75
    out = std::move(*parsed);
4!
76
    return 0;
4✔
77
}
20✔
78
#endif
79

80
static int parse_aggregator_args(PyObject *args, PyObject *kwds,
36✔
81
                                 AggregatorInput &input,
82
                                 std::size_t *buffer_size_out = nullptr,
83
                                 std::optional<Query> *query_out = nullptr) {
84
    static const char *kwlist[] = {"directory",
85
                                   "time_interval_ms",
86
                                   "group_keys",
87
                                   "categories",
88
                                   "names",
89
                                   "index_dir",
90
                                   "checkpoint_size",
91
                                   "force_rebuild",
92
                                   "parallelism",
93
                                   "event_batch_size",
94
                                   "custom_metric_fields",
95
                                   "compute_percentiles",
96
                                   "buffer_size",
97
                                   "query",
98
                                   NULL};
99

100
    const char *directory = NULL;
36✔
101
    double time_interval_ms = 5000.0;
36✔
102
    PyObject *group_keys_obj = Py_None;
36✔
103
    PyObject *categories_obj = Py_None;
36✔
104
    PyObject *names_obj = Py_None;
36✔
105
    const char *index_dir = "";
36✔
106
    Py_ssize_t checkpoint_size = static_cast<Py_ssize_t>(
36✔
107
        dftracer::utils::constants::indexer::DEFAULT_CHECKPOINT_SIZE);
108
    int force_rebuild = 0;
36✔
109
    Py_ssize_t parallelism = 0;
36✔
110
    Py_ssize_t event_batch_size = 10000;
36✔
111
    PyObject *custom_metrics_obj = Py_None;
36✔
112
    int compute_percentiles = 0;
36✔
113
    Py_ssize_t buffer_size = 8;
36✔
114
    PyObject *query_obj = Py_None;
36✔
115

116
    if (!PyArg_ParseTupleAndKeywords(
36!
117
            args, kwds, "s|dOOOsnpnnOpnO", (char **)kwlist, &directory,
18✔
118
            &time_interval_ms, &group_keys_obj, &categories_obj, &names_obj,
119
            &index_dir, &checkpoint_size, &force_rebuild, &parallelism,
120
            &event_batch_size, &custom_metrics_obj, &compute_percentiles,
121
            &buffer_size, &query_obj))
122
        return -1;
×
123

124
    if (buffer_size_out) {
36✔
125
        *buffer_size_out = static_cast<std::size_t>(buffer_size);
6✔
126
    }
3✔
127

128
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
129
    if (query_out) {
36✔
130
        if (parse_view_query(query_obj, *query_out) < 0) return -1;
36!
131
    }
18✔
132
#else
133
    (void)query_obj;
134
#endif
135

136
    input.directory = directory;
36!
137
    input.config.time_interval_us =
36✔
138
        static_cast<std::uint64_t>(time_interval_ms * 1000.0);
36✔
139
    input.index_dir = index_dir;
36!
140
    input.checkpoint_size = static_cast<std::size_t>(checkpoint_size);
36✔
141
    input.force_rebuild = force_rebuild != 0;
36✔
142
    input.parallelism = static_cast<std::size_t>(parallelism);
36✔
143
    input.event_batch_size = static_cast<std::size_t>(event_batch_size);
36✔
144
    input.config.compute_percentiles = compute_percentiles != 0;
36✔
145

146
    if (!parse_str_list(group_keys_obj, "group_keys",
54!
147
                        input.config.extra_group_keys))
36!
148
        return -1;
×
149
    if (!parse_str_list(custom_metrics_obj, "custom_metric_fields",
54!
150
                        input.config.custom_metric_fields))
36!
151
        return -1;
×
152

153
    return 0;
36✔
154
}
18✔
155

156
#ifdef DFTRACER_UTILS_ENABLE_ARROW
157
static bool run_aggregator_pipeline(
30✔
158
    AggregatorObject *self, const AggregatorInput &input,
159
    std::vector<ArrowExportResult> &results,
160
    const std::optional<Query> *query = nullptr) {
161
    auto *rp = &results;
30✔
162
    AggregatorInput input_copy = input;
30!
163
    std::optional<Query> query_copy;
30✔
164
    if (query) query_copy = *query;
30!
165

166
    return run_blocking([&] {
45!
167
        Runtime *rt = resolve_runtime(self);
30✔
168
        rt->submit(run_coro_scope(
135!
169
                       rt->executor(),
15✔
170
                       [](CoroScope &scope, std::vector<ArrowExportResult> *out,
214!
171
                          AggregatorInput input,
172
                          std::optional<Query> query) -> CoroTask<void> {
15!
173
                           AggregatorUtility util;
15!
174
                           util.bind_context(scope);
15!
175
                           try {
176
                               auto gen = util.process(input);
15!
177
                               while (auto batch = co_await gen.next()) {
139!
178
                                   if (batch->entries.empty()) continue;
16!
179
                                   AggregationBatch filtered;
16✔
180
                                   if (query) {
16✔
181
                                       filtered = batch->filter(*query);
1!
182
                                       if (filtered.entries.empty()) continue;
1!
183
                                   } else {
1✔
184
                                       filtered = std::move(*batch);
15✔
185
                                   }
186
                                   auto arrow_result = filtered.to_arrow();
16!
187
                                   if (!arrow_result.valid()) continue;
16!
188
                                   out->push_back(std::move(arrow_result));
16!
189
                               }
31!
190
                               util.unbind_context();
15!
191
                           } catch (...) {
77✔
192
                               util.unbind_context();
×
193
                               throw;
194
                           }
×
195
                       },
231!
196
                       rp, std::move(input_copy), std::move(query_copy)),
60✔
197
                   "aggregator")
15!
198
            .get();
30!
199
    });
60✔
200
}
30✔
201
#endif  // DFTRACER_UTILS_ENABLE_ARROW
202

203
#ifdef DFTRACER_UTILS_ENABLE_ARROW
204

205
static CoroTask<void> run_aggregator_stream(
50!
206
    CoroScope &scope, std::shared_ptr<StreamingState<ArrowExportResult>> state,
207
    AggregatorInput input, std::optional<Query> query) {
3!
208
    if (state->cancelled()) {
3!
209
        state->complete();
×
210
        co_return;
3✔
211
    }
212

213
    try {
214
        AggregatorUtility util;
3!
215
        util.bind_context(scope);
3!
216
        auto gen = util.process(input);
3!
217

218
        while (auto batch = co_await gen.next()) {
32!
219
            if (state->cancelled()) break;
5!
220
            if (batch->entries.empty()) continue;
5!
221

222
            AggregationBatch filtered;
5✔
223
            if (query) {
5✔
224
                filtered = batch->filter(*query);
1!
225
                if (filtered.entries.empty()) continue;
1!
226
            } else {
1✔
227
                filtered = std::move(*batch);
4✔
228
            }
229

230
            auto arrow_result = filtered.to_arrow();
5!
231
            if (!arrow_result.valid()) continue;
5!
232

233
            auto result_bytes =
5✔
234
                dftracer::utils::python::byte_size(arrow_result);
5!
235
            if (!state->push(std::move(arrow_result), result_bytes)) {
5!
236
                break;
237
            }
238
        }
8!
239

240
        util.unbind_context();
3!
241
        state->complete();
3!
242
    } catch (const std::exception &e) {
19!
243
        state->fail(std::current_exception());
×
244
    } catch (...) {
×
245
        state->fail(std::current_exception());
×
246
    }
×
247
}
38!
248

249
#endif  // DFTRACER_UTILS_ENABLE_ARROW
250

251
// ---------------------------------------------------------------------------
252
// process() - returns ArrowTable (materialized)
253
// ---------------------------------------------------------------------------
254

255
static PyObject *Aggregator_process(AggregatorObject *self, PyObject *args,
30✔
256
                                    PyObject *kwds) {
257
    AggregatorInput input;
30!
258
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
259
    std::optional<Query> query;
30✔
260
    if (parse_aggregator_args(args, kwds, input, nullptr, &query) < 0)
30!
261
        return NULL;
×
262
#else
263
    if (parse_aggregator_args(args, kwds, input) < 0) return NULL;
264
#endif
265

266
#ifdef DFTRACER_UTILS_ENABLE_ARROW
267
    std::vector<ArrowExportResult> results;
30✔
268
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
269
    if (!run_aggregator_pipeline(self, input, results, &query)) {
30!
270
#else
271
    if (!run_aggregator_pipeline(self, input, results)) {
272
#endif
273
        return NULL;
×
274
    }
275

276
    PyObject *batch_list = PyList_New(0);
30!
277
    if (!batch_list) return NULL;
30✔
278

279
    for (auto &result : results) {
62✔
280
        PyObject *cap = wrap_arrow_result(std::move(result));
32!
281
        if (!cap) {
32!
282
            Py_DECREF(batch_list);
×
283
            return NULL;
×
284
        }
285
        int rc = PyList_Append(batch_list, cap);
32!
286
        Py_DECREF(cap);
16!
287
        if (rc < 0) {
32!
288
            Py_DECREF(batch_list);
×
289
            return NULL;
×
290
        }
291
    }
292

293
    return wrap_arrow_table(batch_list);
30!
294
#else
295
    PyErr_SetString(PyExc_RuntimeError,
296
                    "dftracer-utils was built without Arrow support");
297
    return NULL;
298
#endif
299
}
30✔
300

301
// ---------------------------------------------------------------------------
302
// iter_arrow() - returns true streaming iterator
303
// ---------------------------------------------------------------------------
304

305
static PyObject *Aggregator_iter_arrow(AggregatorObject *self, PyObject *args,
6✔
306
                                       PyObject *kwds) {
307
    AggregatorInput input;
6!
308
    std::size_t buffer_size = 8;
6✔
309
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
310
    std::optional<Query> query;
6✔
311
    if (parse_aggregator_args(args, kwds, input, &buffer_size, &query) < 0)
6!
312
        return NULL;
×
313
#else
314
    if (parse_aggregator_args(args, kwds, input, &buffer_size) < 0) return NULL;
315
#endif
316

317
#ifdef DFTRACER_UTILS_ENABLE_ARROW
318
    auto state = std::make_shared<StreamingState<ArrowExportResult>>(
3!
319
        dftracer::utils::compute_memory_budget(0));
6!
320

321
    ArrowStreamingIteratorObject *iter_obj =
3✔
322
        (ArrowStreamingIteratorObject *)ArrowStreamingIteratorType.tp_new(
6!
323
            &ArrowStreamingIteratorType, NULL, NULL);
324
    if (!iter_obj) {
6✔
325
        return NULL;
×
326
    }
327

328
    iter_obj->cpp_state->state = state;
6✔
329
    iter_obj->cpp_state->pull_next =
6!
330
        [state]() -> std::optional<ArrowExportResult> { return state->pull(); };
22!
331
    iter_obj->cpp_state->get_error = [state]() -> std::exception_ptr {
12!
332
        return state->error();
6✔
333
    };
3!
334
    iter_obj->cpp_state->cancel = [state]() { state->cancel(); };
12!
335

336
    Runtime *rt = resolve_runtime(self);
6!
337
    AggregatorInput input_copy = input;
6!
338
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
339
    std::optional<Query> query_copy = std::move(query);
6✔
340
    Py_BEGIN_ALLOW_THREADS rt->submit(
15!
341
        run_coro_scope(rt->executor(), run_aggregator_stream, state,
15!
342
                       std::move(input_copy), std::move(query_copy)),
9✔
343
        "aggregator_stream");
3!
344
#else
345
    Py_BEGIN_ALLOW_THREADS rt->submit(
346
        run_coro_scope(rt->executor(), run_aggregator_stream, state,
347
                       std::move(input_copy), std::nullopt),
348
        "aggregator_stream");
349
#endif
350
    Py_END_ALLOW_THREADS
6!
351

352
        return (PyObject *)iter_obj;
6✔
353
#else
354
    PyErr_SetString(PyExc_RuntimeError,
355
                    "dftracer-utils was built without Arrow support");
356
    return NULL;
357
#endif
358
}
6✔
359

360
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
361

362
struct AggregatorViewDef {
363
    std::string name;
364
    std::optional<Query> query;
365
};
366

367
struct AggregatorWriteArrowResult {
4✔
368
    std::unordered_map<std::string, PartitionWriteStats> view_stats;
369
    int64_t total_rows = 0;
4✔
370
    int64_t total_bytes = 0;
4✔
371
    std::string error;
372
};
373

374
static CoroTask<void> run_aggregator_write_arrow(
44!
375
    CoroScope &scope, AggregatorWriteArrowResult *out, AggregatorInput input,
376
    std::string output_path, std::vector<AggregatorViewDef> views,
377
    int64_t chunk_size_bytes, IpcCompression compression) {
4!
378
    try {
379
        // If no views specified, create a default "all" view
380
        if (views.empty()) {
4✔
381
            views.push_back({"all", std::nullopt});
3!
382
        }
3✔
383

384
        // Open a writer for each view
385
        std::vector<PartitionWriter> writers(views.size());
4!
386
        for (std::size_t i = 0; i < views.size(); ++i) {
21✔
387
            std::string view_path = output_path;
11!
388
            if (views.size() > 1 || views[i].name != "all") {
11✔
389
                view_path = output_path + "/" + views[i].name;
18!
390
            }
2✔
391
            int rc = co_await writers[i].open(view_path, chunk_size_bytes,
39!
392
                                              compression);
15✔
393
            if (rc != 0) {
5!
394
                out->error = "Failed to open writer for view: " + views[i].name;
×
395
                co_return;
396
            }
397
        }
5✔
398

399
        AggregatorUtility util;
20✔
400
        util.bind_context(scope);
4✔
401
        auto gen = util.process(input);
20!
402

403
        while (auto batch = co_await gen.next()) {
32!
404
            if (batch->entries.empty()) continue;
4!
405

406
            // Write to each view (with optional filtering)
407
            for (std::size_t i = 0; i < views.size(); ++i) {
29✔
408
                AggregationBatch filtered_batch;
15✔
409
                if (views[i].query) {
15✔
410
                    filtered_batch = batch->filter(*views[i].query);
2!
411
                    if (filtered_batch.entries.empty()) continue;
2!
412
                } else {
2✔
413
                    filtered_batch = *batch;
13!
414
                }
415

416
                auto arrow_result = filtered_batch.to_arrow();
15!
417
                if (!arrow_result.valid()) continue;
15!
418

419
                int rc = co_await writers[i].write_batch(arrow_result);
20!
420
                if (rc != 0) {
5!
421
                    util.unbind_context();
×
422
                    out->error =
423
                        "Failed to write batch for view: " + views[i].name;
×
424
                    co_return;
425
                }
426
            }
5✔
427
        }
18!
428

429
        util.unbind_context();
4!
430

431
        // Close writers and collect stats
432
        for (std::size_t i = 0; i < views.size(); ++i) {
29✔
433
            auto stats = co_await writers[i].close();
20!
434
            out->view_stats[views[i].name] = std::move(stats);
5!
435
            out->total_rows += out->view_stats[views[i].name].total_rows;
5!
436
            out->total_bytes +=
5✔
437
                out->view_stats[views[i].name].total_uncompressed_bytes;
5!
438
        }
5✔
439
    } catch (const std::exception &e) {
34!
440
        out->error = e.what();
×
441
    }
×
442
}
102!
443

444
static PyObject *Aggregator_write_arrow(AggregatorObject *self, PyObject *args,
8✔
445
                                        PyObject *kwds) {
446
    static const char *kwlist[] = {"directory",
447
                                   "path",
448
                                   "time_interval_ms",
449
                                   "group_keys",
450
                                   "categories",
451
                                   "names",
452
                                   "index_dir",
453
                                   "checkpoint_size",
454
                                   "force_rebuild",
455
                                   "parallelism",
456
                                   "event_batch_size",
457
                                   "custom_metric_fields",
458
                                   "compute_percentiles",
459
                                   "views",
460
                                   "chunk_size_mb",
461
                                   "compression",
462
                                   NULL};
463

464
    const char *directory = NULL;
8✔
465
    const char *output_path = NULL;
8✔
466
    double time_interval_ms = 5000.0;
8✔
467
    PyObject *group_keys_obj = Py_None;
8✔
468
    PyObject *categories_obj = Py_None;
8✔
469
    PyObject *names_obj = Py_None;
8✔
470
    const char *index_dir = "";
8✔
471
    Py_ssize_t checkpoint_size = static_cast<Py_ssize_t>(
8✔
472
        dftracer::utils::constants::indexer::DEFAULT_CHECKPOINT_SIZE);
473
    int force_rebuild = 0;
8✔
474
    Py_ssize_t parallelism = 0;
8✔
475
    Py_ssize_t event_batch_size = 10000;
8✔
476
    PyObject *custom_metrics_obj = Py_None;
8✔
477
    int compute_percentiles = 0;
8✔
478
    PyObject *views_obj = Py_None;
8✔
479
    int chunk_size_mb = 32;
8✔
480
    const char *compression_str = "zstd";
8✔
481

482
    if (!PyArg_ParseTupleAndKeywords(
8!
483
            args, kwds, "ss|dOOOsnpnnOpOis", (char **)kwlist, &directory,
4✔
484
            &output_path, &time_interval_ms, &group_keys_obj, &categories_obj,
485
            &names_obj, &index_dir, &checkpoint_size, &force_rebuild,
486
            &parallelism, &event_batch_size, &custom_metrics_obj,
487
            &compute_percentiles, &views_obj, &chunk_size_mb, &compression_str))
488
        return NULL;
×
489

490
    // Parse views
491
    std::vector<AggregatorViewDef> views;
8✔
492
    if (views_obj && views_obj != Py_None) {
8!
493
        if (!PyList_Check(views_obj)) {
2!
494
            PyErr_SetString(PyExc_TypeError,
×
495
                            "views must be a list of dicts with 'name' and "
496
                            "optional 'query' keys");
497
            return NULL;
×
498
        }
499
        Py_ssize_t n = PyList_Size(views_obj);
2!
500
        for (Py_ssize_t i = 0; i < n; i++) {
6✔
501
            PyObject *item = PyList_GetItem(views_obj, i);
4!
502
            if (!PyDict_Check(item)) {
4!
503
                PyErr_SetString(PyExc_TypeError,
×
504
                                "each view must be a dict with 'name' key");
505
                return NULL;
×
506
            }
507
            AggregatorViewDef view;
4✔
508
            PyObject *name_obj = PyDict_GetItemString(item, "name");
4!
509
            if (!name_obj) {
4!
510
                PyErr_SetString(PyExc_ValueError,
×
511
                                "each view must have a 'name' key");
512
                return NULL;
×
513
            }
514
            const char *name_str = PyUnicode_AsUTF8(name_obj);
4!
515
            if (!name_str) return NULL;
4✔
516
            view.name = name_str;
4!
517

518
            PyObject *query_obj = PyDict_GetItemString(item, "query");
4!
519
            if (query_obj && query_obj != Py_None) {
4!
520
                const char *query_str = PyUnicode_AsUTF8(query_obj);
4!
521
                if (!query_str) return NULL;
4✔
522
                auto parsed = Query::from_string(query_str);
4!
523
                if (!parsed) {
4!
524
                    PyErr_Format(PyExc_ValueError,
×
525
                                 "Invalid query for view '%s': %s", name_str,
526
                                 parsed.error().format().c_str());
×
527
                    return NULL;
×
528
                }
529
                view.query = std::move(*parsed);
4!
530
            }
4✔
531
            views.push_back(std::move(view));
4!
532
        }
4✔
533
    }
1✔
534

535
    // Parse compression
536
    IpcCompression compression = IpcCompression::ZSTD;
8✔
537
    if (compression_str) {
8!
538
        std::string comp_lower(compression_str);
8!
539
        for (auto &c : comp_lower) c = std::tolower(c);
40!
540
        if (comp_lower == "none") {
8✔
541
            compression = IpcCompression::NONE;
2✔
542
        } else if (comp_lower == "zstd") {
7✔
543
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
544
            compression = IpcCompression::ZSTD;
6✔
545
#else
546
            PyErr_SetString(PyExc_ValueError, "ZSTD compression not available");
547
            return NULL;
548
#endif
549
        } else {
3✔
550
            PyErr_Format(PyExc_ValueError,
×
551
                         "Unknown compression: %s (use 'none' or 'zstd')",
552
                         compression_str);
553
            return NULL;
×
554
        }
555
    }
8✔
556

557
    int64_t chunk_size_bytes =
8✔
558
        static_cast<int64_t>(chunk_size_mb) * 1024 * 1024;
8✔
559

560
    // Parse group_keys
561
    std::vector<std::string> group_keys;
8✔
562
    if (group_keys_obj && group_keys_obj != Py_None) {
8!
563
        if (!PyList_Check(group_keys_obj)) {
×
564
            PyErr_SetString(PyExc_TypeError,
×
565
                            "group_keys must be a list of str");
566
            return NULL;
×
567
        }
568
        Py_ssize_t n = PyList_Size(group_keys_obj);
×
569
        for (Py_ssize_t i = 0; i < n; i++) {
×
570
            const char *s = PyUnicode_AsUTF8(PyList_GetItem(group_keys_obj, i));
×
571
            if (!s) return NULL;
×
572
            group_keys.emplace_back(s);
×
573
        }
574
    }
575

576
    // Parse custom_metric_fields
577
    std::vector<std::string> custom_metrics;
8✔
578
    if (custom_metrics_obj && custom_metrics_obj != Py_None) {
8!
579
        if (!PyList_Check(custom_metrics_obj)) {
×
580
            PyErr_SetString(PyExc_TypeError,
×
581
                            "custom_metric_fields must be a list of str");
582
            return NULL;
×
583
        }
584
        Py_ssize_t n = PyList_Size(custom_metrics_obj);
×
585
        for (Py_ssize_t i = 0; i < n; i++) {
×
586
            const char *s =
587
                PyUnicode_AsUTF8(PyList_GetItem(custom_metrics_obj, i));
×
588
            if (!s) return NULL;
×
589
            custom_metrics.emplace_back(s);
×
590
        }
591
    }
592

593
    AggregatorInput input;
8!
594
    input.directory = directory;
8!
595
    input.config.time_interval_us =
8✔
596
        static_cast<std::uint64_t>(time_interval_ms * 1000.0);
8✔
597
    input.config.extra_group_keys = std::move(group_keys);
8✔
598
    input.config.custom_metric_fields = std::move(custom_metrics);
8✔
599
    input.config.compute_percentiles = compute_percentiles != 0;
8✔
600
    input.index_dir = index_dir;
8!
601
    input.checkpoint_size = static_cast<std::size_t>(checkpoint_size);
8✔
602
    input.force_rebuild = force_rebuild != 0;
8✔
603
    input.parallelism = static_cast<std::size_t>(parallelism);
8✔
604
    input.event_batch_size = static_cast<std::size_t>(event_batch_size);
8✔
605

606
    std::string output_path_str(output_path);
8!
607
    AggregatorWriteArrowResult result;
8✔
608
    auto *rp = &result;
8✔
609

610
    if (!run_blocking([&] {
12!
611
            Runtime *rt = resolve_runtime(self);
8✔
612
            rt->submit(run_coro_scope(
32!
613
                           rt->executor(), run_aggregator_write_arrow, rp,
8✔
614
                           std::move(input), output_path_str, std::move(views),
8!
615
                           chunk_size_bytes, compression),
8✔
616
                       "aggregator_write_arrow")
4!
617
                .get();
8!
618
        })) {
8✔
619
        return NULL;
×
620
    }
621

622
    if (!result.error.empty()) {
8!
623
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
624
        return NULL;
×
625
    }
626

627
    // Build result dict
628
    PyObject *dict = PyDict_New();
8!
629
    if (!dict) return NULL;
8✔
630

631
    PyObject *views_dict = PyDict_New();
8!
632
    if (!views_dict) {
8!
633
        Py_DECREF(dict);
×
634
        return NULL;
×
635
    }
636

637
    for (const auto &[view_name, view_stats] : result.view_stats) {
18!
638
        PyObject *view_dict = PyDict_New();
10!
639
        if (!view_dict) {
10!
640
            Py_DECREF(views_dict);
×
641
            Py_DECREF(dict);
×
642
            return NULL;
×
643
        }
644

645
        PyObject *files_list = PyList_New(0);
10!
646
        if (!files_list) {
10!
647
            Py_DECREF(view_dict);
×
648
            Py_DECREF(views_dict);
×
649
            Py_DECREF(dict);
×
650
            return NULL;
×
651
        }
652

653
        for (const auto &f : view_stats.files) {
20✔
654
            PyObject *file_str = PyUnicode_FromString(f.c_str());
10!
655
            if (!file_str || PyList_Append(files_list, file_str) < 0) {
10!
656
                Py_XDECREF(file_str);
×
657
                Py_DECREF(files_list);
×
658
                Py_DECREF(view_dict);
×
659
                Py_DECREF(views_dict);
×
660
                Py_DECREF(dict);
×
661
                return NULL;
×
662
            }
663
            Py_DECREF(file_str);
5!
664
        }
665

666
        PyDict_SetItemString(view_dict, "files", files_list);
10!
667
        dict_set_steal(view_dict, "rows",
10!
668
                       PyLong_FromLongLong(view_stats.total_rows));
10!
669
        dict_set_steal(
10!
670
            view_dict, "bytes",
5✔
671
            PyLong_FromLongLong(view_stats.total_uncompressed_bytes));
10!
672
        Py_DECREF(files_list);
5!
673

674
        PyObject *key = PyUnicode_FromString(view_name.c_str());
10!
675
        PyDict_SetItem(views_dict, key, view_dict);
10!
676
        Py_DECREF(key);
5!
677
        Py_DECREF(view_dict);
5!
678
    }
679

680
    PyDict_SetItemString(dict, "views", views_dict);
8!
681
    dict_set_steal(dict, "total_rows", PyLong_FromLongLong(result.total_rows));
8!
682
    dict_set_steal(dict, "total_bytes",
8!
683
                   PyLong_FromLongLong(result.total_bytes));
8!
684
    Py_DECREF(views_dict);
4!
685

686
    return dict;
8✔
687
}
8✔
688

689
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
690

691
static PyObject *Aggregator_call(PyObject *self, PyObject *args,
2✔
692
                                 PyObject *kwds) {
693
    return Aggregator_process((AggregatorObject *)self, args, kwds);
2✔
694
}
695

696
static PyMethodDef Aggregator_methods[] = {
697
    {"process", (PyCFunction)Aggregator_process, METH_VARARGS | METH_KEYWORDS,
698
     "process(directory, time_interval_ms=5000.0, group_keys=None,\n"
699
     "        categories=None, names=None, index_dir='',\n"
700
     "        checkpoint_size=33554432, force_rebuild=False,\n"
701
     "        parallelism=0, event_batch_size=10000,\n"
702
     "        custom_metric_fields=None, compute_percentiles=False)\n"
703
     "--\n"
704
     "\n"
705
     "Run aggregation pipeline, return materialized ArrowTable.\n"
706
     "\n"
707
     "Uses parallel, RocksDB-backed, fused indexing and aggregation.\n"
708
     "\n"
709
     "Args:\n"
710
     "    directory (str): Directory containing .pfw/.pfw.gz files.\n"
711
     "    time_interval_ms (float): Time bucket in milliseconds (default "
712
     "5000).\n"
713
     "    group_keys (list[str] or None): Extra grouping dims (default None).\n"
714
     "    categories (list[str] or None): Category filter (default None).\n"
715
     "    names (list[str] or None): Name filter (default None).\n"
716
     "    index_dir (str): Directory for .dftindex stores (default '').\n"
717
     "    checkpoint_size (int): Checkpoint size (default 33554432).\n"
718
     "    force_rebuild (bool): Force index rebuild (default False).\n"
719
     "    parallelism (int): Number of parallel workers (0 = all cores).\n"
720
     "    event_batch_size (int): Entries per batch (default 10000).\n"
721
     "    custom_metric_fields (list[str] or None): Extra numeric args\n"
722
     "        fields to aggregate into ``*_total``/``*_min``/``*_max``/\n"
723
     "        ``*_mean``/``*_std`` columns (default None).\n"
724
     "    compute_percentiles (bool): Enable percentile sketch collection\n"
725
     "        during aggregation (default False).\n"
726
     "\n"
727
     "Returns:\n"
728
     "    ArrowTable: Aggregated results.\n"},
729
    {"iter_arrow", (PyCFunction)Aggregator_iter_arrow,
730
     METH_VARARGS | METH_KEYWORDS,
731
     "iter_arrow(directory, time_interval_ms=5000.0, group_keys=None,\n"
732
     "           categories=None, names=None, index_dir='',\n"
733
     "           checkpoint_size=33554432, force_rebuild=False,\n"
734
     "           parallelism=0, event_batch_size=10000,\n"
735
     "           custom_metric_fields=None, compute_percentiles=False,\n"
736
     "           buffer_size=8)\n"
737
     "--\n"
738
     "\n"
739
     "Run aggregation pipeline, stream Arrow batches.\n"
740
     "\n"
741
     "Returns immediately with a streaming iterator. Batches are produced\n"
742
     "in the background with a bounded buffer. GIL is released while waiting\n"
743
     "for the next batch, allowing other Python threads to run.\n"
744
     "\n"
745
     "Uses parallel, RocksDB-backed, fused indexing and aggregation.\n"
746
     "\n"
747
     "Args:\n"
748
     "    directory (str): Directory containing .pfw/.pfw.gz files.\n"
749
     "    time_interval_ms (float): Time bucket in milliseconds (default "
750
     "5000).\n"
751
     "    group_keys (list[str] or None): Extra grouping dims (default None).\n"
752
     "    categories (list[str] or None): Category filter (default None).\n"
753
     "    names (list[str] or None): Name filter (default None).\n"
754
     "    index_dir (str): Directory for .dftindex stores (default '').\n"
755
     "    checkpoint_size (int): Checkpoint size (default 33554432).\n"
756
     "    force_rebuild (bool): Force index rebuild (default False).\n"
757
     "    parallelism (int): Number of parallel workers (0 = all cores).\n"
758
     "    event_batch_size (int): Entries per batch (default 10000).\n"
759
     "    custom_metric_fields (list[str] or None): Extra numeric args\n"
760
     "        fields to aggregate into ``*_total``/``*_min``/``*_max``/\n"
761
     "        ``*_mean``/``*_std`` columns (default None).\n"
762
     "    compute_percentiles (bool): Enable percentile sketch collection\n"
763
     "        during aggregation (default False).\n"
764
     "    buffer_size (int): Max batches to buffer (default 8).\n"
765
     "\n"
766
     "Returns:\n"
767
     "    _ArrowStreamingIterator: Streaming iterator yielding Arrow record\n"
768
     "        batches. Supports cancel() to stop early.\n"},
769
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
770
    {"write_arrow", (PyCFunction)Aggregator_write_arrow,
771
     METH_VARARGS | METH_KEYWORDS,
772
     "write_arrow(directory, path, time_interval_ms=5000.0, ..., views=None)\n"
773
     "--\n"
774
     "\n"
775
     "Run aggregation and write results to Arrow IPC files with optional "
776
     "views.\n"
777
     "\n"
778
     "Views allow filtering aggregated entries before writing. Each view\n"
779
     "writes to a separate subdirectory. Query syntax supports: cat, name,\n"
780
     "pid, tid, hhash, fhash, time_bucket, extra group keys, and aggregation\n"
781
     "metrics (count, dur_total, dur_min, dur_max, size_total, etc.).\n"
782
     "\n"
783
     "Args:\n"
784
     "    directory (str): Directory containing .pfw/.pfw.gz files.\n"
785
     "    path (str): Output directory for Arrow files.\n"
786
     "    time_interval_ms (float): Time bucket in milliseconds.\n"
787
     "    group_keys (list[str] or None): Extra grouping dims.\n"
788
     "    categories (list[str] or None): Category filter.\n"
789
     "    names (list[str] or None): Name filter.\n"
790
     "    index_dir (str): Directory for .dftindex stores.\n"
791
     "    checkpoint_size (int): Checkpoint size.\n"
792
     "    force_rebuild (bool): Force index rebuild.\n"
793
     "    parallelism (int): Number of parallel workers.\n"
794
     "    event_batch_size (int): Entries per batch.\n"
795
     "    custom_metric_fields (list[str] or None): Extra numeric fields.\n"
796
     "    compute_percentiles (bool): Enable percentile collection.\n"
797
     "    views (list[dict] or None): View definitions, each with 'name' and\n"
798
     "        optional 'query' keys. If None, writes all entries to path.\n"
799
     "        Example: [{'name': 'io', 'query': 'cat == \"POSIX\"'}]\n"
800
     "    chunk_size_mb (int): Max uncompressed MB per file (default 32).\n"
801
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
802
     "\n"
803
     "Returns:\n"
804
     "    dict: Statistics with 'views' (per-view stats), 'total_rows',\n"
805
     "        'total_bytes'. Each view has 'files', 'rows', 'bytes'.\n"},
806
#endif
807
    {NULL}};
808

809
PyTypeObject AggregatorType = {
810
    PyVarObject_HEAD_INIT(
811
        NULL, 0) "dftracer_utils_ext.AggregatorUtility", /* tp_name */
812
    sizeof(AggregatorObject),                            /* tp_basicsize */
813
    0,                                                   /* tp_itemsize */
814
    (destructor)Aggregator_dealloc,                      /* tp_dealloc */
815
    0,                                        /* tp_vectorcall_offset */
816
    0,                                        /* tp_getattr */
817
    0,                                        /* tp_setattr */
818
    0,                                        /* tp_as_async */
819
    0,                                        /* tp_repr */
820
    0,                                        /* tp_as_number */
821
    0,                                        /* tp_as_sequence */
822
    0,                                        /* tp_as_mapping */
823
    0,                                        /* tp_hash */
824
    Aggregator_call,                          /* tp_call */
825
    0,                                        /* tp_str */
826
    0,                                        /* tp_getattro */
827
    0,                                        /* tp_setattro */
828
    0,                                        /* tp_as_buffer */
829
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
830
    "AggregatorUtility(runtime: Runtime | None = None)\n"
831
    "--\n\n"
832
    "High-level aggregation pipeline for DFTracer trace files.\n\n"
833
    "Args:\n"
834
    "    runtime (Runtime or None): Runtime for thread pool control.\n"
835
    "        If None, uses the default global Runtime.\n\n"
836
    "process(directory, time_interval_ms=5000.0, ...) -> ArrowTable\n"
837
    "    Run aggregation and return a materialized Arrow table.\n\n"
838
    "iter_arrow(directory, time_interval_ms=5000.0, ...) -> "
839
    "Iterator[ArrowBatch]\n"
840
    "    Run aggregation and stream Arrow batches.\n", /* tp_doc */
841
    0,                                                 /* tp_traverse */
842
    0,                                                 /* tp_clear */
843
    0,                                                 /* tp_richcompare */
844
    0,                                                 /* tp_weaklistoffset */
845
    0,                                                 /* tp_iter */
846
    0,                                                 /* tp_iternext */
847
    Aggregator_methods,                                /* tp_methods */
848
    0,                                                 /* tp_members */
849
    0,                                                 /* tp_getset */
850
    0,                                                 /* tp_base */
851
    0,                                                 /* tp_dict */
852
    0,                                                 /* tp_descr_get */
853
    0,                                                 /* tp_descr_set */
854
    0,                                                 /* tp_dictoffset */
855
    (initproc)Aggregator_init,                         /* tp_init */
856
    0,                                                 /* tp_alloc */
857
    Aggregator_new,                                    /* tp_new */
858
};
859

860
int init_aggregator(PyObject *m) {
2✔
861
    if (register_type(m, &AggregatorType, "AggregatorUtility") < 0) return -1;
2✔
862

863
    return 0;
2✔
864
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc