• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26043728131

18 May 2026 03:37PM UTC coverage: 51.706% (-0.4%) from 52.076%
26043728131

push

github

hariharan-devarajan
feat(perf): performance improvements for parallel reading, indexing, and aggregation

Indexer
- Streaming parse-and-emit worker pipeline with bounded memory usage
- Concurrent SST artifact ingestion with staging support
- Gzip member slicing for parallel indexing
- Lazy decoding for compressed value counts
- Bypass DOM wrapper for indexer hot path (simdjson on_demand)
- Decoupled write workers from parse workers
- --rebuild-summaries flag and optimized root summary rebuild

Aggregator / MPI
- Task-based DAG execution for aggregator pipeline
- Shared staging for multi-node artifact relocation
- Per-node thread scaling to avoid oversubscription
- Unified distributed aggregation tracking, removed manifest consolidation
- Deterministic aggregation and intra-file parallelism

Trace reader / query
- Compiled predicate evaluation for AND-of-EQ queries
- Uniform-match shortcut for AND-of-EQ queries
- Line-range support for work items and checkpoint processing
- Optimized chunk pruning and checkpoint handling

Replay
- Pipelined replay with coroutines and channels
- JsonParser-based trace processing
- Optimized string handling and i/o buffering

Organize / writer / dft
- Parallel slice creation and merging in organize visitor
- Inline indexer in organize
- Gzip member tracking in writer
- Coroutine-based event dispatcher with extracted parse logic
- Batch flushing in organize visitor

Arrow / call_tree
- Optimized arrow conversion
- Arrow IPC support and improved save/load in call_tree

Build / infrastructure
- zlib-ng option, system simdjson fallback
- cgroup v1/v2 memory limit detection
- Auto-computed per-file memory estimates and batch sizes
- CI: perf branch trigger, formatting

Docs
- Rewritten indexer and trace reader API references

35907 of 90345 branches covered (39.74%)

Branch coverage included in aggregate %.

16869 of 21880 new or added lines in 137 files covered. (77.1%)

273 existing lines in 39 files now uncovered.

32021 of 41028 relevant lines covered (78.05%)

13164.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.81
/src/dftracer/utils/python/schema_reconcile.cpp
1
#include <dftracer/utils/core/common/config.h>
2
#ifdef DFTRACER_UTILS_ENABLE_ARROW
3

4
#include <dftracer/utils/python/schema_reconcile.h>
5

6
#include <cstdint>
7
#include <cstdio>
8
#include <cstring>
9

10
namespace dftracer::utils::python {
11

12
namespace {
13

14
bool cstr_eq(const char *a, const char *b) {
744✔
15
    if (a == b) return true;
744✔
16
    if (!a || !b) return false;
744!
17
    return std::strcmp(a, b) == 0;
744✔
18
}
372✔
19

20
// Unknown formats fall back to NA so we can still emit a safe null column.
21
ArrowType type_from_format(const ArrowSchema *s) {
72✔
22
    if (!s || !s->format) return NANOARROW_TYPE_NA;
72!
23
    const char *f = s->format;
72✔
24
    if (cstr_eq(f, "n")) return NANOARROW_TYPE_NA;
72!
25
    if (cstr_eq(f, "b")) return NANOARROW_TYPE_BOOL;
72!
26
    if (cstr_eq(f, "c")) return NANOARROW_TYPE_INT8;
72!
27
    if (cstr_eq(f, "s")) return NANOARROW_TYPE_INT16;
72!
28
    if (cstr_eq(f, "i")) return NANOARROW_TYPE_INT32;
72!
29
    if (cstr_eq(f, "l")) return NANOARROW_TYPE_INT64;
72✔
30
    if (cstr_eq(f, "C")) return NANOARROW_TYPE_UINT8;
48!
31
    if (cstr_eq(f, "S")) return NANOARROW_TYPE_UINT16;
48!
32
    if (cstr_eq(f, "I")) return NANOARROW_TYPE_UINT32;
48!
33
    if (cstr_eq(f, "L")) return NANOARROW_TYPE_UINT64;
48!
34
    if (cstr_eq(f, "f")) return NANOARROW_TYPE_FLOAT;
48!
35
    if (cstr_eq(f, "g")) return NANOARROW_TYPE_DOUBLE;
48✔
36
    if (cstr_eq(f, "u")) return NANOARROW_TYPE_STRING;
24!
NEW
37
    if (cstr_eq(f, "z")) return NANOARROW_TYPE_BINARY;
×
NEW
38
    if (cstr_eq(f, "U")) return NANOARROW_TYPE_LARGE_STRING;
×
NEW
39
    if (cstr_eq(f, "Z")) return NANOARROW_TYPE_LARGE_BINARY;
×
NEW
40
    return NANOARROW_TYPE_NA;
×
41
}
36✔
42

43
int build_null_array(const ArrowSchema *child_schema, int64_t length,
72✔
44
                     ArrowArray *out) {
45
    ArrowError err;
46
    ArrowErrorInit(&err);
72✔
47
    ArrowType t = type_from_format(child_schema);
72✔
48
    if (ArrowArrayInitFromType(out, t) != NANOARROW_OK) return -1;
72!
49
    if (ArrowArrayStartAppending(out) != NANOARROW_OK) return -1;
72!
50
    if (ArrowArrayAppendNull(out, length) != NANOARROW_OK) return -1;
72!
51
    if (ArrowArrayFinishBuildingDefault(out, &err) != NANOARROW_OK) return -1;
72!
52
    return 0;
72✔
53
}
36✔
54

NEW
55
void json_escape(std::string_view in, std::string &out) {
×
NEW
56
    for (char c : in) {
×
NEW
57
        switch (c) {
×
58
            case '"':
NEW
59
                out.append("\\\"");
×
NEW
60
                break;
×
61
            case '\\':
NEW
62
                out.append("\\\\");
×
NEW
63
                break;
×
64
            case '\n':
NEW
65
                out.append("\\n");
×
NEW
66
                break;
×
67
            case '\r':
NEW
68
                out.append("\\r");
×
NEW
69
                break;
×
70
            case '\t':
NEW
71
                out.append("\\t");
×
NEW
72
                break;
×
73
            default:
NEW
74
                if (static_cast<unsigned char>(c) < 0x20) {
×
75
                    char buf[8];
NEW
76
                    std::snprintf(
×
77
                        buf, sizeof(buf), "\\u%04x",
NEW
78
                        static_cast<int>(static_cast<unsigned char>(c)));
×
NEW
79
                    out.append(buf);
×
80
                } else {
NEW
81
                    out.push_back(c);
×
82
                }
83
        }
84
    }
NEW
85
}
×
86

NEW
87
void append_json_scalar(const ArrowSchema *child_schema,
×
88
                        const ArrowArray *child_array, int64_t row,
89
                        std::string &out) {
NEW
90
    if (!child_schema || !child_array) {
×
NEW
91
        out.append("null");
×
NEW
92
        return;
×
93
    }
94
    ArrowArrayView view;
NEW
95
    ArrowArrayViewInitFromType(&view, type_from_format(child_schema));
×
96
    ArrowError err;
NEW
97
    ArrowErrorInit(&err);
×
NEW
98
    if (ArrowArrayViewSetArray(&view, child_array, &err) != NANOARROW_OK) {
×
NEW
99
        out.append("null");
×
NEW
100
        ArrowArrayViewReset(&view);
×
NEW
101
        return;
×
102
    }
NEW
103
    if (ArrowArrayViewIsNull(&view, row)) {
×
NEW
104
        out.append("null");
×
NEW
105
        ArrowArrayViewReset(&view);
×
NEW
106
        return;
×
107
    }
NEW
108
    ArrowType t = type_from_format(child_schema);
×
NEW
109
    switch (t) {
×
110
        case NANOARROW_TYPE_BOOL:
NEW
111
            out.append(ArrowArrayViewGetIntUnsafe(&view, row) ? "true"
×
112
                                                              : "false");
NEW
113
            break;
×
114
        case NANOARROW_TYPE_INT8:
115
        case NANOARROW_TYPE_INT16:
116
        case NANOARROW_TYPE_INT32:
117
        case NANOARROW_TYPE_INT64: {
118
            char buf[32];
NEW
119
            std::snprintf(
×
120
                buf, sizeof(buf), "%lld",
NEW
121
                static_cast<long long>(ArrowArrayViewGetIntUnsafe(&view, row)));
×
NEW
122
            out.append(buf);
×
NEW
123
            break;
×
124
        }
125
        case NANOARROW_TYPE_UINT8:
126
        case NANOARROW_TYPE_UINT16:
127
        case NANOARROW_TYPE_UINT32:
128
        case NANOARROW_TYPE_UINT64: {
129
            char buf[32];
NEW
130
            std::snprintf(buf, sizeof(buf), "%llu",
×
131
                          static_cast<unsigned long long>(
NEW
132
                              ArrowArrayViewGetUIntUnsafe(&view, row)));
×
NEW
133
            out.append(buf);
×
NEW
134
            break;
×
135
        }
136
        case NANOARROW_TYPE_FLOAT:
137
        case NANOARROW_TYPE_DOUBLE: {
138
            char buf[32];
NEW
139
            std::snprintf(buf, sizeof(buf), "%g",
×
140
                          ArrowArrayViewGetDoubleUnsafe(&view, row));
NEW
141
            out.append(buf);
×
NEW
142
            break;
×
143
        }
144
        case NANOARROW_TYPE_STRING:
145
        case NANOARROW_TYPE_LARGE_STRING: {
NEW
146
            auto sv = ArrowArrayViewGetStringUnsafe(&view, row);
×
NEW
147
            out.push_back('"');
×
NEW
148
            json_escape(std::string_view(sv.data, sv.size_bytes), out);
×
NEW
149
            out.push_back('"');
×
NEW
150
            break;
×
151
        }
152
        default:
NEW
153
            out.append("null");
×
154
    }
NEW
155
    ArrowArrayViewReset(&view);
×
156
}
157

158
}  // namespace
159

160
SchemaReconciler::SchemaReconciler() = default;
75!
161

162
bool SchemaReconciler::merge(const ArrowSchema *incoming) {
130✔
163
    if (finalized_ || !incoming) return false;
130!
164
    bool added = false;
130✔
165
    for (int64_t i = 0; i < incoming->n_children; ++i) {
1,172✔
166
        const ArrowSchema *child = incoming->children[i];
1,042✔
167
        if (!child || !child->name) continue;
1,380!
168
        std::string name(child->name);
1,042!
169
        if (name == EXTRA_COLUMN_NAME) continue;  // reserved
1,042!
170
        if (name_to_idx_.count(name)) continue;
1,042!
171
        nanoarrow::UniqueSchema copy;
366!
172
        if (ArrowSchemaDeepCopy(child, copy.get()) != NANOARROW_OK) {
366!
NEW
173
            last_error_ = "schema deep-copy failed while merging";
×
NEW
174
            return added;
×
175
        }
176
        int64_t idx = static_cast<int64_t>(names_.size());
366✔
177
        names_.push_back(name);
366!
178
        child_schemas_.push_back(std::move(copy));
366!
179
        name_to_idx_.emplace(std::move(name), idx);
366!
180
        added = true;
366✔
181
    }
1,042!
182
    return added;
130✔
183
}
65✔
184

185
int SchemaReconciler::finalize() {
48✔
186
    if (finalized_) return 0;
48!
187
    int64_t n = static_cast<int64_t>(child_schemas_.size()) + 1;
48✔
188
    ArrowSchemaInit(locked_schema_.get());
48✔
189
    if (ArrowSchemaSetTypeStruct(locked_schema_.get(), n) != NANOARROW_OK) {
48!
NEW
190
        last_error_ = "failed to initialize union struct schema";
×
NEW
191
        return -1;
×
192
    }
193
    for (size_t i = 0; i < child_schemas_.size(); ++i) {
414✔
194
        nanoarrow::UniqueSchema tmp;
366✔
195
        if (ArrowSchemaDeepCopy(child_schemas_[i].get(), tmp.get()) !=
366!
196
            NANOARROW_OK) {
NEW
197
            last_error_ = "failed to deep-copy union child";
×
NEW
198
            return -1;
×
199
        }
200
        ArrowSchemaMove(tmp.get(), locked_schema_->children[i]);
366!
201
    }
366!
202
    ArrowSchema *extra = locked_schema_->children[child_schemas_.size()];
48✔
203
    if (ArrowSchemaSetType(extra, NANOARROW_TYPE_STRING) != NANOARROW_OK) {
48!
NEW
204
        last_error_ = "failed to set _extra column type";
×
NEW
205
        return -1;
×
206
    }
207
    if (ArrowSchemaSetName(extra, EXTRA_COLUMN_NAME) != NANOARROW_OK) {
48✔
NEW
208
        last_error_ = "failed to name _extra column";
×
NEW
209
        return -1;
×
210
    }
211
    finalized_ = true;
48✔
212
    return 0;
48✔
213
}
24✔
214

215
int SchemaReconciler::copy_schema(ArrowSchema *out) const {
48✔
216
    if (!finalized_) {
48!
NEW
217
        last_error_ = "copy_schema called before finalize";
×
NEW
218
        return -1;
×
219
    }
220
    nanoarrow::UniqueSchema tmp;
48✔
221
    if (ArrowSchemaDeepCopy(locked_schema_.get(), tmp.get()) != NANOARROW_OK) {
48!
NEW
222
        last_error_ = "failed to deep-copy locked schema";
×
NEW
223
        return -1;
×
224
    }
225
    ArrowSchemaMove(tmp.get(), out);
48!
226
    return 0;
48✔
227
}
48✔
228

229
int SchemaReconciler::reconcile(const ArrowSchema *in_schema,
126✔
230
                                ArrowArray *in_array, ArrowArray *out) const {
231
    if (!finalized_) {
126!
NEW
232
        last_error_ = "reconcile called before finalize";
×
NEW
233
        return -1;
×
234
    }
235
    if (!in_schema || !in_array || !out) return -1;
126!
236

237
    int64_t num_rows = in_array->length;
126✔
238

239
    // Initialize out as a struct matching the locked schema. This allocates
240
    // children of the right types; we'll populate them below.
241
    ArrowError err;
242
    ArrowErrorInit(&err);
126✔
243
    if (ArrowArrayInitFromSchema(out, locked_schema_.get(), &err) !=
126!
244
        NANOARROW_OK) {
NEW
245
        last_error_ = "ArrowArrayInitFromSchema failed for reconciled array";
×
NEW
246
        return -1;
×
247
    }
248

249
    // Build: input-name -> input-child-index
250
    std::unordered_map<std::string, int64_t> in_idx;
126✔
251
    in_idx.reserve(static_cast<size_t>(in_schema->n_children));
126!
252
    for (int64_t i = 0; i < in_schema->n_children; ++i) {
1,136✔
253
        const ArrowSchema *c = in_schema->children[i];
1,010✔
254
        if (c && c->name) in_idx.emplace(c->name, i);
1,010!
255
    }
505✔
256

257
    // For each known union column (all except the final _extra), try to take
258
    // it from the input batch. If missing, null-pad.
259
    int64_t n_known = num_known_columns();
126!
260
    for (int64_t i = 0; i < n_known; ++i) {
1,208✔
261
        const std::string &name = names_[static_cast<size_t>(i)];
1,082✔
262
        auto it = in_idx.find(name);
1,082!
263
        if (it != in_idx.end()) {
1,082✔
264
            // Release the pre-initialized placeholder child and move the
265
            // input child into its slot (zero copy; release of the input
266
            // goes null after the move).
267
            ArrowArray *slot = out->children[i];
1,010✔
268
            if (slot->release) slot->release(slot);
1,010!
269
            ArrowArrayMove(in_array->children[it->second], slot);
1,010!
270
        } else {
505✔
271
            ArrowArray *slot = out->children[i];
72✔
272
            if (slot->release) slot->release(slot);
72!
273
            if (build_null_array(locked_schema_->children[i], num_rows, slot) !=
72!
274
                0) {
NEW
275
                last_error_ = "failed to build null column for missing field";
×
NEW
276
                return -1;
×
277
            }
278
        }
279
    }
541✔
280

281
    // Find input children whose names aren't in the union: these feed _extra.
282
    std::vector<int64_t> unknown_in;
126✔
283
    for (int64_t i = 0; i < in_schema->n_children; ++i) {
1,136✔
284
        const ArrowSchema *c = in_schema->children[i];
1,010✔
285
        if (!c || !c->name) continue;
1,010!
286
        if (!name_to_idx_.count(c->name)) unknown_in.push_back(i);
1,010!
287
    }
505✔
288

289
    // Build the _extra column. Fast path: no unknowns -> all nulls.
290
    ArrowArray *extra_slot = out->children[n_known];
126✔
291
    if (extra_slot->release) extra_slot->release(extra_slot);
126!
292
    if (unknown_in.empty()) {
126!
293
        if (ArrowArrayInitFromType(extra_slot, NANOARROW_TYPE_STRING) !=
126!
294
            NANOARROW_OK) {
NEW
295
            last_error_ = "failed to init null _extra column";
×
NEW
296
            return -1;
×
297
        }
298
        if (ArrowArrayStartAppending(extra_slot) != NANOARROW_OK ||
189!
299
            ArrowArrayAppendNull(extra_slot, num_rows) != NANOARROW_OK ||
189!
300
            ArrowArrayFinishBuildingDefault(extra_slot, &err) != NANOARROW_OK) {
126!
NEW
301
            last_error_ = "failed to append nulls to _extra";
×
NEW
302
            return -1;
×
303
        }
304
    } else {
63✔
305
        // Slow path: JSON-encode unknown fields per row.
NEW
306
        if (ArrowArrayInitFromType(extra_slot, NANOARROW_TYPE_STRING) !=
×
307
            NANOARROW_OK) {
NEW
308
            last_error_ = "failed to init string _extra column";
×
NEW
309
            return -1;
×
310
        }
NEW
311
        if (ArrowArrayStartAppending(extra_slot) != NANOARROW_OK) {
×
NEW
312
            last_error_ = "failed to start appending to _extra";
×
NEW
313
            return -1;
×
314
        }
NEW
315
        std::string buf;
×
NEW
316
        for (int64_t row = 0; row < num_rows; ++row) {
×
NEW
317
            buf.clear();
×
NEW
318
            buf.push_back('{');
×
NEW
319
            bool first = true;
×
NEW
320
            for (int64_t u : unknown_in) {
×
NEW
321
                const ArrowSchema *cs = in_schema->children[u];
×
NEW
322
                const ArrowArray *ca = in_array->children[u];
×
NEW
323
                if (!cs || !ca || !cs->name) continue;
×
NEW
324
                if (!first) buf.push_back(',');
×
NEW
325
                first = false;
×
NEW
326
                buf.push_back('"');
×
NEW
327
                json_escape(cs->name, buf);
×
NEW
328
                buf.append("\":");
×
NEW
329
                append_json_scalar(cs, ca, row, buf);
×
330
            }
NEW
331
            buf.push_back('}');
×
NEW
332
            ArrowStringView sv{buf.data(), static_cast<int64_t>(buf.size())};
×
NEW
333
            if (ArrowArrayAppendString(extra_slot, sv) != NANOARROW_OK) {
×
NEW
334
                last_error_ = "failed to append _extra row";
×
NEW
335
                return -1;
×
336
            }
337
        }
NEW
338
        if (ArrowArrayFinishBuildingDefault(extra_slot, &err) != NANOARROW_OK) {
×
NEW
339
            last_error_ = "failed to finish _extra column";
×
NEW
340
            return -1;
×
341
        }
NEW
342
    }
×
343

344
    out->length = num_rows;
126✔
345
    out->null_count = 0;
126✔
346
    return 0;
126✔
347
}
126✔
348

349
}  // namespace dftracer::utils::python
350

351
#endif  // DFTRACER_UTILS_ENABLE_ARROW
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc