• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26195612357

20 May 2026 11:19PM UTC coverage: 49.859% (-2.3%) from 52.2%
26195612357

push

github

hariharan-devarajan
feat(aggregator): improve system metrics scanning and persistence error handling

16041 of 43831 branches covered (36.6%)

Branch coverage included in aggregate %.

6 of 17 new or added lines in 2 files covered. (35.29%)

1072 existing lines in 104 files now uncovered.

21423 of 31309 relevant lines covered (68.42%)

13054.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.95
/src/dftracer/utils/python/utilities/aggregator.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <dftracer/utils/core/common/config.h>
3
#include <dftracer/utils/core/common/memory_budget.h>
4
#include <dftracer/utils/core/coro/task.h>
5
#include <dftracer/utils/core/runtime.h>
6
#include <dftracer/utils/python/arrow_helpers.h>
7
#include <dftracer/utils/python/runtime.h>
8
#include <dftracer/utils/python/trace_reader_iterator.h>
9
#include <dftracer/utils/python/utilities/aggregator.h>
10
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregator_utility.h>
11

12
#ifdef DFTRACER_UTILS_ENABLE_ARROW
13
#include <dftracer/utils/python/batch_byte_size.h>
14
#include <dftracer/utils/python/streaming_iterator.h>
15
#endif
16
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
17
#include <dftracer/utils/utilities/common/arrow/partition_router.h>
18
#include <dftracer/utils/utilities/common/arrow/partition_writer.h>
19
#include <dftracer/utils/utilities/common/query/query.h>
20
#endif
21

22
#include <cctype>
23
#include <memory>
24
#include <optional>
25
#include <string>
26
#include <vector>
27

28
using dftracer::utils::CoroScope;
29
using dftracer::utils::Runtime;
30
using dftracer::utils::coro::CoroTask;
31
using namespace dftracer::utils::utilities::composites::dft::aggregators;
32

33
using dftracer::utils::python::wrap_arrow_result;
34
using dftracer::utils::python::wrap_arrow_table;
35

36
#ifdef DFTRACER_UTILS_ENABLE_ARROW
37
using dftracer::utils::python::ArrowStreamingIteratorObject;
38
using dftracer::utils::python::ArrowStreamingIteratorType;
39
using dftracer::utils::python::StreamingState;
40
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
41
#endif
42
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
43
using dftracer::utils::utilities::common::arrow::IpcCompression;
44
using dftracer::utils::utilities::common::arrow::PartitionWriter;
45
using dftracer::utils::utilities::common::arrow::PartitionWriteStats;
46
using dftracer::utils::utilities::common::query::Query;
47
#endif
48

49
static Runtime *get_runtime(AggregatorObject *self) {
22✔
50
    if (self->runtime_obj)
22!
51
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
×
52
    return get_default_runtime();
22✔
53
}
54

55
static void Aggregator_dealloc(AggregatorObject *self) {
22✔
56
    Py_XDECREF(self->runtime_obj);
22✔
57
    Py_TYPE(self)->tp_free((PyObject *)self);
22✔
58
}
22✔
59

60
static PyObject *Aggregator_new(PyTypeObject *type, PyObject *args,
22✔
61
                                PyObject *kwds) {
62
    AggregatorObject *self = (AggregatorObject *)type->tp_alloc(type, 0);
22✔
63
    if (self) {
22!
64
        self->runtime_obj = NULL;
22✔
65
    }
66
    return (PyObject *)self;
22✔
67
}
68

69
static int Aggregator_init(AggregatorObject *self, PyObject *args,
22✔
70
                           PyObject *kwds) {
71
    static const char *kwlist[] = {"runtime", NULL};
72
    PyObject *runtime_arg = NULL;
22✔
73

74
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", (char **)kwlist,
22!
75
                                     &runtime_arg)) {
76
        return -1;
×
77
    }
78

79
    if (runtime_arg && runtime_arg != Py_None) {
22!
80
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
81
            Py_INCREF(runtime_arg);
×
82
            self->runtime_obj = runtime_arg;
×
83
        } else {
84
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
85
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
×
86
                self->runtime_obj = native;
×
87
            } else {
88
                Py_XDECREF(native);
×
89
                PyErr_SetString(PyExc_TypeError,
×
90
                                "runtime must be a Runtime instance or None");
91
                return -1;
×
92
            }
93
        }
94
    }
95

96
    return 0;
22✔
97
}
98

99
// ---------------------------------------------------------------------------
100
// Helpers
101
// ---------------------------------------------------------------------------
102

103
static int parse_str_list(PyObject *obj, std::vector<std::string> &out,
36✔
104
                          const char *param_name) {
105
    if (!obj || obj == Py_None) return 0;
36!
106
    if (!PyList_Check(obj)) {
4!
107
        PyErr_Format(PyExc_TypeError, "%s must be a list of str", param_name);
×
108
        return -1;
×
109
    }
110
    Py_ssize_t n = PyList_Size(obj);
4✔
111
    for (Py_ssize_t i = 0; i < n; i++) {
10✔
112
        const char *s = PyUnicode_AsUTF8(PyList_GetItem(obj, i));
6!
113
        if (!s) return -1;
6!
114
        out.emplace_back(s);
6!
115
    }
116
    return 0;
4✔
117
}
118

119
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
120
// Parse a view query string into an optional Query
121
static int parse_view_query(PyObject *query_obj, std::optional<Query> &out) {
18✔
122
    if (!query_obj || query_obj == Py_None) {
18!
123
        out = std::nullopt;
16✔
124
        return 0;
16✔
125
    }
126
    const char *query_str = PyUnicode_AsUTF8(query_obj);
2!
127
    if (!query_str) return -1;
2!
128
    auto parsed = Query::from_string(query_str);
2!
129
    if (!parsed) {
2!
130
        PyErr_Format(PyExc_ValueError, "Invalid query: %s",
×
131
                     parsed.error().format().c_str());
×
132
        return -1;
×
133
    }
134
    out = std::move(*parsed);
2!
135
    return 0;
2✔
136
}
2✔
137
#endif
138

139
static int parse_aggregator_args(PyObject *args, PyObject *kwds,
18✔
140
                                 AggregatorInput &input,
141
                                 std::size_t *buffer_size_out = nullptr,
142
                                 std::optional<Query> *query_out = nullptr) {
143
    static const char *kwlist[] = {"directory",
144
                                   "time_interval_ms",
145
                                   "group_keys",
146
                                   "categories",
147
                                   "names",
148
                                   "index_dir",
149
                                   "checkpoint_size",
150
                                   "force_rebuild",
151
                                   "parallelism",
152
                                   "event_batch_size",
153
                                   "custom_metric_fields",
154
                                   "compute_percentiles",
155
                                   "buffer_size",
156
                                   "query",
157
                                   NULL};
158

159
    const char *directory = NULL;
18✔
160
    double time_interval_ms = 5000.0;
18✔
161
    PyObject *group_keys_obj = Py_None;
18✔
162
    PyObject *categories_obj = Py_None;
18✔
163
    PyObject *names_obj = Py_None;
18✔
164
    const char *index_dir = "";
18✔
165
    Py_ssize_t checkpoint_size = 32 * 1024 * 1024;
18✔
166
    int force_rebuild = 0;
18✔
167
    Py_ssize_t parallelism = 0;
18✔
168
    Py_ssize_t event_batch_size = 10000;
18✔
169
    PyObject *custom_metrics_obj = Py_None;
18✔
170
    int compute_percentiles = 0;
18✔
171
    Py_ssize_t buffer_size = 8;
18✔
172
    PyObject *query_obj = Py_None;
18✔
173

174
    if (!PyArg_ParseTupleAndKeywords(
18!
175
            args, kwds, "s|dOOOsnpnnOpnO", (char **)kwlist, &directory,
176
            &time_interval_ms, &group_keys_obj, &categories_obj, &names_obj,
177
            &index_dir, &checkpoint_size, &force_rebuild, &parallelism,
178
            &event_batch_size, &custom_metrics_obj, &compute_percentiles,
179
            &buffer_size, &query_obj))
180
        return -1;
×
181

182
    if (buffer_size_out) {
18✔
183
        *buffer_size_out = static_cast<std::size_t>(buffer_size);
3✔
184
    }
185

186
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
187
    if (query_out) {
18!
188
        if (parse_view_query(query_obj, *query_out) < 0) return -1;
18!
189
    }
190
#else
191
    (void)query_obj;
192
#endif
193

194
    input.directory = directory;
18!
195
    input.config.time_interval_us =
18✔
196
        static_cast<std::uint64_t>(time_interval_ms * 1000.0);
18✔
197
    input.index_dir = index_dir;
18!
198
    input.checkpoint_size = static_cast<std::size_t>(checkpoint_size);
18✔
199
    input.force_rebuild = force_rebuild != 0;
18✔
200
    input.parallelism = static_cast<std::size_t>(parallelism);
18✔
201
    input.event_batch_size = static_cast<std::size_t>(event_batch_size);
18✔
202
    input.config.compute_percentiles = compute_percentiles != 0;
18✔
203

204
    if (parse_str_list(group_keys_obj, input.config.extra_group_keys,
18!
205
                       "group_keys") < 0)
18!
206
        return -1;
×
207
    if (parse_str_list(custom_metrics_obj, input.config.custom_metric_fields,
18!
208
                       "custom_metric_fields") < 0)
18!
209
        return -1;
×
210

211
    return 0;
18✔
212
}
213

214
#ifdef DFTRACER_UTILS_ENABLE_ARROW
215
static int run_aggregator_pipeline(
15✔
216
    AggregatorObject *self, const AggregatorInput &input,
217
    std::vector<ArrowExportResult> &results, std::string &error_msg,
218
    const std::optional<Query> *query = nullptr) {
219
    auto *rp = &results;
15✔
220
    AggregatorInput input_copy = input;
15!
221
    std::optional<Query> query_copy;
15✔
222
    if (query) query_copy = *query;
15!
223

224
    Py_BEGIN_ALLOW_THREADS try {
15!
225
        Runtime *rt = get_runtime(self);
15!
226
        rt->submit(run_coro_scope(
60!
227
                       rt->executor(),
228
                       [](CoroScope &scope, std::vector<ArrowExportResult> *out,
15!
229
                          AggregatorInput input,
230
                          std::optional<Query> query) -> CoroTask<void> {
231
                           AggregatorUtility util;
232
                           util.bind_context(scope);
233
                           try {
234
                               auto gen = util.process(input);
235
                               while (auto batch = co_await gen.next()) {
236
                                   if (batch->entries.empty()) continue;
237
                                   AggregationBatch filtered;
238
                                   if (query) {
239
                                       filtered = batch->filter(*query);
240
                                       if (filtered.entries.empty()) continue;
241
                                   } else {
242
                                       filtered = std::move(*batch);
243
                                   }
244
                                   auto arrow_result = filtered.to_arrow();
245
                                   if (!arrow_result.valid()) continue;
246
                                   out->push_back(std::move(arrow_result));
247
                               }
248
                               util.unbind_context();
249
                           } catch (...) {
250
                               util.unbind_context();
251
                               throw;
252
                           }
253
                       },
30!
254
                       rp, std::move(input_copy), std::move(query_copy)),
30✔
255
                   "aggregator")
256
            .get();
15!
UNCOV
257
    } catch (const std::exception &e) {
×
258
        error_msg = e.what();
×
259
    }
×
260
    Py_END_ALLOW_THREADS
15!
261

262
        return error_msg.empty()
15✔
263
        ? 0
15!
264
        : -1;
15✔
265
}
15✔
266
#endif  // DFTRACER_UTILS_ENABLE_ARROW
267

268
#ifdef DFTRACER_UTILS_ENABLE_ARROW
269

270
static CoroTask<void> run_aggregator_stream(
3!
271
    CoroScope &scope, std::shared_ptr<StreamingState<ArrowExportResult>> state,
272
    AggregatorInput input, std::optional<Query> query) {
273
    if (state->cancelled()) {
274
        state->complete();
275
        co_return;
276
    }
277

278
    try {
279
        AggregatorUtility util;
280
        util.bind_context(scope);
281
        auto gen = util.process(input);
282

283
        while (auto batch = co_await gen.next()) {
284
            if (state->cancelled()) break;
285
            if (batch->entries.empty()) continue;
286

287
            AggregationBatch filtered;
288
            if (query) {
289
                filtered = batch->filter(*query);
290
                if (filtered.entries.empty()) continue;
291
            } else {
292
                filtered = std::move(*batch);
293
            }
294

295
            auto arrow_result = filtered.to_arrow();
296
            if (!arrow_result.valid()) continue;
297

298
            auto result_bytes =
299
                dftracer::utils::python::byte_size(arrow_result);
300
            if (!state->push(std::move(arrow_result), result_bytes)) {
301
                break;
302
            }
303
        }
304

305
        util.unbind_context();
306
        state->complete();
307
    } catch (const std::exception &e) {
308
        state->fail(std::current_exception());
309
    } catch (...) {
310
        state->fail(std::current_exception());
311
    }
312
}
6!
313

314
#endif  // DFTRACER_UTILS_ENABLE_ARROW
315

316
// ---------------------------------------------------------------------------
317
// process() - returns ArrowTable (materialized)
318
// ---------------------------------------------------------------------------
319

320
static PyObject *Aggregator_process(AggregatorObject *self, PyObject *args,
15✔
321
                                    PyObject *kwds) {
322
    AggregatorInput input;
15!
323
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
324
    std::optional<Query> query;
15✔
325
    if (parse_aggregator_args(args, kwds, input, nullptr, &query) < 0)
15!
326
        return NULL;
×
327
#else
328
    if (parse_aggregator_args(args, kwds, input) < 0) return NULL;
329
#endif
330

331
#ifdef DFTRACER_UTILS_ENABLE_ARROW
332
    std::vector<ArrowExportResult> results;
15✔
333
    std::string error_msg;
15✔
334
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
335
    if (run_aggregator_pipeline(self, input, results, error_msg, &query) < 0) {
15!
336
#else
337
    if (run_aggregator_pipeline(self, input, results, error_msg) < 0) {
338
#endif
339
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
340
        return NULL;
×
341
    }
342

343
    PyObject *batch_list = PyList_New(0);
15!
344
    if (!batch_list) return NULL;
15!
345

346
    for (auto &result : results) {
31✔
347
        PyObject *cap = wrap_arrow_result(std::move(result));
16!
348
        if (!cap) {
16!
349
            Py_DECREF(batch_list);
350
            return NULL;
×
351
        }
352
        int rc = PyList_Append(batch_list, cap);
16!
353
        Py_DECREF(cap);
354
        if (rc < 0) {
16!
355
            Py_DECREF(batch_list);
356
            return NULL;
×
357
        }
358
    }
359

360
    return wrap_arrow_table(batch_list);
15!
361
#else
362
    PyErr_SetString(PyExc_RuntimeError,
363
                    "dftracer-utils was built without Arrow support");
364
    return NULL;
365
#endif
366
}
15✔
367

368
// ---------------------------------------------------------------------------
369
// iter_arrow() - returns true streaming iterator
370
// ---------------------------------------------------------------------------
371

372
static PyObject *Aggregator_iter_arrow(AggregatorObject *self, PyObject *args,
3✔
373
                                       PyObject *kwds) {
374
    AggregatorInput input;
3!
375
    std::size_t buffer_size = 8;
3✔
376
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
377
    std::optional<Query> query;
3✔
378
    if (parse_aggregator_args(args, kwds, input, &buffer_size, &query) < 0)
3!
379
        return NULL;
×
380
#else
381
    if (parse_aggregator_args(args, kwds, input, &buffer_size) < 0) return NULL;
382
#endif
383

384
#ifdef DFTRACER_UTILS_ENABLE_ARROW
385
    auto state = std::make_shared<StreamingState<ArrowExportResult>>(
386
        dftracer::utils::compute_memory_budget(0));
3!
387

388
    ArrowStreamingIteratorObject *iter_obj =
389
        (ArrowStreamingIteratorObject *)ArrowStreamingIteratorType.tp_new(
3!
390
            &ArrowStreamingIteratorType, NULL, NULL);
391
    if (!iter_obj) {
3!
392
        return NULL;
×
393
    }
394

395
    iter_obj->cpp_state->state = state;
3✔
396
    iter_obj->cpp_state->pull_next =
3✔
397
        [state]() -> std::optional<ArrowExportResult> { return state->pull(); };
11!
398
    iter_obj->cpp_state->get_error = [state]() -> std::exception_ptr {
6✔
399
        return state->error();
3✔
400
    };
3!
401
    iter_obj->cpp_state->cancel = [state]() { state->cancel(); };
6!
402

403
    Runtime *rt = get_runtime(self);
3!
404
    AggregatorInput input_copy = input;
3!
405
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
406
    std::optional<Query> query_copy = std::move(query);
3✔
407
    Py_BEGIN_ALLOW_THREADS rt->submit(
9!
408
        run_coro_scope(rt->executor(), run_aggregator_stream, state,
9!
409
                       std::move(input_copy), std::move(query_copy)),
6✔
410
        "aggregator_stream");
411
#else
412
    Py_BEGIN_ALLOW_THREADS rt->submit(
413
        run_coro_scope(rt->executor(), run_aggregator_stream, state,
414
                       std::move(input_copy), std::nullopt),
415
        "aggregator_stream");
416
#endif
417
    Py_END_ALLOW_THREADS
3!
418

419
        return (PyObject *)iter_obj;
3✔
420
#else
421
    PyErr_SetString(PyExc_RuntimeError,
422
                    "dftracer-utils was built without Arrow support");
423
    return NULL;
424
#endif
425
}
3✔
426

427
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
428

429
struct AggregatorViewDef {
430
    std::string name;
431
    std::optional<Query> query;
432
};
433

434
struct AggregatorWriteArrowResult {
435
    std::unordered_map<std::string, PartitionWriteStats> view_stats;
436
    int64_t total_rows = 0;
437
    int64_t total_bytes = 0;
438
    std::string error;
439
};
440

441
static CoroTask<void> run_aggregator_write_arrow(
4!
442
    CoroScope &scope, AggregatorWriteArrowResult *out, AggregatorInput input,
443
    std::string output_path, std::vector<AggregatorViewDef> views,
444
    int64_t chunk_size_bytes, IpcCompression compression) {
445
    try {
446
        // If no views specified, create a default "all" view
447
        if (views.empty()) {
448
            views.push_back({"all", std::nullopt});
449
        }
450

451
        // Open a writer for each view
452
        std::vector<PartitionWriter> writers(views.size());
453
        for (std::size_t i = 0; i < views.size(); ++i) {
454
            std::string view_path = output_path;
455
            if (views.size() > 1 || views[i].name != "all") {
456
                view_path = output_path + "/" + views[i].name;
457
            }
458
            int rc = co_await writers[i].open(view_path, chunk_size_bytes,
459
                                              compression);
460
            if (rc != 0) {
461
                out->error = "Failed to open writer for view: " + views[i].name;
462
                co_return;
463
            }
464
        }
465

466
        AggregatorUtility util;
467
        util.bind_context(scope);
468
        auto gen = util.process(input);
469

470
        while (auto batch = co_await gen.next()) {
471
            if (batch->entries.empty()) continue;
472

473
            // Write to each view (with optional filtering)
474
            for (std::size_t i = 0; i < views.size(); ++i) {
475
                AggregationBatch filtered_batch;
476
                if (views[i].query) {
477
                    filtered_batch = batch->filter(*views[i].query);
478
                    if (filtered_batch.entries.empty()) continue;
479
                } else {
480
                    filtered_batch = *batch;
481
                }
482

483
                auto arrow_result = filtered_batch.to_arrow();
484
                if (!arrow_result.valid()) continue;
485

486
                int rc = co_await writers[i].write_batch(arrow_result);
487
                if (rc != 0) {
488
                    util.unbind_context();
489
                    out->error =
490
                        "Failed to write batch for view: " + views[i].name;
491
                    co_return;
492
                }
493
            }
494
        }
495

496
        util.unbind_context();
497

498
        // Close writers and collect stats
499
        for (std::size_t i = 0; i < views.size(); ++i) {
500
            auto stats = co_await writers[i].close();
501
            out->view_stats[views[i].name] = std::move(stats);
502
            out->total_rows += out->view_stats[views[i].name].total_rows;
503
            out->total_bytes +=
504
                out->view_stats[views[i].name].total_uncompressed_bytes;
505
        }
506
    } catch (const std::exception &e) {
507
        out->error = e.what();
508
    }
509
}
8!
510

511
static PyObject *Aggregator_write_arrow(AggregatorObject *self, PyObject *args,
4✔
512
                                        PyObject *kwds) {
513
    static const char *kwlist[] = {"directory",
514
                                   "path",
515
                                   "time_interval_ms",
516
                                   "group_keys",
517
                                   "categories",
518
                                   "names",
519
                                   "index_dir",
520
                                   "checkpoint_size",
521
                                   "force_rebuild",
522
                                   "parallelism",
523
                                   "event_batch_size",
524
                                   "custom_metric_fields",
525
                                   "compute_percentiles",
526
                                   "views",
527
                                   "chunk_size_mb",
528
                                   "compression",
529
                                   NULL};
530

531
    const char *directory = NULL;
4✔
532
    const char *output_path = NULL;
4✔
533
    double time_interval_ms = 5000.0;
4✔
534
    PyObject *group_keys_obj = Py_None;
4✔
535
    PyObject *categories_obj = Py_None;
4✔
536
    PyObject *names_obj = Py_None;
4✔
537
    const char *index_dir = "";
4✔
538
    Py_ssize_t checkpoint_size = 32 * 1024 * 1024;
4✔
539
    int force_rebuild = 0;
4✔
540
    Py_ssize_t parallelism = 0;
4✔
541
    Py_ssize_t event_batch_size = 10000;
4✔
542
    PyObject *custom_metrics_obj = Py_None;
4✔
543
    int compute_percentiles = 0;
4✔
544
    PyObject *views_obj = Py_None;
4✔
545
    int chunk_size_mb = 32;
4✔
546
    const char *compression_str = "zstd";
4✔
547

548
    if (!PyArg_ParseTupleAndKeywords(
4!
549
            args, kwds, "ss|dOOOsnpnnOpOis", (char **)kwlist, &directory,
550
            &output_path, &time_interval_ms, &group_keys_obj, &categories_obj,
551
            &names_obj, &index_dir, &checkpoint_size, &force_rebuild,
552
            &parallelism, &event_batch_size, &custom_metrics_obj,
553
            &compute_percentiles, &views_obj, &chunk_size_mb, &compression_str))
554
        return NULL;
×
555

556
    // Parse views
557
    std::vector<AggregatorViewDef> views;
4✔
558
    if (views_obj && views_obj != Py_None) {
4!
559
        if (!PyList_Check(views_obj)) {
1!
560
            PyErr_SetString(PyExc_TypeError,
×
561
                            "views must be a list of dicts with 'name' and "
562
                            "optional 'query' keys");
563
            return NULL;
×
564
        }
565
        Py_ssize_t n = PyList_Size(views_obj);
1!
566
        for (Py_ssize_t i = 0; i < n; i++) {
3✔
567
            PyObject *item = PyList_GetItem(views_obj, i);
2!
568
            if (!PyDict_Check(item)) {
2!
569
                PyErr_SetString(PyExc_TypeError,
×
570
                                "each view must be a dict with 'name' key");
571
                return NULL;
×
572
            }
573
            AggregatorViewDef view;
2✔
574
            PyObject *name_obj = PyDict_GetItemString(item, "name");
2!
575
            if (!name_obj) {
2!
576
                PyErr_SetString(PyExc_ValueError,
×
577
                                "each view must have a 'name' key");
578
                return NULL;
×
579
            }
580
            const char *name_str = PyUnicode_AsUTF8(name_obj);
2!
581
            if (!name_str) return NULL;
2!
582
            view.name = name_str;
2!
583

584
            PyObject *query_obj = PyDict_GetItemString(item, "query");
2!
585
            if (query_obj && query_obj != Py_None) {
2!
586
                const char *query_str = PyUnicode_AsUTF8(query_obj);
2!
587
                if (!query_str) return NULL;
2!
588
                auto parsed = Query::from_string(query_str);
2!
589
                if (!parsed) {
2!
590
                    PyErr_Format(PyExc_ValueError,
×
591
                                 "Invalid query for view '%s': %s", name_str,
592
                                 parsed.error().format().c_str());
×
593
                    return NULL;
×
594
                }
595
                view.query = std::move(*parsed);
2!
596
            }
2!
597
            views.push_back(std::move(view));
2!
598
        }
2!
599
    }
600

601
    // Parse compression
602
    IpcCompression compression = IpcCompression::ZSTD;
4✔
603
    if (compression_str) {
4!
604
        std::string comp_lower(compression_str);
4!
605
        for (auto &c : comp_lower) c = std::tolower(c);
20✔
606
        if (comp_lower == "none") {
4✔
607
            compression = IpcCompression::NONE;
1✔
608
        } else if (comp_lower == "zstd") {
3!
609
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
610
            compression = IpcCompression::ZSTD;
3✔
611
#else
612
            PyErr_SetString(PyExc_ValueError, "ZSTD compression not available");
613
            return NULL;
614
#endif
615
        } else {
616
            PyErr_Format(PyExc_ValueError,
×
617
                         "Unknown compression: %s (use 'none' or 'zstd')",
618
                         compression_str);
619
            return NULL;
×
620
        }
621
    }
4!
622

623
    int64_t chunk_size_bytes =
4✔
624
        static_cast<int64_t>(chunk_size_mb) * 1024 * 1024;
4✔
625

626
    // Parse group_keys
627
    std::vector<std::string> group_keys;
4✔
628
    if (group_keys_obj && group_keys_obj != Py_None) {
4!
629
        if (!PyList_Check(group_keys_obj)) {
×
630
            PyErr_SetString(PyExc_TypeError,
×
631
                            "group_keys must be a list of str");
632
            return NULL;
×
633
        }
634
        Py_ssize_t n = PyList_Size(group_keys_obj);
×
635
        for (Py_ssize_t i = 0; i < n; i++) {
×
636
            const char *s = PyUnicode_AsUTF8(PyList_GetItem(group_keys_obj, i));
×
637
            if (!s) return NULL;
×
638
            group_keys.emplace_back(s);
×
639
        }
640
    }
641

642
    // Parse custom_metric_fields
643
    std::vector<std::string> custom_metrics;
4✔
644
    if (custom_metrics_obj && custom_metrics_obj != Py_None) {
4!
645
        if (!PyList_Check(custom_metrics_obj)) {
×
646
            PyErr_SetString(PyExc_TypeError,
×
647
                            "custom_metric_fields must be a list of str");
648
            return NULL;
×
649
        }
650
        Py_ssize_t n = PyList_Size(custom_metrics_obj);
×
651
        for (Py_ssize_t i = 0; i < n; i++) {
×
652
            const char *s =
653
                PyUnicode_AsUTF8(PyList_GetItem(custom_metrics_obj, i));
×
654
            if (!s) return NULL;
×
655
            custom_metrics.emplace_back(s);
×
656
        }
657
    }
658

659
    AggregatorInput input;
4!
660
    input.directory = directory;
4!
661
    input.config.time_interval_us =
4✔
662
        static_cast<std::uint64_t>(time_interval_ms * 1000.0);
4✔
663
    input.config.extra_group_keys = std::move(group_keys);
4✔
664
    input.config.custom_metric_fields = std::move(custom_metrics);
4✔
665
    input.config.compute_percentiles = compute_percentiles != 0;
4✔
666
    input.index_dir = index_dir;
4!
667
    input.checkpoint_size = static_cast<std::size_t>(checkpoint_size);
4✔
668
    input.force_rebuild = force_rebuild != 0;
4✔
669
    input.parallelism = static_cast<std::size_t>(parallelism);
4✔
670
    input.event_batch_size = static_cast<std::size_t>(event_batch_size);
4✔
671

672
    std::string output_path_str(output_path);
4!
673
    AggregatorWriteArrowResult result;
4✔
674
    auto *rp = &result;
4✔
675
    std::string error_msg;
4✔
676

677
    Py_BEGIN_ALLOW_THREADS try {
4!
678
        Runtime *rt = get_runtime(self);
4!
679
        rt->submit(
16!
680
              run_coro_scope(rt->executor(), run_aggregator_write_arrow, rp,
12!
681
                             std::move(input), output_path_str,
4✔
682
                             std::move(views), chunk_size_bytes, compression),
4✔
683
              "aggregator_write_arrow")
684
            .get();
4!
UNCOV
685
    } catch (const std::exception &e) {
×
686
        error_msg = e.what();
×
687
    }
×
688
    Py_END_ALLOW_THREADS
4!
689

690
        if (!error_msg.empty()) {
4!
691
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
692
        return NULL;
×
693
    }
694

695
    if (!result.error.empty()) {
4!
696
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
697
        return NULL;
×
698
    }
699

700
    // Build result dict
701
    PyObject *dict = PyDict_New();
4!
702
    if (!dict) return NULL;
4!
703

704
    PyObject *views_dict = PyDict_New();
4!
705
    if (!views_dict) {
4!
706
        Py_DECREF(dict);
707
        return NULL;
×
708
    }
709

710
    for (const auto &[view_name, view_stats] : result.view_stats) {
9✔
711
        PyObject *view_dict = PyDict_New();
5!
712
        if (!view_dict) {
5!
713
            Py_DECREF(views_dict);
714
            Py_DECREF(dict);
715
            return NULL;
×
716
        }
717

718
        PyObject *files_list = PyList_New(0);
5!
719
        if (!files_list) {
5!
720
            Py_DECREF(view_dict);
721
            Py_DECREF(views_dict);
722
            Py_DECREF(dict);
723
            return NULL;
×
724
        }
725

726
        for (const auto &f : view_stats.files) {
10✔
727
            PyObject *file_str = PyUnicode_FromString(f.c_str());
5!
728
            if (!file_str || PyList_Append(files_list, file_str) < 0) {
5!
729
                Py_XDECREF(file_str);
×
730
                Py_DECREF(files_list);
731
                Py_DECREF(view_dict);
732
                Py_DECREF(views_dict);
733
                Py_DECREF(dict);
734
                return NULL;
×
735
            }
736
            Py_DECREF(file_str);
737
        }
738

739
        PyDict_SetItemString(view_dict, "files", files_list);
5!
740
        PyDict_SetItemString(view_dict, "rows",
5!
741
                             PyLong_FromLongLong(view_stats.total_rows));
5!
742
        PyDict_SetItemString(
5!
743
            view_dict, "bytes",
744
            PyLong_FromLongLong(view_stats.total_uncompressed_bytes));
5!
745
        Py_DECREF(files_list);
746

747
        PyObject *key = PyUnicode_FromString(view_name.c_str());
5!
748
        PyDict_SetItem(views_dict, key, view_dict);
5!
749
        Py_DECREF(key);
750
        Py_DECREF(view_dict);
751
    }
752

753
    PyDict_SetItemString(dict, "views", views_dict);
4!
754
    PyDict_SetItemString(dict, "total_rows",
4!
755
                         PyLong_FromLongLong(result.total_rows));
4!
756
    PyDict_SetItemString(dict, "total_bytes",
4!
757
                         PyLong_FromLongLong(result.total_bytes));
4!
758
    Py_DECREF(views_dict);
759

760
    return dict;
4✔
761
}
4✔
762

763
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
764

765
static PyObject *Aggregator_call(PyObject *self, PyObject *args,
1✔
766
                                 PyObject *kwds) {
767
    return Aggregator_process((AggregatorObject *)self, args, kwds);
1✔
768
}
769

770
static PyMethodDef Aggregator_methods[] = {
771
    {"process", (PyCFunction)Aggregator_process, METH_VARARGS | METH_KEYWORDS,
772
     "process(directory, time_interval_ms=5000.0, group_keys=None,\n"
773
     "        categories=None, names=None, index_dir='',\n"
774
     "        checkpoint_size=33554432, force_rebuild=False,\n"
775
     "        parallelism=0, event_batch_size=10000,\n"
776
     "        custom_metric_fields=None, compute_percentiles=False)\n"
777
     "--\n"
778
     "\n"
779
     "Run aggregation pipeline, return materialized ArrowTable.\n"
780
     "\n"
781
     "Uses parallel, RocksDB-backed, fused indexing and aggregation.\n"
782
     "\n"
783
     "Args:\n"
784
     "    directory (str): Directory containing .pfw/.pfw.gz files.\n"
785
     "    time_interval_ms (float): Time bucket in milliseconds (default "
786
     "5000).\n"
787
     "    group_keys (list[str] or None): Extra grouping dims (default None).\n"
788
     "    categories (list[str] or None): Category filter (default None).\n"
789
     "    names (list[str] or None): Name filter (default None).\n"
790
     "    index_dir (str): Directory for .dftindex stores (default '').\n"
791
     "    checkpoint_size (int): Checkpoint size (default 33554432).\n"
792
     "    force_rebuild (bool): Force index rebuild (default False).\n"
793
     "    parallelism (int): Number of parallel workers (0 = all cores).\n"
794
     "    event_batch_size (int): Entries per batch (default 10000).\n"
795
     "    custom_metric_fields (list[str] or None): Extra numeric args\n"
796
     "        fields to aggregate into ``*_total``/``*_min``/``*_max``/\n"
797
     "        ``*_mean``/``*_std`` columns (default None).\n"
798
     "    compute_percentiles (bool): Enable percentile sketch collection\n"
799
     "        during aggregation (default False).\n"
800
     "\n"
801
     "Returns:\n"
802
     "    ArrowTable: Aggregated results.\n"},
803
    {"iter_arrow", (PyCFunction)Aggregator_iter_arrow,
804
     METH_VARARGS | METH_KEYWORDS,
805
     "iter_arrow(directory, time_interval_ms=5000.0, group_keys=None,\n"
806
     "           categories=None, names=None, index_dir='',\n"
807
     "           checkpoint_size=33554432, force_rebuild=False,\n"
808
     "           parallelism=0, event_batch_size=10000,\n"
809
     "           custom_metric_fields=None, compute_percentiles=False,\n"
810
     "           buffer_size=8)\n"
811
     "--\n"
812
     "\n"
813
     "Run aggregation pipeline, stream Arrow batches.\n"
814
     "\n"
815
     "Returns immediately with a streaming iterator. Batches are produced\n"
816
     "in the background with a bounded buffer. GIL is released while waiting\n"
817
     "for the next batch, allowing other Python threads to run.\n"
818
     "\n"
819
     "Uses parallel, RocksDB-backed, fused indexing and aggregation.\n"
820
     "\n"
821
     "Args:\n"
822
     "    directory (str): Directory containing .pfw/.pfw.gz files.\n"
823
     "    time_interval_ms (float): Time bucket in milliseconds (default "
824
     "5000).\n"
825
     "    group_keys (list[str] or None): Extra grouping dims (default None).\n"
826
     "    categories (list[str] or None): Category filter (default None).\n"
827
     "    names (list[str] or None): Name filter (default None).\n"
828
     "    index_dir (str): Directory for .dftindex stores (default '').\n"
829
     "    checkpoint_size (int): Checkpoint size (default 33554432).\n"
830
     "    force_rebuild (bool): Force index rebuild (default False).\n"
831
     "    parallelism (int): Number of parallel workers (0 = all cores).\n"
832
     "    event_batch_size (int): Entries per batch (default 10000).\n"
833
     "    custom_metric_fields (list[str] or None): Extra numeric args\n"
834
     "        fields to aggregate into ``*_total``/``*_min``/``*_max``/\n"
835
     "        ``*_mean``/``*_std`` columns (default None).\n"
836
     "    compute_percentiles (bool): Enable percentile sketch collection\n"
837
     "        during aggregation (default False).\n"
838
     "    buffer_size (int): Max batches to buffer (default 8).\n"
839
     "\n"
840
     "Returns:\n"
841
     "    _ArrowStreamingIterator: Streaming iterator yielding Arrow record\n"
842
     "        batches. Supports cancel() to stop early.\n"},
843
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
844
    {"write_arrow", (PyCFunction)Aggregator_write_arrow,
845
     METH_VARARGS | METH_KEYWORDS,
846
     "write_arrow(directory, path, time_interval_ms=5000.0, ..., views=None)\n"
847
     "--\n"
848
     "\n"
849
     "Run aggregation and write results to Arrow IPC files with optional "
850
     "views.\n"
851
     "\n"
852
     "Views allow filtering aggregated entries before writing. Each view\n"
853
     "writes to a separate subdirectory. Query syntax supports: cat, name,\n"
854
     "pid, tid, hhash, fhash, time_bucket, extra group keys, and aggregation\n"
855
     "metrics (count, dur_total, dur_min, dur_max, size_total, etc.).\n"
856
     "\n"
857
     "Args:\n"
858
     "    directory (str): Directory containing .pfw/.pfw.gz files.\n"
859
     "    path (str): Output directory for Arrow files.\n"
860
     "    time_interval_ms (float): Time bucket in milliseconds.\n"
861
     "    group_keys (list[str] or None): Extra grouping dims.\n"
862
     "    categories (list[str] or None): Category filter.\n"
863
     "    names (list[str] or None): Name filter.\n"
864
     "    index_dir (str): Directory for .dftindex stores.\n"
865
     "    checkpoint_size (int): Checkpoint size.\n"
866
     "    force_rebuild (bool): Force index rebuild.\n"
867
     "    parallelism (int): Number of parallel workers.\n"
868
     "    event_batch_size (int): Entries per batch.\n"
869
     "    custom_metric_fields (list[str] or None): Extra numeric fields.\n"
870
     "    compute_percentiles (bool): Enable percentile collection.\n"
871
     "    views (list[dict] or None): View definitions, each with 'name' and\n"
872
     "        optional 'query' keys. If None, writes all entries to path.\n"
873
     "        Example: [{'name': 'io', 'query': 'cat == \"POSIX\"'}]\n"
874
     "    chunk_size_mb (int): Max uncompressed MB per file (default 32).\n"
875
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
876
     "\n"
877
     "Returns:\n"
878
     "    dict: Statistics with 'views' (per-view stats), 'total_rows',\n"
879
     "        'total_bytes'. Each view has 'files', 'rows', 'bytes'.\n"},
880
#endif
881
    {NULL}};
882

883
PyTypeObject AggregatorType = {
884
    PyVarObject_HEAD_INIT(
885
        NULL, 0) "dftracer_utils_ext.AggregatorUtility", /* tp_name */
886
    sizeof(AggregatorObject),                            /* tp_basicsize */
887
    0,                                                   /* tp_itemsize */
888
    (destructor)Aggregator_dealloc,                      /* tp_dealloc */
889
    0,                                        /* tp_vectorcall_offset */
890
    0,                                        /* tp_getattr */
891
    0,                                        /* tp_setattr */
892
    0,                                        /* tp_as_async */
893
    0,                                        /* tp_repr */
894
    0,                                        /* tp_as_number */
895
    0,                                        /* tp_as_sequence */
896
    0,                                        /* tp_as_mapping */
897
    0,                                        /* tp_hash */
898
    Aggregator_call,                          /* tp_call */
899
    0,                                        /* tp_str */
900
    0,                                        /* tp_getattro */
901
    0,                                        /* tp_setattro */
902
    0,                                        /* tp_as_buffer */
903
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
904
    "AggregatorUtility(runtime: Runtime | None = None)\n"
905
    "--\n\n"
906
    "High-level aggregation pipeline for DFTracer trace files.\n\n"
907
    "Args:\n"
908
    "    runtime (Runtime or None): Runtime for thread pool control.\n"
909
    "        If None, uses the default global Runtime.\n\n"
910
    "process(directory, time_interval_ms=5000.0, ...) -> ArrowTable\n"
911
    "    Run aggregation and return a materialized Arrow table.\n\n"
912
    "iter_arrow(directory, time_interval_ms=5000.0, ...) -> "
913
    "Iterator[ArrowBatch]\n"
914
    "    Run aggregation and stream Arrow batches.\n", /* tp_doc */
915
    0,                                                 /* tp_traverse */
916
    0,                                                 /* tp_clear */
917
    0,                                                 /* tp_richcompare */
918
    0,                                                 /* tp_weaklistoffset */
919
    0,                                                 /* tp_iter */
920
    0,                                                 /* tp_iternext */
921
    Aggregator_methods,                                /* tp_methods */
922
    0,                                                 /* tp_members */
923
    0,                                                 /* tp_getset */
924
    0,                                                 /* tp_base */
925
    0,                                                 /* tp_dict */
926
    0,                                                 /* tp_descr_get */
927
    0,                                                 /* tp_descr_set */
928
    0,                                                 /* tp_dictoffset */
929
    (initproc)Aggregator_init,                         /* tp_init */
930
    0,                                                 /* tp_alloc */
931
    Aggregator_new,                                    /* tp_new */
932
};
933

934
int init_aggregator(PyObject *m) {
1✔
935
    if (PyType_Ready(&AggregatorType) < 0) return -1;
1!
936

937
    Py_INCREF(&AggregatorType);
938
    if (PyModule_AddObject(m, "AggregatorUtility",
1✔
939
                           (PyObject *)&AggregatorType) < 0) {
1!
940
        Py_DECREF(&AggregatorType);
941
        Py_DECREF(m);
942
        return -1;
×
943
    }
944

945
    return 0;
1✔
946
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc