• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 23531027933

25 Mar 2026 08:05AM UTC coverage: 48.592% (-1.5%) from 50.098%
23531027933

Pull #57

github

web-flow
Merge d1070e289 into 38f9f3616
Pull Request #57: feat(comparator): add pairwise traces comparator

18900 of 49456 branches covered (38.22%)

Branch coverage included in aggregate %.

1604 of 1954 new or added lines in 25 files covered. (82.09%)

3407 existing lines in 135 files now uncovered.

18487 of 27485 relevant lines covered (67.26%)

240991.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.34
/src/dftracer/utils/python/utilities/aggregator.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <dftracer/utils/core/coro/task.h>
3
#include <dftracer/utils/core/runtime.h>
4
#include <dftracer/utils/python/arrow_helpers.h>
5
#include <dftracer/utils/python/runtime.h>
6
#include <dftracer/utils/python/trace_reader_iterator.h>
7
#include <dftracer/utils/python/utilities/aggregator.h>
8
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregator_utility.h>
9

10
#include <string>
11
#include <vector>
12

13
using dftracer::utils::Runtime;
14
using dftracer::utils::coro::CoroTask;
15
using namespace dftracer::utils::utilities::composites::dft::aggregators;
16

17
using dftracer::utils::python::wrap_arrow_result;
18
using dftracer::utils::python::wrap_arrow_table;
19

20
#ifdef DFTRACER_UTILS_ENABLE_ARROW
21
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
22
#endif
23

24
static Runtime *get_runtime(AggregatorObject *self) {
9✔
25
    if (self->runtime_obj)
9!
26
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
×
27
    return get_default_runtime();
9✔
28
}
9✔
29

30
static void Aggregator_dealloc(AggregatorObject *self) {
9✔
31
    Py_XDECREF(self->runtime_obj);
9✔
32
    Py_TYPE(self)->tp_free((PyObject *)self);
9✔
33
}
9✔
34

35
static PyObject *Aggregator_new(PyTypeObject *type, PyObject *args,
9✔
36
                                PyObject *kwds) {
37
    AggregatorObject *self = (AggregatorObject *)type->tp_alloc(type, 0);
9✔
38
    if (self) {
9!
39
        self->runtime_obj = NULL;
9✔
40
    }
9✔
41
    return (PyObject *)self;
9✔
42
}
43

44
static int Aggregator_init(AggregatorObject *self, PyObject *args,
9✔
45
                           PyObject *kwds) {
46
    static const char *kwlist[] = {"runtime", NULL};
47
    PyObject *runtime_arg = NULL;
9✔
48

49
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", (char **)kwlist,
9!
50
                                     &runtime_arg)) {
51
        return -1;
×
52
    }
53

54
    if (runtime_arg && runtime_arg != Py_None) {
9!
55
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
56
            Py_INCREF(runtime_arg);
×
57
            self->runtime_obj = runtime_arg;
×
UNCOV
58
        } else {
×
59
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
60
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
×
61
                self->runtime_obj = native;
×
UNCOV
62
            } else {
×
63
                Py_XDECREF(native);
×
64
                PyErr_SetString(PyExc_TypeError,
×
65
                                "runtime must be a Runtime instance or None");
66
                return -1;
×
67
            }
68
        }
UNCOV
69
    }
×
70

71
    return 0;
9✔
72
}
9✔
73

74
// ---------------------------------------------------------------------------
75
// Helpers
76
// ---------------------------------------------------------------------------
77

78
static int parse_aggregator_args(PyObject *args, PyObject *kwds,
9✔
79
                                 AggregatorInput &input) {
80
    static const char *kwlist[] = {
81
        "directory",     "time_interval", "group_keys",       "categories",
82
        "names",         "index_dir",     "checkpoint_size",  "force_rebuild",
83
        "chunk_size_mb", "batch_size_mb", "event_batch_size", NULL};
84

85
    const char *directory = NULL;
9✔
86
    double time_interval = 5.0;
9✔
87
    PyObject *group_keys_obj = Py_None;
9✔
88
    PyObject *categories_obj = Py_None;
9✔
89
    PyObject *names_obj = Py_None;
9✔
90
    const char *index_dir = "";
9✔
91
    Py_ssize_t checkpoint_size = 32 * 1024 * 1024;
9✔
92
    int force_rebuild = 0;
9✔
93
    Py_ssize_t chunk_size_mb = 64;
9✔
94
    Py_ssize_t batch_size_mb = 4;
9✔
95
    Py_ssize_t event_batch_size = 10000;
9✔
96

97
    if (!PyArg_ParseTupleAndKeywords(
9!
98
            args, kwds, "s|dOOOsnpnnn", (char **)kwlist, &directory,
9✔
99
            &time_interval, &group_keys_obj, &categories_obj, &names_obj,
100
            &index_dir, &checkpoint_size, &force_rebuild, &chunk_size_mb,
101
            &batch_size_mb, &event_batch_size))
102
        return -1;
×
103

104
    input.directory = directory;
9✔
105
    input.config.time_interval_us =
9✔
106
        static_cast<std::uint64_t>(time_interval * 1000000.0);
9✔
107
    input.index_dir = index_dir;
9✔
108
    input.checkpoint_size = static_cast<std::size_t>(checkpoint_size);
9✔
109
    input.force_rebuild = force_rebuild != 0;
9✔
110
    input.chunk_size_mb = static_cast<std::size_t>(chunk_size_mb);
9✔
111
    input.batch_size_mb = static_cast<std::size_t>(batch_size_mb);
9✔
112
    input.event_batch_size = static_cast<std::size_t>(event_batch_size);
9✔
113

114
    if (group_keys_obj && group_keys_obj != Py_None) {
9!
115
        if (!PyList_Check(group_keys_obj)) {
×
116
            PyErr_SetString(PyExc_TypeError,
×
117
                            "group_keys must be a list of str");
118
            return -1;
×
119
        }
120
        Py_ssize_t n = PyList_Size(group_keys_obj);
×
121
        for (Py_ssize_t i = 0; i < n; i++) {
×
122
            const char *s = PyUnicode_AsUTF8(PyList_GetItem(group_keys_obj, i));
×
123
            if (!s) return -1;
×
124
            input.config.extra_group_keys.emplace_back(s);
×
UNCOV
125
        }
×
UNCOV
126
    }
×
127

128
    // categories and names filtering is now handled via the query DSL.
129
    // The categories_obj and names_obj parameters are accepted but ignored
130
    // for backward compatibility. Use the query parameter instead.
131

132
    return 0;
9✔
133
}
9✔
134

135
static int run_aggregator_pipeline(AggregatorObject *self,
9✔
136
                                   const AggregatorInput &input,
137
                                   std::vector<AggregationBatch> &batches,
138
                                   std::string &error_msg) {
139
    auto *bp = &batches;
9✔
140
    AggregatorInput input_copy = input;
9✔
141

142
    Py_BEGIN_ALLOW_THREADS try {
9!
143
        Runtime *rt = get_runtime(self);
9!
144
        auto task = [bp, input_copy]() -> CoroTask<void> {
126!
145
            AggregatorUtility util;
9!
146
            auto gen = util.process(input_copy);
9!
147
            while (auto batch = co_await gen.next()) {
72!
148
                bp->push_back(std::move(*batch));
9!
149
            }
18✔
150
        };
117!
151
        rt->submit(task(), "aggregator").get();
9!
152
    } catch (const std::exception &e) {
9!
153
        error_msg = e.what();
×
154
    }
×
155
    Py_END_ALLOW_THREADS
9!
156

157
        return error_msg.empty()
9✔
158
        ? 0
159
        : -1;
160
}
9✔
161

162
#ifdef DFTRACER_UTILS_ENABLE_ARROW
163

164
#endif  // DFTRACER_UTILS_ENABLE_ARROW
165

166
// ---------------------------------------------------------------------------
167
// process() — returns ArrowTable (materialized)
168
// ---------------------------------------------------------------------------
169

170
static PyObject *Aggregator_process(AggregatorObject *self, PyObject *args,
8✔
171
                                    PyObject *kwds) {
172
    AggregatorInput input;
8✔
173
    if (parse_aggregator_args(args, kwds, input) < 0) return NULL;
8!
174

175
    std::vector<AggregationBatch> batches;
8✔
176
    std::string error_msg;
8✔
177
    if (run_aggregator_pipeline(self, input, batches, error_msg) < 0) {
8!
178
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
179
        return NULL;
×
180
    }
181

182
#ifdef DFTRACER_UTILS_ENABLE_ARROW
183
    PyObject *batch_list = PyList_New(0);
8!
184
    if (!batch_list) return NULL;
8!
185

186
    for (const auto &batch : batches) {
16✔
187
        if (batch.entries.empty()) continue;
8✔
188

189
        auto arrow_result = batch.to_arrow();
7!
190
        if (!arrow_result.valid()) continue;
7!
191

192
        PyObject *cap = wrap_arrow_result(std::move(arrow_result));
7!
193
        if (!cap) {
7!
UNCOV
194
            Py_DECREF(batch_list);
×
195
            return NULL;
×
196
        }
197
        int rc = PyList_Append(batch_list, cap);
7!
198
        Py_DECREF(cap);
7!
199
        if (rc < 0) {
7!
UNCOV
200
            Py_DECREF(batch_list);
×
201
            return NULL;
×
202
        }
203
    }
7!
204

205
    return wrap_arrow_table(batch_list);
8!
206
#else
207
    PyErr_SetString(PyExc_RuntimeError,
208
                    "dftracer-utils was built without Arrow support");
209
    return NULL;
210
#endif
211
}
8✔
212

213
// ---------------------------------------------------------------------------
214
// iter_arrow() — returns list iterator of ArrowBatch capsules
215
// ---------------------------------------------------------------------------
216

217
static PyObject *Aggregator_iter_arrow(AggregatorObject *self, PyObject *args,
1✔
218
                                       PyObject *kwds) {
219
    AggregatorInput input;
1✔
220
    if (parse_aggregator_args(args, kwds, input) < 0) return NULL;
1!
221

222
    std::vector<AggregationBatch> batches;
1✔
223
    std::string error_msg;
1✔
224
    if (run_aggregator_pipeline(self, input, batches, error_msg) < 0) {
1!
225
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
226
        return NULL;
×
227
    }
228

229
#ifdef DFTRACER_UTILS_ENABLE_ARROW
230
    PyObject *batch_list = PyList_New(0);
1!
231
    if (!batch_list) return NULL;
1!
232

233
    for (const auto &batch : batches) {
2✔
234
        if (batch.entries.empty()) continue;
1!
235

236
        auto arrow_result = batch.to_arrow();
1!
237
        if (!arrow_result.valid()) continue;
1!
238

239
        PyObject *cap = wrap_arrow_result(std::move(arrow_result));
1!
240
        if (!cap) {
1!
UNCOV
241
            Py_DECREF(batch_list);
×
242
            return NULL;
×
243
        }
244

245
        int rc = PyList_Append(batch_list, cap);
1!
246
        Py_DECREF(cap);
1!
247
        if (rc < 0) {
1!
UNCOV
248
            Py_DECREF(batch_list);
×
249
            return NULL;
×
250
        }
251
    }
1!
252

253
    PyObject *it = PyObject_GetIter(batch_list);
1!
254
    Py_DECREF(batch_list);
1!
255
    return it;
1✔
256
#else
257
    PyErr_SetString(PyExc_RuntimeError,
258
                    "dftracer-utils was built without Arrow support");
259
    return NULL;
260
#endif
261
}
1✔
262

263
static PyObject *Aggregator_call(PyObject *self, PyObject *args,
1✔
264
                                 PyObject *kwds) {
265
    return Aggregator_process((AggregatorObject *)self, args, kwds);
1✔
266
}
267

268
static PyMethodDef Aggregator_methods[] = {
269
    {"process", (PyCFunction)Aggregator_process, METH_VARARGS | METH_KEYWORDS,
270
     "process(directory, time_interval=5.0, group_keys=None,\n"
271
     "        categories=None, names=None, index_dir='',\n"
272
     "        checkpoint_size=33554432, force_rebuild=False,\n"
273
     "        chunk_size_mb=64, batch_size_mb=4, event_batch_size=10000)\n"
274
     "--\n"
275
     "\n"
276
     "Run aggregation pipeline, return materialized ArrowTable.\n"
277
     "\n"
278
     "Args:\n"
279
     "    directory (str): Directory containing .pfw/.pfw.gz files.\n"
280
     "    time_interval (float): Time bucket in seconds (default 5.0).\n"
281
     "    group_keys (list[str] or None): Extra grouping dims (default None).\n"
282
     "    categories (list[str] or None): Category filter (default None).\n"
283
     "    names (list[str] or None): Name filter (default None).\n"
284
     "    index_dir (str): Index sidecar directory (default '').\n"
285
     "    checkpoint_size (int): Checkpoint size (default 33554432).\n"
286
     "    force_rebuild (bool): Force index rebuild (default False).\n"
287
     "    chunk_size_mb (int): Target chunk size in MB (default 64).\n"
288
     "    batch_size_mb (int): Batch read size in MB (default 4).\n"
289
     "    event_batch_size (int): Entries per batch (default 10000).\n"
290
     "\n"
291
     "Returns:\n"
292
     "    ArrowTable: Aggregated results.\n"},
293
    {"iter_arrow", (PyCFunction)Aggregator_iter_arrow,
294
     METH_VARARGS | METH_KEYWORDS,
295
     "iter_arrow(directory, time_interval=5.0, group_keys=None,\n"
296
     "           categories=None, names=None, index_dir='',\n"
297
     "           checkpoint_size=33554432, force_rebuild=False,\n"
298
     "           chunk_size_mb=64, batch_size_mb=4, event_batch_size=10000)\n"
299
     "--\n"
300
     "\n"
301
     "Run aggregation pipeline, stream Arrow batches.\n"
302
     "\n"
303
     "Args:\n"
304
     "    directory (str): Directory containing .pfw/.pfw.gz files.\n"
305
     "    time_interval (float): Time bucket in seconds (default 5.0).\n"
306
     "    group_keys (list[str] or None): Extra grouping dims (default None).\n"
307
     "    categories (list[str] or None): Category filter (default None).\n"
308
     "    names (list[str] or None): Name filter (default None).\n"
309
     "    index_dir (str): Index sidecar directory (default '').\n"
310
     "    checkpoint_size (int): Checkpoint size (default 33554432).\n"
311
     "    force_rebuild (bool): Force index rebuild (default False).\n"
312
     "    chunk_size_mb (int): Target chunk size in MB (default 64).\n"
313
     "    batch_size_mb (int): Batch read size in MB (default 4).\n"
314
     "    event_batch_size (int): Entries per batch (default 10000).\n"
315
     "\n"
316
     "Returns:\n"
317
     "    Iterator[ArrowBatch]: Arrow record batches.\n"},
318
    {NULL}};
319

320
PyTypeObject AggregatorType = {
321
    PyVarObject_HEAD_INIT(
322
        NULL, 0) "dftracer_utils_ext.AggregatorUtility", /* tp_name */
323
    sizeof(AggregatorObject),                            /* tp_basicsize */
324
    0,                                                   /* tp_itemsize */
325
    (destructor)Aggregator_dealloc,                      /* tp_dealloc */
326
    0,                                        /* tp_vectorcall_offset */
327
    0,                                        /* tp_getattr */
328
    0,                                        /* tp_setattr */
329
    0,                                        /* tp_as_async */
330
    0,                                        /* tp_repr */
331
    0,                                        /* tp_as_number */
332
    0,                                        /* tp_as_sequence */
333
    0,                                        /* tp_as_mapping */
334
    0,                                        /* tp_hash */
335
    Aggregator_call,                          /* tp_call */
336
    0,                                        /* tp_str */
337
    0,                                        /* tp_getattro */
338
    0,                                        /* tp_setattro */
339
    0,                                        /* tp_as_buffer */
340
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
341
    "AggregatorUtility(runtime: Runtime | None = None)\n"
342
    "--\n\n"
343
    "High-level aggregation pipeline for DFTracer trace files.\n\n"
344
    "Args:\n"
345
    "    runtime (Runtime or None): Runtime for thread pool control.\n"
346
    "        If None, uses the default global Runtime.\n\n"
347
    "process(directory, time_interval=5.0, ...) -> ArrowTable\n"
348
    "    Run aggregation and return a materialized Arrow table.\n\n"
349
    "iter_arrow(directory, time_interval=5.0, ...) -> Iterator[ArrowBatch]\n"
350
    "    Run aggregation and stream Arrow batches.\n", /* tp_doc */
351
    0,                                                 /* tp_traverse */
352
    0,                                                 /* tp_clear */
353
    0,                                                 /* tp_richcompare */
354
    0,                                                 /* tp_weaklistoffset */
355
    0,                                                 /* tp_iter */
356
    0,                                                 /* tp_iternext */
357
    Aggregator_methods,                                /* tp_methods */
358
    0,                                                 /* tp_members */
359
    0,                                                 /* tp_getset */
360
    0,                                                 /* tp_base */
361
    0,                                                 /* tp_dict */
362
    0,                                                 /* tp_descr_get */
363
    0,                                                 /* tp_descr_set */
364
    0,                                                 /* tp_dictoffset */
365
    (initproc)Aggregator_init,                         /* tp_init */
366
    0,                                                 /* tp_alloc */
367
    Aggregator_new,                                    /* tp_new */
368
};
369

370
int init_aggregator(PyObject *m) {
1✔
371
    if (PyType_Ready(&AggregatorType) < 0) return -1;
1!
372

373
    Py_INCREF(&AggregatorType);
1✔
374
    if (PyModule_AddObject(m, "AggregatorUtility",
2!
375
                           (PyObject *)&AggregatorType) < 0) {
1✔
UNCOV
376
        Py_DECREF(&AggregatorType);
×
UNCOV
377
        Py_DECREF(m);
×
378
        return -1;
×
379
    }
380

381
    return 0;
1✔
382
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc