• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 27171677342

08 Jun 2026 10:43PM UTC coverage: 51.99% (+0.05%) from 51.937%
27171677342

Pull #77

github

web-flow
Merge 3a1432eec into 8045f0be3
Pull Request #77: chore: bump version to 0.0.10

36972 of 92663 branches covered (39.9%)

Branch coverage included in aggregate %.

33405 of 42703 relevant lines covered (78.23%)

20411.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.47
/src/dftracer/utils/utilities/reader/trace_reader.cpp
1
#include <dftracer/utils/core/common/archive_format.h>
2
#include <dftracer/utils/core/common/filesystem.h>
3
#include <dftracer/utils/core/utils/string.h>
4
#include <dftracer/utils/utilities/common/json/json_value.h>
5
#include <dftracer/utils/utilities/common/query/query.h>
6
#include <dftracer/utils/utilities/composites/dft/indexing/chunk_pruner_utility.h>
7
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
8
#include <dftracer/utils/utilities/fileio/lines/sources/async_plain_file_bytes_generator.h>
9
#include <dftracer/utils/utilities/fileio/lines/sources/async_plain_file_line_generator.h>
10
#include <dftracer/utils/utilities/fileio/lines/sources/async_streaming_gz_line_generator.h>
11
#include <dftracer/utils/utilities/indexer/index_database.h>
12
#include <dftracer/utils/utilities/indexer/internal/helpers.h>
13
#include <dftracer/utils/utilities/indexer/internal/indexer_factory.h>
14
#include <dftracer/utils/utilities/reader/internal/reader.h>
15
#include <dftracer/utils/utilities/reader/internal/reader_factory.h>
16
#include <dftracer/utils/utilities/reader/internal/stream.h>
17
#include <dftracer/utils/utilities/reader/internal/stream_config.h>
18
#include <dftracer/utils/utilities/reader/internal/stream_type.h>
19
#include <dftracer/utils/utilities/reader/trace_reader.h>
20
#include <simdjson.h>
21
#ifdef DFTRACER_UTILS_ENABLE_ARROW
22
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
23
#endif
24

25
#include <algorithm>
26
#include <cstring>
27
#include <optional>
28
#include <span>
29
#include <type_traits>
30
#include <unordered_map>
31

32
namespace dftracer::utils::utilities::reader {
33

34
namespace dft_internal = composites::dft::internal;
35
using common::json::JsonValue;
36
using common::query::Query;
37
using composites::dft::indexing::ChunkPrunerInput;
38
using composites::dft::indexing::ChunkPrunerUtility;
39
using indexer::internal::IndexerFactory;
40

41
namespace {
42

43
thread_local simdjson::dom::parser tl_parser;
4✔
44

45
bool line_matches_query(const Query& q, std::string_view content) {
5,420✔
46
    auto result = tl_parser.parse(content.data(), content.size());
5,420✔
47
    if (result.error()) return false;
5,420✔
48
    auto root = result.value_unsafe();
5,382✔
49
    if (!root.is_object()) return false;
5,382✔
50
    JsonValue json(root);
5,382✔
51
    return q.evaluate(json);
5,382!
52
}
2,710✔
53

54
struct LineRange {
55
    std::size_t start_line;
56
    std::size_t end_line;
57
};
58

59
// Cheap byte-level pre-filter derived from a query AST.
60
//
61
// The filter holds a list of literal substrings that MUST appear (verbatim) in
62
// any line matching the query. Currently populated only for ASTs of the form
63
// "AND of field == literal"; the common shape of dftindex equality queries.
64
// For unsupported shapes (range ops, OR, NOT, IN/NOT IN, non-equality compares)
65
// `required` is left empty and `may_match` trivially returns true.
66
//
67
// Semantically false-positive-safe: any line we accept still gets re-checked
68
// against the real query downstream. Lines we reject are guaranteed not to
69
// match because the literal representation of the comparison is missing.
70
struct LinePrefilter {
71
    std::vector<std::string> required;
72

73
    bool empty() const { return required.empty(); }
2,943✔
74

75
    bool may_match(std::string_view bytes) const {
×
76
        for (const auto& lit : required) {
×
77
            if (::memmem(bytes.data(), bytes.size(), lit.data(), lit.size()) ==
×
78
                nullptr)
79
                return false;
×
80
        }
81
        return true;
×
82
    }
83
};
84

85
bool collect_and_eq_literals(const common::query::QueryNode& node,
94✔
86
                             std::vector<std::string>& out) {
87
    return std::visit(
94✔
88
        [&out](const auto& n) -> bool {
159✔
89
            using T = std::decay_t<decltype(n)>;
90
            if constexpr (std::is_same_v<T, common::query::CompareNode>) {
91
                if (n.op != common::query::CompareOp::EQ) return false;
36!
92
                std::string lit;
36✔
93
                lit.reserve(n.field.path.size() + 16);
36!
94
                lit += '"';
36!
95
                lit += n.field.path;
36!
96
                lit += "\":";
36!
97
                const auto& val = n.value.value;
36✔
98
                if (std::holds_alternative<std::string>(val)) {
36!
99
                    lit += '"';
36!
100
                    lit += std::get<std::string>(val);
36!
101
                    lit += '"';
36!
102
                } else if (std::holds_alternative<int64_t>(val)) {
18!
103
                    lit += std::to_string(std::get<int64_t>(val));
×
104
                } else if (std::holds_alternative<uint64_t>(val)) {
×
105
                    lit += std::to_string(std::get<uint64_t>(val));
×
106
                } else if (std::holds_alternative<bool>(val)) {
×
107
                    lit += std::get<bool>(val) ? "true" : "false";
×
108
                } else {
109
                    return false;  // double or other: skip pre-filter
×
110
                }
111
                out.push_back(std::move(lit));
36!
112
                return true;
36✔
113
            } else if constexpr (std::is_same_v<T, common::query::AndNode>) {
36✔
114
                return collect_and_eq_literals(*n.left, out) &&
×
115
                       collect_and_eq_literals(*n.right, out);
×
116
            }
117
            return false;  // OrNode, NotNode, InNode, NotInNode, CompareNode
58✔
118
                           // with non-EQ op: conservative skip
119
        },
18✔
120
        node.data);
141!
121
}
122

123
// Strip a leading `[` and trailing `]` (plus surrounding whitespace) from a
124
// chunk buffer. These bookends appear in `.pfw.gz` files to keep them
125
// Perfetto-viewable as JSON arrays, but break simdjson iterate_many which
126
// expects whitespace-separated NDJSON. Safe to call on any chunk: if the
127
// bookends are absent the range is returned unchanged.
128
std::string_view strip_ndjson_bookends(std::string_view bytes) {
106✔
129
    const char* s = bytes.data();
106✔
130
    const char* e = bytes.data() + bytes.size();
106✔
131
    auto is_ws = [](char c) {
691✔
132
        return c == ' ' || c == '\t' || c == '\n' || c == '\r';
638!
133
    };
134
    while (s < e && is_ws(*s)) ++s;
106!
135
    if (s < e && *s == '[') {
106!
136
        ++s;
82✔
137
        while (s < e && is_ws(*s)) ++s;
164✔
138
    }
41✔
139
    while (e > s && is_ws(e[-1])) --e;
212✔
140
    if (e > s && e[-1] == ']') {
106!
141
        --e;
78✔
142
        while (e > s && is_ws(e[-1])) --e;
156✔
143
    }
39✔
144
    return std::string_view(s, static_cast<std::size_t>(e - s));
106✔
145
}
146

147
// AND-of-EQ predicates with concrete typed literals can be evaluated
148
// directly against simdjson without going through ValueMap (which costs
149
// wyhash + per-field std::string allocation per row). Anything more
150
// complex (OR/NOT/IN/range) falls back to the generic visitor.
151
struct CompiledEqProbe {
21!
152
    std::string top_key;     // "pid", "args", "name", etc.
153
    std::string nested_key;  // "" for top-level, else e.g. "fhash"
154
    enum class Kind { String, Int64, UInt64, Double, Bool };
155
    Kind kind = Kind::String;
7✔
156
    std::string s_val;
157
    std::int64_t i64_val = 0;
7✔
158
    std::uint64_t u64_val = 0;
7✔
159
    double d_val = 0.0;
7✔
160
    bool b_val = false;
7✔
161
};
162

163
// Top-level JSON keys in dftracer events. Anything else in the query DSL
164
// (e.g. `epoch == 0`, `fhash == "..."`) refers to a field nested under
165
// "args"; the same convention collect_query_fields relies on when it
166
// folds nested object keys into the flat ValueMap.
167
bool is_top_level_event_key(std::string_view k) {
14✔
168
    return k == "id" || k == "name" || k == "cat" || k == "pid" || k == "tid" ||
21!
169
           k == "ts" || k == "dur" || k == "ph";
14!
170
}
171

172
// Walk a CompareNode-with-EQ leaf into a probe. Returns false on
173
// unsupported shapes (more than one '.' or a literal type the simdjson
174
// get_X path can't compare directly).
175
bool compile_eq_leaf(const common::query::CompareNode& n,
14✔
176
                     CompiledEqProbe& out) {
177
    if (n.op != common::query::CompareOp::EQ) return false;
14!
178
    auto dot = n.field.path.find('.');
14✔
179
    if (dot == std::string::npos) {
14✔
180
        if (is_top_level_event_key(n.field.path)) {
14!
181
            out.top_key = n.field.path;
14✔
182
            out.nested_key.clear();
14✔
183
        } else {
7✔
184
            // Bare arg-style key: foo -> args.foo.
185
            out.top_key = "args";
×
186
            out.nested_key = n.field.path;
×
187
        }
188
    } else {
7✔
189
        if (n.field.path.find('.', dot + 1) != std::string::npos) return false;
×
190
        out.top_key = n.field.path.substr(0, dot);
×
191
        out.nested_key = n.field.path.substr(dot + 1);
×
192
    }
193
    return std::visit(
14✔
194
        [&out](auto&& v) -> bool {
21✔
195
            using T = std::decay_t<decltype(v)>;
196
            if constexpr (std::is_same_v<T, std::string>) {
197
                out.kind = CompiledEqProbe::Kind::String;
12✔
198
                out.s_val = v;
12✔
199
                return true;
12✔
200
            } else if constexpr (std::is_same_v<T, std::int64_t>) {
201
                out.kind = CompiledEqProbe::Kind::Int64;
×
202
                out.i64_val = v;
×
203
                return true;
×
204
            } else if constexpr (std::is_same_v<T, std::uint64_t>) {
205
                out.kind = CompiledEqProbe::Kind::UInt64;
2✔
206
                out.u64_val = v;
2✔
207
                return true;
2✔
208
            } else if constexpr (std::is_same_v<T, double>) {
209
                out.kind = CompiledEqProbe::Kind::Double;
×
210
                out.d_val = v;
×
211
                return true;
×
212
            } else if constexpr (std::is_same_v<T, bool>) {
213
                out.kind = CompiledEqProbe::Kind::Bool;
×
214
                out.b_val = v;
×
215
                return true;
×
216
            } else {
217
                return false;
218
            }
219
        },
220
        n.value.value);
14!
221
}
7✔
222

223
// Try to compile the query AST as an AND of EQ leaves. nullopt on
224
// unsupported shapes; the ValueMap path handles those.
225
std::optional<std::vector<CompiledEqProbe>> try_compile_eq_probes(
14✔
226
    const common::query::QueryNode& node) {
227
    using namespace common::query;
228
    return std::visit(
7✔
229
        [&](const auto& n) -> std::optional<std::vector<CompiledEqProbe>> {
14✔
230
            using T = std::decay_t<decltype(n)>;
231
            if constexpr (std::is_same_v<T, CompareNode>) {
232
                CompiledEqProbe p;
14✔
233
                if (!compile_eq_leaf(n, p)) return std::nullopt;
14!
234
                return std::vector<CompiledEqProbe>{std::move(p)};
21!
235
            } else if constexpr (std::is_same_v<T, AndNode>) {
14✔
236
                auto l = try_compile_eq_probes(*n.left);
×
237
                if (!l) return std::nullopt;
×
238
                auto r = try_compile_eq_probes(*n.right);
×
239
                if (!r) return std::nullopt;
×
240
                l->insert(l->end(), std::make_move_iterator(r->begin()),
×
241
                          std::make_move_iterator(r->end()));
×
242
                return l;
×
243
            } else {
×
244
                return std::nullopt;
×
245
            }
246
        },
7✔
247
        node.data);
14!
248
}
249

250
bool probe_matches_value(const CompiledEqProbe& p,
×
251
                         simdjson::ondemand::value val) {
252
    switch (p.kind) {
×
253
        case CompiledEqProbe::Kind::String: {
254
            auto r = val.get_string();
×
255
            if (r.error()) return false;
×
256
            auto sv = r.value_unsafe();
×
257
            return sv.size() == p.s_val.size() &&
×
258
                   std::memcmp(sv.data(), p.s_val.data(), sv.size()) == 0;
×
259
        }
260
        case CompiledEqProbe::Kind::Int64: {
261
            auto t = val.type();
×
262
            if (t.error()) return false;
×
263
            if (t.value_unsafe() == simdjson::ondemand::json_type::number) {
×
264
                auto num = val.get_number();
×
265
                if (num.error()) return false;
×
266
                auto n = num.value_unsafe();
×
267
                if (n.is_int64()) return n.get_int64() == p.i64_val;
×
268
                if (n.is_uint64()) {
×
269
                    if (p.i64_val < 0) return false;
×
270
                    return n.get_uint64() ==
×
271
                           static_cast<std::uint64_t>(p.i64_val);
×
272
                }
273
                return n.get_double() == static_cast<double>(p.i64_val);
×
274
            }
275
            return false;
×
276
        }
277
        case CompiledEqProbe::Kind::UInt64: {
278
            auto num = val.get_number();
×
279
            if (num.error()) return false;
×
280
            auto n = num.value_unsafe();
×
281
            if (n.is_uint64()) return n.get_uint64() == p.u64_val;
×
282
            if (n.is_int64()) {
×
283
                auto v = n.get_int64();
×
284
                if (v < 0) return false;
×
285
                return static_cast<std::uint64_t>(v) == p.u64_val;
×
286
            }
287
            return n.get_double() == static_cast<double>(p.u64_val);
×
288
        }
289
        case CompiledEqProbe::Kind::Double: {
290
            auto r = val.get_double();
×
291
            if (r.error()) return false;
×
292
            return r.value_unsafe() == p.d_val;
×
293
        }
294
        case CompiledEqProbe::Kind::Bool: {
295
            auto r = val.get_bool();
×
296
            if (r.error()) return false;
×
297
            return r.value_unsafe() == p.b_val;
×
298
        }
299
    }
300
    return false;
×
301
}
302

303
// Evaluate compiled AND-of-EQ probes by directly probing simdjson fields.
304
bool eval_compiled_eq(const std::vector<CompiledEqProbe>& probes,
×
305
                      simdjson::ondemand::document_reference doc) {
306
    for (const auto& p : probes) {
×
307
        doc.rewind();
×
308
        auto top_r = doc.find_field_unordered(
×
309
            std::string_view(p.top_key.data(), p.top_key.size()));
310
        if (top_r.error()) return false;
×
311
        auto top_v = top_r.value();
×
312
        if (p.nested_key.empty()) {
×
313
            if (!probe_matches_value(p, top_v)) return false;
×
314
        } else {
315
            auto obj_r = top_v.get_object();
×
316
            if (obj_r.error()) return false;
×
317
            auto inner_r = obj_r.value().find_field_unordered(
×
318
                std::string_view(p.nested_key.data(), p.nested_key.size()));
319
            if (inner_r.error()) return false;
×
320
            if (!probe_matches_value(p, inner_r.value())) return false;
×
321
        }
322
    }
323
    return true;
×
324
}
325

326
LinePrefilter build_prefilter(const Query& q) {
94✔
327
    // Short literals like `"pid":1000` or `"epoch":0` are common enough in
328
    // practice that memmem on every line costs more than it saves on the
329
    // parse side. Only keep literals long enough that rarity is plausible
330
    // (hashes, filenames, host names).
331
    constexpr std::size_t MIN_LITERAL_LEN = 16;
94✔
332

333
    LinePrefilter pf;
94✔
334
    std::vector<std::string> tmp;
94✔
335
    if (collect_and_eq_literals(q.root(), tmp)) {
94!
336
        for (auto& lit : tmp) {
72✔
337
            if (lit.size() >= MIN_LITERAL_LEN) {
36✔
338
                pf.required.push_back(std::move(lit));
6!
339
            }
3✔
340
        }
341
    }
18✔
342
    return pf;
141✔
343
}
94!
344

345
coro::AsyncGenerator<Line> yield_lines_from_stream(
5,596!
346
    std::unique_ptr<internal::ReaderStream> stream, std::size_t start_line_num,
347
    const Query* query, bool chunk_prune_only = false,
348
    const LinePrefilter* prefilter = nullptr) {
12!
349
    std::size_t line_num = start_line_num;
12✔
350
    while (!stream->done()) {
24!
351
        auto chunk = co_await stream->read_async();
108!
352
        if (chunk.empty()) break;
2,810✔
353
        const char* data = chunk.data();
2,798✔
354
        std::size_t len = chunk.size();
2,798✔
355

356
        // Chunk-level pre-filter: if any required literal is absent from this
357
        // entire buffer, no line within it can match. Skip without splitting.
358
        // Line numbers must stay correct for subsequent chunks.
359
        if (prefilter && !prefilter->empty() &&
2,798!
360
            !prefilter->may_match(std::string_view(data, len))) {
×
361
            line_num += std::count(data, data + len, '\n');
×
362
            continue;
363
        }
364

365
        std::size_t pos = 0;
2,798✔
366
        while (pos < len) {
4,885✔
367
            const void* nl_ptr = std::memchr(data + pos, '\n', len - pos);
4,873!
368
            std::size_t end_pos =
9,746✔
369
                nl_ptr ? static_cast<const char*>(nl_ptr) - data : len;
4,873!
370
            if (end_pos > pos) {
4,873!
371
                auto line_sv = std::string_view(data + pos, end_pos - pos);
4,873✔
372
                bool accept = chunk_prune_only || !query ||
6,185✔
373
                              line_matches_query(*query, line_sv);
1,312!
374
                if (accept && prefilter && !prefilter->empty() &&
4,873!
375
                    !prefilter->may_match(line_sv)) {
×
376
                    accept = false;
377
                }
378
                if (accept) {
4,873✔
379
                    co_yield Line(line_sv, line_num);
5,572!
380
                }
1,393✔
381
                ++line_num;
2,087✔
382
            } else {
2,087!
383
                ++line_num;
384
            }
385
            pos = end_pos + 1;
2,087✔
386
        }
2,087!
387
    }
2,810!
388
}
5,704!
389

390
coro::AsyncGenerator<Line> yield_lines_from_ranges(
×
391
    std::shared_ptr<internal::Reader> reader, std::vector<LineRange> ranges,
392
    std::size_t buffer_size, Query query, bool chunk_prune_only = false,
393
    LinePrefilter prefilter = {}) {
×
394
    for (const auto& range : ranges) {
×
395
        auto stream =
396
            reader->stream(internal::StreamConfig()
×
397
                               .stream_type(internal::StreamType::MULTI_LINES)
×
398
                               .range_type(internal::RangeType::LINE_RANGE)
×
399
                               .from(range.start_line)
×
400
                               .to(range.end_line)
×
401
                               .buffer_size(buffer_size));
×
402
        auto gen =
403
            yield_lines_from_stream(std::move(stream), range.start_line, &query,
×
404
                                    chunk_prune_only, &prefilter);
405
        while (auto line = co_await gen.next()) {
×
406
            co_yield *line;
×
407
        }
×
408
    }
×
409
}
×
410

411
// Raw-chunk variants of the yield/read helpers. Same pruning logic as the
412
// line-yielding flavors but emit std::span<const char> buffers untouched
413
// (multi-line boundary respected by stream type). Used by read_json to run
414
// simdjson iterate_many over each chunk instead of parsing line by line.
415
coro::AsyncGenerator<std::span<const char>> yield_chunks_from_stream(
848!
416
    std::unique_ptr<internal::ReaderStream> stream,
417
    const LinePrefilter* prefilter = nullptr) {
53!
418
    while (!stream->done()) {
106!
419
        auto chunk = co_await stream->read_async();
477!
420
        if (chunk.empty()) break;
106✔
421
        if (prefilter && !prefilter->empty() &&
53!
422
            !prefilter->may_match(
×
423
                std::string_view(chunk.data(), chunk.size()))) {
424
            continue;
425
        }
426
        co_yield chunk;
106!
427
    }
106!
428
}
318!
429

430
coro::AsyncGenerator<std::span<const char>> yield_chunks_from_ranges(
×
431
    std::shared_ptr<internal::Reader> reader, std::vector<LineRange> ranges,
432
    std::size_t buffer_size, LinePrefilter prefilter = {}) {
×
433
    for (const auto& range : ranges) {
×
434
        auto stream =
435
            reader->stream(internal::StreamConfig()
×
436
                               .stream_type(internal::StreamType::MULTI_LINES)
×
437
                               .range_type(internal::RangeType::LINE_RANGE)
×
438
                               .from(range.start_line)
×
439
                               .to(range.end_line)
×
440
                               .buffer_size(buffer_size));
×
441
        auto gen = yield_chunks_from_stream(std::move(stream), &prefilter);
×
442
        while (auto chunk = co_await gen.next()) {
×
443
            co_yield *chunk;
×
444
        }
×
445
    }
×
446
}
×
447

448
coro::AsyncGenerator<std::span<const char>> read_chunks_indexed(
793!
449
    std::shared_ptr<internal::Reader> reader, std::string index_path,
450
    std::string file_path, ReadConfig config, std::optional<Query> query,
451
    bool extend_to_line_boundary = false) {
55✔
452
    // Keep RocksDB alive for the generator's lifetime so per-method opens
453
    // in GzipIndexer reuse DBManager's cached handle.
454
    std::optional<indexer::IndexDatabase> db_keep_alive;
55✔
455
    if (!index_path.empty()) {
55!
456
        try {
457
            db_keep_alive.emplace(index_path,
55!
458
                                  rocksdb::RocksDatabase::OpenMode::ReadOnly);
55✔
459
        } catch (...) {
55✔
460
        }
×
461
    }
55✔
462

463
    LinePrefilter prefilter = query ? build_prefilter(*query) : LinePrefilter{};
55!
464
    auto range_type = config.has_line_range() ? internal::RangeType::LINE_RANGE
55✔
465
                                              : internal::RangeType::BYTE_RANGE;
466
    std::size_t start =
110✔
467
        config.has_line_range() ? config.start_line : config.start_byte;
55✔
468
    std::size_t end =
110✔
469
        config.has_line_range() ? config.end_line : config.end_byte;
55✔
470

471
    if (range_type == internal::RangeType::LINE_RANGE) {
55✔
472
        auto total_lines = reader->get_num_lines();
3!
473
        if (start == 0) start = 1;
3!
474
        if (end == 0 || end > total_lines) end = total_lines;
3!
475
        if (start > total_lines) co_return;
58!
476
    } else {
3!
477
        auto max_bytes = reader->get_max_bytes();
52!
478
        if (end == 0 || end > max_bytes) end = max_bytes;
52!
479
        if (start >= max_bytes) co_return;
52✔
480
    }
52✔
481

482
    if (query && !index_path.empty() && !config.skip_pruning) {
54!
483
        ChunkPrunerInput pruner_input{index_path, file_path, *query, nullptr};
120!
484
        ChunkPrunerUtility pruner;
485
        auto pruner_out = co_await pruner.process(pruner_input);
40✔
486
        if (pruner_out.success && !pruner_out.file_may_match) {
40!
487
            co_return;
1✔
488
        }
489

490
        if (pruner_out.success && !pruner_out.candidate_checkpoints.empty() &&
39!
491
            pruner_out.candidate_checkpoints.size() <
78!
492
                pruner_out.total_checkpoints) {
39✔
493
            indexer::IndexDatabase idx_db(
×
494
                index_path, rocksdb::RocksDatabase::OpenMode::ReadOnly);
495
            auto logical = indexer::internal::get_logical_path(file_path);
×
496
            int fid = idx_db.get_file_info_id(logical);
×
497

498
            if (fid >= 0) {
×
499
                auto all_ckpts = idx_db.query_checkpoints(fid);
×
500
                std::unordered_map<std::uint64_t, indexer::IndexerCheckpoint>
501
                    ckpt_map;
502
                for (auto& ckpt : all_ckpts) {
×
503
                    ckpt_map.emplace(ckpt.checkpoint_idx, std::move(ckpt));
×
504
                }
505

506
                std::vector<LineRange> ranges;
507
                std::uint64_t prev_idx = UINT64_MAX;
508
                for (auto ckpt_idx : pruner_out.candidate_checkpoints) {
×
509
                    auto it = ckpt_map.find(ckpt_idx);
×
510
                    if (it == ckpt_map.end()) continue;
×
511
                    const auto& ckpt = it->second;
×
512
                    // Intersect with the caller's window (byte or line) so
513
                    // checkpoint-level parallel work items stay disjoint.
514
                    if (range_type == internal::RangeType::BYTE_RANGE) {
×
515
                        std::size_t ckpt_start = ckpt.uc_offset;
516
                        std::size_t ckpt_end = ckpt.uc_offset + ckpt.uc_size;
517
                        if (ckpt_end <= start) continue;
×
518
                        if (ckpt_start >= end) continue;
×
519
                    } else {
×
520
                        if (ckpt.last_line_num < start) continue;
×
521
                        if (ckpt.first_line_num > end) continue;
×
522
                    }
523
                    if (ranges.empty() || ckpt_idx != prev_idx + 1) {
×
524
                        ranges.push_back(
×
525
                            {ckpt.first_line_num, ckpt.last_line_num});
526
                    } else {
527
                        ranges.back().end_line = ckpt.last_line_num;
528
                    }
529
                    prev_idx = ckpt_idx;
530
                }
×
531

532
                if (ranges.empty()) {
×
533
                    co_return;
534
                }
535

536
                auto gen = yield_chunks_from_ranges(
×
537
                    reader, std::move(ranges), config.buffer_size, prefilter);
×
538
                while (auto chunk = co_await gen.next()) {
×
539
                    co_yield *chunk;
×
540
                }
×
541
                co_return;
542
            }
543
        }
×
544
    }
40✔
545

546
    auto stream_type = (range_type == internal::RangeType::BYTE_RANGE)
133✔
547
                           ? internal::StreamType::MULTI_LINES_BYTES
548
                           : internal::StreamType::MULTI_LINES;
549
    auto stream =
133✔
550
        reader->stream(internal::StreamConfig()
188✔
551
                           .stream_type(stream_type)
133✔
552
                           .range_type(range_type)
53!
553
                           .from(start)
53!
554
                           .to(end)
53!
555
                           .buffer_size(config.buffer_size)
53✔
556
                           .extend_to_line_boundary(
52✔
557
                               extend_to_line_boundary &&
52✔
558
                               range_type == internal::RangeType::BYTE_RANGE));
1✔
559

560
    auto gen = yield_chunks_from_stream(std::move(stream), &prefilter);
29!
561
    while (auto chunk = co_await gen.next()) {
424!
562
        co_yield *chunk;
106!
563
    }
106✔
564
}
982!
565

566
coro::AsyncGenerator<Line> read_lines_indexed(
5,716!
567
    std::shared_ptr<internal::Reader> reader, std::string index_path,
568
    std::string file_path, ReadConfig config, std::optional<Query> query,
569
    bool chunk_prune_only = false) {
16!
570
    // Keep RocksDB alive for the generator's lifetime so per-method opens
571
    // in GzipIndexer reuse DBManager's cached handle.
572
    std::optional<indexer::IndexDatabase> db_keep_alive;
16✔
573
    if (!index_path.empty()) {
16!
574
        try {
575
            db_keep_alive.emplace(index_path,
16!
576
                                  rocksdb::RocksDatabase::OpenMode::ReadOnly);
16✔
577
        } catch (...) {
16✔
578
        }
×
579
    }
16✔
580

581
    LinePrefilter prefilter = query ? build_prefilter(*query) : LinePrefilter{};
16!
582
    auto range_type = config.has_line_range() ? internal::RangeType::LINE_RANGE
16✔
583
                                              : internal::RangeType::BYTE_RANGE;
584
    std::size_t start =
32✔
585
        config.has_line_range() ? config.start_line : config.start_byte;
16✔
586
    std::size_t end =
32✔
587
        config.has_line_range() ? config.end_line : config.end_byte;
16✔
588

589
    if (range_type == internal::RangeType::LINE_RANGE) {
16✔
590
        auto total_lines = reader->get_num_lines();
2!
591
        if (start == 0) start = 1;
2!
592
        if (end == 0 || end > total_lines) end = total_lines;
2!
593
        if (start > total_lines) co_return;
18✔
594
    } else {
2✔
595
        auto max_bytes = reader->get_max_bytes();
14!
596
        if (end == 0 || end > max_bytes) end = max_bytes;
14!
597
        if (start >= max_bytes) co_return;
14✔
598
    }
14✔
599

600
    if (query && !index_path.empty() &&
14!
601
        range_type == internal::RangeType::BYTE_RANGE) {
7✔
602
        ChunkPrunerInput pruner_input{index_path, file_path, *query, nullptr};
21!
603
        ChunkPrunerUtility pruner;
604
        auto pruner_out = co_await pruner.process(pruner_input);
7✔
605
        if (pruner_out.success && !pruner_out.file_may_match) {
7!
606
            co_return;
2✔
607
        }
608

609
        if (pruner_out.success && !pruner_out.candidate_checkpoints.empty() &&
5!
610
            pruner_out.candidate_checkpoints.size() <
8!
611
                pruner_out.total_checkpoints) {
4✔
612
            indexer::IndexDatabase idx_db(
×
613
                index_path, rocksdb::RocksDatabase::OpenMode::ReadOnly);
614
            auto logical = indexer::internal::get_logical_path(file_path);
×
615
            int fid = idx_db.get_file_info_id(logical);
×
616

617
            if (fid >= 0) {
×
618
                auto all_ckpts = idx_db.query_checkpoints(fid);
×
619
                std::unordered_map<std::uint64_t, indexer::IndexerCheckpoint>
620
                    ckpt_map;
621
                for (auto& ckpt : all_ckpts) {
×
622
                    ckpt_map.emplace(ckpt.checkpoint_idx, std::move(ckpt));
×
623
                }
624

625
                std::vector<LineRange> ranges;
626
                std::uint64_t prev_idx = UINT64_MAX;
627

628
                for (auto ckpt_idx : pruner_out.candidate_checkpoints) {
×
629
                    auto it = ckpt_map.find(ckpt_idx);
×
630
                    if (it == ckpt_map.end()) continue;
×
631
                    const auto& ckpt = it->second;
×
632

633
                    if (ranges.empty() || ckpt_idx != prev_idx + 1) {
×
634
                        ranges.push_back(
×
635
                            {ckpt.first_line_num, ckpt.last_line_num});
636
                    } else {
637
                        ranges.back().end_line = ckpt.last_line_num;
638
                    }
639
                    prev_idx = ckpt_idx;
640
                }
×
641

642
                auto gen = yield_lines_from_ranges(reader, std::move(ranges),
×
643
                                                   config.buffer_size, *query,
×
644
                                                   chunk_prune_only, prefilter);
×
645
                while (auto line = co_await gen.next()) {
×
646
                    co_yield *line;
×
647
                }
×
648
                co_return;
649
            }
650
        }
×
651
    }
7✔
652

653
    auto stream =
26✔
654
        reader->stream(internal::StreamConfig()
38!
655
                           .stream_type(internal::StreamType::MULTI_LINES)
26!
656
                           .range_type(range_type)
12!
657
                           .from(start)
12!
658
                           .to(end)
12!
659
                           .buffer_size(config.buffer_size));
12!
660

661
    auto gen = yield_lines_from_stream(std::move(stream), start,
24!
662
                                       query ? &*query : nullptr,
12✔
663
                                       chunk_prune_only, &prefilter);
12✔
664
    while (auto line = co_await gen.next()) {
5,620!
665
        co_yield *line;
2,786!
666
    }
1,405✔
667
}
8,478!
668

669
coro::AsyncGenerator<Line> read_lines_gz(std::string file_path,
26,641!
670
                                         ReadConfig config,
671
                                         std::optional<Query> query,
672
                                         bool chunk_prune_only = false) {
164✔
673
    std::size_t start = config.has_line_range() ? config.start_line : 0;
163✔
674
    std::size_t end = config.has_line_range() ? config.end_line : 0;
163✔
675
    auto gen =
163✔
676
        fileio::lines::sources::async_streaming_gz_lines(file_path, start, end);
163!
677
    while (auto opt = co_await gen.next()) {
25,824!
678
        if (chunk_prune_only || !query ||
7,498✔
679
            line_matches_query(*query, opt->content)) {
1,246!
680
            co_yield *opt;
10,794!
681
        }
5,397✔
682
    }
6,416✔
683
}
38,973!
684

685
coro::AsyncGenerator<Line> read_lines_plain_bytes(
94!
686
    std::string file_path, ReadConfig config, std::optional<Query> query,
687
    bool chunk_prune_only = false) {
5!
688
    auto gen = fileio::lines::sources::async_plain_file_bytes(
5!
689
        file_path, config.start_byte, config.end_byte, config.buffer_size);
5!
690
    while (auto opt = co_await gen.next()) {
69!
691
        if (chunk_prune_only || !query ||
13✔
692
            line_matches_query(*query, opt->content)) {
2!
693
            co_yield *opt;
20!
694
        }
10✔
695
    }
16✔
696
}
111!
697

698
coro::AsyncGenerator<Line> read_lines_plain(std::string file_path,
10,060!
699
                                            ReadConfig config,
700
                                            std::optional<Query> query,
701
                                            bool chunk_prune_only = false) {
36!
702
    std::size_t start = config.has_line_range() ? config.start_line : 0;
36!
703
    std::size_t end = config.has_line_range() ? config.end_line : 0;
36!
704
    auto gen =
36✔
705
        fileio::lines::sources::async_plain_file_lines(file_path, start, end);
36!
706
    while (auto opt = co_await gen.next()) {
9,882!
707
        if (chunk_prune_only || !query ||
2,576!
708
            line_matches_query(*query, opt->content)) {
150!
709
            co_yield *opt;
4,752!
710
        }
2,376✔
711
    }
2,462✔
712
}
14,874!
713

714
}  // namespace
715

716
TraceReader::TraceReader(TraceReaderConfig config)
1,371✔
717
    : config_(std::move(config)) {
1,371✔
718
    probe_index();
916!
719
}
913✔
720

721
void TraceReader::probe_index() {
913✔
722
    format_ = IndexerFactory::detect_format(config_.file_path);
913✔
723
    index_path_ = dft_internal::determine_index_path(config_.file_path,
1,373✔
724
                                                     config_.index_dir);
916✔
725
    has_index_ =
915!
726
        (format_ == ArchiveFormat::GZIP || format_ == ArchiveFormat::TAR_GZ) &&
1,304!
727
        fs::exists(index_path_);
1,238!
728
}
915✔
729

730
bool TraceReader::has_index() const { return has_index_; }
338✔
731

732
void TraceReader::ensure_metadata_cached() {
38✔
733
    if (metadata_cached_) return;
38!
734

735
    if (has_index_) {
38✔
736
        auto reader = create_indexed_reader();
26!
737
        cached_max_bytes_ = reader->get_max_bytes();
26!
738
        cached_num_lines_ = reader->get_num_lines();
26!
739
    } else if (format_ == ArchiveFormat::GZIP ||
38!
740
               format_ == ArchiveFormat::TAR_GZ) {
×
741
        cached_max_bytes_ = 0;
12✔
742
        cached_num_lines_ = 0;
12✔
743
    } else {
6✔
744
        std::error_code ec;
×
745
        auto size = fs::file_size(config_.file_path, ec);
×
746
        cached_max_bytes_ = ec ? 0 : static_cast<std::size_t>(size);
×
747
        cached_num_lines_ = 0;
×
748
    }
749
    metadata_cached_ = true;
38✔
750
}
19✔
751

752
std::size_t TraceReader::get_max_bytes() {
24✔
753
    ensure_metadata_cached();
24✔
754
    return cached_max_bytes_;
24✔
755
}
756

757
std::size_t TraceReader::get_num_lines() {
14✔
758
    ensure_metadata_cached();
14✔
759
    return cached_num_lines_;
14✔
760
}
761

762
std::shared_ptr<internal::Reader> TraceReader::create_indexed_reader() {
177✔
763
    auto indexer = IndexerFactory::create(config_.file_path, index_path_,
267✔
764
                                          config_.checkpoint_size, false);
177!
765
    return internal::ReaderFactory::create(indexer);
267!
766
}
180✔
767

768
internal::StreamType TraceReader::resolve_raw_stream_type(
12✔
769
    const ReadConfig& config) const {
770
    if (!config.line_aligned) return internal::StreamType::BYTES;
12!
771
    if (config.multi_line) return internal::StreamType::MULTI_LINES_BYTES;
12✔
772
    return internal::StreamType::LINE_BYTES;
2✔
773
}
6✔
774

775
internal::RangeType TraceReader::resolve_range_type(
12✔
776
    const ReadConfig& config) const {
777
    if (config.has_line_range()) return internal::RangeType::LINE_RANGE;
12!
778
    return internal::RangeType::BYTE_RANGE;
12✔
779
}
6✔
780

781
coro::AsyncGenerator<Line> TraceReader::read_lines(ReadConfig config) {
440✔
782
    std::optional<Query> query;
440✔
783
    if (!config.query.empty()) {
440✔
784
        auto parsed = Query::from_string(config.query);
82!
785
        if (!parsed) throw common::query::QueryParseError(parsed.error());
82!
786
        query = std::move(*parsed);
82!
787
    }
82✔
788

789
    bool cpo = config.chunk_prune_only;
440✔
790

791
    if (has_index_) {
440✔
792
        return read_lines_indexed(create_indexed_reader(), index_path_,
64!
793
                                  config_.file_path, std::move(config),
32!
794
                                  std::move(query), cpo);
80!
795
    }
796
    if (format_ == ArchiveFormat::GZIP || format_ == ArchiveFormat::TAR_GZ) {
408✔
797
        return read_lines_gz(config_.file_path, std::move(config),
326!
798
                             std::move(query), cpo);
488!
799
    }
800
    if (config.has_byte_range()) {
82✔
801
        return read_lines_plain_bytes(config_.file_path, std::move(config),
10!
802
                                      std::move(query), cpo);
15!
803
    }
804
    return read_lines_plain(config_.file_path, std::move(config),
72!
805
                            std::move(query), cpo);
108!
806
}
437✔
807

808
namespace {
809

810
common::query::LiteralValue ondemand_to_literal(simdjson::ondemand::value val) {
5,441✔
811
    auto type = val.type().value_unsafe();
5,441✔
812
    switch (type) {
5,447!
813
        case simdjson::ondemand::json_type::string: {
2,712✔
814
            auto r = val.get_string();
5,427✔
815
            if (!r.error()) return std::string(r.value_unsafe());
5,426!
816
            break;
×
817
        }
818
        case simdjson::ondemand::json_type::number: {
10✔
819
            auto num = val.get_number();
20✔
820
            if (!num.error()) {
20✔
821
                auto n = num.value_unsafe();
20✔
822
                if (n.is_int64()) return n.get_int64();
20!
823
                if (n.is_uint64()) return n.get_uint64();
×
824
                return n.get_double();
×
825
            }
826
            break;
×
827
        }
828
        case simdjson::ondemand::json_type::boolean: {
829
            auto r = val.get_bool();
×
830
            if (!r.error()) return r.value_unsafe();
×
831
            break;
×
832
        }
833
        default:
834
            break;
×
835
    }
836
    return std::string{};
×
837
}
2,725✔
838

839
}  // namespace
840

841
coro::AsyncGenerator<JsonLine> TraceReader::read_json(ReadConfig config) {
12,712!
842
    std::optional<Query> query;
11,891✔
843
    if (!config.query.empty()) {
11,891✔
844
        auto parsed = Query::from_string(config.query);
52!
845
        if (!parsed) throw common::query::QueryParseError(parsed.error());
52!
846
        query = std::move(*parsed);
52!
847
    }
52✔
848

849
    // chunk_prune_only path: dim_stats already proved every event with the
850
    // predicate field matches; we still need to skip events lacking the
851
    // field (e.g., metadata "ph":"M" events). Field-presence probe is
852
    // cheaper than full ValueMap eval.
853
    std::vector<std::string> presence_check_paths;
11,891✔
854
    if (query && config.chunk_prune_only) {
11,891!
855
        const auto& fset = query->fields();
×
856
        presence_check_paths.assign(fset.begin(), fset.end());
×
857
    }
858

859
    // Fast path: indexed gz files go through a chunk generator with
860
    // simdjson iterate_many. Query is evaluated on the ondemand document
861
    // directly, so non-matching docs never hit the yield_parser.
862
    if (has_index_) {
11,891✔
863
        auto reader = create_indexed_reader();
196!
864
        auto chunk_gen = read_chunks_indexed(reader, index_path_,
392!
865
                                             config_.file_path, config, query);
196!
866

867
        simdjson::ondemand::parser bulk_parser;
196✔
868
        common::json::JsonParser yield_parser;
196!
869

870
        while (auto chunk_opt = co_await chunk_gen.next()) {
479!
871
            auto chunk = *chunk_opt;
39✔
872
            if (chunk.empty()) continue;
39!
873
            auto trimmed = strip_ndjson_bookends(
78!
874
                std::string_view(chunk.data(), chunk.size()));
39✔
875
            if (trimmed.empty()) continue;
39!
876
            simdjson::padded_string padded(trimmed);
39✔
877

878
            auto docs_r = bulk_parser.iterate_many(
39✔
879
                padded, 1 << 20, /*allow_comma_separated=*/false);
880
            if (docs_r.error()) continue;
39!
881
            auto& docs = docs_r.value();
39!
882

883
            for (auto it = docs.begin(); it != docs.end(); ++it) {
2,007!
884
                auto doc_result = *it;
1,968✔
885
                if (doc_result.error()) continue;
1,968!
886
                auto& doc = doc_result.value();
1,968!
887

888
                std::string_view src(it.source().data(), it.source().size());
1,968✔
889

890
                if (query && config.chunk_prune_only) {
1,968!
891
                    bool all_present = true;
892
                    for (const auto& path : presence_check_paths) {
×
893
                        auto fld = doc.find_field_unordered(path);
894
                        if (fld.error()) {
×
895
                            all_present = false;
896
                            break;
897
                        }
898
                    }
×
899
                    if (!all_present) continue;
×
900
                    doc.rewind();
901
                } else if (query) {
1,968!
902
                    common::query::ValueMap fields;
1,968!
903
                    auto obj = doc.get_object();
1,968✔
904
                    if (obj.error()) continue;
1,968!
905
                    for (auto field : obj.value()) {
17,937!
906
                        if (field.error()) continue;
15,969!
907
                        auto key_r = field.unescaped_key();
15,969✔
908
                        if (key_r.error()) continue;
15,969!
909
                        auto val_r = field.value();
15,969✔
910
                        if (val_r.error()) continue;
15,969!
911
                        auto key = key_r.value();
15,969!
912
                        auto val = val_r.value();
15,969!
913
                        auto type_r = val.type();
15,969✔
914
                        if (type_r.error()) continue;
15,969!
915
                        auto type = type_r.value();
15,969✔
916
                        if (type == simdjson::ondemand::json_type::object) {
15,962✔
917
                            auto nested = val.get_object();
1,972✔
918
                            if (nested.error()) continue;
1,972!
919
                            for (auto nf : nested.value()) {
5,348!
920
                                if (nf.error()) continue;
3,380!
921
                                auto nk_r = nf.unescaped_key();
3,380✔
922
                                if (nk_r.error()) continue;
3,380!
923
                                auto nv_r = nf.value();
3,380✔
924
                                if (nv_r.error()) continue;
3,380!
925
                                auto nk = nk_r.value();
3,380✔
926
                                if (!query->references(nk)) continue;
3,378!
927
                                fields[std::string(nk)] =
×
928
                                    ondemand_to_literal(nv_r.value());
×
929
                            }
3,376!
930
                        } else if (query->references(key)) {
15,958!
931
                            fields[std::string(key)] = ondemand_to_literal(val);
1,932!
932
                        }
1,932✔
933
                    }
15,965✔
934
                    if (!query->evaluate(fields)) continue;
1,968!
935
                }
1,968✔
936

937
                // Matched (or no query): lend the iterate_many doc to
938
                // yield_parser without re-parsing. Consumers like
939
                // build_arrow_row call parser.for_each_field which now
940
                // iterates the borrowed doc_reference.
941
                doc.rewind();
1,514✔
942
                yield_parser.set_borrowed_document(
3,028✔
943
                    simdjson::ondemand::document_reference(doc));
1,514✔
944
                co_yield JsonLine{src, 0, &yield_parser};
3,028!
945
            }
1,968!
946
        }
79!
947
        co_return;
40✔
948
    }
40✔
949

950
    // Fallback: non-indexed paths use the per-line pipeline unchanged.
951
    config.chunk_prune_only = true;
11,695✔
952
    auto line_gen = read_lines(config);
11,695!
953

954
    common::json::JsonParser parser;
11,695!
955

956
    while (auto opt = co_await line_gen.next()) {
23,140!
957
        const char* trimmed;
5,661✔
958
        std::size_t trimmed_len;
5,661✔
959
        if (!dftracer::utils::json_trim_and_validate_with_comma(
5,661✔
960
                opt->content.data(), opt->content.size(), trimmed, trimmed_len))
5,661✔
961
            continue;
140✔
962
        if (!parser.parse(std::string_view(trimmed, trimmed_len))) continue;
5,520!
963

964
        if (query) {
5,521✔
965
            common::query::ValueMap fields;
692!
966
            std::vector<std::string> nested_keys;
692✔
967
            parser.for_each_field(
692!
968
                [&](std::string_view key, simdjson::ondemand::value val) {
12,328✔
969
                    auto type = val.type().value_unsafe();
11,636✔
970
                    if (type == simdjson::ondemand::json_type::object) {
11,669✔
971
                        nested_keys.emplace_back(key);
1,384✔
972
                    } else if (query->references(key)) {
10,977✔
973
                        fields[std::string(key)] = ondemand_to_literal(val);
1,583!
974
                    }
792✔
975
                });
11,643✔
976
            for (auto& nk : nested_keys) {
1,384✔
977
                parser.rewind();
692!
978
                parser.for_each_field(nk, [&](std::string_view key,
2,722!
979
                                              simdjson::ondemand::value val) {
980
                    if (query->references(key)) {
2,030✔
981
                        fields[std::string(key)] = ondemand_to_literal(val);
×
982
                    }
983
                });
2,032✔
984
            }
692✔
985
            if (!query->evaluate(fields)) continue;
692!
986
            parser.rewind();
220!
987
        }
692✔
988

989
        co_yield JsonLine{opt->content, opt->line_number, &parser};
10,097!
990
    }
5,787!
991
}
23,967!
992

993
coro::AsyncGenerator<std::span<const char>> TraceReader::read_raw(
677!
994
    ReadConfig config) {
27!
995
    if (has_index_) {
27✔
996
        // Keep RocksDB alive for the generator's lifetime so per-method
997
        // opens in GzipIndexer reuse DBManager's cached handle.
998
        std::optional<indexer::IndexDatabase> db_keep_alive;
6✔
999
        if (!index_path_.empty()) {
6!
1000
            try {
1001
                db_keep_alive.emplace(
6!
1002
                    index_path_, rocksdb::RocksDatabase::OpenMode::ReadOnly);
6✔
1003
            } catch (...) {
6✔
1004
            }
×
1005
        }
6✔
1006
        auto reader = create_indexed_reader();
6!
1007
        auto stream_type = resolve_raw_stream_type(config);
6✔
1008
        auto range_type = resolve_range_type(config);
6!
1009
        std::size_t start =
12✔
1010
            config.has_line_range() ? config.start_line : config.start_byte;
6!
1011
        std::size_t end =
12✔
1012
            config.has_line_range() ? config.end_line : config.end_byte;
6!
1013

1014
        if (range_type == internal::RangeType::LINE_RANGE) {
6!
1015
            auto total_lines = reader->get_num_lines();
×
1016
            if (start == 0) start = 1;
×
1017
            if (end == 0 || end > total_lines) end = total_lines;
×
1018
            if (start > total_lines) co_return;
27!
1019
        } else {
×
1020
            auto max_bytes = reader->get_max_bytes();
6!
1021
            if (end == 0 || end > max_bytes) end = max_bytes;
6!
1022
            if (start >= max_bytes) co_return;
6!
1023
        }
6!
1024

1025
        if (!config.query.empty() && !index_path_.empty() &&
6!
1026
            range_type == internal::RangeType::BYTE_RANGE) {
2✔
1027
            auto parsed = Query::from_string(config.query);
6!
1028
            if (!parsed) throw common::query::QueryParseError(parsed.error());
6!
1029
            ChunkPrunerInput pruner_input{index_path_, config_.file_path,
6!
1030
                                          std::move(*parsed), nullptr};
6!
1031
            ChunkPrunerUtility pruner;
1032
            auto pruner_out = co_await pruner.process(pruner_input);
2✔
1033
            if (pruner_out.success && !pruner_out.file_may_match) {
2!
1034
                co_return;
1✔
1035
            }
1036
        }
2✔
1037

1038
        auto stream = reader->stream(internal::StreamConfig()
14✔
1039
                                         .stream_type(stream_type)
9✔
1040
                                         .range_type(range_type)
5!
1041
                                         .from(start)
5!
1042
                                         .to(end)
5!
1043
                                         .buffer_size(config.buffer_size));
5!
1044

1045
        while (!stream->done()) {
107!
1046
            auto chunk = co_await stream->read_async();
444!
1047
            if (chunk.empty()) break;
111✔
1048
            co_yield chunk;
212!
1049
        }
111✔
1050
    } else if (format_ == ArchiveFormat::GZIP ||
685!
1051
               format_ == ArchiveFormat::TAR_GZ) {
1052
        auto gen =
21✔
1053
            fileio::lines::sources::async_streaming_gz_lines(config_.file_path);
21!
1054
        std::size_t byte_pos = 0;
21✔
1055
        while (auto opt = co_await gen.next()) {
4,012!
1056
            const auto& line = *opt;
2,946✔
1057
            std::size_t line_end = byte_pos + line.content.size() + 1;
2,946✔
1058
            if (config.end_byte > 0 && byte_pos >= config.end_byte) break;
2,946✔
1059
            if (line_end > config.start_byte) {
2,945✔
1060
                co_yield std::span<const char>(line.content.data(),
6,870!
1061
                                               line.content.size());
2,944✔
1062
            }
981✔
1063
            byte_pos = line_end;
982✔
1064
        }
4,929✔
1065
    } else {
63✔
1066
        auto gen =
1067
            fileio::lines::sources::async_plain_file_lines(config_.file_path);
×
1068
        std::size_t byte_pos = 0;
1069
        while (auto opt = co_await gen.next()) {
×
1070
            const auto& line = *opt;
1071
            std::size_t line_end = byte_pos + line.content.size() + 1;
1072
            if (config.end_byte > 0 && byte_pos >= config.end_byte) break;
×
1073
            if (line_end > config.start_byte) {
×
1074
                co_yield std::span<const char>(line.content.data(),
×
1075
                                               line.content.size());
1076
            }
1077
            byte_pos = line_end;
1078
        }
×
1079
    }
×
1080
}
6,260!
1081

1082
#ifdef DFTRACER_UTILS_ENABLE_ARROW
1083

1084
namespace {
1085

1086
using common::arrow::ArrowExportResult;
1087
using common::arrow::ColumnType;
1088
using common::arrow::RecordBatchBuilder;
1089

1090
// Bump arena for string_views that must survive until builder.finish().
1091
struct ArrowStringArena {
1092
    static constexpr std::size_t BLOCK_SIZE = 64 * 1024;
1093
    std::vector<std::vector<char>> blocks;
1094
    std::size_t pos = 0;
79✔
1095

1096
    ArrowStringArena() { blocks.emplace_back(BLOCK_SIZE); }
310!
1097

1098
    std::string_view push(const char* data, std::size_t len) {
1099
        if (pos + len > blocks.back().size()) {
1100
            blocks.emplace_back(std::max(BLOCK_SIZE, len));
1101
            pos = 0;
1102
        }
1103
        char* dst = blocks.back().data() + pos;
1104
        std::memcpy(dst, data, len);
1105
        pos += len;
1106
        return {dst, len};
1107
    }
1108

1109
    void clear() {
116✔
1110
        if (blocks.size() > 1) blocks.resize(1);
116✔
1111
        pos = 0;
116✔
1112
    }
116✔
1113
};
1114

1115
struct ArrowKeyHint {
615✔
1116
    std::string key;
1117
    std::size_t col_idx = 0;
615✔
1118
    ColumnType type = ColumnType::INT64;
615✔
1119
    bool valid = false;
615✔
1120
};
1121

1122
inline std::size_t resolve_col_idx(RecordBatchBuilder& builder,
31,707✔
1123
                                   std::vector<ArrowKeyHint>& hints,
1124
                                   std::size_t pos, std::string_view key_sv,
1125
                                   ColumnType type) {
1126
    if (pos < hints.size()) {
31,707✔
1127
        auto& h = hints[pos];
30,479✔
1128
        if (h.valid && h.type == type && h.key.size() == key_sv.size() &&
45,681✔
1129
            std::memcmp(h.key.data(), key_sv.data(), key_sv.size()) == 0) {
30,500✔
1130
            return h.col_idx;
30,479✔
1131
        }
1132
    }
12✔
1133
    // Position-keyed miss. Variable-shape rows (e.g., open vs read events
1134
    // with different args fields) push fields to different positions, so
1135
    // the position cache misses constantly while the underlying schema is
1136
    // small (~15 keys). A linear scan over the hint vector with a SIMD
1137
    // memcmp beats RecordBatchBuilder's name_to_index_ hash lookup for this
1138
    // size.
1139
    for (std::size_t i = 0; i < hints.size(); ++i) {
5,689✔
1140
        if (i == pos) continue;
4,465!
1141
        auto& h = hints[i];
4,465✔
1142
        if (h.valid && h.type == type && h.key.size() == key_sv.size() &&
4,742!
1143
            std::memcmp(h.key.data(), key_sv.data(), key_sv.size()) == 0) {
557!
1144
            if (pos < hints.size()) {
×
1145
                auto& slot = hints[pos];
×
1146
                slot.key.assign(key_sv);
×
1147
                slot.type = type;
×
1148
                slot.col_idx = h.col_idx;
×
1149
                slot.valid = true;
×
1150
            }
1151
            return h.col_idx;
×
1152
        }
1153
    }
2,238✔
1154
    std::size_t idx = builder.add_or_get_column(key_sv, type);
1,226✔
1155
    if (pos >= hints.size()) hints.resize(pos + 1);
1,227✔
1156
    auto& h = hints[pos];
1,226✔
1157
    h.key.assign(key_sv);
1,226✔
1158
    h.type = type;
1,228✔
1159
    h.col_idx = idx;
1,228✔
1160
    h.valid = true;
1,228✔
1161
    return idx;
1,228✔
1162
}
15,919✔
1163

1164
// Append a typed scalar value under `key_sv`. Nested objects/arrays are
1165
// always round-tripped as JSON strings (flattening is one level only).
1166
void append_scalar_or_json(RecordBatchBuilder& builder,
31,752✔
1167
                           std::vector<ArrowKeyHint>& hints, std::size_t& pos,
1168
                           std::string_view key_sv,
1169
                           simdjson::ondemand::value val,
1170
                           simdjson::ondemand::json_type type) {
1171
    switch (type) {
31,752!
1172
        case simdjson::ondemand::json_type::number: {
8,139✔
1173
            auto num_r = val.get_number();
16,295✔
1174
            if (num_r.error()) break;
16,248!
1175
            auto num = num_r.value();
16,233!
1176
            if (num.is_int64()) {
16,242✔
1177
                auto idx = resolve_col_idx(builder, hints, pos++, key_sv,
16,062!
1178
                                           ColumnType::INT64);
1179
                builder.append_int64(idx, num.get_int64());
16,057!
1180
            } else if (num.is_uint64()) {
8,239!
1181
                auto idx = resolve_col_idx(builder, hints, pos++, key_sv,
×
1182
                                           ColumnType::UINT64);
1183
                builder.append_uint64(idx, num.get_uint64());
×
1184
            } else {
1185
                auto idx = resolve_col_idx(builder, hints, pos++, key_sv,
180!
1186
                                           ColumnType::DOUBLE);
1187
                builder.append_double(idx, num.get_double());
180!
1188
            }
1189
            break;
16,277✔
1190
        }
1191
        case simdjson::ondemand::json_type::string: {
6,374✔
1192
            auto str_r = val.get_string();
12,769✔
1193
            if (str_r.error()) break;
12,755!
1194
            auto idx = resolve_col_idx(builder, hints, pos++, key_sv,
12,758!
1195
                                       ColumnType::STRING);
1196
            builder.append_string(idx, str_r.value());
12,762!
1197
            break;
12,771✔
1198
        }
1199
        case simdjson::ondemand::json_type::boolean: {
1200
            auto b_r = val.get_bool();
×
1201
            if (b_r.error()) break;
×
1202
            auto idx = resolve_col_idx(builder, hints, pos++, key_sv,
×
1203
                                       ColumnType::BOOL);
1204
            builder.append_bool(idx, b_r.value());
×
1205
            break;
×
1206
        }
1207
        case simdjson::ondemand::json_type::null: {
1208
            auto existing = builder.find_column(key_sv);
×
1209
            if (existing) builder.append_null(*existing);
×
1210
            ++pos;
×
1211
            break;
×
1212
        }
1213
        case simdjson::ondemand::json_type::object:
1,373✔
1214
        case simdjson::ondemand::json_type::array: {
1215
            auto raw_r = val.raw_json();
2,750✔
1216
            auto idx = resolve_col_idx(builder, hints, pos++, key_sv,
2,741!
1217
                                       ColumnType::STRING);
1218
            if (!raw_r.error()) {
2,744✔
1219
                auto sv = raw_r.value();
2,743!
1220
                builder.append_string(idx, sv);
2,742!
1221
            } else {
1,377✔
1222
                builder.append_null(idx);
×
1223
            }
1224
            break;
2,750✔
1225
        }
1226
        default:
1227
            ++pos;
×
1228
            break;
×
1229
    }
1230
}
31,736✔
1231

1232
// Append one Arrow row from an already-parsed simdjson document.
1233
// Dynamic schema: new columns appended as they appear. When flatten_objects
1234
// is true, top-level object values are expanded one level into `parent.child`
1235
// columns; deeper nesting still lands as a JSON string under the flattened
1236
// key. Returns false on error paths so callers can skip the row.
1237
bool arrow_row_from_doc(RecordBatchBuilder& builder,
3,844✔
1238
                        std::vector<ArrowKeyHint>& hints,
1239
                        simdjson::ondemand::document_reference doc,
1240
                        bool flatten_objects = false) {
1241
    auto obj_result = doc.get_object();
3,844✔
1242
    if (obj_result.error()) return false;
3,849!
1243
    char key_buf[512];
1244
    std::size_t pos = 0;
3,848✔
1245
    for (auto field : obj_result.value()) {
34,588✔
1246
        if (field.error()) continue;
31,012!
1247
        auto key_r = field.unescaped_key();
30,716✔
1248
        if (key_r.error()) continue;
30,637!
1249
        auto key_sv = key_r.value();
30,619!
1250
        auto val_r = field.value();
30,653✔
1251
        if (val_r.error()) continue;
30,642!
1252
        auto val = val_r.value();
30,637!
1253
        auto type_r = val.type();
30,615✔
1254
        if (type_r.error()) continue;
30,741!
1255
        auto type = type_r.value();
30,740!
1256

1257
        if (flatten_objects && type == simdjson::ondemand::json_type::object) {
30,739✔
1258
            auto nested = val.get_object();
562✔
1259
            if (nested.error()) continue;
562!
1260
            for (auto nf : nested.value()) {
2,148✔
1261
                if (nf.error()) continue;
1,586!
1262
                auto nk_r = nf.unescaped_key();
1,586✔
1263
                if (nk_r.error()) continue;
1,586!
1264
                auto nk = nk_r.value();
1,586!
1265
                auto nv_r = nf.value();
1,586✔
1266
                if (nv_r.error()) continue;
1,586!
1267
                auto nv = nv_r.value();
1,586!
1268
                auto nt_r = nv.type();
1,586✔
1269
                if (nt_r.error()) continue;
1,586!
1270
                std::size_t needed = key_sv.size() + 1 + nk.size();
1,586✔
1271
                if (needed >= sizeof(key_buf)) continue;
1,586!
1272
                std::memcpy(key_buf, key_sv.data(), key_sv.size());
1,586✔
1273
                key_buf[key_sv.size()] = '.';
1,586✔
1274
                std::memcpy(key_buf + key_sv.size() + 1, nk.data(), nk.size());
1,586✔
1275
                append_scalar_or_json(builder, hints, pos,
2,379!
1276
                                      std::string_view(key_buf, needed), nv,
793✔
1277
                                      nt_r.value());
1,586!
1278
            }
1279
            continue;
562✔
1280
        }
281✔
1281

1282
        append_scalar_or_json(builder, hints, pos, key_sv, val, type);
30,177!
1283
    }
1284
    builder.end_row();
3,820!
1285
    return true;
3,849✔
1286
}
1,925✔
1287

1288
void collect_query_fields(simdjson::ondemand::document_reference doc,
1289
                          const Query& query, common::query::ValueMap& out);
1290

1291
// Run iterate_many over `padded`, build arrow rows, and emit completed
1292
// batches via `yield_one`. Updates `carry` with the truncated tail (if any)
1293
// for the caller to prepend to the next chunk.
1294
template <typename Yield>
1295
void parse_padded_into_arrow(simdjson::ondemand::parser& bulk_parser,
1296
                             simdjson::padded_string& padded,
1297
                             const std::optional<Query>& query, bool flatten,
1298
                             RecordBatchBuilder& builder,
1299
                             ArrowStringArena& arena,
1300
                             std::vector<ArrowKeyHint>& hints,
1301
                             std::size_t batch_size, std::string* carry,
1302
                             Yield&& yield_one) {
1303
    auto docs_r = bulk_parser.iterate_many(padded, 1 << 20, false);
1304
    if (docs_r.error()) {
1305
        if (carry) carry->clear();
1306
        return;
1307
    }
1308
    auto& docs = docs_r.value();
1309
    for (auto it = docs.begin(); it != docs.end(); ++it) {
1310
        auto doc_result = *it;
1311
        if (doc_result.error()) continue;
1312
        auto& doc = doc_result.value();
1313
        if (query) {
1314
            common::query::ValueMap fields;
1315
            collect_query_fields(doc, *query, fields);
1316
            if (!query->evaluate(fields)) continue;
1317
            doc.rewind();
1318
        }
1319
        if (!arrow_row_from_doc(builder, hints, doc, flatten)) continue;
1320
        if (builder.num_rows() >= batch_size) {
1321
            auto result = builder.finish();
1322
            arena.clear();
1323
            if (!builder.is_schema_locked()) builder.lock_schema();
1324
            builder.reset(true);
1325
            builder.reserve(batch_size);
1326
            yield_one(std::move(result));
1327
        }
1328
    }
1329
    if (carry) {
1330
        std::size_t total = padded.size();
1331
        std::size_t truncated = docs.truncated_bytes();
1332
        if (truncated > 0 && truncated <= total) {
1333
            carry->assign(padded.data() + total - truncated,
1334
                          padded.data() + total);
1335
        } else {
1336
            carry->clear();
1337
        }
1338
    }
1339
}
1340

1341
// Build a simdjson-padded buffer containing only the lines in `chunk` that
1342
// pass the line-level prefilter. For queries with no useful prefilter, the
1343
// caller should skip this and feed the raw chunk directly.
1344
std::string collect_matching_lines(std::span<const char> chunk,
×
1345
                                   const LinePrefilter& prefilter) {
1346
    std::string out;
×
1347
    out.reserve(chunk.size());
×
1348
    const char* data = chunk.data();
×
1349
    std::size_t len = chunk.size();
×
1350
    std::size_t pos = 0;
×
1351
    while (pos < len) {
×
1352
        const void* nl = std::memchr(data + pos, '\n', len - pos);
×
1353
        std::size_t end_pos = nl ? static_cast<const char*>(nl) - data : len;
×
1354
        if (end_pos > pos) {
×
1355
            std::string_view line(data + pos, end_pos - pos);
×
1356
            if (prefilter.may_match(line)) {
×
1357
                out.append(line);
×
1358
                out.push_back('\n');
×
1359
            }
1360
        }
1361
        pos = end_pos + 1;
×
1362
    }
1363
    return out;
×
1364
}
×
1365

1366
// Extract fields referenced by the query into a ValueMap, walking one level
1367
// of object nesting. Fields not referenced by the query are skipped.
1368
void collect_query_fields(simdjson::ondemand::document_reference doc,
×
1369
                          const Query& query, common::query::ValueMap& out) {
1370
    auto obj = doc.get_object();
×
1371
    if (obj.error()) return;
×
1372
    for (auto field : obj.value()) {
×
1373
        if (field.error()) continue;
×
1374
        auto key_r = field.unescaped_key();
×
1375
        if (key_r.error()) continue;
×
1376
        auto val_r = field.value();
×
1377
        if (val_r.error()) continue;
×
1378
        auto key = key_r.value();
×
1379
        auto val = val_r.value();
×
1380
        auto type_r = val.type();
×
1381
        if (type_r.error()) continue;
×
1382
        auto type = type_r.value();
×
1383
        if (type == simdjson::ondemand::json_type::object) {
×
1384
            auto nested = val.get_object();
×
1385
            if (nested.error()) continue;
×
1386
            for (auto nf : nested.value()) {
×
1387
                if (nf.error()) continue;
×
1388
                auto nk_r = nf.unescaped_key();
×
1389
                if (nk_r.error()) continue;
×
1390
                auto nv_r = nf.value();
×
1391
                if (nv_r.error()) continue;
×
1392
                if (!query.references(nk_r.value())) continue;
×
1393
                out[std::string(nk_r.value())] =
×
1394
                    ondemand_to_literal(nv_r.value());
×
1395
            }
1396
        } else if (query.references(key)) {
×
1397
            out[std::string(key)] = ondemand_to_literal(val);
×
1398
        }
1399
    }
1400
}
1401

1402
}  // namespace
1403

1404
coro::AsyncGenerator<ArrowExportResult> TraceReader::read_arrow(
3,822!
1405
    ReadConfig config, std::size_t batch_size) {
80✔
1406
    std::optional<Query> query;
3,311✔
1407
    if (!config.query.empty()) {
3,311✔
1408
        auto parsed = Query::from_string(config.query);
7!
1409
        if (!parsed) throw common::query::QueryParseError(parsed.error());
7!
1410
        query = std::move(*parsed);
7!
1411
    }
7✔
1412

1413
    // When chunk_prune_only is set, dim_stats already proved every event in
1414
    // the chunk that has the predicate field matches the literal. We still
1415
    // need to skip events that lack the field (e.g., metadata "ph":"M"
1416
    // events lack pid), since the original predicate would reject them.
1417
    std::vector<std::string> presence_check_paths;
3,311✔
1418
    if (query && config.chunk_prune_only) {
3,311!
1419
        const auto& fset = query->fields();
×
1420
        presence_check_paths.assign(fset.begin(), fset.end());
×
1421
    }
1422

1423
    // For AND-of-EQ predicates, evaluate directly against simdjson without
1424
    // ValueMap (avoids wyhash + per-field std::string allocation per row).
1425
    // Falls back to the ValueMap path on unsupported AST shapes.
1426
    std::vector<CompiledEqProbe> compiled_probes;
3,311✔
1427
    bool use_compiled = false;
3,311✔
1428
    if (query && !config.chunk_prune_only) {
3,311!
1429
        if (auto p = try_compile_eq_probes(query->root())) {
14!
1430
            compiled_probes = std::move(*p);
7✔
1431
            use_compiled = !compiled_probes.empty();
7✔
1432
        }
7✔
1433
    }
7✔
1434

1435
    bool flatten = config.flatten_objects;
3,311✔
1436

1437
    if (!has_index_) {
3,311✔
1438
        // Fallback: drive the per-line read_json path and build rows.
1439
        auto json_gen = read_json(config);
3,294!
1440
        RecordBatchBuilder builder;
3,294!
1441
        ArrowStringArena arena;
3,294!
1442
        std::vector<ArrowKeyHint> hints;
3,294✔
1443
        builder.reserve(batch_size);
3,294!
1444
        while (auto opt = co_await json_gen.next()) {
6,542!
1445
            if (!arrow_row_from_doc(builder, hints,
1,554!
1446
                                    simdjson::ondemand::document_reference(
1,552✔
1447
                                        opt->parser->raw_document()),
1,554✔
1448
                                    flatten))
1,552✔
1449
                continue;
1450
            if (builder.num_rows() >= batch_size) {
1,550✔
1451
                co_yield builder.finish();
70!
1452
                arena.clear();
35!
1453
                if (!builder.is_schema_locked()) builder.lock_schema();
35✔
1454
                builder.reset(true);
35!
1455
                builder.reserve(batch_size);
35!
1456
            }
35✔
1457
        }
1,615!
1458
        if (builder.num_rows() > 0) {
62✔
1459
            co_yield builder.finish();
110!
1460
        }
55✔
1461
        co_return;
63✔
1462
    }
64✔
1463

1464
    // Keep RocksDB alive for the generator's lifetime so per-method opens
1465
    // in GzipIndexer reuse DBManager's cached handle.
1466
    std::optional<indexer::IndexDatabase> db_keep_alive;
17✔
1467
    if (has_index_ && !index_path_.empty()) {
17✔
1468
        try {
1469
            db_keep_alive.emplace(index_path_,
30!
1470
                                  rocksdb::RocksDatabase::OpenMode::ReadOnly);
15✔
1471
        } catch (...) {
15✔
1472
        }
×
1473
    }
15✔
1474

1475
    auto reader = create_indexed_reader();
23!
1476
    auto chunk_gen = read_chunks_indexed(
26!
1477
        reader, index_path_, config_.file_path, config, query,
13!
1478
        /*extend_to_line_boundary=*/config.end_at_checkpoint);
13✔
1479

1480
    LinePrefilter prefilter = (query && !config.chunk_prune_only)
13!
1481
                                  ? build_prefilter(*query)
×
1482
                                  : LinePrefilter{};
13✔
1483
    bool have_line_prefilter = !prefilter.empty();
13!
1484

1485
    simdjson::ondemand::parser bulk_parser;
13✔
1486
    RecordBatchBuilder builder;
13!
1487
    ArrowStringArena arena;
13!
1488
    std::vector<ArrowKeyHint> hints;
13✔
1489
    builder.reserve(batch_size);
13!
1490

1491
    auto maybe_flush = [&builder, &arena, batch_size](
791✔
1492
                           bool final) -> std::optional<ArrowExportResult> {
1,265✔
1493
        if (builder.num_rows() == 0) return std::nullopt;
778✔
1494
        if (!final && builder.num_rows() < batch_size) return std::nullopt;
775✔
1495
        auto result = builder.finish();
45!
1496
        arena.clear();
46!
1497
        if (!builder.is_schema_locked()) builder.lock_schema();
46✔
1498
        builder.reset(true);
46!
1499
        builder.reserve(batch_size);
46!
1500
        return result;
46!
1501
    };
413✔
1502

1503
    bool first_chunk = true;
13✔
1504
    while (auto chunk_opt = co_await chunk_gen.next()) {
116!
1505
        auto chunk = *chunk_opt;
14✔
1506
        if (chunk.empty()) continue;
14!
1507

1508
        // Work items with start_byte > 0 begin at a deflate-block boundary
1509
        // that is typically mid-line; the previous worker emitted that
1510
        // spanning line via its tail-flush, so drop bytes up to (and
1511
        // including) the first newline in our first chunk.
1512
        if (first_chunk && config.start_byte > 0 &&
14!
1513
            config.start_at_checkpoint) {
1514
            const char* nl = static_cast<const char*>(
1515
                std::memchr(chunk.data(), '\n', chunk.size()));
×
1516
            if (nl) {
×
1517
                std::size_t skip =
1518
                    static_cast<std::size_t>(nl - chunk.data()) + 1;
1519
                if (skip < chunk.size()) {
×
1520
                    chunk = chunk.subspan(skip);
1521
                } else {
1522
                    first_chunk = false;
1523
                    continue;
1524
                }
1525
            }
×
1526
        }
×
1527
        first_chunk = false;
14✔
1528

1529
        simdjson::padded_string padded;
14✔
1530
        if (have_line_prefilter) {
14!
1531
            auto collected = collect_matching_lines(chunk, prefilter);
×
1532
            if (collected.empty()) continue;
×
1533
            padded = simdjson::padded_string(std::move(collected));
1534
        } else {
×
1535
            auto trimmed = strip_ndjson_bookends(
28!
1536
                std::string_view(chunk.data(), chunk.size()));
14✔
1537
            if (trimmed.empty()) continue;
14!
1538
            padded = simdjson::padded_string(trimmed);
14✔
1539
        }
14!
1540

1541
        auto docs_r = bulk_parser.iterate_many(padded, 1 << 20,
14✔
1542
                                               /*allow_comma_separated=*/false);
1543
        if (docs_r.error()) continue;
14!
1544
        auto& docs = docs_r.value();
14!
1545

1546
        for (auto it = docs.begin(); it != docs.end(); ++it) {
389!
1547
            auto doc_result = *it;
375✔
1548
            if (doc_result.error()) continue;
375!
1549
            auto& doc = doc_result.value();
375!
1550

1551
            if (query && !config.chunk_prune_only) {
375!
1552
                if (use_compiled) {
×
1553
                    if (!eval_compiled_eq(compiled_probes, doc)) continue;
×
1554
                } else {
1555
                    common::query::ValueMap fields;
×
1556
                    collect_query_fields(doc, *query, fields);
×
1557
                    if (!query->evaluate(fields)) continue;
×
1558
                }
×
1559
                doc.rewind();
1560
            } else if (!presence_check_paths.empty()) {
375!
1561
                bool all_present = true;
1562
                for (const auto& path : presence_check_paths) {
×
1563
                    auto fld = doc.find_field_unordered(path);
1564
                    if (fld.error()) {
×
1565
                        all_present = false;
1566
                        break;
1567
                    }
1568
                }
×
1569
                if (!all_present) continue;
×
1570
                doc.rewind();
1571
            }
×
1572

1573
            if (!arrow_row_from_doc(builder, hints, doc, flatten)) continue;
375!
1574

1575
            if (auto flushed = maybe_flush(/*final=*/false)) {
384!
1576
                co_yield std::move(*flushed);
18!
1577
            }
9✔
1578
        }
375!
1579
    }
29!
1580

1581
    if (auto flushed = maybe_flush(/*final=*/true)) {
29!
1582
        co_yield std::move(*flushed);
28!
1583
    }
14✔
1584
}
6,876!
1585

1586
#endif  // DFTRACER_UTILS_ENABLE_ARROW
1587

1588
}  // namespace dftracer::utils::utilities::reader
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc