• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 23730905027

30 Mar 2026 06:22AM UTC coverage: 51.451% (+0.4%) from 51.022%
23730905027

push

github

rayandrew
chore(docs)!: regenerate C++ API reference pages from Doxygen XML

- Add generate_api_index.py script for automated API doc generation
- Rename core_common.rst to core_infrastructure.rst
- Update all API reference pages with current class/function signatures
- Add doxygen group annotations to public headers

BREAKING CHANGE: API reference page structure reorganized

23019 of 57787 branches covered (39.83%)

Branch coverage included in aggregate %.

20057 of 25936 relevant lines covered (77.33%)

13268.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.52
/src/dftracer/utils/python/utilities/aggregator.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <dftracer/utils/core/coro/task.h>
3
#include <dftracer/utils/core/runtime.h>
4
#include <dftracer/utils/python/arrow_helpers.h>
5
#include <dftracer/utils/python/runtime.h>
6
#include <dftracer/utils/python/trace_reader_iterator.h>
7
#include <dftracer/utils/python/utilities/aggregator.h>
8
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregator_utility.h>
9

10
#include <string>
11
#include <vector>
12

13
using dftracer::utils::Runtime;
14
using dftracer::utils::coro::CoroTask;
15
using namespace dftracer::utils::utilities::composites::dft::aggregators;
16

17
using dftracer::utils::python::wrap_arrow_result;
18
using dftracer::utils::python::wrap_arrow_table;
19

20
#ifdef DFTRACER_UTILS_ENABLE_ARROW
21
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
22
#endif
23

24
static Runtime *get_runtime(AggregatorObject *self) {
18✔
25
    if (self->runtime_obj)
18!
26
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
×
27
    return get_default_runtime();
18✔
28
}
9✔
29

30
static void Aggregator_dealloc(AggregatorObject *self) {
18✔
31
    Py_XDECREF(self->runtime_obj);
18✔
32
    Py_TYPE(self)->tp_free((PyObject *)self);
18✔
33
}
18✔
34

35
static PyObject *Aggregator_new(PyTypeObject *type, PyObject *args,
18✔
36
                                PyObject *kwds) {
37
    AggregatorObject *self = (AggregatorObject *)type->tp_alloc(type, 0);
18✔
38
    if (self) {
18✔
39
        self->runtime_obj = NULL;
18✔
40
    }
9✔
41
    return (PyObject *)self;
18✔
42
}
43

44
static int Aggregator_init(AggregatorObject *self, PyObject *args,
18✔
45
                           PyObject *kwds) {
46
    static const char *kwlist[] = {"runtime", NULL};
47
    PyObject *runtime_arg = NULL;
18✔
48

49
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", (char **)kwlist,
18!
50
                                     &runtime_arg)) {
51
        return -1;
×
52
    }
53

54
    if (runtime_arg && runtime_arg != Py_None) {
18!
55
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
56
            Py_INCREF(runtime_arg);
×
57
            self->runtime_obj = runtime_arg;
×
58
        } else {
59
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
60
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
×
61
                self->runtime_obj = native;
×
62
            } else {
63
                Py_XDECREF(native);
×
64
                PyErr_SetString(PyExc_TypeError,
×
65
                                "runtime must be a Runtime instance or None");
66
                return -1;
×
67
            }
68
        }
69
    }
70

71
    return 0;
18✔
72
}
9✔
73

74
// ---------------------------------------------------------------------------
75
// Helpers
76
// ---------------------------------------------------------------------------
77

78
static int parse_aggregator_args(PyObject *args, PyObject *kwds,
18✔
79
                                 AggregatorInput &input) {
80
    static const char *kwlist[] = {"directory",
81
                                   "time_interval_ms",
82
                                   "group_keys",
83
                                   "categories",
84
                                   "names",
85
                                   "index_dir",
86
                                   "checkpoint_size",
87
                                   "force_rebuild",
88
                                   "chunk_size_mb",
89
                                   "batch_size_mb",
90
                                   "event_batch_size",
91
                                   NULL};
92

93
    const char *directory = NULL;
18✔
94
    double time_interval_ms = 5000.0;
18✔
95
    PyObject *group_keys_obj = Py_None;
18✔
96
    PyObject *categories_obj = Py_None;
18✔
97
    PyObject *names_obj = Py_None;
18✔
98
    const char *index_dir = "";
18✔
99
    Py_ssize_t checkpoint_size = 32 * 1024 * 1024;
18✔
100
    int force_rebuild = 0;
18✔
101
    Py_ssize_t chunk_size_mb = 64;
18✔
102
    Py_ssize_t batch_size_mb = 4;
18✔
103
    Py_ssize_t event_batch_size = 10000;
18✔
104

105
    if (!PyArg_ParseTupleAndKeywords(
18!
106
            args, kwds, "s|dOOOsnpnnn", (char **)kwlist, &directory,
9✔
107
            &time_interval_ms, &group_keys_obj, &categories_obj, &names_obj,
108
            &index_dir, &checkpoint_size, &force_rebuild, &chunk_size_mb,
109
            &batch_size_mb, &event_batch_size))
110
        return -1;
×
111

112
    input.directory = directory;
18!
113
    input.config.time_interval_us =
18✔
114
        static_cast<std::uint64_t>(time_interval_ms * 1000.0);
18✔
115
    input.index_dir = index_dir;
18!
116
    input.checkpoint_size = static_cast<std::size_t>(checkpoint_size);
18✔
117
    input.force_rebuild = force_rebuild != 0;
18✔
118
    input.chunk_size_mb = static_cast<std::size_t>(chunk_size_mb);
18✔
119
    input.batch_size_mb = static_cast<std::size_t>(batch_size_mb);
18✔
120
    input.event_batch_size = static_cast<std::size_t>(event_batch_size);
18✔
121

122
    if (group_keys_obj && group_keys_obj != Py_None) {
18!
123
        if (!PyList_Check(group_keys_obj)) {
×
124
            PyErr_SetString(PyExc_TypeError,
×
125
                            "group_keys must be a list of str");
126
            return -1;
×
127
        }
128
        Py_ssize_t n = PyList_Size(group_keys_obj);
×
129
        for (Py_ssize_t i = 0; i < n; i++) {
×
130
            const char *s = PyUnicode_AsUTF8(PyList_GetItem(group_keys_obj, i));
×
131
            if (!s) return -1;
×
132
            input.config.extra_group_keys.emplace_back(s);
×
133
        }
134
    }
135

136
    // categories and names filtering is now handled via the query DSL.
137
    // The categories_obj and names_obj parameters are accepted but ignored
138
    // for backward compatibility. Use the query parameter instead.
139

140
    return 0;
18✔
141
}
9✔
142

143
static int run_aggregator_pipeline(AggregatorObject *self,
18✔
144
                                   const AggregatorInput &input,
145
                                   std::vector<AggregationBatch> &batches,
146
                                   std::string &error_msg) {
147
    auto *bp = &batches;
18✔
148
    AggregatorInput input_copy = input;
18!
149

150
    Py_BEGIN_ALLOW_THREADS try {
18!
151
        Runtime *rt = get_runtime(self);
18!
152
        auto task = [bp, input_copy]() -> CoroTask<void> {
135!
153
            AggregatorUtility util;
9!
154
            auto gen = util.process(input_copy);
9!
155
            while (auto batch = co_await gen.next()) {
72!
156
                bp->push_back(std::move(*batch));
9!
157
            }
18✔
158
        };
144!
159
        rt->submit(task(), "aggregator").get();
18!
160
    } catch (const std::exception &e) {
18!
161
        error_msg = e.what();
×
162
    }
×
163
    Py_END_ALLOW_THREADS
18!
164

165
        return error_msg.empty()
18✔
166
        ? 0
9!
167
        : -1;
9✔
168
}
18✔
169

170
#ifdef DFTRACER_UTILS_ENABLE_ARROW
171

172
#endif  // DFTRACER_UTILS_ENABLE_ARROW
173

174
// ---------------------------------------------------------------------------
175
// process() — returns ArrowTable (materialized)
176
// ---------------------------------------------------------------------------
177

178
static PyObject *Aggregator_process(AggregatorObject *self, PyObject *args,
16✔
179
                                    PyObject *kwds) {
180
    AggregatorInput input;
16!
181
    if (parse_aggregator_args(args, kwds, input) < 0) return NULL;
16!
182

183
    std::vector<AggregationBatch> batches;
16✔
184
    std::string error_msg;
16✔
185
    if (run_aggregator_pipeline(self, input, batches, error_msg) < 0) {
16!
186
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
187
        return NULL;
×
188
    }
189

190
#ifdef DFTRACER_UTILS_ENABLE_ARROW
191
    PyObject *batch_list = PyList_New(0);
16!
192
    if (!batch_list) return NULL;
16✔
193

194
    for (const auto &batch : batches) {
32✔
195
        if (batch.entries.empty()) continue;
16✔
196

197
        auto arrow_result = batch.to_arrow();
14!
198
        if (!arrow_result.valid()) continue;
14✔
199

200
        PyObject *cap = wrap_arrow_result(std::move(arrow_result));
14!
201
        if (!cap) {
14!
202
            Py_DECREF(batch_list);
×
203
            return NULL;
×
204
        }
205
        int rc = PyList_Append(batch_list, cap);
14!
206
        Py_DECREF(cap);
7!
207
        if (rc < 0) {
14!
208
            Py_DECREF(batch_list);
×
209
            return NULL;
×
210
        }
211
    }
14!
212

213
    return wrap_arrow_table(batch_list);
16!
214
#else
215
    PyErr_SetString(PyExc_RuntimeError,
216
                    "dftracer-utils was built without Arrow support");
217
    return NULL;
218
#endif
219
}
16✔
220

221
// ---------------------------------------------------------------------------
222
// iter_arrow() — returns list iterator of ArrowBatch capsules
223
// ---------------------------------------------------------------------------
224

225
static PyObject *Aggregator_iter_arrow(AggregatorObject *self, PyObject *args,
2✔
226
                                       PyObject *kwds) {
227
    AggregatorInput input;
2!
228
    if (parse_aggregator_args(args, kwds, input) < 0) return NULL;
2!
229

230
    std::vector<AggregationBatch> batches;
2✔
231
    std::string error_msg;
2✔
232
    if (run_aggregator_pipeline(self, input, batches, error_msg) < 0) {
2!
233
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
234
        return NULL;
×
235
    }
236

237
#ifdef DFTRACER_UTILS_ENABLE_ARROW
238
    PyObject *batch_list = PyList_New(0);
2!
239
    if (!batch_list) return NULL;
2✔
240

241
    for (const auto &batch : batches) {
4✔
242
        if (batch.entries.empty()) continue;
2!
243

244
        auto arrow_result = batch.to_arrow();
2!
245
        if (!arrow_result.valid()) continue;
2✔
246

247
        PyObject *cap = wrap_arrow_result(std::move(arrow_result));
2!
248
        if (!cap) {
2!
249
            Py_DECREF(batch_list);
×
250
            return NULL;
×
251
        }
252

253
        int rc = PyList_Append(batch_list, cap);
2!
254
        Py_DECREF(cap);
1!
255
        if (rc < 0) {
2!
256
            Py_DECREF(batch_list);
×
257
            return NULL;
×
258
        }
259
    }
2!
260

261
    PyObject *it = PyObject_GetIter(batch_list);
2!
262
    Py_DECREF(batch_list);
1!
263
    return it;
2✔
264
#else
265
    PyErr_SetString(PyExc_RuntimeError,
266
                    "dftracer-utils was built without Arrow support");
267
    return NULL;
268
#endif
269
}
2✔
270

271
static PyObject *Aggregator_call(PyObject *self, PyObject *args,
2✔
272
                                 PyObject *kwds) {
273
    return Aggregator_process((AggregatorObject *)self, args, kwds);
2✔
274
}
275

276
static PyMethodDef Aggregator_methods[] = {
277
    {"process", (PyCFunction)Aggregator_process, METH_VARARGS | METH_KEYWORDS,
278
     "process(directory, time_interval_ms=5000.0, group_keys=None,\n"
279
     "        categories=None, names=None, index_dir='',\n"
280
     "        checkpoint_size=33554432, force_rebuild=False,\n"
281
     "        chunk_size_mb=64, batch_size_mb=4, event_batch_size=10000)\n"
282
     "--\n"
283
     "\n"
284
     "Run aggregation pipeline, return materialized ArrowTable.\n"
285
     "\n"
286
     "Args:\n"
287
     "    directory (str): Directory containing .pfw/.pfw.gz files.\n"
288
     "    time_interval_ms (float): Time bucket in milliseconds (default "
289
     "5000).\n"
290
     "    group_keys (list[str] or None): Extra grouping dims (default None).\n"
291
     "    categories (list[str] or None): Category filter (default None).\n"
292
     "    names (list[str] or None): Name filter (default None).\n"
293
     "    index_dir (str): Index sidecar directory (default '').\n"
294
     "    checkpoint_size (int): Checkpoint size (default 33554432).\n"
295
     "    force_rebuild (bool): Force index rebuild (default False).\n"
296
     "    chunk_size_mb (int): Target chunk size in MB (default 64).\n"
297
     "    batch_size_mb (int): Batch read size in MB (default 4).\n"
298
     "    event_batch_size (int): Entries per batch (default 10000).\n"
299
     "\n"
300
     "Returns:\n"
301
     "    ArrowTable: Aggregated results.\n"},
302
    {"iter_arrow", (PyCFunction)Aggregator_iter_arrow,
303
     METH_VARARGS | METH_KEYWORDS,
304
     "iter_arrow(directory, time_interval_ms=5000.0, group_keys=None,\n"
305
     "           categories=None, names=None, index_dir='',\n"
306
     "           checkpoint_size=33554432, force_rebuild=False,\n"
307
     "           chunk_size_mb=64, batch_size_mb=4, event_batch_size=10000)\n"
308
     "--\n"
309
     "\n"
310
     "Run aggregation pipeline, stream Arrow batches.\n"
311
     "\n"
312
     "Args:\n"
313
     "    directory (str): Directory containing .pfw/.pfw.gz files.\n"
314
     "    time_interval_ms (float): Time bucket in milliseconds (default "
315
     "5000).\n"
316
     "    group_keys (list[str] or None): Extra grouping dims (default None).\n"
317
     "    categories (list[str] or None): Category filter (default None).\n"
318
     "    names (list[str] or None): Name filter (default None).\n"
319
     "    index_dir (str): Index sidecar directory (default '').\n"
320
     "    checkpoint_size (int): Checkpoint size (default 33554432).\n"
321
     "    force_rebuild (bool): Force index rebuild (default False).\n"
322
     "    chunk_size_mb (int): Target chunk size in MB (default 64).\n"
323
     "    batch_size_mb (int): Batch read size in MB (default 4).\n"
324
     "    event_batch_size (int): Entries per batch (default 10000).\n"
325
     "\n"
326
     "Returns:\n"
327
     "    Iterator[ArrowBatch]: Arrow record batches.\n"},
328
    {NULL}};
329

330
PyTypeObject AggregatorType = {
331
    PyVarObject_HEAD_INIT(
332
        NULL, 0) "dftracer_utils_ext.AggregatorUtility", /* tp_name */
333
    sizeof(AggregatorObject),                            /* tp_basicsize */
334
    0,                                                   /* tp_itemsize */
335
    (destructor)Aggregator_dealloc,                      /* tp_dealloc */
336
    0,                                        /* tp_vectorcall_offset */
337
    0,                                        /* tp_getattr */
338
    0,                                        /* tp_setattr */
339
    0,                                        /* tp_as_async */
340
    0,                                        /* tp_repr */
341
    0,                                        /* tp_as_number */
342
    0,                                        /* tp_as_sequence */
343
    0,                                        /* tp_as_mapping */
344
    0,                                        /* tp_hash */
345
    Aggregator_call,                          /* tp_call */
346
    0,                                        /* tp_str */
347
    0,                                        /* tp_getattro */
348
    0,                                        /* tp_setattro */
349
    0,                                        /* tp_as_buffer */
350
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
351
    "AggregatorUtility(runtime: Runtime | None = None)\n"
352
    "--\n\n"
353
    "High-level aggregation pipeline for DFTracer trace files.\n\n"
354
    "Args:\n"
355
    "    runtime (Runtime or None): Runtime for thread pool control.\n"
356
    "        If None, uses the default global Runtime.\n\n"
357
    "process(directory, time_interval_ms=5000.0, ...) -> ArrowTable\n"
358
    "    Run aggregation and return a materialized Arrow table.\n\n"
359
    "iter_arrow(directory, time_interval_ms=5000.0, ...) -> "
360
    "Iterator[ArrowBatch]\n"
361
    "    Run aggregation and stream Arrow batches.\n", /* tp_doc */
362
    0,                                                 /* tp_traverse */
363
    0,                                                 /* tp_clear */
364
    0,                                                 /* tp_richcompare */
365
    0,                                                 /* tp_weaklistoffset */
366
    0,                                                 /* tp_iter */
367
    0,                                                 /* tp_iternext */
368
    Aggregator_methods,                                /* tp_methods */
369
    0,                                                 /* tp_members */
370
    0,                                                 /* tp_getset */
371
    0,                                                 /* tp_base */
372
    0,                                                 /* tp_dict */
373
    0,                                                 /* tp_descr_get */
374
    0,                                                 /* tp_descr_set */
375
    0,                                                 /* tp_dictoffset */
376
    (initproc)Aggregator_init,                         /* tp_init */
377
    0,                                                 /* tp_alloc */
378
    Aggregator_new,                                    /* tp_new */
379
};
380

381
int init_aggregator(PyObject *m) {
2✔
382
    if (PyType_Ready(&AggregatorType) < 0) return -1;
2✔
383

384
    Py_INCREF(&AggregatorType);
1✔
385
    if (PyModule_AddObject(m, "AggregatorUtility",
3!
386
                           (PyObject *)&AggregatorType) < 0) {
2!
387
        Py_DECREF(&AggregatorType);
388
        Py_DECREF(m);
389
        return -1;
×
390
    }
391

392
    return 0;
2✔
393
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc