• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 27052412546

06 Jun 2026 04:20AM UTC coverage: 50.862% (+1.0%) from 49.905%
27052412546

Pull #73

github

web-flow
Merge 734572730 into 88a3c8457
Pull Request #73: add portable dependencies wheel support

31801 of 79859 branches covered (39.82%)

Branch coverage included in aggregate %.

32491 of 46545 relevant lines covered (69.81%)

9947.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.75
/src/dftracer/utils/python/arrow_parallel_reader.cpp
1
#include <dftracer/utils/core/common/config.h>
2
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
3

4
#define PY_SSIZE_T_CLEAN
5
#include <Python.h>
6
#include <dftracer/utils/python/arrow_parallel_reader.h>
7
#include <dftracer/utils/python/runtime.h>
8
#include <dftracer/utils/python/trace_reader_iterator.h>
9
#include <dftracer/utils/utilities/common/arrow/parallel_reader.h>
10

11
#include <string>
12
#include <vector>
13

14
namespace dftracer::utils::python {
15

16
using utilities::common::arrow::ArrowExportResult;
17
using utilities::common::arrow::read_arrow_files_parallel;
18

19
static PyObject* py_read_arrow_files_parallel(PyObject* /*self*/,
×
20
                                              PyObject* args,
21
                                              PyObject* kwargs) {
22
    static const char* kwlist[] = {"paths", "runtime", nullptr};
23
    PyObject* paths_obj = nullptr;
×
24
    PyObject* runtime_obj = nullptr;
×
25

26
    if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|O",
×
27
                                     const_cast<char**>(kwlist), &paths_obj,
28
                                     &runtime_obj)) {
29
        return nullptr;
×
30
    }
31

32
    // Convert paths to vector<string>
33
    if (!PyList_Check(paths_obj)) {
×
34
        PyErr_SetString(PyExc_TypeError, "paths must be a list of strings");
×
35
        return nullptr;
×
36
    }
37

38
    Py_ssize_t n = PyList_Size(paths_obj);
×
39
    std::vector<std::string> paths;
×
40
    paths.reserve(n);
×
41

42
    for (Py_ssize_t i = 0; i < n; ++i) {
×
43
        PyObject* item = PyList_GetItem(paths_obj, i);
×
44
        if (!PyUnicode_Check(item)) {
×
45
            PyErr_SetString(PyExc_TypeError, "all paths must be strings");
×
46
            return nullptr;
×
47
        }
48
        paths.push_back(PyUnicode_AsUTF8(item));
×
49
    }
×
50

51
    // Get runtime
52
    Runtime* runtime = nullptr;
×
53
    if (runtime_obj && runtime_obj != Py_None) {
×
54
        if (!PyObject_TypeCheck(runtime_obj, &RuntimeType)) {
×
55
            PyErr_SetString(PyExc_TypeError,
×
56
                            "runtime must be a Runtime object");
57
            return nullptr;
×
58
        }
59
        runtime = ((RuntimeObject*)runtime_obj)->runtime.get();
×
60
    } else {
×
61
        runtime = get_default_runtime();
×
62
    }
63

64
    // Call C++ parallel reader (releases GIL during file I/O)
65
    utilities::common::arrow::ParallelReadResult result;
×
66
    bool had_error = false;
×
67
    std::string error_msg;
×
68

69
    Py_BEGIN_ALLOW_THREADS try {
×
70
        auto task = read_arrow_files_parallel(std::move(paths));
×
71
        result = runtime->submit(std::move(task), "read_arrow_files").get();
×
72
    } catch (const std::exception& e) {
×
73
        had_error = true;
×
74
        error_msg = e.what();
×
75
    } catch (...) {
×
76
        had_error = true;
×
77
        error_msg = "Unknown error in read_arrow_files";
×
78
    }
×
79
    Py_END_ALLOW_THREADS
×
80

81
        if (had_error) {
×
82
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
83
        return nullptr;
×
84
    }
85

86
    // Build Python result dict
87
    PyObject* file_results_list = PyList_New(result.file_results.size());
×
88
    if (!file_results_list) return nullptr;
×
89

90
    for (std::size_t i = 0; i < result.file_results.size(); ++i) {
×
91
        const auto& fr = result.file_results[i];
×
92
        PyObject* fr_dict = PyDict_New();
×
93
        if (!fr_dict) {
×
94
            Py_DECREF(file_results_list);
×
95
            return nullptr;
×
96
        }
97

98
        // path
99
        PyObject* path_str = PyUnicode_FromString(fr.path.c_str());
×
100
        PyDict_SetItemString(fr_dict, "path", path_str);
×
101
        Py_DECREF(path_str);
×
102

103
        // success
104
        PyDict_SetItemString(fr_dict, "success",
×
105
                             fr.success ? Py_True : Py_False);
×
106

107
        // error
108
        if (!fr.error.empty()) {
×
109
            PyObject* err_str = PyUnicode_FromString(fr.error.c_str());
×
110
            PyDict_SetItemString(fr_dict, "error", err_str);
×
111
            Py_DECREF(err_str);
×
112
        } else {
×
113
            Py_INCREF(Py_None);
×
114
            PyDict_SetItemString(fr_dict, "error", Py_None);
×
115
        }
116

117
        // total_rows
118
        PyObject* rows = PyLong_FromLongLong(fr.total_rows);
×
119
        PyDict_SetItemString(fr_dict, "total_rows", rows);
×
120
        Py_DECREF(rows);
×
121

122
        // batches - list of ArrowBatchCapsule objects
123
        PyObject* batches_list = PyList_New(fr.batches->size());
×
124
        if (!batches_list) {
×
125
            Py_DECREF(fr_dict);
×
126
            Py_DECREF(file_results_list);
×
127
            return nullptr;
×
128
        }
129

130
        for (std::size_t j = 0; j < fr.batches->size(); ++j) {
×
131
            ArrowBatchCapsuleObject* capsule =
×
132
                (ArrowBatchCapsuleObject*)ArrowBatchCapsuleType.tp_alloc(
×
133
                    &ArrowBatchCapsuleType, 0);
134
            if (!capsule) {
×
135
                Py_DECREF(batches_list);
×
136
                Py_DECREF(fr_dict);
×
137
                Py_DECREF(file_results_list);
×
138
                return nullptr;
×
139
            }
140
            // Move the batch into the capsule
141
            capsule->result =
×
142
                new ArrowExportResult(std::move((*fr.batches)[j]));
×
143
            PyList_SetItem(batches_list, j, (PyObject*)capsule);
×
144
        }
×
145

146
        PyDict_SetItemString(fr_dict, "batches", batches_list);
×
147
        Py_DECREF(batches_list);
×
148

149
        PyList_SetItem(file_results_list, i, fr_dict);
×
150
    }
×
151

152
    // Build final result dict
153
    PyObject* result_dict = PyDict_New();
×
154
    if (!result_dict) {
×
155
        Py_DECREF(file_results_list);
×
156
        return nullptr;
×
157
    }
158

159
    PyDict_SetItemString(result_dict, "file_results", file_results_list);
×
160
    Py_DECREF(file_results_list);
×
161

162
    PyObject* total_rows = PyLong_FromLongLong(result.total_rows);
×
163
    PyDict_SetItemString(result_dict, "total_rows", total_rows);
×
164
    Py_DECREF(total_rows);
×
165

166
    PyObject* total_batches = PyLong_FromLongLong(result.total_batches);
×
167
    PyDict_SetItemString(result_dict, "total_batches", total_batches);
×
168
    Py_DECREF(total_batches);
×
169

170
    PyObject* files_read = PyLong_FromSize_t(result.files_read);
×
171
    PyDict_SetItemString(result_dict, "files_read", files_read);
×
172
    Py_DECREF(files_read);
×
173

174
    PyObject* files_failed = PyLong_FromSize_t(result.files_failed);
×
175
    PyDict_SetItemString(result_dict, "files_failed", files_failed);
×
176
    Py_DECREF(files_failed);
×
177

178
    return result_dict;
×
179
}
×
180

181
static PyMethodDef arrow_parallel_reader_methods[] = {
182
    {"read_arrow_files_parallel", (PyCFunction)py_read_arrow_files_parallel,
183
     METH_VARARGS | METH_KEYWORDS,
184
     "Read multiple Arrow IPC files in parallel using the Runtime.\n\n"
185
     "Args:\n"
186
     "    paths: List of file paths to read.\n"
187
     "    runtime: Optional Runtime object. Uses default if not provided.\n\n"
188
     "Returns:\n"
189
     "    dict with:\n"
190
     "        - file_results: List of per-file results, each with:\n"
191
     "            - path: File path\n"
192
     "            - success: True if read succeeded\n"
193
     "            - error: Error message if failed, else None\n"
194
     "            - total_rows: Number of rows in file\n"
195
     "            - batches: List of ArrowBatch objects\n"
196
     "        - total_rows: Total rows across all files\n"
197
     "        - total_batches: Total batches across all files\n"
198
     "        - files_read: Number of files read successfully\n"
199
     "        - files_failed: Number of files that failed"},
200
    {nullptr, nullptr, 0, nullptr}};
201

202
int init_arrow_parallel_reader(PyObject* m) {
1✔
203
    // Add the function to the module
204
    if (PyModule_AddFunctions(m, arrow_parallel_reader_methods) < 0) {
1!
205
        return -1;
×
206
    }
207
    return 0;
1✔
208
}
1✔
209

210
}  // namespace dftracer::utils::python
211

212
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc