• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28521653886

01 Jul 2026 01:36PM UTC coverage: 50.92% (-1.4%) from 52.278%
28521653886

Pull #83

github

web-flow
Merge 9bdedb1e9 into 2efed6649
Pull Request #83: refactor and improve code QoL

31893 of 80049 branches covered (39.84%)

Branch coverage included in aggregate %.

789 of 1613 new or added lines in 87 files covered. (48.92%)

5007 existing lines in 181 files now uncovered.

32812 of 47024 relevant lines covered (69.78%)

9905.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.74
/src/dftracer/utils/python/utilities/aggregator.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <dftracer/utils/core/common/config.h>
3
#include <dftracer/utils/core/common/constants.h>
4
#include <dftracer/utils/core/common/memory_budget.h>
5
#include <dftracer/utils/core/coro/task.h>
6
#include <dftracer/utils/core/runtime.h>
7
#include <dftracer/utils/python/arrow_helpers.h>
8
#include <dftracer/utils/python/py_dict_helpers.h>
9
#include <dftracer/utils/python/py_runtime_mixin.h>
10
#include <dftracer/utils/python/py_type_helpers.h>
11
#include <dftracer/utils/python/runtime.h>
12
#include <dftracer/utils/python/trace_reader_iterator.h>
13
#include <dftracer/utils/python/utilities/aggregator.h>
14
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregator_utility.h>
15

16
#ifdef DFTRACER_UTILS_ENABLE_ARROW
17
#include <dftracer/utils/python/batch_byte_size.h>
18
#include <dftracer/utils/python/streaming_iterator.h>
19
#endif
20
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
21
#include <dftracer/utils/utilities/common/arrow/partition_router.h>
22
#include <dftracer/utils/utilities/common/arrow/partition_writer.h>
23
#include <dftracer/utils/utilities/common/query/query.h>
24
#endif
25

26
#include <cctype>
27
#include <memory>
28
#include <optional>
29
#include <string>
30
#include <vector>
31

32
using dftracer::utils::CoroScope;
33
using dftracer::utils::Runtime;
34
using dftracer::utils::coro::CoroTask;
35
using namespace dftracer::utils::utilities::composites::dft::aggregators;
36

37
using dftracer::utils::python::wrap_arrow_result;
38
using dftracer::utils::python::wrap_arrow_table;
39

40
#ifdef DFTRACER_UTILS_ENABLE_ARROW
41
using dftracer::utils::python::ArrowStreamingIteratorObject;
42
using dftracer::utils::python::ArrowStreamingIteratorType;
43
using dftracer::utils::python::StreamingState;
44
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
45
#endif
46
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
47
using dftracer::utils::utilities::common::arrow::IpcCompression;
48
using dftracer::utils::utilities::common::arrow::PartitionWriter;
49
using dftracer::utils::utilities::common::arrow::PartitionWriteStats;
50
using dftracer::utils::utilities::common::query::Query;
51
#endif
52

53
static Runtime *get_runtime(AggregatorObject *self) {
22✔
54
    return resolve_runtime(self);
22✔
55
}
56

57
static void Aggregator_dealloc(AggregatorObject *self) {
22✔
58
    runtime_backed_dealloc(self);
22✔
59
}
22✔
60

61
static PyObject *Aggregator_new(PyTypeObject *type, PyObject *args,
22✔
62
                                PyObject *kwds) {
63
    return runtime_backed_new<AggregatorObject>(type, args, kwds);
22✔
64
}
65

66
static int Aggregator_init(AggregatorObject *self, PyObject *args,
22✔
67
                           PyObject *kwds) {
68
    return runtime_backed_init(self, args, kwds);
22✔
69
}
70

71
// ---------------------------------------------------------------------------
72
// Helpers
73
// ---------------------------------------------------------------------------
74

75
static int parse_str_list(PyObject *obj, std::vector<std::string> &out,
36✔
76
                          const char *param_name) {
77
    if (!obj || obj == Py_None) return 0;
36!
78
    if (!PyList_Check(obj)) {
4!
79
        PyErr_Format(PyExc_TypeError, "%s must be a list of str", param_name);
×
80
        return -1;
×
81
    }
82
    Py_ssize_t n = PyList_Size(obj);
4✔
83
    for (Py_ssize_t i = 0; i < n; i++) {
10✔
84
        const char *s = PyUnicode_AsUTF8(PyList_GetItem(obj, i));
6✔
85
        if (!s) return -1;
6!
86
        out.emplace_back(s);
6✔
87
    }
6✔
88
    return 0;
4✔
89
}
36✔
90

91
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
92
// Parse a view query string into an optional Query
93
static int parse_view_query(PyObject *query_obj, std::optional<Query> &out) {
18✔
94
    if (!query_obj || query_obj == Py_None) {
18!
95
        out = std::nullopt;
16✔
96
        return 0;
16✔
97
    }
98
    const char *query_str = PyUnicode_AsUTF8(query_obj);
2✔
99
    if (!query_str) return -1;
2!
100
    auto parsed = Query::from_string(query_str);
2✔
101
    if (!parsed) {
2!
102
        PyErr_Format(PyExc_ValueError, "Invalid query: %s",
×
103
                     parsed.error().format().c_str());
×
104
        return -1;
×
105
    }
106
    out = std::move(*parsed);
2!
107
    return 0;
2✔
108
}
18✔
109
#endif
110

111
static int parse_aggregator_args(PyObject *args, PyObject *kwds,
18✔
112
                                 AggregatorInput &input,
113
                                 std::size_t *buffer_size_out = nullptr,
114
                                 std::optional<Query> *query_out = nullptr) {
115
    static const char *kwlist[] = {"directory",
116
                                   "time_interval_ms",
117
                                   "group_keys",
118
                                   "categories",
119
                                   "names",
120
                                   "index_dir",
121
                                   "checkpoint_size",
122
                                   "force_rebuild",
123
                                   "parallelism",
124
                                   "event_batch_size",
125
                                   "custom_metric_fields",
126
                                   "compute_percentiles",
127
                                   "buffer_size",
128
                                   "query",
129
                                   NULL};
130

131
    const char *directory = NULL;
18✔
132
    double time_interval_ms = 5000.0;
18✔
133
    PyObject *group_keys_obj = Py_None;
18✔
134
    PyObject *categories_obj = Py_None;
18✔
135
    PyObject *names_obj = Py_None;
18✔
136
    const char *index_dir = "";
18✔
137
    Py_ssize_t checkpoint_size = static_cast<Py_ssize_t>(
18✔
138
        dftracer::utils::constants::indexer::DEFAULT_CHECKPOINT_SIZE);
139
    int force_rebuild = 0;
18✔
140
    Py_ssize_t parallelism = 0;
18✔
141
    Py_ssize_t event_batch_size = 10000;
18✔
142
    PyObject *custom_metrics_obj = Py_None;
18✔
143
    int compute_percentiles = 0;
18✔
144
    Py_ssize_t buffer_size = 8;
18✔
145
    PyObject *query_obj = Py_None;
18✔
146

147
    if (!PyArg_ParseTupleAndKeywords(
18!
148
            args, kwds, "s|dOOOsnpnnOpnO", (char **)kwlist, &directory,
18✔
149
            &time_interval_ms, &group_keys_obj, &categories_obj, &names_obj,
150
            &index_dir, &checkpoint_size, &force_rebuild, &parallelism,
151
            &event_batch_size, &custom_metrics_obj, &compute_percentiles,
152
            &buffer_size, &query_obj))
153
        return -1;
×
154

155
    if (buffer_size_out) {
18✔
156
        *buffer_size_out = static_cast<std::size_t>(buffer_size);
3✔
157
    }
3✔
158

159
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
160
    if (query_out) {
18!
161
        if (parse_view_query(query_obj, *query_out) < 0) return -1;
18!
162
    }
18✔
163
#else
164
    (void)query_obj;
165
#endif
166

167
    input.directory = directory;
18✔
168
    input.config.time_interval_us =
18✔
169
        static_cast<std::uint64_t>(time_interval_ms * 1000.0);
18✔
170
    input.index_dir = index_dir;
18✔
171
    input.checkpoint_size = static_cast<std::size_t>(checkpoint_size);
18✔
172
    input.force_rebuild = force_rebuild != 0;
18✔
173
    input.parallelism = static_cast<std::size_t>(parallelism);
18✔
174
    input.event_batch_size = static_cast<std::size_t>(event_batch_size);
18✔
175
    input.config.compute_percentiles = compute_percentiles != 0;
18✔
176

177
    if (parse_str_list(group_keys_obj, input.config.extra_group_keys,
18✔
178
                       "group_keys") < 0)
18!
179
        return -1;
×
180
    if (parse_str_list(custom_metrics_obj, input.config.custom_metric_fields,
18✔
181
                       "custom_metric_fields") < 0)
18!
182
        return -1;
×
183

184
    return 0;
18✔
185
}
18✔
186

187
#ifdef DFTRACER_UTILS_ENABLE_ARROW
188
static bool run_aggregator_pipeline(
15✔
189
    AggregatorObject *self, const AggregatorInput &input,
190
    std::vector<ArrowExportResult> &results,
191
    const std::optional<Query> *query = nullptr) {
192
    auto *rp = &results;
15✔
193
    AggregatorInput input_copy = input;
15✔
194
    std::optional<Query> query_copy;
15✔
195
    if (query) query_copy = *query;
15!
196

197
    return run_blocking([&] {
30!
198
        Runtime *rt = get_runtime(self);
15✔
199
        rt->submit(run_coro_scope(
60!
200
                       rt->executor(),
15✔
201
                       [](CoroScope &scope, std::vector<ArrowExportResult> *out,
199!
202
                          AggregatorInput input,
203
                          std::optional<Query> query) -> CoroTask<void> {
15!
204
                           AggregatorUtility util;
15!
205
                           util.bind_context(scope);
15!
206
                           try {
207
                               auto gen = util.process(input);
15!
208
                               while (auto batch = co_await gen.next()) {
139!
209
                                   if (batch->entries.empty()) continue;
16!
210
                                   AggregationBatch filtered;
16✔
211
                                   if (query) {
16✔
212
                                       filtered = batch->filter(*query);
1!
213
                                       if (filtered.entries.empty()) continue;
1!
214
                                   } else {
1✔
215
                                       filtered = std::move(*batch);
15✔
216
                                   }
217
                                   auto arrow_result = filtered.to_arrow();
16!
218
                                   if (!arrow_result.valid()) continue;
16!
219
                                   out->push_back(std::move(arrow_result));
16!
220
                               }
31!
221
                               util.unbind_context();
15!
222
                           } catch (...) {
77✔
UNCOV
223
                               util.unbind_context();
×
UNCOV
224
                               throw;
×
UNCOV
225
                           }
×
226
                       },
201✔
227
                       rp, std::move(input_copy), std::move(query_copy)),
15✔
228
                   "aggregator")
15!
229
            .get();
15!
230
    });
15✔
231
}
15✔
232
#endif  // DFTRACER_UTILS_ENABLE_ARROW
233

234
#ifdef DFTRACER_UTILS_ENABLE_ARROW
235

236
static CoroTask<void> run_aggregator_stream(
47!
237
    CoroScope &scope, std::shared_ptr<StreamingState<ArrowExportResult>> state,
238
    AggregatorInput input, std::optional<Query> query) {
3!
239
    if (state->cancelled()) {
3!
UNCOV
240
        state->complete();
×
241
        co_return;
3✔
242
    }
243

244
    try {
245
        AggregatorUtility util;
3!
246
        util.bind_context(scope);
3!
247
        auto gen = util.process(input);
3!
248

249
        while (auto batch = co_await gen.next()) {
32!
250
            if (state->cancelled()) break;
5!
251
            if (batch->entries.empty()) continue;
5!
252

253
            AggregationBatch filtered;
5✔
254
            if (query) {
5✔
255
                filtered = batch->filter(*query);
1!
256
                if (filtered.entries.empty()) continue;
1!
257
            } else {
1✔
258
                filtered = std::move(*batch);
4✔
259
            }
260

261
            auto arrow_result = filtered.to_arrow();
5!
262
            if (!arrow_result.valid()) continue;
5!
263

264
            auto result_bytes =
5✔
265
                dftracer::utils::python::byte_size(arrow_result);
5!
266
            if (!state->push(std::move(arrow_result), result_bytes)) {
5!
UNCOV
267
                break;
×
268
            }
269
        }
8!
270

271
        util.unbind_context();
3!
272
        state->complete();
3!
273
    } catch (const std::exception &e) {
19!
UNCOV
274
        state->fail(std::current_exception());
×
UNCOV
275
    } catch (...) {
×
UNCOV
276
        state->fail(std::current_exception());
×
UNCOV
277
    }
×
278
}
32✔
279

280
#endif  // DFTRACER_UTILS_ENABLE_ARROW
281

282
// ---------------------------------------------------------------------------
283
// process() - returns ArrowTable (materialized)
284
// ---------------------------------------------------------------------------
285

286
static PyObject *Aggregator_process(AggregatorObject *self, PyObject *args,
15✔
287
                                    PyObject *kwds) {
288
    AggregatorInput input;
15✔
289
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
290
    std::optional<Query> query;
15✔
291
    if (parse_aggregator_args(args, kwds, input, nullptr, &query) < 0)
15!
292
        return NULL;
×
293
#else
294
    if (parse_aggregator_args(args, kwds, input) < 0) return NULL;
295
#endif
296

297
#ifdef DFTRACER_UTILS_ENABLE_ARROW
298
    std::vector<ArrowExportResult> results;
15✔
299
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
300
    if (!run_aggregator_pipeline(self, input, results, &query)) {
15!
301
#else
302
    if (!run_aggregator_pipeline(self, input, results)) {
303
#endif
304
        return NULL;
×
305
    }
306

307
    PyObject *batch_list = PyList_New(0);
15!
308
    if (!batch_list) return NULL;
15!
309

310
    for (auto &result : results) {
31✔
311
        PyObject *cap = wrap_arrow_result(std::move(result));
16!
312
        if (!cap) {
16!
UNCOV
313
            Py_DECREF(batch_list);
×
314
            return NULL;
×
315
        }
316
        int rc = PyList_Append(batch_list, cap);
16!
317
        Py_DECREF(cap);
16!
318
        if (rc < 0) {
16!
UNCOV
319
            Py_DECREF(batch_list);
×
320
            return NULL;
×
321
        }
322
    }
323

324
    return wrap_arrow_table(batch_list);
15!
325
#else
326
    PyErr_SetString(PyExc_RuntimeError,
327
                    "dftracer-utils was built without Arrow support");
328
    return NULL;
329
#endif
330
}
15✔
331

332
// ---------------------------------------------------------------------------
333
// iter_arrow() - returns true streaming iterator
334
// ---------------------------------------------------------------------------
335

336
static PyObject *Aggregator_iter_arrow(AggregatorObject *self, PyObject *args,
3✔
337
                                       PyObject *kwds) {
338
    AggregatorInput input;
3✔
339
    std::size_t buffer_size = 8;
3✔
340
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
341
    std::optional<Query> query;
3✔
342
    if (parse_aggregator_args(args, kwds, input, &buffer_size, &query) < 0)
3!
343
        return NULL;
×
344
#else
345
    if (parse_aggregator_args(args, kwds, input, &buffer_size) < 0) return NULL;
346
#endif
347

348
#ifdef DFTRACER_UTILS_ENABLE_ARROW
349
    auto state = std::make_shared<StreamingState<ArrowExportResult>>(
3!
350
        dftracer::utils::compute_memory_budget(0));
3!
351

352
    ArrowStreamingIteratorObject *iter_obj =
3✔
353
        (ArrowStreamingIteratorObject *)ArrowStreamingIteratorType.tp_new(
3!
354
            &ArrowStreamingIteratorType, NULL, NULL);
355
    if (!iter_obj) {
3!
356
        return NULL;
×
357
    }
358

359
    iter_obj->cpp_state->state = state;
3✔
360
    iter_obj->cpp_state->pull_next =
3!
361
        [state]() -> std::optional<ArrowExportResult> { return state->pull(); };
11✔
362
    iter_obj->cpp_state->get_error = [state]() -> std::exception_ptr {
6!
363
        return state->error();
3✔
364
    };
365
    iter_obj->cpp_state->cancel = [state]() { state->cancel(); };
6!
366

367
    Runtime *rt = get_runtime(self);
3!
368
    AggregatorInput input_copy = input;
3!
369
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
370
    std::optional<Query> query_copy = std::move(query);
3✔
371
    Py_BEGIN_ALLOW_THREADS rt->submit(
6!
372
        run_coro_scope(rt->executor(), run_aggregator_stream, state,
6!
373
                       std::move(input_copy), std::move(query_copy)),
3✔
374
        "aggregator_stream");
3!
375
#else
376
    Py_BEGIN_ALLOW_THREADS rt->submit(
377
        run_coro_scope(rt->executor(), run_aggregator_stream, state,
378
                       std::move(input_copy), std::nullopt),
379
        "aggregator_stream");
380
#endif
381
    Py_END_ALLOW_THREADS
3!
382

383
        return (PyObject *)iter_obj;
3✔
384
#else
385
    PyErr_SetString(PyExc_RuntimeError,
386
                    "dftracer-utils was built without Arrow support");
387
    return NULL;
388
#endif
389
}
3✔
390

391
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
392

393
struct AggregatorViewDef {
394
    std::string name;
395
    std::optional<Query> query;
396
};
397

398
struct AggregatorWriteArrowResult {
4✔
399
    std::unordered_map<std::string, PartitionWriteStats> view_stats;
400
    int64_t total_rows = 0;
4✔
401
    int64_t total_bytes = 0;
4✔
402
    std::string error;
403
};
404

405
static CoroTask<void> run_aggregator_write_arrow(
40!
406
    CoroScope &scope, AggregatorWriteArrowResult *out, AggregatorInput input,
407
    std::string output_path, std::vector<AggregatorViewDef> views,
408
    int64_t chunk_size_bytes, IpcCompression compression) {
4!
409
    try {
410
        // If no views specified, create a default "all" view
411
        if (views.empty()) {
4✔
412
            views.push_back({"all", std::nullopt});
3!
413
        }
3✔
414

415
        // Open a writer for each view
416
        std::vector<PartitionWriter> writers(views.size());
4!
417
        for (std::size_t i = 0; i < views.size(); ++i) {
21✔
418
            std::string view_path = output_path;
11!
419
            if (views.size() > 1 || views[i].name != "all") {
11✔
420
                view_path = output_path + "/" + views[i].name;
18!
421
            }
2✔
422
            int rc = co_await writers[i].open(view_path, chunk_size_bytes,
39!
423
                                              compression);
15✔
424
            if (rc != 0) {
5!
UNCOV
425
                out->error = "Failed to open writer for view: " + views[i].name;
×
UNCOV
426
                co_return;
×
427
            }
428
        }
5✔
429

430
        AggregatorUtility util;
20✔
431
        util.bind_context(scope);
4✔
432
        auto gen = util.process(input);
20!
433

434
        while (auto batch = co_await gen.next()) {
32!
435
            if (batch->entries.empty()) continue;
4!
436

437
            // Write to each view (with optional filtering)
438
            for (std::size_t i = 0; i < views.size(); ++i) {
29✔
439
                AggregationBatch filtered_batch;
15✔
440
                if (views[i].query) {
15✔
441
                    filtered_batch = batch->filter(*views[i].query);
2!
442
                    if (filtered_batch.entries.empty()) continue;
2!
443
                } else {
2✔
444
                    filtered_batch = *batch;
13!
445
                }
446

447
                auto arrow_result = filtered_batch.to_arrow();
15!
448
                if (!arrow_result.valid()) continue;
15!
449

450
                int rc = co_await writers[i].write_batch(arrow_result);
20!
451
                if (rc != 0) {
5!
UNCOV
452
                    util.unbind_context();
×
UNCOV
453
                    out->error =
×
UNCOV
454
                        "Failed to write batch for view: " + views[i].name;
×
UNCOV
455
                    co_return;
×
456
                }
457
            }
5✔
458
        }
18!
459

460
        util.unbind_context();
4!
461

462
        // Close writers and collect stats
463
        for (std::size_t i = 0; i < views.size(); ++i) {
29✔
464
            auto stats = co_await writers[i].close();
20!
465
            out->view_stats[views[i].name] = std::move(stats);
5!
466
            out->total_rows += out->view_stats[views[i].name].total_rows;
5!
467
            out->total_bytes +=
5✔
468
                out->view_stats[views[i].name].total_uncompressed_bytes;
5!
469
        }
5✔
470
    } catch (const std::exception &e) {
34!
UNCOV
471
        out->error = e.what();
×
UNCOV
472
    }
×
473
}
94✔
474

475
static PyObject *Aggregator_write_arrow(AggregatorObject *self, PyObject *args,
4✔
476
                                        PyObject *kwds) {
477
    static const char *kwlist[] = {"directory",
478
                                   "path",
479
                                   "time_interval_ms",
480
                                   "group_keys",
481
                                   "categories",
482
                                   "names",
483
                                   "index_dir",
484
                                   "checkpoint_size",
485
                                   "force_rebuild",
486
                                   "parallelism",
487
                                   "event_batch_size",
488
                                   "custom_metric_fields",
489
                                   "compute_percentiles",
490
                                   "views",
491
                                   "chunk_size_mb",
492
                                   "compression",
493
                                   NULL};
494

495
    const char *directory = NULL;
4✔
496
    const char *output_path = NULL;
4✔
497
    double time_interval_ms = 5000.0;
4✔
498
    PyObject *group_keys_obj = Py_None;
4✔
499
    PyObject *categories_obj = Py_None;
4✔
500
    PyObject *names_obj = Py_None;
4✔
501
    const char *index_dir = "";
4✔
502
    Py_ssize_t checkpoint_size = static_cast<Py_ssize_t>(
4✔
503
        dftracer::utils::constants::indexer::DEFAULT_CHECKPOINT_SIZE);
504
    int force_rebuild = 0;
4✔
505
    Py_ssize_t parallelism = 0;
4✔
506
    Py_ssize_t event_batch_size = 10000;
4✔
507
    PyObject *custom_metrics_obj = Py_None;
4✔
508
    int compute_percentiles = 0;
4✔
509
    PyObject *views_obj = Py_None;
4✔
510
    int chunk_size_mb = 32;
4✔
511
    const char *compression_str = "zstd";
4✔
512

513
    if (!PyArg_ParseTupleAndKeywords(
4!
514
            args, kwds, "ss|dOOOsnpnnOpOis", (char **)kwlist, &directory,
4✔
515
            &output_path, &time_interval_ms, &group_keys_obj, &categories_obj,
516
            &names_obj, &index_dir, &checkpoint_size, &force_rebuild,
517
            &parallelism, &event_batch_size, &custom_metrics_obj,
518
            &compute_percentiles, &views_obj, &chunk_size_mb, &compression_str))
519
        return NULL;
×
520

521
    // Parse views
522
    std::vector<AggregatorViewDef> views;
4✔
523
    if (views_obj && views_obj != Py_None) {
4!
524
        if (!PyList_Check(views_obj)) {
1!
525
            PyErr_SetString(PyExc_TypeError,
×
526
                            "views must be a list of dicts with 'name' and "
527
                            "optional 'query' keys");
528
            return NULL;
×
529
        }
530
        Py_ssize_t n = PyList_Size(views_obj);
1!
531
        for (Py_ssize_t i = 0; i < n; i++) {
3✔
532
            PyObject *item = PyList_GetItem(views_obj, i);
2!
533
            if (!PyDict_Check(item)) {
2!
534
                PyErr_SetString(PyExc_TypeError,
×
535
                                "each view must be a dict with 'name' key");
536
                return NULL;
×
537
            }
538
            AggregatorViewDef view;
2✔
539
            PyObject *name_obj = PyDict_GetItemString(item, "name");
2!
540
            if (!name_obj) {
2!
541
                PyErr_SetString(PyExc_ValueError,
×
542
                                "each view must have a 'name' key");
543
                return NULL;
×
544
            }
545
            const char *name_str = PyUnicode_AsUTF8(name_obj);
2!
546
            if (!name_str) return NULL;
2!
547
            view.name = name_str;
2!
548

549
            PyObject *query_obj = PyDict_GetItemString(item, "query");
2!
550
            if (query_obj && query_obj != Py_None) {
2!
551
                const char *query_str = PyUnicode_AsUTF8(query_obj);
2!
552
                if (!query_str) return NULL;
2!
553
                auto parsed = Query::from_string(query_str);
2!
554
                if (!parsed) {
2!
555
                    PyErr_Format(PyExc_ValueError,
×
UNCOV
556
                                 "Invalid query for view '%s': %s", name_str,
×
557
                                 parsed.error().format().c_str());
×
558
                    return NULL;
×
559
                }
560
                view.query = std::move(*parsed);
2!
561
            }
2!
562
            views.push_back(std::move(view));
2!
563
        }
2!
564
    }
1✔
565

566
    // Parse compression
567
    IpcCompression compression = IpcCompression::ZSTD;
4✔
568
    if (compression_str) {
4!
569
        std::string comp_lower(compression_str);
4!
570
        for (auto &c : comp_lower) c = std::tolower(c);
20!
571
        if (comp_lower == "none") {
4✔
572
            compression = IpcCompression::NONE;
1✔
573
        } else if (comp_lower == "zstd") {
4!
574
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
575
            compression = IpcCompression::ZSTD;
3✔
576
#else
577
            PyErr_SetString(PyExc_ValueError, "ZSTD compression not available");
578
            return NULL;
579
#endif
580
        } else {
3✔
581
            PyErr_Format(PyExc_ValueError,
×
582
                         "Unknown compression: %s (use 'none' or 'zstd')",
UNCOV
583
                         compression_str);
×
584
            return NULL;
×
585
        }
586
    }
4!
587

588
    int64_t chunk_size_bytes =
4✔
589
        static_cast<int64_t>(chunk_size_mb) * 1024 * 1024;
4✔
590

591
    // Parse group_keys
592
    std::vector<std::string> group_keys;
4✔
593
    if (group_keys_obj && group_keys_obj != Py_None) {
4!
594
        if (!PyList_Check(group_keys_obj)) {
×
595
            PyErr_SetString(PyExc_TypeError,
×
596
                            "group_keys must be a list of str");
597
            return NULL;
×
598
        }
599
        Py_ssize_t n = PyList_Size(group_keys_obj);
×
600
        for (Py_ssize_t i = 0; i < n; i++) {
×
601
            const char *s = PyUnicode_AsUTF8(PyList_GetItem(group_keys_obj, i));
×
602
            if (!s) return NULL;
×
603
            group_keys.emplace_back(s);
×
UNCOV
604
        }
×
UNCOV
605
    }
×
606

607
    // Parse custom_metric_fields
608
    std::vector<std::string> custom_metrics;
4✔
609
    if (custom_metrics_obj && custom_metrics_obj != Py_None) {
4!
610
        if (!PyList_Check(custom_metrics_obj)) {
×
611
            PyErr_SetString(PyExc_TypeError,
×
612
                            "custom_metric_fields must be a list of str");
613
            return NULL;
×
614
        }
615
        Py_ssize_t n = PyList_Size(custom_metrics_obj);
×
616
        for (Py_ssize_t i = 0; i < n; i++) {
×
UNCOV
617
            const char *s =
×
618
                PyUnicode_AsUTF8(PyList_GetItem(custom_metrics_obj, i));
×
619
            if (!s) return NULL;
×
620
            custom_metrics.emplace_back(s);
×
UNCOV
621
        }
×
UNCOV
622
    }
×
623

624
    AggregatorInput input;
4!
625
    input.directory = directory;
4!
626
    input.config.time_interval_us =
4✔
627
        static_cast<std::uint64_t>(time_interval_ms * 1000.0);
4✔
628
    input.config.extra_group_keys = std::move(group_keys);
4✔
629
    input.config.custom_metric_fields = std::move(custom_metrics);
4✔
630
    input.config.compute_percentiles = compute_percentiles != 0;
4✔
631
    input.index_dir = index_dir;
4!
632
    input.checkpoint_size = static_cast<std::size_t>(checkpoint_size);
4✔
633
    input.force_rebuild = force_rebuild != 0;
4✔
634
    input.parallelism = static_cast<std::size_t>(parallelism);
4✔
635
    input.event_batch_size = static_cast<std::size_t>(event_batch_size);
4✔
636

637
    std::string output_path_str(output_path);
4!
638
    AggregatorWriteArrowResult result;
4✔
639
    auto *rp = &result;
4✔
640
    std::string error_msg;
4✔
641

642
    Py_BEGIN_ALLOW_THREADS try {
4!
643
        Runtime *rt = get_runtime(self);
4!
644
        rt->submit(
12!
645
              run_coro_scope(rt->executor(), run_aggregator_write_arrow, rp,
4!
646
                             std::move(input), output_path_str,
4!
647
                             std::move(views), chunk_size_bytes, compression),
4✔
648
              "aggregator_write_arrow")
4!
649
            .get();
4!
650
    } catch (const std::exception &e) {
4!
651
        error_msg = e.what();
×
652
    }
×
653
    Py_END_ALLOW_THREADS
4!
654

655
        if (!error_msg.empty()) {
4!
656
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
657
        return NULL;
×
658
    }
659

660
    if (!result.error.empty()) {
4!
661
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
662
        return NULL;
×
663
    }
664

665
    // Build result dict
666
    PyObject *dict = PyDict_New();
4!
667
    if (!dict) return NULL;
4!
668

669
    PyObject *views_dict = PyDict_New();
4!
670
    if (!views_dict) {
4!
UNCOV
671
        Py_DECREF(dict);
×
672
        return NULL;
×
673
    }
674

675
    for (const auto &[view_name, view_stats] : result.view_stats) {
9!
676
        PyObject *view_dict = PyDict_New();
5!
677
        if (!view_dict) {
5!
UNCOV
678
            Py_DECREF(views_dict);
×
UNCOV
679
            Py_DECREF(dict);
×
680
            return NULL;
×
681
        }
682

683
        PyObject *files_list = PyList_New(0);
5!
684
        if (!files_list) {
5!
UNCOV
685
            Py_DECREF(view_dict);
×
UNCOV
686
            Py_DECREF(views_dict);
×
UNCOV
687
            Py_DECREF(dict);
×
688
            return NULL;
×
689
        }
690

691
        for (const auto &f : view_stats.files) {
10✔
692
            PyObject *file_str = PyUnicode_FromString(f.c_str());
5!
693
            if (!file_str || PyList_Append(files_list, file_str) < 0) {
5!
694
                Py_XDECREF(file_str);
×
UNCOV
695
                Py_DECREF(files_list);
×
UNCOV
696
                Py_DECREF(view_dict);
×
UNCOV
697
                Py_DECREF(views_dict);
×
UNCOV
698
                Py_DECREF(dict);
×
699
                return NULL;
×
700
            }
701
            Py_DECREF(file_str);
5!
702
        }
703

704
        PyDict_SetItemString(view_dict, "files", files_list);
5!
705
        dict_set_steal(view_dict, "rows",
5!
706
                       PyLong_FromLongLong(view_stats.total_rows));
5!
707
        dict_set_steal(
5!
708
            view_dict, "bytes",
5✔
709
            PyLong_FromLongLong(view_stats.total_uncompressed_bytes));
5!
710
        Py_DECREF(files_list);
5!
711

712
        PyObject *key = PyUnicode_FromString(view_name.c_str());
5!
713
        PyDict_SetItem(views_dict, key, view_dict);
5!
714
        Py_DECREF(key);
5!
715
        Py_DECREF(view_dict);
5!
716
    }
717

718
    PyDict_SetItemString(dict, "views", views_dict);
4!
719
    dict_set_steal(dict, "total_rows", PyLong_FromLongLong(result.total_rows));
4!
720
    dict_set_steal(dict, "total_bytes",
4!
721
                   PyLong_FromLongLong(result.total_bytes));
4!
722
    Py_DECREF(views_dict);
4!
723

724
    return dict;
4✔
725
}
4✔
726

727
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
728

729
static PyObject *Aggregator_call(PyObject *self, PyObject *args,
1✔
730
                                 PyObject *kwds) {
731
    return Aggregator_process((AggregatorObject *)self, args, kwds);
1✔
732
}
733

734
static PyMethodDef Aggregator_methods[] = {
735
    {"process", (PyCFunction)Aggregator_process, METH_VARARGS | METH_KEYWORDS,
736
     "process(directory, time_interval_ms=5000.0, group_keys=None,\n"
737
     "        categories=None, names=None, index_dir='',\n"
738
     "        checkpoint_size=33554432, force_rebuild=False,\n"
739
     "        parallelism=0, event_batch_size=10000,\n"
740
     "        custom_metric_fields=None, compute_percentiles=False)\n"
741
     "--\n"
742
     "\n"
743
     "Run aggregation pipeline, return materialized ArrowTable.\n"
744
     "\n"
745
     "Uses parallel, RocksDB-backed, fused indexing and aggregation.\n"
746
     "\n"
747
     "Args:\n"
748
     "    directory (str): Directory containing .pfw/.pfw.gz files.\n"
749
     "    time_interval_ms (float): Time bucket in milliseconds (default "
750
     "5000).\n"
751
     "    group_keys (list[str] or None): Extra grouping dims (default None).\n"
752
     "    categories (list[str] or None): Category filter (default None).\n"
753
     "    names (list[str] or None): Name filter (default None).\n"
754
     "    index_dir (str): Directory for .dftindex stores (default '').\n"
755
     "    checkpoint_size (int): Checkpoint size (default 33554432).\n"
756
     "    force_rebuild (bool): Force index rebuild (default False).\n"
757
     "    parallelism (int): Number of parallel workers (0 = all cores).\n"
758
     "    event_batch_size (int): Entries per batch (default 10000).\n"
759
     "    custom_metric_fields (list[str] or None): Extra numeric args\n"
760
     "        fields to aggregate into ``*_total``/``*_min``/``*_max``/\n"
761
     "        ``*_mean``/``*_std`` columns (default None).\n"
762
     "    compute_percentiles (bool): Enable percentile sketch collection\n"
763
     "        during aggregation (default False).\n"
764
     "\n"
765
     "Returns:\n"
766
     "    ArrowTable: Aggregated results.\n"},
767
    {"iter_arrow", (PyCFunction)Aggregator_iter_arrow,
768
     METH_VARARGS | METH_KEYWORDS,
769
     "iter_arrow(directory, time_interval_ms=5000.0, group_keys=None,\n"
770
     "           categories=None, names=None, index_dir='',\n"
771
     "           checkpoint_size=33554432, force_rebuild=False,\n"
772
     "           parallelism=0, event_batch_size=10000,\n"
773
     "           custom_metric_fields=None, compute_percentiles=False,\n"
774
     "           buffer_size=8)\n"
775
     "--\n"
776
     "\n"
777
     "Run aggregation pipeline, stream Arrow batches.\n"
778
     "\n"
779
     "Returns immediately with a streaming iterator. Batches are produced\n"
780
     "in the background with a bounded buffer. GIL is released while waiting\n"
781
     "for the next batch, allowing other Python threads to run.\n"
782
     "\n"
783
     "Uses parallel, RocksDB-backed, fused indexing and aggregation.\n"
784
     "\n"
785
     "Args:\n"
786
     "    directory (str): Directory containing .pfw/.pfw.gz files.\n"
787
     "    time_interval_ms (float): Time bucket in milliseconds (default "
788
     "5000).\n"
789
     "    group_keys (list[str] or None): Extra grouping dims (default None).\n"
790
     "    categories (list[str] or None): Category filter (default None).\n"
791
     "    names (list[str] or None): Name filter (default None).\n"
792
     "    index_dir (str): Directory for .dftindex stores (default '').\n"
793
     "    checkpoint_size (int): Checkpoint size (default 33554432).\n"
794
     "    force_rebuild (bool): Force index rebuild (default False).\n"
795
     "    parallelism (int): Number of parallel workers (0 = all cores).\n"
796
     "    event_batch_size (int): Entries per batch (default 10000).\n"
797
     "    custom_metric_fields (list[str] or None): Extra numeric args\n"
798
     "        fields to aggregate into ``*_total``/``*_min``/``*_max``/\n"
799
     "        ``*_mean``/``*_std`` columns (default None).\n"
800
     "    compute_percentiles (bool): Enable percentile sketch collection\n"
801
     "        during aggregation (default False).\n"
802
     "    buffer_size (int): Max batches to buffer (default 8).\n"
803
     "\n"
804
     "Returns:\n"
805
     "    _ArrowStreamingIterator: Streaming iterator yielding Arrow record\n"
806
     "        batches. Supports cancel() to stop early.\n"},
807
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
808
    {"write_arrow", (PyCFunction)Aggregator_write_arrow,
809
     METH_VARARGS | METH_KEYWORDS,
810
     "write_arrow(directory, path, time_interval_ms=5000.0, ..., views=None)\n"
811
     "--\n"
812
     "\n"
813
     "Run aggregation and write results to Arrow IPC files with optional "
814
     "views.\n"
815
     "\n"
816
     "Views allow filtering aggregated entries before writing. Each view\n"
817
     "writes to a separate subdirectory. Query syntax supports: cat, name,\n"
818
     "pid, tid, hhash, fhash, time_bucket, extra group keys, and aggregation\n"
819
     "metrics (count, dur_total, dur_min, dur_max, size_total, etc.).\n"
820
     "\n"
821
     "Args:\n"
822
     "    directory (str): Directory containing .pfw/.pfw.gz files.\n"
823
     "    path (str): Output directory for Arrow files.\n"
824
     "    time_interval_ms (float): Time bucket in milliseconds.\n"
825
     "    group_keys (list[str] or None): Extra grouping dims.\n"
826
     "    categories (list[str] or None): Category filter.\n"
827
     "    names (list[str] or None): Name filter.\n"
828
     "    index_dir (str): Directory for .dftindex stores.\n"
829
     "    checkpoint_size (int): Checkpoint size.\n"
830
     "    force_rebuild (bool): Force index rebuild.\n"
831
     "    parallelism (int): Number of parallel workers.\n"
832
     "    event_batch_size (int): Entries per batch.\n"
833
     "    custom_metric_fields (list[str] or None): Extra numeric fields.\n"
834
     "    compute_percentiles (bool): Enable percentile collection.\n"
835
     "    views (list[dict] or None): View definitions, each with 'name' and\n"
836
     "        optional 'query' keys. If None, writes all entries to path.\n"
837
     "        Example: [{'name': 'io', 'query': 'cat == \"POSIX\"'}]\n"
838
     "    chunk_size_mb (int): Max uncompressed MB per file (default 32).\n"
839
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
840
     "\n"
841
     "Returns:\n"
842
     "    dict: Statistics with 'views' (per-view stats), 'total_rows',\n"
843
     "        'total_bytes'. Each view has 'files', 'rows', 'bytes'.\n"},
844
#endif
845
    {NULL}};
846

847
PyTypeObject AggregatorType = {
848
    PyVarObject_HEAD_INIT(
849
        NULL, 0) "dftracer_utils_ext.AggregatorUtility", /* tp_name */
850
    sizeof(AggregatorObject),                            /* tp_basicsize */
851
    0,                                                   /* tp_itemsize */
852
    (destructor)Aggregator_dealloc,                      /* tp_dealloc */
853
    0,                                        /* tp_vectorcall_offset */
854
    0,                                        /* tp_getattr */
855
    0,                                        /* tp_setattr */
856
    0,                                        /* tp_as_async */
857
    0,                                        /* tp_repr */
858
    0,                                        /* tp_as_number */
859
    0,                                        /* tp_as_sequence */
860
    0,                                        /* tp_as_mapping */
861
    0,                                        /* tp_hash */
862
    Aggregator_call,                          /* tp_call */
863
    0,                                        /* tp_str */
864
    0,                                        /* tp_getattro */
865
    0,                                        /* tp_setattro */
866
    0,                                        /* tp_as_buffer */
867
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
868
    "AggregatorUtility(runtime: Runtime | None = None)\n"
869
    "--\n\n"
870
    "High-level aggregation pipeline for DFTracer trace files.\n\n"
871
    "Args:\n"
872
    "    runtime (Runtime or None): Runtime for thread pool control.\n"
873
    "        If None, uses the default global Runtime.\n\n"
874
    "process(directory, time_interval_ms=5000.0, ...) -> ArrowTable\n"
875
    "    Run aggregation and return a materialized Arrow table.\n\n"
876
    "iter_arrow(directory, time_interval_ms=5000.0, ...) -> "
877
    "Iterator[ArrowBatch]\n"
878
    "    Run aggregation and stream Arrow batches.\n", /* tp_doc */
879
    0,                                                 /* tp_traverse */
880
    0,                                                 /* tp_clear */
881
    0,                                                 /* tp_richcompare */
882
    0,                                                 /* tp_weaklistoffset */
883
    0,                                                 /* tp_iter */
884
    0,                                                 /* tp_iternext */
885
    Aggregator_methods,                                /* tp_methods */
886
    0,                                                 /* tp_members */
887
    0,                                                 /* tp_getset */
888
    0,                                                 /* tp_base */
889
    0,                                                 /* tp_dict */
890
    0,                                                 /* tp_descr_get */
891
    0,                                                 /* tp_descr_set */
892
    0,                                                 /* tp_dictoffset */
893
    (initproc)Aggregator_init,                         /* tp_init */
894
    0,                                                 /* tp_alloc */
895
    Aggregator_new,                                    /* tp_new */
896
};
897

898
int init_aggregator(PyObject *m) {
1✔
899
    if (register_type(m, &AggregatorType, "AggregatorUtility") < 0) return -1;
1!
900

901
    return 0;
1✔
902
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc