• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26932094552

04 Jun 2026 05:08AM UTC coverage: 49.905% (-2.3%) from 52.184%
26932094552

push

github

hariharan-devarajan
chore(utils): add portable to_chars_double fallback for macOS and update zstd handling

- Introduce to_chars_double wrapper that falls back to snprintf on macOS < 13.3
- Force CPM-built zstd on Apple to avoid deployment target mismatches
- Update version patch to 8

16076 of 43875 branches covered (36.64%)

Branch coverage included in aggregate %.

0 of 3 new or added lines in 1 file covered. (0.0%)

660 existing lines in 103 files now uncovered.

21461 of 31342 relevant lines covered (68.47%)

13056.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.61
/src/dftracer/utils/python/schema_reconcile.cpp
1
#include <dftracer/utils/core/common/config.h>
2
#ifdef DFTRACER_UTILS_ENABLE_ARROW
3

4
#include <dftracer/utils/python/schema_reconcile.h>
5

6
#include <cstdint>
7
#include <cstdio>
8
#include <cstring>
9

10
namespace dftracer::utils::python {
11

12
namespace {
13

14
bool cstr_eq(const char *a, const char *b) {
372✔
15
    if (a == b) return true;
372!
16
    if (!a || !b) return false;
372!
17
    return std::strcmp(a, b) == 0;
372✔
18
}
19

20
// Unknown formats fall back to NA so we can still emit a safe null column.
21
ArrowType type_from_format(const ArrowSchema *s) {
36✔
22
    if (!s || !s->format) return NANOARROW_TYPE_NA;
36!
23
    const char *f = s->format;
36✔
24
    if (cstr_eq(f, "n")) return NANOARROW_TYPE_NA;
36!
25
    if (cstr_eq(f, "b")) return NANOARROW_TYPE_BOOL;
36!
26
    if (cstr_eq(f, "c")) return NANOARROW_TYPE_INT8;
36!
27
    if (cstr_eq(f, "s")) return NANOARROW_TYPE_INT16;
36!
28
    if (cstr_eq(f, "i")) return NANOARROW_TYPE_INT32;
36!
29
    if (cstr_eq(f, "l")) return NANOARROW_TYPE_INT64;
36✔
30
    if (cstr_eq(f, "C")) return NANOARROW_TYPE_UINT8;
24!
31
    if (cstr_eq(f, "S")) return NANOARROW_TYPE_UINT16;
24!
32
    if (cstr_eq(f, "I")) return NANOARROW_TYPE_UINT32;
24!
33
    if (cstr_eq(f, "L")) return NANOARROW_TYPE_UINT64;
24!
34
    if (cstr_eq(f, "f")) return NANOARROW_TYPE_FLOAT;
24!
35
    if (cstr_eq(f, "g")) return NANOARROW_TYPE_DOUBLE;
24✔
36
    if (cstr_eq(f, "u")) return NANOARROW_TYPE_STRING;
12!
37
    if (cstr_eq(f, "z")) return NANOARROW_TYPE_BINARY;
×
38
    if (cstr_eq(f, "U")) return NANOARROW_TYPE_LARGE_STRING;
×
39
    if (cstr_eq(f, "Z")) return NANOARROW_TYPE_LARGE_BINARY;
×
40
    return NANOARROW_TYPE_NA;
×
41
}
42

43
int build_null_array(const ArrowSchema *child_schema, int64_t length,
36✔
44
                     ArrowArray *out) {
45
    ArrowError err;
46
    ArrowErrorInit(&err);
36✔
47
    ArrowType t = type_from_format(child_schema);
36✔
48
    if (ArrowArrayInitFromType(out, t) != NANOARROW_OK) return -1;
36!
49
    if (ArrowArrayStartAppending(out) != NANOARROW_OK) return -1;
36!
50
    if (ArrowArrayAppendNull(out, length) != NANOARROW_OK) return -1;
36!
51
    if (ArrowArrayFinishBuildingDefault(out, &err) != NANOARROW_OK) return -1;
36!
52
    return 0;
36✔
53
}
54

55
void json_escape(std::string_view in, std::string &out) {
×
56
    for (char c : in) {
×
57
        switch (c) {
×
UNCOV
58
            case '"':
×
59
                out.append("\\\"");
×
60
                break;
×
UNCOV
61
            case '\\':
×
62
                out.append("\\\\");
×
63
                break;
×
UNCOV
64
            case '\n':
×
65
                out.append("\\n");
×
66
                break;
×
UNCOV
67
            case '\r':
×
68
                out.append("\\r");
×
69
                break;
×
UNCOV
70
            case '\t':
×
71
                out.append("\\t");
×
72
                break;
×
UNCOV
73
            default:
×
74
                if (static_cast<unsigned char>(c) < 0x20) {
×
75
                    char buf[8];
76
                    std::snprintf(
×
77
                        buf, sizeof(buf), "\\u%04x",
78
                        static_cast<int>(static_cast<unsigned char>(c)));
×
79
                    out.append(buf);
×
80
                } else {
81
                    out.push_back(c);
×
82
                }
83
        }
84
    }
85
}
×
86

87
void append_json_scalar(const ArrowSchema *child_schema,
×
88
                        const ArrowArray *child_array, int64_t row,
89
                        std::string &out) {
90
    if (!child_schema || !child_array) {
×
91
        out.append("null");
×
92
        return;
×
93
    }
94
    ArrowArrayView view;
95
    ArrowArrayViewInitFromType(&view, type_from_format(child_schema));
×
96
    ArrowError err;
97
    ArrowErrorInit(&err);
×
98
    if (ArrowArrayViewSetArray(&view, child_array, &err) != NANOARROW_OK) {
×
99
        out.append("null");
×
100
        ArrowArrayViewReset(&view);
×
101
        return;
×
102
    }
103
    if (ArrowArrayViewIsNull(&view, row)) {
×
104
        out.append("null");
×
105
        ArrowArrayViewReset(&view);
×
106
        return;
×
107
    }
108
    ArrowType t = type_from_format(child_schema);
×
109
    switch (t) {
×
UNCOV
110
        case NANOARROW_TYPE_BOOL:
×
111
            out.append(ArrowArrayViewGetIntUnsafe(&view, row) ? "true"
×
112
                                                              : "false");
113
            break;
×
UNCOV
114
        case NANOARROW_TYPE_INT8:
×
115
        case NANOARROW_TYPE_INT16:
116
        case NANOARROW_TYPE_INT32:
117
        case NANOARROW_TYPE_INT64: {
118
            char buf[32];
119
            std::snprintf(
×
120
                buf, sizeof(buf), "%lld",
121
                static_cast<long long>(ArrowArrayViewGetIntUnsafe(&view, row)));
×
122
            out.append(buf);
×
123
            break;
×
124
        }
UNCOV
125
        case NANOARROW_TYPE_UINT8:
×
126
        case NANOARROW_TYPE_UINT16:
127
        case NANOARROW_TYPE_UINT32:
128
        case NANOARROW_TYPE_UINT64: {
129
            char buf[32];
130
            std::snprintf(buf, sizeof(buf), "%llu",
×
131
                          static_cast<unsigned long long>(
132
                              ArrowArrayViewGetUIntUnsafe(&view, row)));
×
133
            out.append(buf);
×
134
            break;
×
135
        }
UNCOV
136
        case NANOARROW_TYPE_FLOAT:
×
137
        case NANOARROW_TYPE_DOUBLE: {
138
            char buf[32];
139
            std::snprintf(buf, sizeof(buf), "%g",
×
140
                          ArrowArrayViewGetDoubleUnsafe(&view, row));
141
            out.append(buf);
×
142
            break;
×
143
        }
UNCOV
144
        case NANOARROW_TYPE_STRING:
×
145
        case NANOARROW_TYPE_LARGE_STRING: {
146
            auto sv = ArrowArrayViewGetStringUnsafe(&view, row);
×
147
            out.push_back('"');
×
148
            json_escape(std::string_view(sv.data, sv.size_bytes), out);
×
149
            out.push_back('"');
×
150
            break;
×
151
        }
UNCOV
152
        default:
×
153
            out.append("null");
×
154
    }
155
    ArrowArrayViewReset(&view);
×
156
}
157

158
}  // namespace
159

160
SchemaReconciler::SchemaReconciler() = default;
25✔
161

162
bool SchemaReconciler::merge(const ArrowSchema *incoming) {
65✔
163
    if (finalized_ || !incoming) return false;
65!
164
    bool added = false;
65✔
165
    for (int64_t i = 0; i < incoming->n_children; ++i) {
586✔
166
        const ArrowSchema *child = incoming->children[i];
521✔
167
        if (!child || !child->name) continue;
859!
168
        std::string name(child->name);
521!
169
        if (name == EXTRA_COLUMN_NAME) continue;  // reserved
521!
170
        if (name_to_idx_.count(name)) continue;
521!
171
        nanoarrow::UniqueSchema copy;
183✔
172
        if (ArrowSchemaDeepCopy(child, copy.get()) != NANOARROW_OK) {
183!
173
            last_error_ = "schema deep-copy failed while merging";
×
174
            return added;
×
175
        }
176
        int64_t idx = static_cast<int64_t>(names_.size());
183✔
177
        names_.push_back(name);
183!
178
        child_schemas_.push_back(std::move(copy));
183!
179
        name_to_idx_.emplace(std::move(name), idx);
183!
180
        added = true;
183✔
181
    }
521!
182
    return added;
65✔
183
}
184

185
int SchemaReconciler::finalize() {
24✔
186
    if (finalized_) return 0;
24!
187
    int64_t n = static_cast<int64_t>(child_schemas_.size()) + 1;
24✔
188
    ArrowSchemaInit(locked_schema_.get());
24✔
189
    if (ArrowSchemaSetTypeStruct(locked_schema_.get(), n) != NANOARROW_OK) {
24!
190
        last_error_ = "failed to initialize union struct schema";
×
191
        return -1;
×
192
    }
193
    for (size_t i = 0; i < child_schemas_.size(); ++i) {
207✔
194
        nanoarrow::UniqueSchema tmp;
183✔
195
        if (ArrowSchemaDeepCopy(child_schemas_[i].get(), tmp.get()) !=
183!
196
            NANOARROW_OK) {
197
            last_error_ = "failed to deep-copy union child";
×
198
            return -1;
×
199
        }
200
        ArrowSchemaMove(tmp.get(), locked_schema_->children[i]);
183✔
201
    }
183!
202
    ArrowSchema *extra = locked_schema_->children[child_schemas_.size()];
24✔
203
    if (ArrowSchemaSetType(extra, NANOARROW_TYPE_STRING) != NANOARROW_OK) {
24!
204
        last_error_ = "failed to set _extra column type";
×
205
        return -1;
×
206
    }
207
    if (ArrowSchemaSetName(extra, EXTRA_COLUMN_NAME) != NANOARROW_OK) {
24!
208
        last_error_ = "failed to name _extra column";
×
209
        return -1;
×
210
    }
211
    finalized_ = true;
24✔
212
    return 0;
24✔
213
}
214

215
int SchemaReconciler::copy_schema(ArrowSchema *out) const {
24✔
216
    if (!finalized_) {
24!
217
        last_error_ = "copy_schema called before finalize";
×
218
        return -1;
×
219
    }
220
    nanoarrow::UniqueSchema tmp;
24✔
221
    if (ArrowSchemaDeepCopy(locked_schema_.get(), tmp.get()) != NANOARROW_OK) {
24!
222
        last_error_ = "failed to deep-copy locked schema";
×
223
        return -1;
×
224
    }
225
    ArrowSchemaMove(tmp.get(), out);
24✔
226
    return 0;
24✔
227
}
24✔
228

229
int SchemaReconciler::reconcile(const ArrowSchema *in_schema,
63✔
230
                                ArrowArray *in_array, ArrowArray *out) const {
231
    if (!finalized_) {
63!
232
        last_error_ = "reconcile called before finalize";
×
233
        return -1;
×
234
    }
235
    if (!in_schema || !in_array || !out) return -1;
63!
236

237
    int64_t num_rows = in_array->length;
63✔
238

239
    // Initialize out as a struct matching the locked schema. This allocates
240
    // children of the right types; we'll populate them below.
241
    ArrowError err;
242
    ArrowErrorInit(&err);
63✔
243
    if (ArrowArrayInitFromSchema(out, locked_schema_.get(), &err) !=
63!
244
        NANOARROW_OK) {
245
        last_error_ = "ArrowArrayInitFromSchema failed for reconciled array";
×
246
        return -1;
×
247
    }
248

249
    // Build: input-name -> input-child-index
250
    std::unordered_map<std::string, int64_t> in_idx;
63✔
251
    in_idx.reserve(static_cast<size_t>(in_schema->n_children));
63!
252
    for (int64_t i = 0; i < in_schema->n_children; ++i) {
568✔
253
        const ArrowSchema *c = in_schema->children[i];
505✔
254
        if (c && c->name) in_idx.emplace(c->name, i);
505!
255
    }
256

257
    // For each known union column (all except the final _extra), try to take
258
    // it from the input batch. If missing, null-pad.
259
    int64_t n_known = num_known_columns();
63✔
260
    for (int64_t i = 0; i < n_known; ++i) {
604✔
261
        const std::string &name = names_[static_cast<size_t>(i)];
541✔
262
        auto it = in_idx.find(name);
541!
263
        if (it != in_idx.end()) {
541✔
264
            // Release the pre-initialized placeholder child and move the
265
            // input child into its slot (zero copy; release of the input
266
            // goes null after the move).
267
            ArrowArray *slot = out->children[i];
505✔
268
            if (slot->release) slot->release(slot);
505!
269
            ArrowArrayMove(in_array->children[it->second], slot);
505✔
270
        } else {
271
            ArrowArray *slot = out->children[i];
36✔
272
            if (slot->release) slot->release(slot);
36!
273
            if (build_null_array(locked_schema_->children[i], num_rows, slot) !=
36!
274
                0) {
275
                last_error_ = "failed to build null column for missing field";
×
276
                return -1;
×
277
            }
278
        }
279
    }
280

281
    // Find input children whose names aren't in the union: these feed _extra.
282
    std::vector<int64_t> unknown_in;
63✔
283
    for (int64_t i = 0; i < in_schema->n_children; ++i) {
568✔
284
        const ArrowSchema *c = in_schema->children[i];
505✔
285
        if (!c || !c->name) continue;
505!
286
        if (!name_to_idx_.count(c->name)) unknown_in.push_back(i);
505!
287
    }
288

289
    // Build the _extra column. Fast path: no unknowns -> all nulls.
290
    ArrowArray *extra_slot = out->children[n_known];
63✔
291
    if (extra_slot->release) extra_slot->release(extra_slot);
63!
292
    if (unknown_in.empty()) {
63!
293
        if (ArrowArrayInitFromType(extra_slot, NANOARROW_TYPE_STRING) !=
63!
294
            NANOARROW_OK) {
295
            last_error_ = "failed to init null _extra column";
×
296
            return -1;
×
297
        }
298
        if (ArrowArrayStartAppending(extra_slot) != NANOARROW_OK ||
63!
299
            ArrowArrayAppendNull(extra_slot, num_rows) != NANOARROW_OK ||
126!
300
            ArrowArrayFinishBuildingDefault(extra_slot, &err) != NANOARROW_OK) {
63!
301
            last_error_ = "failed to append nulls to _extra";
×
302
            return -1;
×
303
        }
304
    } else {
305
        // Slow path: JSON-encode unknown fields per row.
306
        if (ArrowArrayInitFromType(extra_slot, NANOARROW_TYPE_STRING) !=
×
307
            NANOARROW_OK) {
308
            last_error_ = "failed to init string _extra column";
×
309
            return -1;
×
310
        }
311
        if (ArrowArrayStartAppending(extra_slot) != NANOARROW_OK) {
×
312
            last_error_ = "failed to start appending to _extra";
×
313
            return -1;
×
314
        }
315
        std::string buf;
×
316
        for (int64_t row = 0; row < num_rows; ++row) {
×
317
            buf.clear();
×
318
            buf.push_back('{');
×
319
            bool first = true;
×
320
            for (int64_t u : unknown_in) {
×
321
                const ArrowSchema *cs = in_schema->children[u];
×
322
                const ArrowArray *ca = in_array->children[u];
×
323
                if (!cs || !ca || !cs->name) continue;
×
324
                if (!first) buf.push_back(',');
×
325
                first = false;
×
326
                buf.push_back('"');
×
327
                json_escape(cs->name, buf);
×
328
                buf.append("\":");
×
329
                append_json_scalar(cs, ca, row, buf);
×
330
            }
331
            buf.push_back('}');
×
332
            ArrowStringView sv{buf.data(), static_cast<int64_t>(buf.size())};
×
333
            if (ArrowArrayAppendString(extra_slot, sv) != NANOARROW_OK) {
×
334
                last_error_ = "failed to append _extra row";
×
335
                return -1;
×
336
            }
337
        }
338
        if (ArrowArrayFinishBuildingDefault(extra_slot, &err) != NANOARROW_OK) {
×
339
            last_error_ = "failed to finish _extra column";
×
340
            return -1;
×
341
        }
342
    }
×
343

344
    out->length = num_rows;
63✔
345
    out->null_count = 0;
63✔
346
    return 0;
63✔
347
}
63✔
348

349
}  // namespace dftracer::utils::python
350

351
#endif  // DFTRACER_UTILS_ENABLE_ARROW
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc