• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 23934877458

03 Apr 2026 05:15AM UTC coverage: 51.182% (-0.3%) from 51.498%
23934877458

Pull #63

github

web-flow
Merge 946c3e0ba into 773a62661
Pull Request #63: feat(aggregator): support profile/system counter aggregation and custom metric Arrow output

23378 of 58949 branches covered (39.66%)

Branch coverage included in aggregate %.

328 of 568 new or added lines in 7 files covered. (57.75%)

73 existing lines in 6 files now uncovered.

20276 of 26343 relevant lines covered (76.97%)

13072.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.19
/src/dftracer/utils/python/utilities/aggregator.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <dftracer/utils/core/coro/task.h>
3
#include <dftracer/utils/core/runtime.h>
4
#include <dftracer/utils/python/arrow_helpers.h>
5
#include <dftracer/utils/python/runtime.h>
6
#include <dftracer/utils/python/trace_reader_iterator.h>
7
#include <dftracer/utils/python/utilities/aggregator.h>
8
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregator_utility.h>
9

10
#include <string>
11
#include <vector>
12

13
using dftracer::utils::Runtime;
14
using dftracer::utils::coro::CoroTask;
15
using namespace dftracer::utils::utilities::composites::dft::aggregators;
16

17
using dftracer::utils::python::wrap_arrow_result;
18
using dftracer::utils::python::wrap_arrow_table;
19

20
#ifdef DFTRACER_UTILS_ENABLE_ARROW
21
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
22
#endif
23

24
static Runtime *get_runtime(AggregatorObject *self) {
22✔
25
    if (self->runtime_obj)
22!
26
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
×
27
    return get_default_runtime();
22✔
28
}
11✔
29

30
static void Aggregator_dealloc(AggregatorObject *self) {
22✔
31
    Py_XDECREF(self->runtime_obj);
22✔
32
    Py_TYPE(self)->tp_free((PyObject *)self);
22✔
33
}
22✔
34

35
static PyObject *Aggregator_new(PyTypeObject *type, PyObject *args,
22✔
36
                                PyObject *kwds) {
37
    AggregatorObject *self = (AggregatorObject *)type->tp_alloc(type, 0);
22✔
38
    if (self) {
22✔
39
        self->runtime_obj = NULL;
22✔
40
    }
11✔
41
    return (PyObject *)self;
22✔
42
}
43

44
static int Aggregator_init(AggregatorObject *self, PyObject *args,
22✔
45
                           PyObject *kwds) {
46
    static const char *kwlist[] = {"runtime", NULL};
47
    PyObject *runtime_arg = NULL;
22✔
48

49
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", (char **)kwlist,
22!
50
                                     &runtime_arg)) {
51
        return -1;
×
52
    }
53

54
    if (runtime_arg && runtime_arg != Py_None) {
22!
55
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
56
            Py_INCREF(runtime_arg);
×
57
            self->runtime_obj = runtime_arg;
×
58
        } else {
59
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
60
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
×
61
                self->runtime_obj = native;
×
62
            } else {
63
                Py_XDECREF(native);
×
64
                PyErr_SetString(PyExc_TypeError,
×
65
                                "runtime must be a Runtime instance or None");
66
                return -1;
×
67
            }
68
        }
69
    }
70

71
    return 0;
22✔
72
}
11✔
73

74
// ---------------------------------------------------------------------------
75
// Helpers
76
// ---------------------------------------------------------------------------
77

78
static int parse_str_list(PyObject *obj, std::vector<std::string> &out,
44✔
79
                          const char *param_name) {
80
    if (!obj || obj == Py_None) return 0;
44!
81
    if (!PyList_Check(obj)) {
4✔
NEW
82
        PyErr_Format(PyExc_TypeError, "%s must be a list of str", param_name);
×
NEW
83
        return -1;
×
84
    }
85
    Py_ssize_t n = PyList_Size(obj);
4✔
86
    for (Py_ssize_t i = 0; i < n; i++) {
8✔
87
        const char *s = PyUnicode_AsUTF8(PyList_GetItem(obj, i));
4!
88
        if (!s) return -1;
4!
89
        out.emplace_back(s);
4!
90
    }
2✔
91
    return 0;
4✔
92
}
22✔
93

94
static int parse_aggregator_args(PyObject *args, PyObject *kwds,
22✔
95
                                 AggregatorInput &input) {
96
    static const char *kwlist[] = {"directory",
97
                                   "time_interval_ms",
98
                                   "group_keys",
99
                                   "categories",
100
                                   "names",
101
                                   "index_dir",
102
                                   "checkpoint_size",
103
                                   "force_rebuild",
104
                                   "chunk_size_mb",
105
                                   "batch_size_mb",
106
                                   "event_batch_size",
107
                                   "custom_metric_fields",
108
                                   "compute_percentiles",
109
                                   NULL};
110

111
    const char *directory = NULL;
22✔
112
    double time_interval_ms = 5000.0;
22✔
113
    PyObject *group_keys_obj = Py_None;
22✔
114
    PyObject *categories_obj = Py_None;
22✔
115
    PyObject *names_obj = Py_None;
22✔
116
    const char *index_dir = "";
22✔
117
    Py_ssize_t checkpoint_size = 32 * 1024 * 1024;
22✔
118
    int force_rebuild = 0;
22✔
119
    Py_ssize_t chunk_size_mb = 64;
22✔
120
    Py_ssize_t batch_size_mb = 4;
22✔
121
    Py_ssize_t event_batch_size = 10000;
22✔
122
    PyObject *custom_metrics_obj = Py_None;
22✔
123
    int compute_percentiles = 0;
22✔
124

125
    if (!PyArg_ParseTupleAndKeywords(
22!
126
            args, kwds, "s|dOOOsnpnnnOp", (char **)kwlist, &directory,
11✔
127
            &time_interval_ms, &group_keys_obj, &categories_obj, &names_obj,
128
            &index_dir, &checkpoint_size, &force_rebuild, &chunk_size_mb,
129
            &batch_size_mb, &event_batch_size, &custom_metrics_obj,
130
            &compute_percentiles))
UNCOV
131
        return -1;
×
132

133
    input.directory = directory;
22!
134
    input.config.time_interval_us =
22✔
135
        static_cast<std::uint64_t>(time_interval_ms * 1000.0);
22✔
136
    input.index_dir = index_dir;
22!
137
    input.checkpoint_size = static_cast<std::size_t>(checkpoint_size);
22✔
138
    input.force_rebuild = force_rebuild != 0;
22✔
139
    input.chunk_size_mb = static_cast<std::size_t>(chunk_size_mb);
22✔
140
    input.batch_size_mb = static_cast<std::size_t>(batch_size_mb);
22✔
141
    input.event_batch_size = static_cast<std::size_t>(event_batch_size);
22✔
142
    input.config.compute_percentiles = compute_percentiles != 0;
22✔
143

144
    if (parse_str_list(group_keys_obj, input.config.extra_group_keys,
33!
145
                       "group_keys") < 0)
22!
NEW
146
        return -1;
×
147
    if (parse_str_list(custom_metrics_obj, input.config.custom_metric_fields,
33!
148
                       "custom_metric_fields") < 0)
22!
NEW
149
        return -1;
×
150

151
    return 0;
22✔
152
}
11✔
153

154
static int run_aggregator_pipeline(AggregatorObject *self,
22✔
155
                                   const AggregatorInput &input,
156
                                   std::vector<AggregationBatch> &batches,
157
                                   std::string &error_msg) {
158
    auto *bp = &batches;
22✔
159
    AggregatorInput input_copy = input;
22!
160

161
    Py_BEGIN_ALLOW_THREADS try {
22!
162
        Runtime *rt = get_runtime(self);
22!
163
        auto task = [bp, input_copy]() -> CoroTask<void> {
181!
164
            AggregatorUtility util;
11!
165
            auto gen = util.process(input_copy);
11!
166
            while (auto batch = co_await gen.next()) {
104!
167
                bp->push_back(std::move(*batch));
15!
168
            }
26✔
169
        };
200!
170
        rt->submit(task(), "aggregator").get();
22!
171
    } catch (const std::exception &e) {
22!
172
        error_msg = e.what();
×
173
    }
×
174
    Py_END_ALLOW_THREADS
22!
175

176
        return error_msg.empty()
22✔
177
        ? 0
11!
178
        : -1;
11✔
179
}
22✔
180

181
#ifdef DFTRACER_UTILS_ENABLE_ARROW
182

183
#endif  // DFTRACER_UTILS_ENABLE_ARROW
184

185
// ---------------------------------------------------------------------------
186
// process() — returns ArrowTable (materialized)
187
// ---------------------------------------------------------------------------
188

189
static PyObject *Aggregator_process(AggregatorObject *self, PyObject *args,
18✔
190
                                    PyObject *kwds) {
191
    AggregatorInput input;
18!
192
    if (parse_aggregator_args(args, kwds, input) < 0) return NULL;
18!
193

194
    std::vector<AggregationBatch> batches;
18✔
195
    std::string error_msg;
18✔
196
    if (run_aggregator_pipeline(self, input, batches, error_msg) < 0) {
18!
197
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
198
        return NULL;
×
199
    }
200

201
#ifdef DFTRACER_UTILS_ENABLE_ARROW
202
    PyObject *batch_list = PyList_New(0);
18!
203
    if (!batch_list) return NULL;
18✔
204

205
    for (const auto &batch : batches) {
40✔
206
        if (batch.entries.empty()) continue;
22✔
207

208
        auto arrow_result = batch.to_arrow();
20!
209
        if (!arrow_result.valid()) continue;
20✔
210

211
        PyObject *cap = wrap_arrow_result(std::move(arrow_result));
20!
212
        if (!cap) {
20!
213
            Py_DECREF(batch_list);
×
214
            return NULL;
×
215
        }
216
        int rc = PyList_Append(batch_list, cap);
20!
217
        Py_DECREF(cap);
10!
218
        if (rc < 0) {
20!
219
            Py_DECREF(batch_list);
×
220
            return NULL;
×
221
        }
222
    }
20!
223

224
    return wrap_arrow_table(batch_list);
18!
225
#else
226
    PyErr_SetString(PyExc_RuntimeError,
227
                    "dftracer-utils was built without Arrow support");
228
    return NULL;
229
#endif
230
}
18✔
231

232
// ---------------------------------------------------------------------------
233
// iter_arrow() — returns list iterator of ArrowBatch capsules
234
// ---------------------------------------------------------------------------
235

236
static PyObject *Aggregator_iter_arrow(AggregatorObject *self, PyObject *args,
4✔
237
                                       PyObject *kwds) {
238
    AggregatorInput input;
4!
239
    if (parse_aggregator_args(args, kwds, input) < 0) return NULL;
4!
240

241
    std::vector<AggregationBatch> batches;
4✔
242
    std::string error_msg;
4✔
243
    if (run_aggregator_pipeline(self, input, batches, error_msg) < 0) {
4!
244
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
245
        return NULL;
×
246
    }
247

248
#ifdef DFTRACER_UTILS_ENABLE_ARROW
249
    PyObject *batch_list = PyList_New(0);
4!
250
    if (!batch_list) return NULL;
4✔
251

252
    for (const auto &batch : batches) {
12✔
253
        if (batch.entries.empty()) continue;
8!
254

255
        auto arrow_result = batch.to_arrow();
8!
256
        if (!arrow_result.valid()) continue;
8✔
257

258
        PyObject *cap = wrap_arrow_result(std::move(arrow_result));
8!
259
        if (!cap) {
8!
260
            Py_DECREF(batch_list);
×
261
            return NULL;
×
262
        }
263

264
        int rc = PyList_Append(batch_list, cap);
8!
265
        Py_DECREF(cap);
4!
266
        if (rc < 0) {
8!
267
            Py_DECREF(batch_list);
×
268
            return NULL;
×
269
        }
270
    }
8!
271

272
    PyObject *it = PyObject_GetIter(batch_list);
4!
273
    Py_DECREF(batch_list);
2!
274
    return it;
4✔
275
#else
276
    PyErr_SetString(PyExc_RuntimeError,
277
                    "dftracer-utils was built without Arrow support");
278
    return NULL;
279
#endif
280
}
4✔
281

282
static PyObject *Aggregator_call(PyObject *self, PyObject *args,
2✔
283
                                 PyObject *kwds) {
284
    return Aggregator_process((AggregatorObject *)self, args, kwds);
2✔
285
}
286

287
static PyMethodDef Aggregator_methods[] = {
288
    {"process", (PyCFunction)Aggregator_process, METH_VARARGS | METH_KEYWORDS,
289
     "process(directory, time_interval_ms=5000.0, group_keys=None,\n"
290
     "        categories=None, names=None, index_dir='',\n"
291
     "        checkpoint_size=33554432, force_rebuild=False,\n"
292
     "        chunk_size_mb=64, batch_size_mb=4, event_batch_size=10000,\n"
293
     "        custom_metric_fields=None, compute_percentiles=False)\n"
294
     "--\n"
295
     "\n"
296
     "Run aggregation pipeline, return materialized ArrowTable.\n"
297
     "\n"
298
     "Args:\n"
299
     "    directory (str): Directory containing .pfw/.pfw.gz files.\n"
300
     "    time_interval_ms (float): Time bucket in milliseconds (default "
301
     "5000).\n"
302
     "    group_keys (list[str] or None): Extra grouping dims (default None).\n"
303
     "    categories (list[str] or None): Category filter (default None).\n"
304
     "    names (list[str] or None): Name filter (default None).\n"
305
     "    index_dir (str): Index sidecar directory (default '').\n"
306
     "    checkpoint_size (int): Checkpoint size (default 33554432).\n"
307
     "    force_rebuild (bool): Force index rebuild (default False).\n"
308
     "    chunk_size_mb (int): Target chunk size in MB (default 64).\n"
309
     "    batch_size_mb (int): Batch read size in MB (default 4).\n"
310
     "    event_batch_size (int): Entries per batch (default 10000).\n"
311
     "    custom_metric_fields (list[str] or None): Extra numeric args\n"
312
     "        fields to aggregate into *_total/*_min/*_max/*_mean/*_std\n"
313
     "        columns (default None).\n"
314
     "    compute_percentiles (bool): Enable percentile sketch collection\n"
315
     "        during aggregation (default False).\n"
316
     "\n"
317
     "Returns:\n"
318
     "    ArrowTable: Aggregated results.\n"},
319
    {"iter_arrow", (PyCFunction)Aggregator_iter_arrow,
320
     METH_VARARGS | METH_KEYWORDS,
321
     "iter_arrow(directory, time_interval_ms=5000.0, group_keys=None,\n"
322
     "           categories=None, names=None, index_dir='',\n"
323
     "           checkpoint_size=33554432, force_rebuild=False,\n"
324
     "           chunk_size_mb=64, batch_size_mb=4, event_batch_size=10000,\n"
325
     "           custom_metric_fields=None, compute_percentiles=False)\n"
326
     "--\n"
327
     "\n"
328
     "Run aggregation pipeline, stream Arrow batches.\n"
329
     "\n"
330
     "Args:\n"
331
     "    directory (str): Directory containing .pfw/.pfw.gz files.\n"
332
     "    time_interval_ms (float): Time bucket in milliseconds (default "
333
     "5000).\n"
334
     "    group_keys (list[str] or None): Extra grouping dims (default None).\n"
335
     "    categories (list[str] or None): Category filter (default None).\n"
336
     "    names (list[str] or None): Name filter (default None).\n"
337
     "    index_dir (str): Index sidecar directory (default '').\n"
338
     "    checkpoint_size (int): Checkpoint size (default 33554432).\n"
339
     "    force_rebuild (bool): Force index rebuild (default False).\n"
340
     "    chunk_size_mb (int): Target chunk size in MB (default 64).\n"
341
     "    batch_size_mb (int): Batch read size in MB (default 4).\n"
342
     "    event_batch_size (int): Entries per batch (default 10000).\n"
343
     "    custom_metric_fields (list[str] or None): Extra numeric args\n"
344
     "        fields to aggregate into *_total/*_min/*_max/*_mean/*_std\n"
345
     "        columns (default None).\n"
346
     "    compute_percentiles (bool): Enable percentile sketch collection\n"
347
     "        during aggregation (default False).\n"
348
     "\n"
349
     "Returns:\n"
350
     "    Iterator[ArrowBatch]: Arrow record batches.\n"},
351
    {NULL}};
352

353
PyTypeObject AggregatorType = {
354
    PyVarObject_HEAD_INIT(
355
        NULL, 0) "dftracer_utils_ext.AggregatorUtility", /* tp_name */
356
    sizeof(AggregatorObject),                            /* tp_basicsize */
357
    0,                                                   /* tp_itemsize */
358
    (destructor)Aggregator_dealloc,                      /* tp_dealloc */
359
    0,                                        /* tp_vectorcall_offset */
360
    0,                                        /* tp_getattr */
361
    0,                                        /* tp_setattr */
362
    0,                                        /* tp_as_async */
363
    0,                                        /* tp_repr */
364
    0,                                        /* tp_as_number */
365
    0,                                        /* tp_as_sequence */
366
    0,                                        /* tp_as_mapping */
367
    0,                                        /* tp_hash */
368
    Aggregator_call,                          /* tp_call */
369
    0,                                        /* tp_str */
370
    0,                                        /* tp_getattro */
371
    0,                                        /* tp_setattro */
372
    0,                                        /* tp_as_buffer */
373
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
374
    "AggregatorUtility(runtime: Runtime | None = None)\n"
375
    "--\n\n"
376
    "High-level aggregation pipeline for DFTracer trace files.\n\n"
377
    "Args:\n"
378
    "    runtime (Runtime or None): Runtime for thread pool control.\n"
379
    "        If None, uses the default global Runtime.\n\n"
380
    "process(directory, time_interval_ms=5000.0, ...) -> ArrowTable\n"
381
    "    Run aggregation and return a materialized Arrow table.\n\n"
382
    "iter_arrow(directory, time_interval_ms=5000.0, ...) -> "
383
    "Iterator[ArrowBatch]\n"
384
    "    Run aggregation and stream Arrow batches.\n", /* tp_doc */
385
    0,                                                 /* tp_traverse */
386
    0,                                                 /* tp_clear */
387
    0,                                                 /* tp_richcompare */
388
    0,                                                 /* tp_weaklistoffset */
389
    0,                                                 /* tp_iter */
390
    0,                                                 /* tp_iternext */
391
    Aggregator_methods,                                /* tp_methods */
392
    0,                                                 /* tp_members */
393
    0,                                                 /* tp_getset */
394
    0,                                                 /* tp_base */
395
    0,                                                 /* tp_dict */
396
    0,                                                 /* tp_descr_get */
397
    0,                                                 /* tp_descr_set */
398
    0,                                                 /* tp_dictoffset */
399
    (initproc)Aggregator_init,                         /* tp_init */
400
    0,                                                 /* tp_alloc */
401
    Aggregator_new,                                    /* tp_new */
402
};
403

404
int init_aggregator(PyObject *m) {
2✔
405
    if (PyType_Ready(&AggregatorType) < 0) return -1;
2✔
406

407
    Py_INCREF(&AggregatorType);
1✔
408
    if (PyModule_AddObject(m, "AggregatorUtility",
3!
409
                           (PyObject *)&AggregatorType) < 0) {
2!
410
        Py_DECREF(&AggregatorType);
411
        Py_DECREF(m);
412
        return -1;
×
413
    }
414

415
    return 0;
2✔
416
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc