• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 27052412546

06 Jun 2026 04:20AM UTC coverage: 50.862% (+1.0%) from 49.905%
27052412546

Pull #73

github

web-flow
Merge 734572730 into 88a3c8457
Pull Request #73: add portable dependencies wheel support

31801 of 79859 branches covered (39.82%)

Branch coverage included in aggregate %.

32491 of 46545 relevant lines covered (69.81%)

9947.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.0
/src/dftracer/utils/python/utilities/reorganization_planner.cpp
1
#include <dftracer/utils/core/runtime.h>
2
#include <dftracer/utils/core/tasks/coro_scope.h>
3
#include <dftracer/utils/core/utilities/behaviors/behavior_chain.h>
4
#include <dftracer/utils/core/utilities/utility_executor.h>
5
#include <dftracer/utils/python/runtime.h>
6
#include <dftracer/utils/python/utilities/reorganization_planner.h>
7
#include <dftracer/utils/utilities/composites/dft/reorganize/reorganization_planner.h>
8

9
#include <string>
10
#include <vector>
11

12
using dftracer::utils::CoroScope;
13
using dftracer::utils::Runtime;
14
using dftracer::utils::utilities::behaviors::BehaviorChain;
15
using dftracer::utils::utilities::behaviors::UtilityExecutor;
16
namespace tags = dftracer::utils::utilities::tags;
17
using namespace dftracer::utils::utilities::composites::dft::reorganize;
18

19
static Runtime *get_runtime(ReorganizationPlannerObject *self) {
4✔
20
    if (self->runtime_obj)
4!
21
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
×
22
    return get_default_runtime();
4✔
23
}
4✔
24

25
static void ReorganizationPlanner_dealloc(ReorganizationPlannerObject *self) {
4✔
26
    Py_XDECREF(self->runtime_obj);
4✔
27
    Py_TYPE(self)->tp_free((PyObject *)self);
4✔
28
}
4✔
29

30
static PyObject *ReorganizationPlanner_new(PyTypeObject *type, PyObject *args,
4✔
31
                                           PyObject *kwds) {
32
    ReorganizationPlannerObject *self;
33
    self = (ReorganizationPlannerObject *)type->tp_alloc(type, 0);
4✔
34
    if (self != NULL) {
4!
35
        self->runtime_obj = NULL;
4✔
36
    }
4✔
37
    return (PyObject *)self;
4✔
38
}
39

40
static int ReorganizationPlanner_init(ReorganizationPlannerObject *self,
4✔
41
                                      PyObject *args, PyObject *kwds) {
42
    static const char *kwlist[] = {"runtime", NULL};
43
    PyObject *runtime_arg = NULL;
4✔
44

45
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", (char **)kwlist,
4!
46
                                     &runtime_arg)) {
47
        return -1;
×
48
    }
49

50
    if (runtime_arg && runtime_arg != Py_None) {
4!
51
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
52
            Py_INCREF(runtime_arg);
×
53
            self->runtime_obj = runtime_arg;
×
54
        } else {
×
55
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
56
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
×
57
                self->runtime_obj = native;
×
58
            } else {
×
59
                Py_XDECREF(native);
×
60
                PyErr_SetString(PyExc_TypeError,
×
61
                                "runtime must be a Runtime instance or None");
62
                return -1;
×
63
            }
64
        }
65
    }
×
66

67
    return 0;
4✔
68
}
4✔
69

70
static PyObject *ReorganizationPlanner_plan(ReorganizationPlannerObject *self,
4✔
71
                                            PyObject *args, PyObject *kwds) {
72
    using dftracer::utils::coro::CoroTask;
73

74
    static const char *kwlist[] = {"source_files", "groups", "index_dir", NULL};
75
    PyObject *source_files_obj;
76
    PyObject *groups_obj = Py_None;
4✔
77
    const char *index_dir = "";
4✔
78

79
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|Os", (char **)kwlist,
4!
80
                                     &source_files_obj, &groups_obj,
81
                                     &index_dir))
82
        return NULL;
×
83

84
    if (!PyList_Check(source_files_obj)) {
4!
85
        PyErr_SetString(PyExc_TypeError, "source_files must be a list");
×
86
        return NULL;
×
87
    }
88

89
    Py_ssize_t nfiles = PyList_Size(source_files_obj);
4✔
90
    std::vector<std::string> files;
4✔
91
    files.reserve(static_cast<std::size_t>(nfiles));
4!
92
    for (Py_ssize_t i = 0; i < nfiles; i++) {
8✔
93
        PyObject *item = PyList_GetItem(source_files_obj, i);
4!
94
        const char *s = PyUnicode_AsUTF8(item);
4!
95
        if (!s) return NULL;
4!
96
        files.emplace_back(s);
4!
97
    }
4✔
98

99
    std::vector<PredicateGroup> groups;
4✔
100
    if (groups_obj && groups_obj != Py_None) {
4!
101
        if (!PyList_Check(groups_obj)) {
4!
102
            PyErr_SetString(PyExc_TypeError,
×
103
                            "groups must be a list of dicts or None");
104
            return NULL;
×
105
        }
106
        Py_ssize_t n = PyList_Size(groups_obj);
4!
107
        groups.reserve(static_cast<std::size_t>(n));
4!
108
        for (Py_ssize_t i = 0; i < n; i++) {
8✔
109
            PyObject *item = PyList_GetItem(groups_obj, i);
4!
110
            PredicateGroup g;
4✔
111
            PyObject *name = PyDict_GetItemString(item, "name");
4!
112
            PyObject *pred = PyDict_GetItemString(item, "query");
4!
113
            if (name) {
4!
114
                const char *ns = PyUnicode_AsUTF8(name);
4!
115
                if (!ns) return NULL;
4!
116
                g.name = ns;
4!
117
            }
4✔
118
            if (pred) {
4!
119
                const char *ps = PyUnicode_AsUTF8(pred);
4!
120
                if (!ps) return NULL;
4!
121
                g.query = ps;
4!
122
            }
4✔
123
            groups.push_back(std::move(g));
4!
124
        }
4!
125
    }
4✔
126

127
    ReorganizationPlannerInput input;
4✔
128
    input.source_files = std::move(files);
4✔
129
    input.groups = std::move(groups);
4✔
130
    input.index_dir = index_dir;
4!
131

132
    ExtractionPlan plan;
4✔
133
    auto *plan_p = &plan;
4✔
134
    ReorganizationPlannerInput input_copy = input;
4!
135
    std::string error_msg;
4✔
136

137
    Py_BEGIN_ALLOW_THREADS try {
4!
138
        Runtime *rt = get_runtime(self);
4!
139
        auto task = run_coro_scope(
4!
140
            rt->executor(),
4!
141
            [plan_p, input_copy](CoroScope &scope) -> CoroTask<void> {
32!
142
                auto planner = std::make_shared<ReorganizationPlannerUtility>();
12!
143
                UtilityExecutor<ReorganizationPlannerInput, ExtractionPlan,
12✔
144
                                tags::NeedsContext>
145
                    exec(planner, BehaviorChain<ReorganizationPlannerInput,
12!
146
                                                ExtractionPlan>{});
147
                *plan_p = co_await exec.execute_with_context(scope, input_copy);
20!
148
            });
12!
149
        rt->submit(std::move(task), "reorganization-planner").wait();
4!
150
    } catch (const std::exception &e) {
4!
151
        error_msg = e.what();
×
152
    }
×
153
    Py_END_ALLOW_THREADS
4!
154

155
        if (!error_msg.empty()) {
4!
156
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
157
        return NULL;
×
158
    }
159

160
    // groups list
161
    PyObject *py_groups =
4✔
162
        PyList_New(static_cast<Py_ssize_t>(plan.groups.size()));
4!
163
    if (!py_groups) return NULL;
4!
164
    for (std::size_t i = 0; i < plan.groups.size(); i++) {
12✔
165
        PyObject *g = PyDict_New();
8!
166
        if (!g) {
8!
167
            Py_DECREF(py_groups);
×
168
            return NULL;
×
169
        }
170
        PyDict_SetItemString(g, "name",
8!
171
                             PyUnicode_FromString(plan.groups[i].name.c_str()));
8!
172
        PyDict_SetItemString(
8!
173
            g, "query", PyUnicode_FromString(plan.groups[i].query.c_str()));
8!
174
        PyList_SetItem(py_groups, static_cast<Py_ssize_t>(i), g);
8!
175
    }
8✔
176

177
    // source_files list
178
    PyObject *py_sources =
4✔
179
        PyList_New(static_cast<Py_ssize_t>(plan.source_files.size()));
4!
180
    if (!py_sources) {
4!
181
        Py_DECREF(py_groups);
×
182
        return NULL;
×
183
    }
184
    for (std::size_t i = 0; i < plan.source_files.size(); i++) {
8✔
185
        const auto &sf = plan.source_files[i];
4✔
186
        PyObject *entry = PyDict_New();
4!
187
        if (!entry) {
4!
188
            Py_DECREF(py_groups);
×
189
            Py_DECREF(py_sources);
×
190
            return NULL;
×
191
        }
192
        PyDict_SetItemString(entry, "file_path",
4!
193
                             PyUnicode_FromString(sf.file_path.c_str()));
4!
194
        PyDict_SetItemString(entry, "index_path",
4!
195
                             PyUnicode_FromString(sf.index_path.c_str()));
4!
196
        PyDict_SetItemString(entry, "num_checkpoints",
4!
197
                             PyLong_FromSize_t(sf.num_checkpoints));
4!
198
        PyDict_SetItemString(entry, "uncompressed_size",
4!
199
                             PyLong_FromUnsignedLongLong(sf.uncompressed_size));
4!
200
        PyDict_SetItemString(entry, "checkpoint_size",
4!
201
                             PyLong_FromUnsignedLongLong(sf.checkpoint_size));
4!
202
        PyList_SetItem(py_sources, static_cast<Py_ssize_t>(i), entry);
4!
203
    }
4✔
204

205
    // tasks list
206
    PyObject *py_tasks = PyList_New(static_cast<Py_ssize_t>(plan.tasks.size()));
4!
207
    if (!py_tasks) {
4!
208
        Py_DECREF(py_groups);
×
209
        Py_DECREF(py_sources);
×
210
        return NULL;
×
211
    }
212
    for (std::size_t i = 0; i < plan.tasks.size(); i++) {
10✔
213
        const auto &t = plan.tasks[i];
6✔
214
        PyObject *entry = PyDict_New();
6!
215
        if (!entry) {
6!
216
            Py_DECREF(py_groups);
×
217
            Py_DECREF(py_sources);
×
218
            Py_DECREF(py_tasks);
×
219
            return NULL;
×
220
        }
221
        PyDict_SetItemString(entry, "source_file_idx",
6!
222
                             PyLong_FromSize_t(t.source_file_idx));
6!
223
        PyDict_SetItemString(entry, "checkpoint_idx",
6!
224
                             PyLong_FromUnsignedLongLong(t.checkpoint_idx));
6!
225
        PyDict_SetItemString(entry, "target_group",
6!
226
                             PyUnicode_FromString(t.target_group.c_str()));
6!
227
        PyDict_SetItemString(entry, "start_byte",
6!
228
                             PyLong_FromUnsignedLongLong(t.start_byte));
6!
229
        PyDict_SetItemString(entry, "end_byte",
6!
230
                             PyLong_FromUnsignedLongLong(t.end_byte));
6!
231
        PyList_SetItem(py_tasks, static_cast<Py_ssize_t>(i), entry);
6!
232
    }
6✔
233

234
    PyObject *result = PyDict_New();
4!
235
    if (!result) {
4!
236
        Py_DECREF(py_groups);
×
237
        Py_DECREF(py_sources);
×
238
        Py_DECREF(py_tasks);
×
239
        return NULL;
×
240
    }
241
    PyDict_SetItemString(result, "groups", py_groups);
4!
242
    Py_DECREF(py_groups);
4!
243
    PyDict_SetItemString(result, "source_files", py_sources);
4!
244
    Py_DECREF(py_sources);
4!
245
    PyDict_SetItemString(result, "tasks", py_tasks);
4!
246
    Py_DECREF(py_tasks);
4!
247
    PyDict_SetItemString(result, "total_events",
4!
248
                         PyLong_FromSize_t(plan.total_events));
4!
249
    return result;
4✔
250
}
4✔
251

252
static PyObject *ReorganizationPlanner_call(PyObject *self, PyObject *args,
1✔
253
                                            PyObject *kwds) {
254
    return ReorganizationPlanner_plan((ReorganizationPlannerObject *)self, args,
2✔
255
                                      kwds);
1✔
256
}
257

258
static PyMethodDef ReorganizationPlanner_methods[] = {
259
    {"process", (PyCFunction)ReorganizationPlanner_plan,
260
     METH_VARARGS | METH_KEYWORDS,
261
     "process(source_files, groups=None, index_dir='')\n"
262
     "--\n"
263
     "\n"
264
     "Build a reorganization plan for trace files.\n"
265
     "\n"
266
     "Args:\n"
267
     "    source_files (list[str]): Paths to source trace files.\n"
268
     "    groups (list[dict] or None): Predicate group definitions\n"
269
     "        (default None).\n"
270
     "    index_dir (str): Directory for .dftindex stores (default '').\n"
271
     "\n"
272
     "Returns:\n"
273
     "    dict: Extraction plan.\n"},
274
    {NULL} /* Sentinel */
275
};
276

277
PyTypeObject ReorganizationPlannerType = {
278
    PyVarObject_HEAD_INIT(
279
        NULL,
280
        0) "dftracer_utils_ext.ReorganizationPlannerUtility", /* tp_name */
281
    sizeof(ReorganizationPlannerObject),                      /* tp_basicsize */
282
    0,                                                        /* tp_itemsize */
283
    (destructor)ReorganizationPlanner_dealloc,                /* tp_dealloc */
284
    0,                                        /* tp_vectorcall_offset */
285
    0,                                        /* tp_getattr */
286
    0,                                        /* tp_setattr */
287
    0,                                        /* tp_as_async */
288
    0,                                        /* tp_repr */
289
    0,                                        /* tp_as_number */
290
    0,                                        /* tp_as_sequence */
291
    0,                                        /* tp_as_mapping */
292
    0,                                        /* tp_hash */
293
    ReorganizationPlanner_call,               /* tp_call */
294
    0,                                        /* tp_str */
295
    0,                                        /* tp_getattro */
296
    0,                                        /* tp_setattro */
297
    0,                                        /* tp_as_buffer */
298
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
299
    "ReorganizationPlannerUtility(runtime: Runtime | None = None)\n"
300
    "--\n"
301
    "\n"
302
    "Plan semantic reorganization of trace files into predicate groups.\n"
303
    "\n"
304
    "Args:\n"
305
    "    runtime (Runtime or None): Runtime for thread pool control.\n"
306
    "        If None, uses the default global Runtime.\n", /* tp_doc */
307
    0,                                                     /* tp_traverse */
308
    0,                                                     /* tp_clear */
309
    0,                                                     /* tp_richcompare */
310
    0,                                    /* tp_weaklistoffset */
311
    0,                                    /* tp_iter */
312
    0,                                    /* tp_iternext */
313
    ReorganizationPlanner_methods,        /* tp_methods */
314
    0,                                    /* tp_members */
315
    0,                                    /* tp_getset */
316
    0,                                    /* tp_base */
317
    0,                                    /* tp_dict */
318
    0,                                    /* tp_descr_get */
319
    0,                                    /* tp_descr_set */
320
    0,                                    /* tp_dictoffset */
321
    (initproc)ReorganizationPlanner_init, /* tp_init */
322
    0,                                    /* tp_alloc */
323
    ReorganizationPlanner_new,            /* tp_new */
324
};
325

326
int init_reorganization_planner(PyObject *m) {
1✔
327
    if (PyType_Ready(&ReorganizationPlannerType) < 0) return -1;
1!
328

329
    Py_INCREF(&ReorganizationPlannerType);
1✔
330
    if (PyModule_AddObject(m, "ReorganizationPlannerUtility",
2!
331
                           (PyObject *)&ReorganizationPlannerType) < 0) {
1✔
332
        Py_DECREF(&ReorganizationPlannerType);
×
333
        Py_DECREF(m);
×
334
        return -1;
×
335
    }
336

337
    return 0;
1✔
338
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc