• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28693295402

04 Jul 2026 03:17AM UTC coverage: 52.408% (+0.1%) from 52.278%
28693295402

push

github

hariharan-devarajan
feat: silence noisy warnings on aarch64

37318 of 92666 branches covered (40.27%)

Branch coverage included in aggregate %.

33462 of 42389 relevant lines covered (78.94%)

20557.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.49
/src/dftracer/utils/python/arrow_parallel_reader.cpp
1
#include <dftracer/utils/core/common/config.h>
2
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
3

4
#define PY_SSIZE_T_CLEAN
5
#include <Python.h>
6
#include <dftracer/utils/python/arrow_parallel_reader.h>
7
#include <dftracer/utils/python/py_dict_helpers.h>
8
#include <dftracer/utils/python/py_list_helpers.h>
9
#include <dftracer/utils/python/py_runtime_mixin.h>
10
#include <dftracer/utils/python/runtime.h>
11
#include <dftracer/utils/python/trace_reader_iterator.h>
12
#include <dftracer/utils/utilities/common/arrow/parallel_reader.h>
13

14
#include <string>
15
#include <vector>
16

17
namespace dftracer::utils::python {
18

19
using utilities::common::arrow::ArrowExportResult;
20
using utilities::common::arrow::read_arrow_files_parallel;
21

22
static PyObject* py_read_arrow_files_parallel(PyObject* /*self*/,
×
23
                                              PyObject* args,
24
                                              PyObject* kwargs) {
25
    static const char* kwlist[] = {"paths", "runtime", nullptr};
26
    PyObject* paths_obj = nullptr;
×
27
    PyObject* runtime_obj = nullptr;
×
28

29
    if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|O",
×
30
                                     const_cast<char**>(kwlist), &paths_obj,
31
                                     &runtime_obj)) {
32
        return nullptr;
×
33
    }
34

35
    // Convert paths to vector<string>
36
    std::vector<std::string> paths;
×
37
    if (!parse_str_list(paths_obj, "paths", paths)) return nullptr;
×
38

39
    // Get runtime
40
    Runtime* runtime = nullptr;
×
41
    if (runtime_obj && runtime_obj != Py_None) {
×
42
        if (!PyObject_TypeCheck(runtime_obj, &RuntimeType)) {
×
43
            PyErr_SetString(PyExc_TypeError,
×
44
                            "runtime must be a Runtime object");
45
            return nullptr;
×
46
        }
47
        runtime = ((RuntimeObject*)runtime_obj)->runtime.get();
×
48
    } else {
49
        runtime = get_default_runtime();
×
50
    }
51

52
    // Call C++ parallel reader (releases GIL during file I/O)
53
    utilities::common::arrow::ParallelReadResult result;
×
54
    if (!run_blocking_r(
×
55
            [&] {
×
56
                auto task = read_arrow_files_parallel(std::move(paths));
×
57
                return runtime->submit(std::move(task), "read_arrow_files")
×
58
                    .get();
×
59
            },
×
60
            result)) {
61
        return nullptr;
×
62
    }
63

64
    // Build Python result dict
65
    PyObject* file_results_list = PyList_New(result.file_results.size());
×
66
    if (!file_results_list) return nullptr;
×
67

68
    for (std::size_t i = 0; i < result.file_results.size(); ++i) {
×
69
        const auto& fr = result.file_results[i];
×
70
        PyObject* fr_dict = PyDict_New();
×
71
        if (!fr_dict) {
×
72
            Py_DECREF(file_results_list);
×
73
            return nullptr;
×
74
        }
75

76
        dict_set_str(fr_dict, "path", fr.path.c_str());
×
77
        dict_set_bool(fr_dict, "success", fr.success);
×
78

79
        if (!fr.error.empty()) {
×
80
            dict_set_str(fr_dict, "error", fr.error.c_str());
×
81
        } else {
82
            Py_INCREF(Py_None);
×
83
            PyDict_SetItemString(fr_dict, "error", Py_None);
×
84
        }
85

86
        dict_set_i64(fr_dict, "total_rows", fr.total_rows);
×
87

88
        // batches - list of ArrowBatchCapsule objects
89
        PyObject* batches_list = PyList_New(fr.batches->size());
×
90
        if (!batches_list) {
×
91
            Py_DECREF(fr_dict);
×
92
            Py_DECREF(file_results_list);
×
93
            return nullptr;
×
94
        }
95

96
        for (std::size_t j = 0; j < fr.batches->size(); ++j) {
×
97
            ArrowBatchCapsuleObject* capsule =
98
                (ArrowBatchCapsuleObject*)ArrowBatchCapsuleType.tp_alloc(
×
99
                    &ArrowBatchCapsuleType, 0);
100
            if (!capsule) {
×
101
                Py_DECREF(batches_list);
×
102
                Py_DECREF(fr_dict);
×
103
                Py_DECREF(file_results_list);
×
104
                return nullptr;
×
105
            }
106
            // Move the batch into the capsule
107
            capsule->result =
×
108
                new ArrowExportResult(std::move((*fr.batches)[j]));
×
109
            PyList_SetItem(batches_list, j, (PyObject*)capsule);
×
110
        }
111

112
        PyDict_SetItemString(fr_dict, "batches", batches_list);
×
113
        Py_DECREF(batches_list);
×
114

115
        PyList_SetItem(file_results_list, i, fr_dict);
×
116
    }
117

118
    // Build final result dict
119
    PyObject* result_dict = PyDict_New();
×
120
    if (!result_dict) {
×
121
        Py_DECREF(file_results_list);
×
122
        return nullptr;
×
123
    }
124

125
    PyDict_SetItemString(result_dict, "file_results", file_results_list);
×
126
    Py_DECREF(file_results_list);
×
127

128
    dict_set_i64(result_dict, "total_rows", result.total_rows);
×
129
    dict_set_i64(result_dict, "total_batches", result.total_batches);
×
130
    dict_set_size(result_dict, "files_read", result.files_read);
×
131
    dict_set_size(result_dict, "files_failed", result.files_failed);
×
132

133
    return result_dict;
×
134
}
×
135

136
static PyMethodDef arrow_parallel_reader_methods[] = {
137
    {"read_arrow_files_parallel", (PyCFunction)py_read_arrow_files_parallel,
138
     METH_VARARGS | METH_KEYWORDS,
139
     "Read multiple Arrow IPC files in parallel using the Runtime.\n\n"
140
     "Args:\n"
141
     "    paths: List of file paths to read.\n"
142
     "    runtime: Optional Runtime object. Uses default if not provided.\n\n"
143
     "Returns:\n"
144
     "    dict with:\n"
145
     "        - file_results: List of per-file results, each with:\n"
146
     "            - path: File path\n"
147
     "            - success: True if read succeeded\n"
148
     "            - error: Error message if failed, else None\n"
149
     "            - total_rows: Number of rows in file\n"
150
     "            - batches: List of ArrowBatch objects\n"
151
     "        - total_rows: Total rows across all files\n"
152
     "        - total_batches: Total batches across all files\n"
153
     "        - files_read: Number of files read successfully\n"
154
     "        - files_failed: Number of files that failed"},
155
    {nullptr, nullptr, 0, nullptr}};
156

157
int init_arrow_parallel_reader(PyObject* m) {
2✔
158
    // Add the function to the module
159
    if (PyModule_AddFunctions(m, arrow_parallel_reader_methods) < 0) {
2✔
160
        return -1;
×
161
    }
162
    return 0;
2✔
163
}
1✔
164

165
}  // namespace dftracer::utils::python
166

167
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc