• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28286012595

27 Jun 2026 10:04AM UTC coverage: 51.056% (-1.3%) from 52.356%
28286012595

Pull #79

github

web-flow
Merge 6c6535a19 into 8eb383f39
Pull Request #79: Add Valgrind memory checking (C++, Python, MPI) and fix the bugs it found

32079 of 80165 branches covered (40.02%)

Branch coverage included in aggregate %.

129 of 149 new or added lines in 11 files covered. (86.58%)

5116 existing lines in 181 files now uncovered.

32739 of 46790 relevant lines covered (69.97%)

9929.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.0
/src/dftracer/utils/python/utilities/reorganization_planner.cpp
1
#include <dftracer/utils/core/runtime.h>
2
#include <dftracer/utils/core/tasks/coro_scope.h>
3
#include <dftracer/utils/core/utilities/behaviors/behavior_chain.h>
4
#include <dftracer/utils/core/utilities/utility_executor.h>
5
#include <dftracer/utils/python/py_dict_helpers.h>
6
#include <dftracer/utils/python/runtime.h>
7
#include <dftracer/utils/python/utilities/reorganization_planner.h>
8
#include <dftracer/utils/utilities/composites/dft/reorganize/reorganization_planner.h>
9

10
#include <string>
11
#include <vector>
12

13
using dftracer::utils::CoroScope;
14
using dftracer::utils::Runtime;
15
using dftracer::utils::utilities::behaviors::BehaviorChain;
16
using dftracer::utils::utilities::behaviors::UtilityExecutor;
17
namespace tags = dftracer::utils::utilities::tags;
18
using namespace dftracer::utils::utilities::composites::dft::reorganize;
19

20
static Runtime *get_runtime(ReorganizationPlannerObject *self) {
4✔
21
    if (self->runtime_obj)
4!
22
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
×
23
    return get_default_runtime();
4✔
24
}
4✔
25

26
static void ReorganizationPlanner_dealloc(ReorganizationPlannerObject *self) {
4✔
27
    Py_XDECREF(self->runtime_obj);
4✔
28
    Py_TYPE(self)->tp_free((PyObject *)self);
4✔
29
}
4✔
30

31
static PyObject *ReorganizationPlanner_new(PyTypeObject *type, PyObject *args,
4✔
32
                                           PyObject *kwds) {
33
    ReorganizationPlannerObject *self;
34
    self = (ReorganizationPlannerObject *)type->tp_alloc(type, 0);
4✔
35
    if (self != NULL) {
4!
36
        self->runtime_obj = NULL;
4✔
37
    }
4✔
38
    return (PyObject *)self;
4✔
39
}
40

41
static int ReorganizationPlanner_init(ReorganizationPlannerObject *self,
4✔
42
                                      PyObject *args, PyObject *kwds) {
43
    static const char *kwlist[] = {"runtime", NULL};
44
    PyObject *runtime_arg = NULL;
4✔
45

46
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", (char **)kwlist,
4!
47
                                     &runtime_arg)) {
48
        return -1;
×
49
    }
50

51
    if (runtime_arg && runtime_arg != Py_None) {
4!
52
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
53
            Py_INCREF(runtime_arg);
×
54
            self->runtime_obj = runtime_arg;
×
UNCOV
55
        } else {
×
56
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
57
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
×
58
                self->runtime_obj = native;
×
UNCOV
59
            } else {
×
60
                Py_XDECREF(native);
×
61
                PyErr_SetString(PyExc_TypeError,
×
62
                                "runtime must be a Runtime instance or None");
63
                return -1;
×
64
            }
65
        }
UNCOV
66
    }
×
67

68
    return 0;
4✔
69
}
4✔
70

71
static PyObject *ReorganizationPlanner_plan(ReorganizationPlannerObject *self,
4✔
72
                                            PyObject *args, PyObject *kwds) {
73
    using dftracer::utils::coro::CoroTask;
74

75
    static const char *kwlist[] = {"source_files", "groups", "index_dir", NULL};
76
    PyObject *source_files_obj;
77
    PyObject *groups_obj = Py_None;
4✔
78
    const char *index_dir = "";
4✔
79

80
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|Os", (char **)kwlist,
4!
81
                                     &source_files_obj, &groups_obj,
82
                                     &index_dir))
83
        return NULL;
×
84

85
    if (!PyList_Check(source_files_obj)) {
4!
86
        PyErr_SetString(PyExc_TypeError, "source_files must be a list");
×
87
        return NULL;
×
88
    }
89

90
    Py_ssize_t nfiles = PyList_Size(source_files_obj);
4✔
91
    std::vector<std::string> files;
4✔
92
    files.reserve(static_cast<std::size_t>(nfiles));
4!
93
    for (Py_ssize_t i = 0; i < nfiles; i++) {
8✔
94
        PyObject *item = PyList_GetItem(source_files_obj, i);
4!
95
        const char *s = PyUnicode_AsUTF8(item);
4!
96
        if (!s) return NULL;
4!
97
        files.emplace_back(s);
4!
98
    }
4✔
99

100
    std::vector<PredicateGroup> groups;
4✔
101
    if (groups_obj && groups_obj != Py_None) {
4!
102
        if (!PyList_Check(groups_obj)) {
4!
103
            PyErr_SetString(PyExc_TypeError,
×
104
                            "groups must be a list of dicts or None");
105
            return NULL;
×
106
        }
107
        Py_ssize_t n = PyList_Size(groups_obj);
4!
108
        groups.reserve(static_cast<std::size_t>(n));
4!
109
        for (Py_ssize_t i = 0; i < n; i++) {
8✔
110
            PyObject *item = PyList_GetItem(groups_obj, i);
4!
111
            PredicateGroup g;
4✔
112
            PyObject *name = PyDict_GetItemString(item, "name");
4!
113
            PyObject *pred = PyDict_GetItemString(item, "query");
4!
114
            if (name) {
4!
115
                const char *ns = PyUnicode_AsUTF8(name);
4!
116
                if (!ns) return NULL;
4!
117
                g.name = ns;
4!
118
            }
4✔
119
            if (pred) {
4!
120
                const char *ps = PyUnicode_AsUTF8(pred);
4!
121
                if (!ps) return NULL;
4!
122
                g.query = ps;
4!
123
            }
4✔
124
            groups.push_back(std::move(g));
4!
125
        }
4!
126
    }
4✔
127

128
    ReorganizationPlannerInput input;
4✔
129
    input.source_files = std::move(files);
4✔
130
    input.groups = std::move(groups);
4✔
131
    input.index_dir = index_dir;
4!
132

133
    ExtractionPlan plan;
4✔
134
    auto *plan_p = &plan;
4✔
135
    ReorganizationPlannerInput input_copy = input;
4!
136
    std::string error_msg;
4✔
137

138
    Py_BEGIN_ALLOW_THREADS try {
4!
139
        Runtime *rt = get_runtime(self);
4!
140
        auto task = run_coro_scope(
4!
141
            rt->executor(),
4!
142
            [plan_p, input_copy](CoroScope &scope) -> CoroTask<void> {
32!
143
                auto planner = std::make_shared<ReorganizationPlannerUtility>();
12!
144
                UtilityExecutor<ReorganizationPlannerInput, ExtractionPlan,
12✔
145
                                tags::NeedsContext>
146
                    exec(planner, BehaviorChain<ReorganizationPlannerInput,
12!
147
                                                ExtractionPlan>{});
148
                *plan_p = co_await exec.execute_with_context(scope, input_copy);
20!
149
            });
12!
150
        rt->submit(std::move(task), "reorganization-planner").wait();
4!
151
    } catch (const std::exception &e) {
4!
152
        error_msg = e.what();
×
153
    }
×
154
    Py_END_ALLOW_THREADS
4!
155

156
        if (!error_msg.empty()) {
4!
157
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
158
        return NULL;
×
159
    }
160

161
    // groups list
162
    PyObject *py_groups =
4✔
163
        PyList_New(static_cast<Py_ssize_t>(plan.groups.size()));
4!
164
    if (!py_groups) return NULL;
4!
165
    for (std::size_t i = 0; i < plan.groups.size(); i++) {
12✔
166
        PyObject *g = PyDict_New();
8!
167
        if (!g) {
8!
UNCOV
168
            Py_DECREF(py_groups);
×
169
            return NULL;
×
170
        }
171
        dict_set_steal(g, "name",
8!
172
                       PyUnicode_FromString(plan.groups[i].name.c_str()));
8!
173
        dict_set_steal(g, "query",
8!
174
                       PyUnicode_FromString(plan.groups[i].query.c_str()));
8!
175
        PyList_SetItem(py_groups, static_cast<Py_ssize_t>(i), g);
8!
176
    }
8✔
177

178
    // source_files list
179
    PyObject *py_sources =
4✔
180
        PyList_New(static_cast<Py_ssize_t>(plan.source_files.size()));
4!
181
    if (!py_sources) {
4!
UNCOV
182
        Py_DECREF(py_groups);
×
183
        return NULL;
×
184
    }
185
    for (std::size_t i = 0; i < plan.source_files.size(); i++) {
8✔
186
        const auto &sf = plan.source_files[i];
4✔
187
        PyObject *entry = PyDict_New();
4!
188
        if (!entry) {
4!
UNCOV
189
            Py_DECREF(py_groups);
×
UNCOV
190
            Py_DECREF(py_sources);
×
191
            return NULL;
×
192
        }
193
        dict_set_steal(entry, "file_path",
4!
194
                       PyUnicode_FromString(sf.file_path.c_str()));
4!
195
        dict_set_steal(entry, "index_path",
4!
196
                       PyUnicode_FromString(sf.index_path.c_str()));
4!
197
        dict_set_steal(entry, "num_checkpoints",
4!
198
                       PyLong_FromSize_t(sf.num_checkpoints));
4!
199
        dict_set_steal(entry, "uncompressed_size",
4!
200
                       PyLong_FromUnsignedLongLong(sf.uncompressed_size));
4!
201
        dict_set_steal(entry, "checkpoint_size",
4!
202
                       PyLong_FromUnsignedLongLong(sf.checkpoint_size));
4!
203
        PyList_SetItem(py_sources, static_cast<Py_ssize_t>(i), entry);
4!
204
    }
4✔
205

206
    // tasks list
207
    PyObject *py_tasks = PyList_New(static_cast<Py_ssize_t>(plan.tasks.size()));
4!
208
    if (!py_tasks) {
4!
UNCOV
209
        Py_DECREF(py_groups);
×
UNCOV
210
        Py_DECREF(py_sources);
×
211
        return NULL;
×
212
    }
213
    for (std::size_t i = 0; i < plan.tasks.size(); i++) {
10✔
214
        const auto &t = plan.tasks[i];
6✔
215
        PyObject *entry = PyDict_New();
6!
216
        if (!entry) {
6!
UNCOV
217
            Py_DECREF(py_groups);
×
UNCOV
218
            Py_DECREF(py_sources);
×
UNCOV
219
            Py_DECREF(py_tasks);
×
220
            return NULL;
×
221
        }
222
        dict_set_steal(entry, "source_file_idx",
6!
223
                       PyLong_FromSize_t(t.source_file_idx));
6!
224
        dict_set_steal(entry, "checkpoint_idx",
6!
225
                       PyLong_FromUnsignedLongLong(t.checkpoint_idx));
6!
226
        dict_set_steal(entry, "target_group",
6!
227
                       PyUnicode_FromString(t.target_group.c_str()));
6!
228
        dict_set_steal(entry, "start_byte",
6!
229
                       PyLong_FromUnsignedLongLong(t.start_byte));
6!
230
        dict_set_steal(entry, "end_byte",
6!
231
                       PyLong_FromUnsignedLongLong(t.end_byte));
6!
232
        PyList_SetItem(py_tasks, static_cast<Py_ssize_t>(i), entry);
6!
233
    }
6✔
234

235
    PyObject *result = PyDict_New();
4!
236
    if (!result) {
4!
UNCOV
237
        Py_DECREF(py_groups);
×
UNCOV
238
        Py_DECREF(py_sources);
×
UNCOV
239
        Py_DECREF(py_tasks);
×
240
        return NULL;
×
241
    }
242
    PyDict_SetItemString(result, "groups", py_groups);
4!
243
    Py_DECREF(py_groups);
4!
244
    PyDict_SetItemString(result, "source_files", py_sources);
4!
245
    Py_DECREF(py_sources);
4!
246
    PyDict_SetItemString(result, "tasks", py_tasks);
4!
247
    Py_DECREF(py_tasks);
4!
248
    dict_set_steal(result, "total_events",
4!
249
                   PyLong_FromSize_t(plan.total_events));
4!
250
    return result;
4✔
251
}
4✔
252

253
static PyObject *ReorganizationPlanner_call(PyObject *self, PyObject *args,
1✔
254
                                            PyObject *kwds) {
255
    return ReorganizationPlanner_plan((ReorganizationPlannerObject *)self, args,
2✔
256
                                      kwds);
1✔
257
}
258

259
static PyMethodDef ReorganizationPlanner_methods[] = {
260
    {"process", (PyCFunction)ReorganizationPlanner_plan,
261
     METH_VARARGS | METH_KEYWORDS,
262
     "process(source_files, groups=None, index_dir='')\n"
263
     "--\n"
264
     "\n"
265
     "Build a reorganization plan for trace files.\n"
266
     "\n"
267
     "Args:\n"
268
     "    source_files (list[str]): Paths to source trace files.\n"
269
     "    groups (list[dict] or None): Predicate group definitions\n"
270
     "        (default None).\n"
271
     "    index_dir (str): Directory for .dftindex stores (default '').\n"
272
     "\n"
273
     "Returns:\n"
274
     "    dict: Extraction plan.\n"},
275
    {NULL} /* Sentinel */
276
};
277

278
PyTypeObject ReorganizationPlannerType = {
279
    PyVarObject_HEAD_INIT(
280
        NULL,
281
        0) "dftracer_utils_ext.ReorganizationPlannerUtility", /* tp_name */
282
    sizeof(ReorganizationPlannerObject),                      /* tp_basicsize */
283
    0,                                                        /* tp_itemsize */
284
    (destructor)ReorganizationPlanner_dealloc,                /* tp_dealloc */
285
    0,                                        /* tp_vectorcall_offset */
286
    0,                                        /* tp_getattr */
287
    0,                                        /* tp_setattr */
288
    0,                                        /* tp_as_async */
289
    0,                                        /* tp_repr */
290
    0,                                        /* tp_as_number */
291
    0,                                        /* tp_as_sequence */
292
    0,                                        /* tp_as_mapping */
293
    0,                                        /* tp_hash */
294
    ReorganizationPlanner_call,               /* tp_call */
295
    0,                                        /* tp_str */
296
    0,                                        /* tp_getattro */
297
    0,                                        /* tp_setattro */
298
    0,                                        /* tp_as_buffer */
299
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
300
    "ReorganizationPlannerUtility(runtime: Runtime | None = None)\n"
301
    "--\n"
302
    "\n"
303
    "Plan semantic reorganization of trace files into predicate groups.\n"
304
    "\n"
305
    "Args:\n"
306
    "    runtime (Runtime or None): Runtime for thread pool control.\n"
307
    "        If None, uses the default global Runtime.\n", /* tp_doc */
308
    0,                                                     /* tp_traverse */
309
    0,                                                     /* tp_clear */
310
    0,                                                     /* tp_richcompare */
311
    0,                                    /* tp_weaklistoffset */
312
    0,                                    /* tp_iter */
313
    0,                                    /* tp_iternext */
314
    ReorganizationPlanner_methods,        /* tp_methods */
315
    0,                                    /* tp_members */
316
    0,                                    /* tp_getset */
317
    0,                                    /* tp_base */
318
    0,                                    /* tp_dict */
319
    0,                                    /* tp_descr_get */
320
    0,                                    /* tp_descr_set */
321
    0,                                    /* tp_dictoffset */
322
    (initproc)ReorganizationPlanner_init, /* tp_init */
323
    0,                                    /* tp_alloc */
324
    ReorganizationPlanner_new,            /* tp_new */
325
};
326

327
int init_reorganization_planner(PyObject *m) {
1✔
328
    if (PyType_Ready(&ReorganizationPlannerType) < 0) return -1;
1!
329

330
    Py_INCREF(&ReorganizationPlannerType);
1✔
331
    if (PyModule_AddObject(m, "ReorganizationPlannerUtility",
2!
332
                           (PyObject *)&ReorganizationPlannerType) < 0) {
1✔
UNCOV
333
        Py_DECREF(&ReorganizationPlannerType);
×
UNCOV
334
        Py_DECREF(m);
×
335
        return -1;
×
336
    }
337

338
    return 0;
1✔
339
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc