• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26043728131

18 May 2026 03:37PM UTC coverage: 51.706% (-0.4%) from 52.076%
26043728131

push

github

hariharan-devarajan
feat(perf): performance improvements for parallel reading, indexing, and aggregation

Indexer
- Streaming parse-and-emit worker pipeline with bounded memory usage
- Concurrent SST artifact ingestion with staging support
- Gzip member slicing for parallel indexing
- Lazy decoding for compressed value counts
- Bypass DOM wrapper for indexer hot path (simdjson on_demand)
- Decoupled write workers from parse workers
- --rebuild-summaries flag and optimized root summary rebuild

Aggregator / MPI
- Task-based DAG execution for aggregator pipeline
- Shared staging for multi-node artifact relocation
- Per-node thread scaling to avoid oversubscription
- Unified distributed aggregation tracking, removed manifest consolidation
- Deterministic aggregation and intra-file parallelism

Trace reader / query
- Compiled predicate evaluation for AND-of-EQ queries
- Uniform-match shortcut for AND-of-EQ queries
- Line-range support for work items and checkpoint processing
- Optimized chunk pruning and checkpoint handling

Replay
- Pipelined replay with coroutines and channels
- JsonParser-based trace processing
- Optimized string handling and i/o buffering

Organize / writer / dft
- Parallel slice creation and merging in organize visitor
- Inline indexer in organize
- Gzip member tracking in writer
- Coroutine-based event dispatcher with extracted parse logic
- Batch flushing in organize visitor

Arrow / call_tree
- Optimized arrow conversion
- Arrow IPC support and improved save/load in call_tree

Build / infrastructure
- zlib-ng option, system simdjson fallback
- cgroup v1/v2 memory limit detection
- Auto-computed per-file memory estimates and batch sizes
- CI: perf branch trigger, formatting

Docs
- Rewritten indexer and trace reader API references

35907 of 90345 branches covered (39.74%)

Branch coverage included in aggregate %.

16869 of 21880 new or added lines in 137 files covered. (77.1%)

273 existing lines in 39 files now uncovered.

32021 of 41028 relevant lines covered (78.05%)

13164.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.21
/src/dftracer/utils/python/arrow_parallel_reader.cpp
1
#include <dftracer/utils/core/common/config.h>
2
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
3

4
#define PY_SSIZE_T_CLEAN
5
#include <Python.h>
6
#include <dftracer/utils/python/arrow_parallel_reader.h>
7
#include <dftracer/utils/python/runtime.h>
8
#include <dftracer/utils/python/trace_reader_iterator.h>
9
#include <dftracer/utils/utilities/common/arrow/parallel_reader.h>
10

11
#include <string>
12
#include <vector>
13

14
namespace dftracer::utils::python {
15

16
using utilities::common::arrow::ArrowExportResult;
17
using utilities::common::arrow::read_arrow_files_parallel;
18

NEW
19
static PyObject* py_read_arrow_files_parallel(PyObject* /*self*/,
×
20
                                              PyObject* args,
21
                                              PyObject* kwargs) {
22
    static const char* kwlist[] = {"paths", "runtime", nullptr};
NEW
23
    PyObject* paths_obj = nullptr;
×
NEW
24
    PyObject* runtime_obj = nullptr;
×
25

NEW
26
    if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|O",
×
27
                                     const_cast<char**>(kwlist), &paths_obj,
28
                                     &runtime_obj)) {
NEW
29
        return nullptr;
×
30
    }
31

32
    // Convert paths to vector<string>
NEW
33
    if (!PyList_Check(paths_obj)) {
×
NEW
34
        PyErr_SetString(PyExc_TypeError, "paths must be a list of strings");
×
NEW
35
        return nullptr;
×
36
    }
37

NEW
38
    Py_ssize_t n = PyList_Size(paths_obj);
×
NEW
39
    std::vector<std::string> paths;
×
NEW
40
    paths.reserve(n);
×
41

NEW
42
    for (Py_ssize_t i = 0; i < n; ++i) {
×
NEW
43
        PyObject* item = PyList_GetItem(paths_obj, i);
×
NEW
44
        if (!PyUnicode_Check(item)) {
×
NEW
45
            PyErr_SetString(PyExc_TypeError, "all paths must be strings");
×
NEW
46
            return nullptr;
×
47
        }
NEW
48
        paths.push_back(PyUnicode_AsUTF8(item));
×
49
    }
50

51
    // Get runtime
NEW
52
    Runtime* runtime = nullptr;
×
NEW
53
    if (runtime_obj && runtime_obj != Py_None) {
×
NEW
54
        if (!PyObject_TypeCheck(runtime_obj, &RuntimeType)) {
×
NEW
55
            PyErr_SetString(PyExc_TypeError,
×
56
                            "runtime must be a Runtime object");
NEW
57
            return nullptr;
×
58
        }
NEW
59
        runtime = ((RuntimeObject*)runtime_obj)->runtime.get();
×
60
    } else {
NEW
61
        runtime = get_default_runtime();
×
62
    }
63

64
    // Call C++ parallel reader (releases GIL during file I/O)
NEW
65
    utilities::common::arrow::ParallelReadResult result;
×
NEW
66
    bool had_error = false;
×
NEW
67
    std::string error_msg;
×
68

NEW
69
    Py_BEGIN_ALLOW_THREADS try {
×
NEW
70
        auto task = read_arrow_files_parallel(std::move(paths));
×
NEW
71
        result = runtime->submit(std::move(task), "read_arrow_files").get();
×
NEW
72
    } catch (const std::exception& e) {
×
NEW
73
        had_error = true;
×
NEW
74
        error_msg = e.what();
×
NEW
75
    } catch (...) {
×
NEW
76
        had_error = true;
×
NEW
77
        error_msg = "Unknown error in read_arrow_files";
×
NEW
78
    }
×
NEW
79
    Py_END_ALLOW_THREADS
×
80

NEW
81
        if (had_error) {
×
NEW
82
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
NEW
83
        return nullptr;
×
84
    }
85

86
    // Build Python result dict
NEW
87
    PyObject* file_results_list = PyList_New(result.file_results.size());
×
NEW
88
    if (!file_results_list) return nullptr;
×
89

NEW
90
    for (std::size_t i = 0; i < result.file_results.size(); ++i) {
×
NEW
91
        const auto& fr = result.file_results[i];
×
NEW
92
        PyObject* fr_dict = PyDict_New();
×
NEW
93
        if (!fr_dict) {
×
94
            Py_DECREF(file_results_list);
×
NEW
95
            return nullptr;
×
96
        }
97

98
        // path
NEW
99
        PyObject* path_str = PyUnicode_FromString(fr.path.c_str());
×
NEW
100
        PyDict_SetItemString(fr_dict, "path", path_str);
×
101
        Py_DECREF(path_str);
×
102

103
        // success
NEW
104
        PyDict_SetItemString(fr_dict, "success",
×
NEW
105
                             fr.success ? Py_True : Py_False);
×
106

107
        // error
NEW
108
        if (!fr.error.empty()) {
×
NEW
109
            PyObject* err_str = PyUnicode_FromString(fr.error.c_str());
×
NEW
110
            PyDict_SetItemString(fr_dict, "error", err_str);
×
111
            Py_DECREF(err_str);
×
112
        } else {
113
            Py_INCREF(Py_None);
×
NEW
114
            PyDict_SetItemString(fr_dict, "error", Py_None);
×
115
        }
116

117
        // total_rows
NEW
118
        PyObject* rows = PyLong_FromLongLong(fr.total_rows);
×
NEW
119
        PyDict_SetItemString(fr_dict, "total_rows", rows);
×
120
        Py_DECREF(rows);
×
121

122
        // batches - list of ArrowBatchCapsule objects
NEW
123
        PyObject* batches_list = PyList_New(fr.batches->size());
×
NEW
124
        if (!batches_list) {
×
125
            Py_DECREF(fr_dict);
×
126
            Py_DECREF(file_results_list);
×
NEW
127
            return nullptr;
×
128
        }
129

NEW
130
        for (std::size_t j = 0; j < fr.batches->size(); ++j) {
×
131
            ArrowBatchCapsuleObject* capsule =
NEW
132
                (ArrowBatchCapsuleObject*)ArrowBatchCapsuleType.tp_alloc(
×
133
                    &ArrowBatchCapsuleType, 0);
NEW
134
            if (!capsule) {
×
135
                Py_DECREF(batches_list);
×
136
                Py_DECREF(fr_dict);
×
137
                Py_DECREF(file_results_list);
×
NEW
138
                return nullptr;
×
139
            }
140
            // Move the batch into the capsule
NEW
141
            capsule->result =
×
NEW
142
                new ArrowExportResult(std::move((*fr.batches)[j]));
×
NEW
143
            PyList_SetItem(batches_list, j, (PyObject*)capsule);
×
144
        }
145

NEW
146
        PyDict_SetItemString(fr_dict, "batches", batches_list);
×
147
        Py_DECREF(batches_list);
×
148

NEW
149
        PyList_SetItem(file_results_list, i, fr_dict);
×
150
    }
151

152
    // Build final result dict
NEW
153
    PyObject* result_dict = PyDict_New();
×
NEW
154
    if (!result_dict) {
×
155
        Py_DECREF(file_results_list);
×
NEW
156
        return nullptr;
×
157
    }
158

NEW
159
    PyDict_SetItemString(result_dict, "file_results", file_results_list);
×
160
    Py_DECREF(file_results_list);
×
161

NEW
162
    PyObject* total_rows = PyLong_FromLongLong(result.total_rows);
×
NEW
163
    PyDict_SetItemString(result_dict, "total_rows", total_rows);
×
164
    Py_DECREF(total_rows);
×
165

NEW
166
    PyObject* total_batches = PyLong_FromLongLong(result.total_batches);
×
NEW
167
    PyDict_SetItemString(result_dict, "total_batches", total_batches);
×
168
    Py_DECREF(total_batches);
×
169

NEW
170
    PyObject* files_read = PyLong_FromSize_t(result.files_read);
×
NEW
171
    PyDict_SetItemString(result_dict, "files_read", files_read);
×
172
    Py_DECREF(files_read);
×
173

NEW
174
    PyObject* files_failed = PyLong_FromSize_t(result.files_failed);
×
NEW
175
    PyDict_SetItemString(result_dict, "files_failed", files_failed);
×
176
    Py_DECREF(files_failed);
×
177

NEW
178
    return result_dict;
×
NEW
179
}
×
180

181
static PyMethodDef arrow_parallel_reader_methods[] = {
182
    {"read_arrow_files_parallel", (PyCFunction)py_read_arrow_files_parallel,
183
     METH_VARARGS | METH_KEYWORDS,
184
     "Read multiple Arrow IPC files in parallel using the Runtime.\n\n"
185
     "Args:\n"
186
     "    paths: List of file paths to read.\n"
187
     "    runtime: Optional Runtime object. Uses default if not provided.\n\n"
188
     "Returns:\n"
189
     "    dict with:\n"
190
     "        - file_results: List of per-file results, each with:\n"
191
     "            - path: File path\n"
192
     "            - success: True if read succeeded\n"
193
     "            - error: Error message if failed, else None\n"
194
     "            - total_rows: Number of rows in file\n"
195
     "            - batches: List of ArrowBatch objects\n"
196
     "        - total_rows: Total rows across all files\n"
197
     "        - total_batches: Total batches across all files\n"
198
     "        - files_read: Number of files read successfully\n"
199
     "        - files_failed: Number of files that failed"},
200
    {nullptr, nullptr, 0, nullptr}};
201

202
int init_arrow_parallel_reader(PyObject* m) {
2✔
203
    // Add the function to the module
204
    if (PyModule_AddFunctions(m, arrow_parallel_reader_methods) < 0) {
2✔
NEW
205
        return -1;
×
206
    }
207
    return 0;
2✔
208
}
1✔
209

210
}  // namespace dftracer::utils::python
211

212
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc