• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28521653886

01 Jul 2026 01:36PM UTC coverage: 50.92% (-1.4%) from 52.278%
28521653886

Pull #83

github

web-flow
Merge 9bdedb1e9 into 2efed6649
Pull Request #83: refactor and improve code QoL

31893 of 80049 branches covered (39.84%)

Branch coverage included in aggregate %.

789 of 1613 new or added lines in 87 files covered. (48.92%)

5007 existing lines in 181 files now uncovered.

32812 of 47024 relevant lines covered (69.78%)

9905.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.07
/src/dftracer/utils/python/arrow_parallel_reader.cpp
1
#include <dftracer/utils/core/common/config.h>
2
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
3

4
#define PY_SSIZE_T_CLEAN
5
#include <Python.h>
6
#include <dftracer/utils/python/arrow_parallel_reader.h>
7
#include <dftracer/utils/python/py_dict_helpers.h>
8
#include <dftracer/utils/python/runtime.h>
9
#include <dftracer/utils/python/trace_reader_iterator.h>
10
#include <dftracer/utils/utilities/common/arrow/parallel_reader.h>
11

12
#include <string>
13
#include <vector>
14

15
namespace dftracer::utils::python {
16

17
using utilities::common::arrow::ArrowExportResult;
18
using utilities::common::arrow::read_arrow_files_parallel;
19

20
static PyObject* py_read_arrow_files_parallel(PyObject* /*self*/,
×
21
                                              PyObject* args,
22
                                              PyObject* kwargs) {
23
    static const char* kwlist[] = {"paths", "runtime", nullptr};
24
    PyObject* paths_obj = nullptr;
×
25
    PyObject* runtime_obj = nullptr;
×
26

27
    if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|O",
×
28
                                     const_cast<char**>(kwlist), &paths_obj,
29
                                     &runtime_obj)) {
30
        return nullptr;
×
31
    }
32

33
    // Convert paths to vector<string>
34
    if (!PyList_Check(paths_obj)) {
×
35
        PyErr_SetString(PyExc_TypeError, "paths must be a list of strings");
×
36
        return nullptr;
×
37
    }
38

39
    Py_ssize_t n = PyList_Size(paths_obj);
×
40
    std::vector<std::string> paths;
×
41
    paths.reserve(n);
×
42

43
    for (Py_ssize_t i = 0; i < n; ++i) {
×
44
        PyObject* item = PyList_GetItem(paths_obj, i);
×
45
        if (!PyUnicode_Check(item)) {
×
46
            PyErr_SetString(PyExc_TypeError, "all paths must be strings");
×
47
            return nullptr;
×
48
        }
49
        paths.push_back(PyUnicode_AsUTF8(item));
×
UNCOV
50
    }
×
51

52
    // Get runtime
53
    Runtime* runtime = nullptr;
×
54
    if (runtime_obj && runtime_obj != Py_None) {
×
55
        if (!PyObject_TypeCheck(runtime_obj, &RuntimeType)) {
×
56
            PyErr_SetString(PyExc_TypeError,
×
57
                            "runtime must be a Runtime object");
58
            return nullptr;
×
59
        }
60
        runtime = ((RuntimeObject*)runtime_obj)->runtime.get();
×
UNCOV
61
    } else {
×
62
        runtime = get_default_runtime();
×
63
    }
64

65
    // Call C++ parallel reader (releases GIL during file I/O)
66
    utilities::common::arrow::ParallelReadResult result;
×
67
    bool had_error = false;
×
68
    std::string error_msg;
×
69

70
    Py_BEGIN_ALLOW_THREADS try {
×
71
        auto task = read_arrow_files_parallel(std::move(paths));
×
72
        result = runtime->submit(std::move(task), "read_arrow_files").get();
×
73
    } catch (const std::exception& e) {
×
74
        had_error = true;
×
75
        error_msg = e.what();
×
76
    } catch (...) {
×
77
        had_error = true;
×
78
        error_msg = "Unknown error in read_arrow_files";
×
79
    }
×
80
    Py_END_ALLOW_THREADS
×
81

82
        if (had_error) {
×
83
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
84
        return nullptr;
×
85
    }
86

87
    // Build Python result dict
88
    PyObject* file_results_list = PyList_New(result.file_results.size());
×
89
    if (!file_results_list) return nullptr;
×
90

91
    for (std::size_t i = 0; i < result.file_results.size(); ++i) {
×
92
        const auto& fr = result.file_results[i];
×
93
        PyObject* fr_dict = PyDict_New();
×
94
        if (!fr_dict) {
×
UNCOV
95
            Py_DECREF(file_results_list);
×
96
            return nullptr;
×
97
        }
98

NEW
99
        dict_set_str(fr_dict, "path", fr.path.c_str());
×
NEW
100
        dict_set_bool(fr_dict, "success", fr.success);
×
101

UNCOV
102
        if (!fr.error.empty()) {
×
NEW
103
            dict_set_str(fr_dict, "error", fr.error.c_str());
×
104
        } else {
×
UNCOV
105
            Py_INCREF(Py_None);
×
106
            PyDict_SetItemString(fr_dict, "error", Py_None);
×
107
        }
108

NEW
109
        dict_set_i64(fr_dict, "total_rows", fr.total_rows);
×
110

111
        // batches - list of ArrowBatchCapsule objects
112
        PyObject* batches_list = PyList_New(fr.batches->size());
×
113
        if (!batches_list) {
×
UNCOV
114
            Py_DECREF(fr_dict);
×
UNCOV
115
            Py_DECREF(file_results_list);
×
116
            return nullptr;
×
117
        }
118

119
        for (std::size_t j = 0; j < fr.batches->size(); ++j) {
×
UNCOV
120
            ArrowBatchCapsuleObject* capsule =
×
121
                (ArrowBatchCapsuleObject*)ArrowBatchCapsuleType.tp_alloc(
×
122
                    &ArrowBatchCapsuleType, 0);
123
            if (!capsule) {
×
UNCOV
124
                Py_DECREF(batches_list);
×
UNCOV
125
                Py_DECREF(fr_dict);
×
UNCOV
126
                Py_DECREF(file_results_list);
×
127
                return nullptr;
×
128
            }
129
            // Move the batch into the capsule
130
            capsule->result =
×
131
                new ArrowExportResult(std::move((*fr.batches)[j]));
×
132
            PyList_SetItem(batches_list, j, (PyObject*)capsule);
×
UNCOV
133
        }
×
134

135
        PyDict_SetItemString(fr_dict, "batches", batches_list);
×
UNCOV
136
        Py_DECREF(batches_list);
×
137

138
        PyList_SetItem(file_results_list, i, fr_dict);
×
UNCOV
139
    }
×
140

141
    // Build final result dict
142
    PyObject* result_dict = PyDict_New();
×
143
    if (!result_dict) {
×
UNCOV
144
        Py_DECREF(file_results_list);
×
145
        return nullptr;
×
146
    }
147

148
    PyDict_SetItemString(result_dict, "file_results", file_results_list);
×
UNCOV
149
    Py_DECREF(file_results_list);
×
150

NEW
151
    dict_set_i64(result_dict, "total_rows", result.total_rows);
×
NEW
152
    dict_set_i64(result_dict, "total_batches", result.total_batches);
×
NEW
153
    dict_set_size(result_dict, "files_read", result.files_read);
×
NEW
154
    dict_set_size(result_dict, "files_failed", result.files_failed);
×
155

156
    return result_dict;
×
157
}
×
158

159
static PyMethodDef arrow_parallel_reader_methods[] = {
160
    {"read_arrow_files_parallel", (PyCFunction)py_read_arrow_files_parallel,
161
     METH_VARARGS | METH_KEYWORDS,
162
     "Read multiple Arrow IPC files in parallel using the Runtime.\n\n"
163
     "Args:\n"
164
     "    paths: List of file paths to read.\n"
165
     "    runtime: Optional Runtime object. Uses default if not provided.\n\n"
166
     "Returns:\n"
167
     "    dict with:\n"
168
     "        - file_results: List of per-file results, each with:\n"
169
     "            - path: File path\n"
170
     "            - success: True if read succeeded\n"
171
     "            - error: Error message if failed, else None\n"
172
     "            - total_rows: Number of rows in file\n"
173
     "            - batches: List of ArrowBatch objects\n"
174
     "        - total_rows: Total rows across all files\n"
175
     "        - total_batches: Total batches across all files\n"
176
     "        - files_read: Number of files read successfully\n"
177
     "        - files_failed: Number of files that failed"},
178
    {nullptr, nullptr, 0, nullptr}};
179

180
int init_arrow_parallel_reader(PyObject* m) {
1✔
181
    // Add the function to the module
182
    if (PyModule_AddFunctions(m, arrow_parallel_reader_methods) < 0) {
1!
183
        return -1;
×
184
    }
185
    return 0;
1✔
186
}
1✔
187

188
}  // namespace dftracer::utils::python
189

190
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc