• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28398085302

29 Jun 2026 07:43PM UTC coverage: 50.067% (-2.2%) from 52.278%
28398085302

Pull #83

github

web-flow
Merge 9a615085e into 2efed6649
Pull Request #83: refactor and improve code QoL

16342 of 44293 branches covered (36.9%)

Branch coverage included in aggregate %.

613 of 1132 new or added lines in 52 files covered. (54.15%)

687 existing lines in 116 files now uncovered.

21698 of 31685 relevant lines covered (68.48%)

12958.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.07
/src/dftracer/utils/python/runtime.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <Python.h>
3
#include <dftracer/utils/python/py_dict_helpers.h>
4
#include <dftracer/utils/python/py_type_helpers.h>
5
#include <dftracer/utils/python/runtime.h>
6

7
#include <chrono>
8
#include <memory>
9

10
static std::shared_ptr<dftracer::utils::Runtime> g_default_runtime;
11

12
dftracer::utils::Runtime *get_default_runtime() {
341✔
13
    if (!g_default_runtime) {
341✔
14
        g_default_runtime = std::make_shared<dftracer::utils::Runtime>(0);
1!
15
    }
16
    return g_default_runtime.get();
341✔
17
}
18

19
static void Runtime_dealloc(RuntimeObject *self) {
79✔
20
    self->runtime.reset();
79✔
21
    Py_TYPE(self)->tp_free((PyObject *)self);
79✔
22
}
79✔
23

24
static PyObject *Runtime_new(PyTypeObject *type, PyObject *args,
78✔
25
                             PyObject *kwds) {
26
    RuntimeObject *self = (RuntimeObject *)type->tp_alloc(type, 0);
78✔
27
    if (self) {
78!
28
        // Placement-new the shared_ptr (tp_alloc gives raw memory)
29
        new (&self->runtime) std::shared_ptr<dftracer::utils::Runtime>(nullptr);
78✔
30
    }
31
    return (PyObject *)self;
78✔
32
}
33

34
static int Runtime_init(RuntimeObject *self, PyObject *args, PyObject *kwds) {
78✔
35
    static const char *kwlist[] = {"threads", "io_threads", NULL};
36
    Py_ssize_t threads = 0;
78✔
37
    Py_ssize_t io_threads = 0;
78✔
38

39
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nn", (char **)kwlist,
78!
40
                                     &threads, &io_threads)) {
41
        return -1;
×
42
    }
43

44
    if (threads < 0) {
78!
45
        PyErr_SetString(PyExc_ValueError, "threads must be >= 0");
×
46
        return -1;
×
47
    }
48
    if (io_threads < 0) {
78!
49
        PyErr_SetString(PyExc_ValueError, "io_threads must be >= 0");
×
50
        return -1;
×
51
    }
52

53
    try {
54
        dftracer::utils::ExecutorConfig config;
78✔
55
        config.num_threads = static_cast<std::size_t>(threads);
78✔
56
        config.io_pool_size = static_cast<std::size_t>(io_threads);
78✔
57
        self->runtime =
58
            std::make_shared<dftracer::utils::Runtime>(config, true);
78!
UNCOV
59
    } catch (const std::exception &e) {
×
60
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
61
        return -1;
×
62
    }
×
63

64
    return 0;
78✔
65
}
66

67
static PyObject *Runtime_shutdown(RuntimeObject *self,
77✔
68
                                  PyObject *Py_UNUSED(ignored)) {
69
    if (!self->runtime) {
77!
70
        PyErr_SetString(PyExc_RuntimeError, "Runtime not initialized");
×
71
        return NULL;
×
72
    }
73
    Py_BEGIN_ALLOW_THREADS self->runtime->shutdown();
77✔
74
    Py_END_ALLOW_THREADS Py_RETURN_NONE;
77✔
75
}
76

77
static bool set_size(PyObject *d, const char *key, std::size_t val) {
75✔
78
    return dict_set_size(d, key, val) == 0;
75✔
79
}
80

81
static bool set_double(PyObject *d, const char *key, double val) {
21✔
82
    return dict_set_f64(d, key, val) == 0;
21✔
83
}
84

85
static bool set_str(PyObject *d, const char *key, const std::string &val) {
29✔
86
    return dict_set_str(d, key, val.c_str()) == 0;
29✔
87
}
88

89
static bool set_bool(PyObject *d, const char *key, bool val) {
8✔
90
    return dict_set_bool(d, key, val) == 0;
8✔
91
}
92

93
static PyObject *build_task_progress(const dftracer::utils::TaskProgress &tp) {
7✔
94
    PyObject *td = PyDict_New();
7✔
95
    if (!td) return NULL;
7!
96

97
    if (!set_str(td, "name", tp.name) || !set_str(td, "state", tp.state) ||
14!
98
        !set_double(td, "queued_duration_ms", tp.queued_duration_ms) ||
7!
99
        !set_double(td, "execution_duration_ms", tp.execution_duration_ms) ||
7!
100
        !set_size(td, "total_subtasks", tp.total_subtasks) ||
7!
101
        !set_size(td, "completed_subtasks", tp.completed_subtasks) ||
7!
102
        !set_double(td, "progress_pct", tp.progress_percentage) ||
21!
103
        !set_str(td, "location", tp.location)) {
7!
104
        Py_DECREF(td);
105
        return NULL;
×
106
    }
107

108
    PyObject *children =
109
        PyList_New(static_cast<Py_ssize_t>(tp.children.size()));
7✔
110
    if (!children) {
7!
111
        Py_DECREF(td);
112
        return NULL;
×
113
    }
114
    for (std::size_t i = 0; i < tp.children.size(); ++i) {
7!
115
        PyObject *child = build_task_progress(tp.children[i]);
×
116
        if (!child) {
×
117
            Py_DECREF(children);
118
            Py_DECREF(td);
119
            return NULL;
×
120
        }
121
        PyList_SET_ITEM(children, static_cast<Py_ssize_t>(i), child);
×
122
    }
123
    if (PyDict_SetItemString(td, "children", children) < 0) {
7!
124
        Py_DECREF(children);
125
        Py_DECREF(td);
126
        return NULL;
×
127
    }
128
    Py_DECREF(children);
129
    return td;
7✔
130
}
131

132
static PyObject *Runtime_get_progress(RuntimeObject *self,
9✔
133
                                      PyObject *Py_UNUSED(ignored)) {
134
    if (!self->runtime) {
9!
135
        PyErr_SetString(PyExc_RuntimeError, "Runtime not initialized");
×
136
        return NULL;
×
137
    }
138

139
    dftracer::utils::ExecutorProgress prog;
9✔
140
    Py_BEGIN_ALLOW_THREADS prog = self->runtime->get_progress();
9!
141
    Py_END_ALLOW_THREADS
9!
142

143
        PyObject *d = PyDict_New();
9!
144
    if (!d) return NULL;
9!
145

146
    if (!set_size(d, "total", prog.total_tasks_submitted) ||
9!
147
        !set_size(d, "completed", prog.tasks_completed) ||
9!
148
        !set_size(d, "running", prog.tasks_running) ||
9!
149
        !set_size(d, "queued", prog.tasks_queued) ||
27!
150
        !set_size(d, "failed", prog.tasks_failed)) {
9!
151
        Py_DECREF(d);
152
        return NULL;
×
153
    }
154

155
    // Workers
156
    PyObject *workers =
157
        PyList_New(static_cast<Py_ssize_t>(prog.workers.size()));
9!
158
    if (!workers) {
9!
159
        Py_DECREF(d);
160
        return NULL;
×
161
    }
162
    for (std::size_t i = 0; i < prog.workers.size(); ++i) {
17✔
163
        const auto &w = prog.workers[i];
8✔
164
        PyObject *wd = PyDict_New();
8!
165
        if (!wd || !set_size(wd, "id", w.worker_id) ||
8!
166
            !set_bool(wd, "idle", w.is_idle) ||
8!
167
            !set_str(wd, "task", w.current_task_name) ||
24!
168
            !set_size(wd, "queue_depth", w.local_queue_depth)) {
8!
169
            Py_XDECREF(wd);
×
170
            Py_DECREF(workers);
171
            Py_DECREF(d);
172
            return NULL;
×
173
        }
174
        PyList_SET_ITEM(workers, static_cast<Py_ssize_t>(i), wd);
8✔
175
    }
176
    if (PyDict_SetItemString(d, "workers", workers) < 0) {
9!
177
        Py_DECREF(workers);
178
        Py_DECREF(d);
179
        return NULL;
×
180
    }
181
    Py_DECREF(workers);
182

183
    // Tasks
184
    PyObject *tasks =
185
        PyList_New(static_cast<Py_ssize_t>(prog.root_tasks.size()));
9!
186
    if (!tasks) {
9!
187
        Py_DECREF(d);
188
        return NULL;
×
189
    }
190
    for (std::size_t i = 0; i < prog.root_tasks.size(); ++i) {
16✔
191
        PyObject *tp = build_task_progress(prog.root_tasks[i]);
7!
192
        if (!tp) {
7!
193
            Py_DECREF(tasks);
194
            Py_DECREF(d);
195
            return NULL;
×
196
        }
197
        PyList_SET_ITEM(tasks, static_cast<Py_ssize_t>(i), tp);
7✔
198
    }
199
    if (PyDict_SetItemString(d, "tasks", tasks) < 0) {
9!
200
        Py_DECREF(tasks);
201
        Py_DECREF(d);
202
        return NULL;
×
203
    }
204
    Py_DECREF(tasks);
205

206
    // Errors
207
    PyObject *errors =
208
        PyList_New(static_cast<Py_ssize_t>(prog.recent_errors.size()));
9!
209
    if (!errors) {
9!
210
        Py_DECREF(d);
211
        return NULL;
×
212
    }
213
    for (std::size_t i = 0; i < prog.recent_errors.size(); ++i) {
9!
214
        const auto &[tid, msg] = prog.recent_errors[i];
×
215
        PyObject *ed = PyDict_New();
×
216
        if (!ed || !set_size(ed, "task_id", static_cast<std::size_t>(tid)) ||
×
217
            !set_str(ed, "message", msg)) {
×
218
            Py_XDECREF(ed);
×
219
            Py_DECREF(errors);
220
            Py_DECREF(d);
221
            return NULL;
×
222
        }
223
        PyList_SET_ITEM(errors, static_cast<Py_ssize_t>(i), ed);
×
224
    }
225
    if (PyDict_SetItemString(d, "errors", errors) < 0) {
9!
226
        Py_DECREF(errors);
227
        Py_DECREF(d);
228
        return NULL;
×
229
    }
230
    Py_DECREF(errors);
231

232
    return d;
9✔
233
}
9✔
234

235
static PyObject *Runtime_is_responsive(RuntimeObject *self,
1✔
236
                                       PyObject *Py_UNUSED(ignored)) {
237
    if (!self->runtime) {
1!
238
        PyErr_SetString(PyExc_RuntimeError, "Runtime not initialized");
×
239
        return NULL;
×
240
    }
241
    bool resp;
242
    Py_BEGIN_ALLOW_THREADS resp = self->runtime->is_responsive();
1✔
243
    Py_END_ALLOW_THREADS return PyBool_FromLong(resp ? 1 : 0);
1!
244
}
245

246
static PyObject *Runtime_set_timeout(RuntimeObject *self, PyObject *args,
×
247
                                     PyObject *kwds) {
248
    static const char *kwlist[] = {"global_ms", NULL};
249
    Py_ssize_t ms = 0;
×
250

251
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|n", (char **)kwlist, &ms)) {
×
252
        return NULL;
×
253
    }
254

255
    if (!self->runtime) {
×
256
        PyErr_SetString(PyExc_RuntimeError, "Runtime not initialized");
×
257
        return NULL;
×
258
    }
259

260
    Py_BEGIN_ALLOW_THREADS self->runtime->set_global_timeout(
×
261
        std::chrono::milliseconds(ms));
262
    Py_END_ALLOW_THREADS Py_RETURN_NONE;
×
263
}
264

265
static PyObject *Runtime_set_default_task_timeout(RuntimeObject *self,
×
266
                                                  PyObject *args,
267
                                                  PyObject *kwds) {
268
    static const char *kwlist[] = {"ms", NULL};
269
    Py_ssize_t ms = 0;
×
270

271
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|n", (char **)kwlist, &ms)) {
×
272
        return NULL;
×
273
    }
274

275
    if (!self->runtime) {
×
276
        PyErr_SetString(PyExc_RuntimeError, "Runtime not initialized");
×
277
        return NULL;
×
278
    }
279

280
    Py_BEGIN_ALLOW_THREADS self->runtime->set_default_task_timeout(
×
281
        std::chrono::milliseconds(ms));
282
    Py_END_ALLOW_THREADS Py_RETURN_NONE;
×
283
}
284

285
static PyObject *Runtime_wait_all(RuntimeObject *self,
93✔
286
                                  PyObject *Py_UNUSED(ignored)) {
287
    if (!self->runtime) {
93!
288
        PyErr_SetString(PyExc_RuntimeError, "Runtime not initialized");
×
289
        return NULL;
×
290
    }
291
    try {
292
        Py_BEGIN_ALLOW_THREADS self->runtime->wait_all();
93!
293
        Py_END_ALLOW_THREADS
93!
UNCOV
294
    } catch (const std::exception &e) {
×
295
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
296
        return NULL;
×
297
    }
×
298
    Py_RETURN_NONE;
93✔
299
}
300

301
static PyObject *Runtime_enter(RuntimeObject *self,
×
302
                               PyObject *Py_UNUSED(ignored)) {
303
    Py_INCREF(self);
304
    return (PyObject *)self;
×
305
}
306

307
static PyObject *Runtime_exit(RuntimeObject *self, PyObject *args) {
×
308
    if (self->runtime) {
×
309
        Py_BEGIN_ALLOW_THREADS self->runtime->shutdown();
×
310
        Py_END_ALLOW_THREADS
×
311
    }
312
    Py_RETURN_NONE;
×
313
}
314

315
static PyObject *Runtime_get_threads(RuntimeObject *self, void *closure) {
7✔
316
    if (!self->runtime) {
7!
317
        PyErr_SetString(PyExc_RuntimeError, "Runtime not initialized");
×
318
        return NULL;
×
319
    }
320
    return PyLong_FromSize_t(self->runtime->threads());
7✔
321
}
322

323
static PyObject *get_default_runtime_py(PyObject *Py_UNUSED(module),
1✔
324
                                        PyObject *Py_UNUSED(ignored)) {
325
    dftracer::utils::Runtime *rt = get_default_runtime();
1✔
326
    if (!rt) {
1!
327
        PyErr_SetString(PyExc_RuntimeError, "Failed to create default runtime");
×
328
        return NULL;
×
329
    }
330

331
    RuntimeObject *obj = (RuntimeObject *)RuntimeType.tp_alloc(&RuntimeType, 0);
1✔
332
    if (!obj) return NULL;
1!
333

334
    new (&obj->runtime)
1✔
335
        std::shared_ptr<dftracer::utils::Runtime>(g_default_runtime);
1✔
336
    return (PyObject *)obj;
1✔
337
}
338

339
static PyObject *set_default_runtime_py(PyObject *Py_UNUSED(module),
2✔
340
                                        PyObject *args) {
341
    PyObject *arg;
342
    if (!PyArg_ParseTuple(args, "O", &arg)) return NULL;
2!
343

344
    if (arg == Py_None) {
2!
345
        g_default_runtime.reset();
×
346
        Py_RETURN_NONE;
×
347
    }
348

349
    if (!PyObject_TypeCheck(arg, &RuntimeType)) {
2!
350
        PyErr_SetString(PyExc_TypeError, "Expected Runtime or None");
×
351
        return NULL;
×
352
    }
353

354
    g_default_runtime = ((RuntimeObject *)arg)->runtime;
2✔
355
    Py_RETURN_NONE;
2✔
356
}
357

358
static PyMethodDef Runtime_methods[] = {
359
    {"shutdown", (PyCFunction)Runtime_shutdown, METH_NOARGS,
360
     "shutdown()\n"
361
     "--\n"
362
     "\n"
363
     "Shut down the runtime.\n"},
364
    {"get_progress", (PyCFunction)Runtime_get_progress, METH_NOARGS,
365
     "Return progress dict with keys: total, completed, running,\n"
366
     "queued, failed."},
367
    {"is_responsive", (PyCFunction)Runtime_is_responsive, METH_NOARGS,
368
     "Return True if the runtime is making progress."},
369
    {"set_timeout", (PyCFunction)Runtime_set_timeout,
370
     METH_VARARGS | METH_KEYWORDS,
371
     "Set global timeout in milliseconds.\n"
372
     "\n"
373
     "Args:\n"
374
     "    global_ms (int): Timeout in milliseconds (0 = no timeout).\n"},
375
    {"set_default_task_timeout", (PyCFunction)Runtime_set_default_task_timeout,
376
     METH_VARARGS | METH_KEYWORDS,
377
     "Set default per-task timeout in milliseconds.\n"
378
     "\n"
379
     "Args:\n"
380
     "    ms (int): Timeout in milliseconds (0 = no timeout).\n"},
381
    {"wait_all", (PyCFunction)Runtime_wait_all, METH_NOARGS,
382
     "Wait for all outstanding submitted tasks to complete."},
383
    {"__enter__", (PyCFunction)Runtime_enter, METH_NOARGS,
384
     "Enter context manager."},
385
    {"__exit__", (PyCFunction)Runtime_exit, METH_VARARGS,
386
     "Exit context manager (calls shutdown)."},
387
    {NULL}};
388

389
static PyObject *Runtime_get_io_threads(RuntimeObject *self, void *closure) {
×
390
    if (!self->runtime) {
×
391
        PyErr_SetString(PyExc_RuntimeError, "Runtime not initialized");
×
392
        return NULL;
×
393
    }
394
    return PyLong_FromSize_t(self->runtime->io_threads());
×
395
}
396

397
static PyGetSetDef Runtime_getsetters[] = {
398
    {"threads", (getter)Runtime_get_threads, NULL, "Number of worker threads",
399
     NULL},
400
    {"io_threads", (getter)Runtime_get_io_threads, NULL,
401
     "Number of I/O threads", NULL},
402
    {NULL}};
403

404
PyTypeObject RuntimeType = {
405
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.Runtime",
406
    sizeof(RuntimeObject),                    /* tp_basicsize */
407
    0,                                        /* tp_itemsize */
408
    (destructor)Runtime_dealloc,              /* tp_dealloc */
409
    0,                                        /* tp_vectorcall_offset */
410
    0,                                        /* tp_getattr */
411
    0,                                        /* tp_setattr */
412
    0,                                        /* tp_as_async */
413
    0,                                        /* tp_repr */
414
    0,                                        /* tp_as_number */
415
    0,                                        /* tp_as_sequence */
416
    0,                                        /* tp_as_mapping */
417
    0,                                        /* tp_hash */
418
    0,                                        /* tp_call */
419
    0,                                        /* tp_str */
420
    0,                                        /* tp_getattro */
421
    0,                                        /* tp_setattro */
422
    0,                                        /* tp_as_buffer */
423
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
424
    "Runtime(threads: int = 0, io_threads: int = 0)\n"
425
    "--\n"
426
    "\n"
427
    "Coroutine runtime backed by a thread pool.\n"
428
    "\n"
429
    "Args:\n"
430
    "    threads (int): Number of worker threads. 0 (default) uses\n"
431
    "        the hardware concurrency.\n"
432
    "    io_threads (int): Number of I/O threads. 0 (default) uses\n"
433
    "        the hardware concurrency.\n", /* tp_doc */
434
    0,                                     /* tp_traverse */
435
    0,                                     /* tp_clear */
436
    0,                                     /* tp_richcompare */
437
    0,                                     /* tp_weaklistoffset */
438
    0,                                     /* tp_iter */
439
    0,                                     /* tp_iternext */
440
    Runtime_methods,                       /* tp_methods */
441
    0,                                     /* tp_members */
442
    Runtime_getsetters,                    /* tp_getset */
443
    0,                                     /* tp_base */
444
    0,                                     /* tp_dict */
445
    0,                                     /* tp_descr_get */
446
    0,                                     /* tp_descr_set */
447
    0,                                     /* tp_dictoffset */
448
    (initproc)Runtime_init,                /* tp_init */
449
    0,                                     /* tp_alloc */
450
    Runtime_new,                           /* tp_new */
451
};
452

453
// Module-level function table (registered via PyModule_AddFunctions or
454
// appended to the module's method table in init_runtime).
455
static PyMethodDef runtime_module_methods[] = {
456
    {"get_default_runtime", get_default_runtime_py, METH_NOARGS,
457
     "Return the module-level default Runtime (lazy-created)."},
458
    {"set_default_runtime", set_default_runtime_py, METH_VARARGS,
459
     "Replace the module-level default Runtime (pass None to clear).\n"
460
     "\n"
461
     "Args:\n"
462
     "    runtime (Runtime or None): New default runtime.\n"},
463
    {NULL}};
464

465
int init_runtime(PyObject *m) {
1✔
466
    if (register_type(m, &RuntimeType, "Runtime") < 0) return -1;
1!
467

468
    for (PyMethodDef *def = runtime_module_methods; def->ml_name; ++def) {
3✔
469
        PyObject *fn = PyCFunction_New(def, NULL);
2✔
470
        if (!fn) return -1;
2!
471
        if (PyModule_AddObject(m, def->ml_name, fn) < 0) {
2!
472
            Py_DECREF(fn);
473
            return -1;
×
474
        }
475
    }
476

477
    return 0;
1✔
478
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc