• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28286012595

27 Jun 2026 10:04AM UTC coverage: 51.056% (-1.3%) from 52.356%
28286012595

Pull #79

github

web-flow
Merge 6c6535a19 into 8eb383f39
Pull Request #79: Add Valgrind memory checking (C++, Python, MPI) and fix the bugs it found

32079 of 80165 branches covered (40.02%)

Branch coverage included in aggregate %.

129 of 149 new or added lines in 11 files covered. (86.58%)

5116 existing lines in 181 files now uncovered.

32739 of 46790 relevant lines covered (69.97%)

9929.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.35
/src/dftracer/utils/python/utilities/aggregator.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <dftracer/utils/core/common/config.h>
3
#include <dftracer/utils/core/common/memory_budget.h>
4
#include <dftracer/utils/core/coro/task.h>
5
#include <dftracer/utils/core/runtime.h>
6
#include <dftracer/utils/python/arrow_helpers.h>
7
#include <dftracer/utils/python/py_dict_helpers.h>
8
#include <dftracer/utils/python/runtime.h>
9
#include <dftracer/utils/python/trace_reader_iterator.h>
10
#include <dftracer/utils/python/utilities/aggregator.h>
11
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregator_utility.h>
12

13
#ifdef DFTRACER_UTILS_ENABLE_ARROW
14
#include <dftracer/utils/python/batch_byte_size.h>
15
#include <dftracer/utils/python/streaming_iterator.h>
16
#endif
17
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
18
#include <dftracer/utils/utilities/common/arrow/partition_router.h>
19
#include <dftracer/utils/utilities/common/arrow/partition_writer.h>
20
#include <dftracer/utils/utilities/common/query/query.h>
21
#endif
22

23
#include <cctype>
24
#include <memory>
25
#include <optional>
26
#include <string>
27
#include <vector>
28

29
using dftracer::utils::CoroScope;
30
using dftracer::utils::Runtime;
31
using dftracer::utils::coro::CoroTask;
32
using namespace dftracer::utils::utilities::composites::dft::aggregators;
33

34
using dftracer::utils::python::wrap_arrow_result;
35
using dftracer::utils::python::wrap_arrow_table;
36

37
#ifdef DFTRACER_UTILS_ENABLE_ARROW
38
using dftracer::utils::python::ArrowStreamingIteratorObject;
39
using dftracer::utils::python::ArrowStreamingIteratorType;
40
using dftracer::utils::python::StreamingState;
41
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
42
#endif
43
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
44
using dftracer::utils::utilities::common::arrow::IpcCompression;
45
using dftracer::utils::utilities::common::arrow::PartitionWriter;
46
using dftracer::utils::utilities::common::arrow::PartitionWriteStats;
47
using dftracer::utils::utilities::common::query::Query;
48
#endif
49

50
static Runtime *get_runtime(AggregatorObject *self) {
22✔
51
    if (self->runtime_obj)
22!
52
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
×
53
    return get_default_runtime();
22✔
54
}
22✔
55

56
static void Aggregator_dealloc(AggregatorObject *self) {
22✔
57
    Py_XDECREF(self->runtime_obj);
22✔
58
    Py_TYPE(self)->tp_free((PyObject *)self);
22✔
59
}
22✔
60

61
static PyObject *Aggregator_new(PyTypeObject *type, PyObject *args,
22✔
62
                                PyObject *kwds) {
63
    AggregatorObject *self = (AggregatorObject *)type->tp_alloc(type, 0);
22✔
64
    if (self) {
22!
65
        self->runtime_obj = NULL;
22✔
66
    }
22✔
67
    return (PyObject *)self;
22✔
68
}
69

70
static int Aggregator_init(AggregatorObject *self, PyObject *args,
22✔
71
                           PyObject *kwds) {
72
    static const char *kwlist[] = {"runtime", NULL};
73
    PyObject *runtime_arg = NULL;
22✔
74

75
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", (char **)kwlist,
22!
76
                                     &runtime_arg)) {
77
        return -1;
×
78
    }
79

80
    if (runtime_arg && runtime_arg != Py_None) {
22!
81
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
82
            Py_INCREF(runtime_arg);
×
83
            self->runtime_obj = runtime_arg;
×
UNCOV
84
        } else {
×
85
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
86
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
×
87
                self->runtime_obj = native;
×
UNCOV
88
            } else {
×
89
                Py_XDECREF(native);
×
90
                PyErr_SetString(PyExc_TypeError,
×
91
                                "runtime must be a Runtime instance or None");
92
                return -1;
×
93
            }
94
        }
UNCOV
95
    }
×
96

97
    return 0;
22✔
98
}
22✔
99

100
// ---------------------------------------------------------------------------
101
// Helpers
102
// ---------------------------------------------------------------------------
103

104
static int parse_str_list(PyObject *obj, std::vector<std::string> &out,
36✔
105
                          const char *param_name) {
106
    if (!obj || obj == Py_None) return 0;
36!
107
    if (!PyList_Check(obj)) {
4!
108
        PyErr_Format(PyExc_TypeError, "%s must be a list of str", param_name);
×
109
        return -1;
×
110
    }
111
    Py_ssize_t n = PyList_Size(obj);
4✔
112
    for (Py_ssize_t i = 0; i < n; i++) {
10✔
113
        const char *s = PyUnicode_AsUTF8(PyList_GetItem(obj, i));
6✔
114
        if (!s) return -1;
6!
115
        out.emplace_back(s);
6✔
116
    }
6✔
117
    return 0;
4✔
118
}
36✔
119

120
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
121
// Parse a view query string into an optional Query
122
static int parse_view_query(PyObject *query_obj, std::optional<Query> &out) {
18✔
123
    if (!query_obj || query_obj == Py_None) {
18!
124
        out = std::nullopt;
16✔
125
        return 0;
16✔
126
    }
127
    const char *query_str = PyUnicode_AsUTF8(query_obj);
2✔
128
    if (!query_str) return -1;
2!
129
    auto parsed = Query::from_string(query_str);
2✔
130
    if (!parsed) {
2!
131
        PyErr_Format(PyExc_ValueError, "Invalid query: %s",
×
132
                     parsed.error().format().c_str());
×
133
        return -1;
×
134
    }
135
    out = std::move(*parsed);
2!
136
    return 0;
2✔
137
}
18✔
138
#endif
139

140
static int parse_aggregator_args(PyObject *args, PyObject *kwds,
18✔
141
                                 AggregatorInput &input,
142
                                 std::size_t *buffer_size_out = nullptr,
143
                                 std::optional<Query> *query_out = nullptr) {
144
    static const char *kwlist[] = {"directory",
145
                                   "time_interval_ms",
146
                                   "group_keys",
147
                                   "categories",
148
                                   "names",
149
                                   "index_dir",
150
                                   "checkpoint_size",
151
                                   "force_rebuild",
152
                                   "parallelism",
153
                                   "event_batch_size",
154
                                   "custom_metric_fields",
155
                                   "compute_percentiles",
156
                                   "buffer_size",
157
                                   "query",
158
                                   NULL};
159

160
    const char *directory = NULL;
18✔
161
    double time_interval_ms = 5000.0;
18✔
162
    PyObject *group_keys_obj = Py_None;
18✔
163
    PyObject *categories_obj = Py_None;
18✔
164
    PyObject *names_obj = Py_None;
18✔
165
    const char *index_dir = "";
18✔
166
    Py_ssize_t checkpoint_size = 32 * 1024 * 1024;
18✔
167
    int force_rebuild = 0;
18✔
168
    Py_ssize_t parallelism = 0;
18✔
169
    Py_ssize_t event_batch_size = 10000;
18✔
170
    PyObject *custom_metrics_obj = Py_None;
18✔
171
    int compute_percentiles = 0;
18✔
172
    Py_ssize_t buffer_size = 8;
18✔
173
    PyObject *query_obj = Py_None;
18✔
174

175
    if (!PyArg_ParseTupleAndKeywords(
18!
176
            args, kwds, "s|dOOOsnpnnOpnO", (char **)kwlist, &directory,
18✔
177
            &time_interval_ms, &group_keys_obj, &categories_obj, &names_obj,
178
            &index_dir, &checkpoint_size, &force_rebuild, &parallelism,
179
            &event_batch_size, &custom_metrics_obj, &compute_percentiles,
180
            &buffer_size, &query_obj))
181
        return -1;
×
182

183
    if (buffer_size_out) {
18✔
184
        *buffer_size_out = static_cast<std::size_t>(buffer_size);
3✔
185
    }
3✔
186

187
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
188
    if (query_out) {
18!
189
        if (parse_view_query(query_obj, *query_out) < 0) return -1;
18!
190
    }
18✔
191
#else
192
    (void)query_obj;
193
#endif
194

195
    input.directory = directory;
18✔
196
    input.config.time_interval_us =
18✔
197
        static_cast<std::uint64_t>(time_interval_ms * 1000.0);
18✔
198
    input.index_dir = index_dir;
18✔
199
    input.checkpoint_size = static_cast<std::size_t>(checkpoint_size);
18✔
200
    input.force_rebuild = force_rebuild != 0;
18✔
201
    input.parallelism = static_cast<std::size_t>(parallelism);
18✔
202
    input.event_batch_size = static_cast<std::size_t>(event_batch_size);
18✔
203
    input.config.compute_percentiles = compute_percentiles != 0;
18✔
204

205
    if (parse_str_list(group_keys_obj, input.config.extra_group_keys,
36!
206
                       "group_keys") < 0)
18✔
207
        return -1;
×
208
    if (parse_str_list(custom_metrics_obj, input.config.custom_metric_fields,
36!
209
                       "custom_metric_fields") < 0)
18✔
210
        return -1;
×
211

212
    return 0;
18✔
213
}
18✔
214

215
#ifdef DFTRACER_UTILS_ENABLE_ARROW
216
static int run_aggregator_pipeline(
15✔
217
    AggregatorObject *self, const AggregatorInput &input,
218
    std::vector<ArrowExportResult> &results, std::string &error_msg,
219
    const std::optional<Query> *query = nullptr) {
220
    auto *rp = &results;
15✔
221
    AggregatorInput input_copy = input;
15✔
222
    std::optional<Query> query_copy;
15✔
223
    if (query) query_copy = *query;
15!
224

225
    Py_BEGIN_ALLOW_THREADS try {
15!
226
        Runtime *rt = get_runtime(self);
15!
227
        rt->submit(run_coro_scope(
45!
228
                       rt->executor(),
15!
229
                       [](CoroScope &scope, std::vector<ArrowExportResult> *out,
199!
230
                          AggregatorInput input,
231
                          std::optional<Query> query) -> CoroTask<void> {
15!
232
                           AggregatorUtility util;
15!
233
                           util.bind_context(scope);
15!
234
                           try {
235
                               auto gen = util.process(input);
15!
236
                               while (auto batch = co_await gen.next()) {
139!
237
                                   if (batch->entries.empty()) continue;
16!
238
                                   AggregationBatch filtered;
16✔
239
                                   if (query) {
16✔
240
                                       filtered = batch->filter(*query);
1!
241
                                       if (filtered.entries.empty()) continue;
1!
242
                                   } else {
1✔
243
                                       filtered = std::move(*batch);
15✔
244
                                   }
245
                                   auto arrow_result = filtered.to_arrow();
16!
246
                                   if (!arrow_result.valid()) continue;
16!
247
                                   out->push_back(std::move(arrow_result));
16!
248
                               }
31!
249
                               util.unbind_context();
15!
250
                           } catch (...) {
77✔
UNCOV
251
                               util.unbind_context();
×
UNCOV
252
                               throw;
×
UNCOV
253
                           }
×
254
                       },
201✔
255
                       rp, std::move(input_copy), std::move(query_copy)),
15✔
256
                   "aggregator")
15!
257
            .get();
15!
258
    } catch (const std::exception &e) {
15!
259
        error_msg = e.what();
×
260
    }
×
261
    Py_END_ALLOW_THREADS
15!
262

263
        return error_msg.empty()
15✔
264
        ? 0
265
        : -1;
266
}
15✔
267
#endif  // DFTRACER_UTILS_ENABLE_ARROW
268

269
#ifdef DFTRACER_UTILS_ENABLE_ARROW
270

271
static CoroTask<void> run_aggregator_stream(
47!
272
    CoroScope &scope, std::shared_ptr<StreamingState<ArrowExportResult>> state,
273
    AggregatorInput input, std::optional<Query> query) {
3!
274
    if (state->cancelled()) {
3!
UNCOV
275
        state->complete();
×
276
        co_return;
3✔
277
    }
278

279
    try {
280
        AggregatorUtility util;
3!
281
        util.bind_context(scope);
3!
282
        auto gen = util.process(input);
3!
283

284
        while (auto batch = co_await gen.next()) {
32!
285
            if (state->cancelled()) break;
5!
286
            if (batch->entries.empty()) continue;
5!
287

288
            AggregationBatch filtered;
5✔
289
            if (query) {
5✔
290
                filtered = batch->filter(*query);
1!
291
                if (filtered.entries.empty()) continue;
1!
292
            } else {
1✔
293
                filtered = std::move(*batch);
4✔
294
            }
295

296
            auto arrow_result = filtered.to_arrow();
5!
297
            if (!arrow_result.valid()) continue;
5!
298

299
            auto result_bytes =
5✔
300
                dftracer::utils::python::byte_size(arrow_result);
5!
301
            if (!state->push(std::move(arrow_result), result_bytes)) {
5!
UNCOV
302
                break;
×
303
            }
304
        }
8!
305

306
        util.unbind_context();
3!
307
        state->complete();
3!
308
    } catch (const std::exception &e) {
19!
UNCOV
309
        state->fail(std::current_exception());
×
UNCOV
310
    } catch (...) {
×
UNCOV
311
        state->fail(std::current_exception());
×
UNCOV
312
    }
×
313
}
32✔
314

315
#endif  // DFTRACER_UTILS_ENABLE_ARROW
316

317
// ---------------------------------------------------------------------------
318
// process() - returns ArrowTable (materialized)
319
// ---------------------------------------------------------------------------
320

321
static PyObject *Aggregator_process(AggregatorObject *self, PyObject *args,
15✔
322
                                    PyObject *kwds) {
323
    AggregatorInput input;
15✔
324
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
325
    std::optional<Query> query;
15✔
326
    if (parse_aggregator_args(args, kwds, input, nullptr, &query) < 0)
15!
327
        return NULL;
×
328
#else
329
    if (parse_aggregator_args(args, kwds, input) < 0) return NULL;
330
#endif
331

332
#ifdef DFTRACER_UTILS_ENABLE_ARROW
333
    std::vector<ArrowExportResult> results;
15✔
334
    std::string error_msg;
15✔
335
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
336
    if (run_aggregator_pipeline(self, input, results, error_msg, &query) < 0) {
15!
337
#else
338
    if (run_aggregator_pipeline(self, input, results, error_msg) < 0) {
339
#endif
340
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
341
        return NULL;
×
342
    }
343

344
    PyObject *batch_list = PyList_New(0);
15!
345
    if (!batch_list) return NULL;
15!
346

347
    for (auto &result : results) {
31✔
348
        PyObject *cap = wrap_arrow_result(std::move(result));
16!
349
        if (!cap) {
16!
UNCOV
350
            Py_DECREF(batch_list);
×
351
            return NULL;
×
352
        }
353
        int rc = PyList_Append(batch_list, cap);
16!
354
        Py_DECREF(cap);
16!
355
        if (rc < 0) {
16!
UNCOV
356
            Py_DECREF(batch_list);
×
357
            return NULL;
×
358
        }
359
    }
360

361
    return wrap_arrow_table(batch_list);
15!
362
#else
363
    PyErr_SetString(PyExc_RuntimeError,
364
                    "dftracer-utils was built without Arrow support");
365
    return NULL;
366
#endif
367
}
15✔
368

369
// ---------------------------------------------------------------------------
370
// iter_arrow() - returns true streaming iterator
371
// ---------------------------------------------------------------------------
372

373
static PyObject *Aggregator_iter_arrow(AggregatorObject *self, PyObject *args,
3✔
374
                                       PyObject *kwds) {
375
    AggregatorInput input;
3✔
376
    std::size_t buffer_size = 8;
3✔
377
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
378
    std::optional<Query> query;
3✔
379
    if (parse_aggregator_args(args, kwds, input, &buffer_size, &query) < 0)
3!
380
        return NULL;
×
381
#else
382
    if (parse_aggregator_args(args, kwds, input, &buffer_size) < 0) return NULL;
383
#endif
384

385
#ifdef DFTRACER_UTILS_ENABLE_ARROW
386
    auto state = std::make_shared<StreamingState<ArrowExportResult>>(
3!
387
        dftracer::utils::compute_memory_budget(0));
3!
388

389
    ArrowStreamingIteratorObject *iter_obj =
3✔
390
        (ArrowStreamingIteratorObject *)ArrowStreamingIteratorType.tp_new(
3!
391
            &ArrowStreamingIteratorType, NULL, NULL);
392
    if (!iter_obj) {
3!
393
        return NULL;
×
394
    }
395

396
    iter_obj->cpp_state->state = state;
3✔
397
    iter_obj->cpp_state->pull_next =
3!
398
        [state]() -> std::optional<ArrowExportResult> { return state->pull(); };
11✔
399
    iter_obj->cpp_state->get_error = [state]() -> std::exception_ptr {
6!
400
        return state->error();
3✔
401
    };
402
    iter_obj->cpp_state->cancel = [state]() { state->cancel(); };
6!
403

404
    Runtime *rt = get_runtime(self);
3!
405
    AggregatorInput input_copy = input;
3!
406
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
407
    std::optional<Query> query_copy = std::move(query);
3✔
408
    Py_BEGIN_ALLOW_THREADS rt->submit(
6!
409
        run_coro_scope(rt->executor(), run_aggregator_stream, state,
6!
410
                       std::move(input_copy), std::move(query_copy)),
3✔
411
        "aggregator_stream");
3!
412
#else
413
    Py_BEGIN_ALLOW_THREADS rt->submit(
414
        run_coro_scope(rt->executor(), run_aggregator_stream, state,
415
                       std::move(input_copy), std::nullopt),
416
        "aggregator_stream");
417
#endif
418
    Py_END_ALLOW_THREADS
3!
419

420
        return (PyObject *)iter_obj;
3✔
421
#else
422
    PyErr_SetString(PyExc_RuntimeError,
423
                    "dftracer-utils was built without Arrow support");
424
    return NULL;
425
#endif
426
}
3✔
427

428
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
429

430
struct AggregatorViewDef {
431
    std::string name;
432
    std::optional<Query> query;
433
};
434

435
struct AggregatorWriteArrowResult {
4✔
436
    std::unordered_map<std::string, PartitionWriteStats> view_stats;
437
    int64_t total_rows = 0;
4✔
438
    int64_t total_bytes = 0;
4✔
439
    std::string error;
440
};
441

442
static CoroTask<void> run_aggregator_write_arrow(
40!
443
    CoroScope &scope, AggregatorWriteArrowResult *out, AggregatorInput input,
444
    std::string output_path, std::vector<AggregatorViewDef> views,
445
    int64_t chunk_size_bytes, IpcCompression compression) {
4!
446
    try {
447
        // If no views specified, create a default "all" view
448
        if (views.empty()) {
4✔
449
            views.push_back({"all", std::nullopt});
3!
450
        }
3✔
451

452
        // Open a writer for each view
453
        std::vector<PartitionWriter> writers(views.size());
4!
454
        for (std::size_t i = 0; i < views.size(); ++i) {
21✔
455
            std::string view_path = output_path;
11!
456
            if (views.size() > 1 || views[i].name != "all") {
11✔
457
                view_path = output_path + "/" + views[i].name;
18!
458
            }
2✔
459
            int rc = co_await writers[i].open(view_path, chunk_size_bytes,
39!
460
                                              compression);
15✔
461
            if (rc != 0) {
5!
UNCOV
462
                out->error = "Failed to open writer for view: " + views[i].name;
×
UNCOV
463
                co_return;
×
464
            }
465
        }
5✔
466

467
        AggregatorUtility util;
20✔
468
        util.bind_context(scope);
4✔
469
        auto gen = util.process(input);
20!
470

471
        while (auto batch = co_await gen.next()) {
32!
472
            if (batch->entries.empty()) continue;
4!
473

474
            // Write to each view (with optional filtering)
475
            for (std::size_t i = 0; i < views.size(); ++i) {
29✔
476
                AggregationBatch filtered_batch;
15✔
477
                if (views[i].query) {
15✔
478
                    filtered_batch = batch->filter(*views[i].query);
2!
479
                    if (filtered_batch.entries.empty()) continue;
2!
480
                } else {
2✔
481
                    filtered_batch = *batch;
13!
482
                }
483

484
                auto arrow_result = filtered_batch.to_arrow();
15!
485
                if (!arrow_result.valid()) continue;
15!
486

487
                int rc = co_await writers[i].write_batch(arrow_result);
20!
488
                if (rc != 0) {
5!
UNCOV
489
                    util.unbind_context();
×
UNCOV
490
                    out->error =
×
UNCOV
491
                        "Failed to write batch for view: " + views[i].name;
×
UNCOV
492
                    co_return;
×
493
                }
494
            }
5✔
495
        }
18!
496

497
        util.unbind_context();
4!
498

499
        // Close writers and collect stats
500
        for (std::size_t i = 0; i < views.size(); ++i) {
29✔
501
            auto stats = co_await writers[i].close();
20!
502
            out->view_stats[views[i].name] = std::move(stats);
5!
503
            out->total_rows += out->view_stats[views[i].name].total_rows;
5!
504
            out->total_bytes +=
5✔
505
                out->view_stats[views[i].name].total_uncompressed_bytes;
5!
506
        }
5✔
507
    } catch (const std::exception &e) {
34!
UNCOV
508
        out->error = e.what();
×
UNCOV
509
    }
×
510
}
94✔
511

512
static PyObject *Aggregator_write_arrow(AggregatorObject *self, PyObject *args,
4✔
513
                                        PyObject *kwds) {
514
    static const char *kwlist[] = {"directory",
515
                                   "path",
516
                                   "time_interval_ms",
517
                                   "group_keys",
518
                                   "categories",
519
                                   "names",
520
                                   "index_dir",
521
                                   "checkpoint_size",
522
                                   "force_rebuild",
523
                                   "parallelism",
524
                                   "event_batch_size",
525
                                   "custom_metric_fields",
526
                                   "compute_percentiles",
527
                                   "views",
528
                                   "chunk_size_mb",
529
                                   "compression",
530
                                   NULL};
531

532
    const char *directory = NULL;
4✔
533
    const char *output_path = NULL;
4✔
534
    double time_interval_ms = 5000.0;
4✔
535
    PyObject *group_keys_obj = Py_None;
4✔
536
    PyObject *categories_obj = Py_None;
4✔
537
    PyObject *names_obj = Py_None;
4✔
538
    const char *index_dir = "";
4✔
539
    Py_ssize_t checkpoint_size = 32 * 1024 * 1024;
4✔
540
    int force_rebuild = 0;
4✔
541
    Py_ssize_t parallelism = 0;
4✔
542
    Py_ssize_t event_batch_size = 10000;
4✔
543
    PyObject *custom_metrics_obj = Py_None;
4✔
544
    int compute_percentiles = 0;
4✔
545
    PyObject *views_obj = Py_None;
4✔
546
    int chunk_size_mb = 32;
4✔
547
    const char *compression_str = "zstd";
4✔
548

549
    if (!PyArg_ParseTupleAndKeywords(
4!
550
            args, kwds, "ss|dOOOsnpnnOpOis", (char **)kwlist, &directory,
4✔
551
            &output_path, &time_interval_ms, &group_keys_obj, &categories_obj,
552
            &names_obj, &index_dir, &checkpoint_size, &force_rebuild,
553
            &parallelism, &event_batch_size, &custom_metrics_obj,
554
            &compute_percentiles, &views_obj, &chunk_size_mb, &compression_str))
555
        return NULL;
×
556

557
    // Parse views
558
    std::vector<AggregatorViewDef> views;
4✔
559
    if (views_obj && views_obj != Py_None) {
4!
560
        if (!PyList_Check(views_obj)) {
1!
561
            PyErr_SetString(PyExc_TypeError,
×
562
                            "views must be a list of dicts with 'name' and "
563
                            "optional 'query' keys");
564
            return NULL;
×
565
        }
566
        Py_ssize_t n = PyList_Size(views_obj);
1!
567
        for (Py_ssize_t i = 0; i < n; i++) {
3✔
568
            PyObject *item = PyList_GetItem(views_obj, i);
2!
569
            if (!PyDict_Check(item)) {
2!
570
                PyErr_SetString(PyExc_TypeError,
×
571
                                "each view must be a dict with 'name' key");
572
                return NULL;
×
573
            }
574
            AggregatorViewDef view;
2✔
575
            PyObject *name_obj = PyDict_GetItemString(item, "name");
2!
576
            if (!name_obj) {
2!
577
                PyErr_SetString(PyExc_ValueError,
×
578
                                "each view must have a 'name' key");
579
                return NULL;
×
580
            }
581
            const char *name_str = PyUnicode_AsUTF8(name_obj);
2!
582
            if (!name_str) return NULL;
2!
583
            view.name = name_str;
2!
584

585
            PyObject *query_obj = PyDict_GetItemString(item, "query");
2!
586
            if (query_obj && query_obj != Py_None) {
2!
587
                const char *query_str = PyUnicode_AsUTF8(query_obj);
2!
588
                if (!query_str) return NULL;
2!
589
                auto parsed = Query::from_string(query_str);
2!
590
                if (!parsed) {
2!
591
                    PyErr_Format(PyExc_ValueError,
×
UNCOV
592
                                 "Invalid query for view '%s': %s", name_str,
×
593
                                 parsed.error().format().c_str());
×
594
                    return NULL;
×
595
                }
596
                view.query = std::move(*parsed);
2!
597
            }
2!
598
            views.push_back(std::move(view));
2!
599
        }
2!
600
    }
1✔
601

602
    // Parse compression
603
    IpcCompression compression = IpcCompression::ZSTD;
4✔
604
    if (compression_str) {
4!
605
        std::string comp_lower(compression_str);
4!
606
        for (auto &c : comp_lower) c = std::tolower(c);
20!
607
        if (comp_lower == "none") {
4✔
608
            compression = IpcCompression::NONE;
1✔
609
        } else if (comp_lower == "zstd") {
4!
610
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
611
            compression = IpcCompression::ZSTD;
3✔
612
#else
613
            PyErr_SetString(PyExc_ValueError, "ZSTD compression not available");
614
            return NULL;
615
#endif
616
        } else {
3✔
617
            PyErr_Format(PyExc_ValueError,
×
618
                         "Unknown compression: %s (use 'none' or 'zstd')",
UNCOV
619
                         compression_str);
×
620
            return NULL;
×
621
        }
622
    }
4!
623

624
    int64_t chunk_size_bytes =
4✔
625
        static_cast<int64_t>(chunk_size_mb) * 1024 * 1024;
4✔
626

627
    // Parse group_keys
628
    std::vector<std::string> group_keys;
4✔
629
    if (group_keys_obj && group_keys_obj != Py_None) {
4!
630
        if (!PyList_Check(group_keys_obj)) {
×
631
            PyErr_SetString(PyExc_TypeError,
×
632
                            "group_keys must be a list of str");
633
            return NULL;
×
634
        }
635
        Py_ssize_t n = PyList_Size(group_keys_obj);
×
636
        for (Py_ssize_t i = 0; i < n; i++) {
×
637
            const char *s = PyUnicode_AsUTF8(PyList_GetItem(group_keys_obj, i));
×
638
            if (!s) return NULL;
×
639
            group_keys.emplace_back(s);
×
UNCOV
640
        }
×
UNCOV
641
    }
×
642

643
    // Parse custom_metric_fields
644
    std::vector<std::string> custom_metrics;
4✔
645
    if (custom_metrics_obj && custom_metrics_obj != Py_None) {
4!
646
        if (!PyList_Check(custom_metrics_obj)) {
×
647
            PyErr_SetString(PyExc_TypeError,
×
648
                            "custom_metric_fields must be a list of str");
649
            return NULL;
×
650
        }
651
        Py_ssize_t n = PyList_Size(custom_metrics_obj);
×
652
        for (Py_ssize_t i = 0; i < n; i++) {
×
UNCOV
653
            const char *s =
×
654
                PyUnicode_AsUTF8(PyList_GetItem(custom_metrics_obj, i));
×
655
            if (!s) return NULL;
×
656
            custom_metrics.emplace_back(s);
×
UNCOV
657
        }
×
UNCOV
658
    }
×
659

660
    AggregatorInput input;
4!
661
    input.directory = directory;
4!
662
    input.config.time_interval_us =
4✔
663
        static_cast<std::uint64_t>(time_interval_ms * 1000.0);
4✔
664
    input.config.extra_group_keys = std::move(group_keys);
4✔
665
    input.config.custom_metric_fields = std::move(custom_metrics);
4✔
666
    input.config.compute_percentiles = compute_percentiles != 0;
4✔
667
    input.index_dir = index_dir;
4!
668
    input.checkpoint_size = static_cast<std::size_t>(checkpoint_size);
4✔
669
    input.force_rebuild = force_rebuild != 0;
4✔
670
    input.parallelism = static_cast<std::size_t>(parallelism);
4✔
671
    input.event_batch_size = static_cast<std::size_t>(event_batch_size);
4✔
672

673
    std::string output_path_str(output_path);
4!
674
    AggregatorWriteArrowResult result;
4✔
675
    auto *rp = &result;
4✔
676
    std::string error_msg;
4✔
677

678
    Py_BEGIN_ALLOW_THREADS try {
4!
679
        Runtime *rt = get_runtime(self);
4!
680
        rt->submit(
12!
681
              run_coro_scope(rt->executor(), run_aggregator_write_arrow, rp,
4!
682
                             std::move(input), output_path_str,
4!
683
                             std::move(views), chunk_size_bytes, compression),
4✔
684
              "aggregator_write_arrow")
4!
685
            .get();
4!
686
    } catch (const std::exception &e) {
4!
687
        error_msg = e.what();
×
688
    }
×
689
    Py_END_ALLOW_THREADS
4!
690

691
        if (!error_msg.empty()) {
4!
692
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
693
        return NULL;
×
694
    }
695

696
    if (!result.error.empty()) {
4!
697
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
698
        return NULL;
×
699
    }
700

701
    // Build result dict
702
    PyObject *dict = PyDict_New();
4!
703
    if (!dict) return NULL;
4!
704

705
    PyObject *views_dict = PyDict_New();
4!
706
    if (!views_dict) {
4!
UNCOV
707
        Py_DECREF(dict);
×
708
        return NULL;
×
709
    }
710

711
    for (const auto &[view_name, view_stats] : result.view_stats) {
29!
712
        PyObject *view_dict = PyDict_New();
5!
713
        if (!view_dict) {
5!
UNCOV
714
            Py_DECREF(views_dict);
×
UNCOV
715
            Py_DECREF(dict);
×
716
            return NULL;
×
717
        }
718

719
        PyObject *files_list = PyList_New(0);
5!
720
        if (!files_list) {
5!
UNCOV
721
            Py_DECREF(view_dict);
×
UNCOV
722
            Py_DECREF(views_dict);
×
UNCOV
723
            Py_DECREF(dict);
×
724
            return NULL;
×
725
        }
726

727
        for (const auto &f : view_stats.files) {
10✔
728
            PyObject *file_str = PyUnicode_FromString(f.c_str());
5!
729
            if (!file_str || PyList_Append(files_list, file_str) < 0) {
5!
730
                Py_XDECREF(file_str);
×
UNCOV
731
                Py_DECREF(files_list);
×
UNCOV
732
                Py_DECREF(view_dict);
×
UNCOV
733
                Py_DECREF(views_dict);
×
UNCOV
734
                Py_DECREF(dict);
×
735
                return NULL;
×
736
            }
737
            Py_DECREF(file_str);
5!
738
        }
739

740
        PyDict_SetItemString(view_dict, "files", files_list);
5!
741
        dict_set_steal(view_dict, "rows",
5!
742
                       PyLong_FromLongLong(view_stats.total_rows));
5!
743
        dict_set_steal(
5!
744
            view_dict, "bytes",
5✔
745
            PyLong_FromLongLong(view_stats.total_uncompressed_bytes));
5!
746
        Py_DECREF(files_list);
5!
747

748
        PyObject *key = PyUnicode_FromString(view_name.c_str());
5!
749
        PyDict_SetItem(views_dict, key, view_dict);
5!
750
        Py_DECREF(key);
5!
751
        Py_DECREF(view_dict);
5!
752
    }
753

754
    PyDict_SetItemString(dict, "views", views_dict);
4!
755
    dict_set_steal(dict, "total_rows", PyLong_FromLongLong(result.total_rows));
4!
756
    dict_set_steal(dict, "total_bytes",
4!
757
                   PyLong_FromLongLong(result.total_bytes));
4!
758
    Py_DECREF(views_dict);
4!
759

760
    return dict;
4✔
761
}
4✔
762

763
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
764

765
static PyObject *Aggregator_call(PyObject *self, PyObject *args,
1✔
766
                                 PyObject *kwds) {
767
    return Aggregator_process((AggregatorObject *)self, args, kwds);
1✔
768
}
769

770
static PyMethodDef Aggregator_methods[] = {
771
    {"process", (PyCFunction)Aggregator_process, METH_VARARGS | METH_KEYWORDS,
772
     "process(directory, time_interval_ms=5000.0, group_keys=None,\n"
773
     "        categories=None, names=None, index_dir='',\n"
774
     "        checkpoint_size=33554432, force_rebuild=False,\n"
775
     "        parallelism=0, event_batch_size=10000,\n"
776
     "        custom_metric_fields=None, compute_percentiles=False)\n"
777
     "--\n"
778
     "\n"
779
     "Run aggregation pipeline, return materialized ArrowTable.\n"
780
     "\n"
781
     "Uses parallel, RocksDB-backed, fused indexing and aggregation.\n"
782
     "\n"
783
     "Args:\n"
784
     "    directory (str): Directory containing .pfw/.pfw.gz files.\n"
785
     "    time_interval_ms (float): Time bucket in milliseconds (default "
786
     "5000).\n"
787
     "    group_keys (list[str] or None): Extra grouping dims (default None).\n"
788
     "    categories (list[str] or None): Category filter (default None).\n"
789
     "    names (list[str] or None): Name filter (default None).\n"
790
     "    index_dir (str): Directory for .dftindex stores (default '').\n"
791
     "    checkpoint_size (int): Checkpoint size (default 33554432).\n"
792
     "    force_rebuild (bool): Force index rebuild (default False).\n"
793
     "    parallelism (int): Number of parallel workers (0 = all cores).\n"
794
     "    event_batch_size (int): Entries per batch (default 10000).\n"
795
     "    custom_metric_fields (list[str] or None): Extra numeric args\n"
796
     "        fields to aggregate into ``*_total``/``*_min``/``*_max``/\n"
797
     "        ``*_mean``/``*_std`` columns (default None).\n"
798
     "    compute_percentiles (bool): Enable percentile sketch collection\n"
799
     "        during aggregation (default False).\n"
800
     "\n"
801
     "Returns:\n"
802
     "    ArrowTable: Aggregated results.\n"},
803
    {"iter_arrow", (PyCFunction)Aggregator_iter_arrow,
804
     METH_VARARGS | METH_KEYWORDS,
805
     "iter_arrow(directory, time_interval_ms=5000.0, group_keys=None,\n"
806
     "           categories=None, names=None, index_dir='',\n"
807
     "           checkpoint_size=33554432, force_rebuild=False,\n"
808
     "           parallelism=0, event_batch_size=10000,\n"
809
     "           custom_metric_fields=None, compute_percentiles=False,\n"
810
     "           buffer_size=8)\n"
811
     "--\n"
812
     "\n"
813
     "Run aggregation pipeline, stream Arrow batches.\n"
814
     "\n"
815
     "Returns immediately with a streaming iterator. Batches are produced\n"
816
     "in the background with a bounded buffer. GIL is released while waiting\n"
817
     "for the next batch, allowing other Python threads to run.\n"
818
     "\n"
819
     "Uses parallel, RocksDB-backed, fused indexing and aggregation.\n"
820
     "\n"
821
     "Args:\n"
822
     "    directory (str): Directory containing .pfw/.pfw.gz files.\n"
823
     "    time_interval_ms (float): Time bucket in milliseconds (default "
824
     "5000).\n"
825
     "    group_keys (list[str] or None): Extra grouping dims (default None).\n"
826
     "    categories (list[str] or None): Category filter (default None).\n"
827
     "    names (list[str] or None): Name filter (default None).\n"
828
     "    index_dir (str): Directory for .dftindex stores (default '').\n"
829
     "    checkpoint_size (int): Checkpoint size (default 33554432).\n"
830
     "    force_rebuild (bool): Force index rebuild (default False).\n"
831
     "    parallelism (int): Number of parallel workers (0 = all cores).\n"
832
     "    event_batch_size (int): Entries per batch (default 10000).\n"
833
     "    custom_metric_fields (list[str] or None): Extra numeric args\n"
834
     "        fields to aggregate into ``*_total``/``*_min``/``*_max``/\n"
835
     "        ``*_mean``/``*_std`` columns (default None).\n"
836
     "    compute_percentiles (bool): Enable percentile sketch collection\n"
837
     "        during aggregation (default False).\n"
838
     "    buffer_size (int): Max batches to buffer (default 8).\n"
839
     "\n"
840
     "Returns:\n"
841
     "    _ArrowStreamingIterator: Streaming iterator yielding Arrow record\n"
842
     "        batches. Supports cancel() to stop early.\n"},
843
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
844
    {"write_arrow", (PyCFunction)Aggregator_write_arrow,
845
     METH_VARARGS | METH_KEYWORDS,
846
     "write_arrow(directory, path, time_interval_ms=5000.0, ..., views=None)\n"
847
     "--\n"
848
     "\n"
849
     "Run aggregation and write results to Arrow IPC files with optional "
850
     "views.\n"
851
     "\n"
852
     "Views allow filtering aggregated entries before writing. Each view\n"
853
     "writes to a separate subdirectory. Query syntax supports: cat, name,\n"
854
     "pid, tid, hhash, fhash, time_bucket, extra group keys, and aggregation\n"
855
     "metrics (count, dur_total, dur_min, dur_max, size_total, etc.).\n"
856
     "\n"
857
     "Args:\n"
858
     "    directory (str): Directory containing .pfw/.pfw.gz files.\n"
859
     "    path (str): Output directory for Arrow files.\n"
860
     "    time_interval_ms (float): Time bucket in milliseconds.\n"
861
     "    group_keys (list[str] or None): Extra grouping dims.\n"
862
     "    categories (list[str] or None): Category filter.\n"
863
     "    names (list[str] or None): Name filter.\n"
864
     "    index_dir (str): Directory for .dftindex stores.\n"
865
     "    checkpoint_size (int): Checkpoint size.\n"
866
     "    force_rebuild (bool): Force index rebuild.\n"
867
     "    parallelism (int): Number of parallel workers.\n"
868
     "    event_batch_size (int): Entries per batch.\n"
869
     "    custom_metric_fields (list[str] or None): Extra numeric fields.\n"
870
     "    compute_percentiles (bool): Enable percentile collection.\n"
871
     "    views (list[dict] or None): View definitions, each with 'name' and\n"
872
     "        optional 'query' keys. If None, writes all entries to path.\n"
873
     "        Example: [{'name': 'io', 'query': 'cat == \"POSIX\"'}]\n"
874
     "    chunk_size_mb (int): Max uncompressed MB per file (default 32).\n"
875
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
876
     "\n"
877
     "Returns:\n"
878
     "    dict: Statistics with 'views' (per-view stats), 'total_rows',\n"
879
     "        'total_bytes'. Each view has 'files', 'rows', 'bytes'.\n"},
880
#endif
881
    {NULL}};
882

883
PyTypeObject AggregatorType = {
884
    PyVarObject_HEAD_INIT(
885
        NULL, 0) "dftracer_utils_ext.AggregatorUtility", /* tp_name */
886
    sizeof(AggregatorObject),                            /* tp_basicsize */
887
    0,                                                   /* tp_itemsize */
888
    (destructor)Aggregator_dealloc,                      /* tp_dealloc */
889
    0,                                        /* tp_vectorcall_offset */
890
    0,                                        /* tp_getattr */
891
    0,                                        /* tp_setattr */
892
    0,                                        /* tp_as_async */
893
    0,                                        /* tp_repr */
894
    0,                                        /* tp_as_number */
895
    0,                                        /* tp_as_sequence */
896
    0,                                        /* tp_as_mapping */
897
    0,                                        /* tp_hash */
898
    Aggregator_call,                          /* tp_call */
899
    0,                                        /* tp_str */
900
    0,                                        /* tp_getattro */
901
    0,                                        /* tp_setattro */
902
    0,                                        /* tp_as_buffer */
903
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
904
    "AggregatorUtility(runtime: Runtime | None = None)\n"
905
    "--\n\n"
906
    "High-level aggregation pipeline for DFTracer trace files.\n\n"
907
    "Args:\n"
908
    "    runtime (Runtime or None): Runtime for thread pool control.\n"
909
    "        If None, uses the default global Runtime.\n\n"
910
    "process(directory, time_interval_ms=5000.0, ...) -> ArrowTable\n"
911
    "    Run aggregation and return a materialized Arrow table.\n\n"
912
    "iter_arrow(directory, time_interval_ms=5000.0, ...) -> "
913
    "Iterator[ArrowBatch]\n"
914
    "    Run aggregation and stream Arrow batches.\n", /* tp_doc */
915
    0,                                                 /* tp_traverse */
916
    0,                                                 /* tp_clear */
917
    0,                                                 /* tp_richcompare */
918
    0,                                                 /* tp_weaklistoffset */
919
    0,                                                 /* tp_iter */
920
    0,                                                 /* tp_iternext */
921
    Aggregator_methods,                                /* tp_methods */
922
    0,                                                 /* tp_members */
923
    0,                                                 /* tp_getset */
924
    0,                                                 /* tp_base */
925
    0,                                                 /* tp_dict */
926
    0,                                                 /* tp_descr_get */
927
    0,                                                 /* tp_descr_set */
928
    0,                                                 /* tp_dictoffset */
929
    (initproc)Aggregator_init,                         /* tp_init */
930
    0,                                                 /* tp_alloc */
931
    Aggregator_new,                                    /* tp_new */
932
};
933

934
int init_aggregator(PyObject *m) {
1✔
935
    if (PyType_Ready(&AggregatorType) < 0) return -1;
1!
936

937
    Py_INCREF(&AggregatorType);
1✔
938
    if (PyModule_AddObject(m, "AggregatorUtility",
2!
939
                           (PyObject *)&AggregatorType) < 0) {
1✔
UNCOV
940
        Py_DECREF(&AggregatorType);
×
UNCOV
941
        Py_DECREF(m);
×
942
        return -1;
×
943
    }
944

945
    return 0;
1✔
946
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc