• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28693295402

04 Jul 2026 03:17AM UTC coverage: 52.408% (+0.1%) from 52.278%
28693295402

push

github

hariharan-devarajan
feat: silence noisy warnings on aarch64

37318 of 92666 branches covered (40.27%)

Branch coverage included in aggregate %.

33462 of 42389 relevant lines covered (78.94%)

20557.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.33
/src/dftracer/utils/utilities/reader/trace_reader.cpp
1
#include <dftracer/utils/core/common/archive_format.h>
2
#include <dftracer/utils/core/common/filesystem.h>
3
#include <dftracer/utils/core/utils/string.h>
4
#include <dftracer/utils/utilities/common/json/json_value.h>
5
#include <dftracer/utils/utilities/common/query/query.h>
6
#include <dftracer/utils/utilities/composites/dft/indexing/chunk_pruner_utility.h>
7
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
8
#include <dftracer/utils/utilities/fileio/lines/sources/async_plain_file_bytes_generator.h>
9
#include <dftracer/utils/utilities/fileio/lines/sources/async_plain_file_line_generator.h>
10
#include <dftracer/utils/utilities/fileio/lines/sources/async_streaming_gz_line_generator.h>
11
#include <dftracer/utils/utilities/indexer/index_database.h>
12
#include <dftracer/utils/utilities/indexer/internal/helpers.h>
13
#include <dftracer/utils/utilities/indexer/internal/indexer_factory.h>
14
#include <dftracer/utils/utilities/reader/internal/reader.h>
15
#include <dftracer/utils/utilities/reader/internal/reader_factory.h>
16
#include <dftracer/utils/utilities/reader/internal/stream.h>
17
#include <dftracer/utils/utilities/reader/internal/stream_config.h>
18
#include <dftracer/utils/utilities/reader/internal/stream_type.h>
19
#include <dftracer/utils/utilities/reader/internal/trace_reader_prefilter.h>
20
#include <dftracer/utils/utilities/reader/internal/trace_reader_shared.h>
21
#include <dftracer/utils/utilities/reader/trace_reader.h>
22
#include <simdjson.h>
23

24
#include <algorithm>
25
#include <cstring>
26
#include <optional>
27
#include <span>
28
#include <type_traits>
29
#include <unordered_map>
30

31
namespace dftracer::utils::utilities::reader {
32

33
namespace dft_internal = composites::dft::internal;
34
using common::json::JsonValue;
35
using common::query::Query;
36
using composites::dft::indexing::ChunkPrunerInput;
37
using composites::dft::indexing::ChunkPrunerUtility;
38
using indexer::internal::IndexerFactory;
39

40
using internal::build_prefilter;
41
using internal::LinePrefilter;
42
using internal::ondemand_to_literal;
43
using internal::read_chunks_indexed;
44
using internal::strip_ndjson_bookends;
45

46
namespace {
47

48
thread_local simdjson::dom::parser tl_parser;
3✔
49

50
bool line_matches_query(const Query& q, std::string_view content) {
5,420✔
51
    auto result = tl_parser.parse(content.data(), content.size());
5,420✔
52
    if (result.error()) return false;
5,420✔
53
    auto root = result.value_unsafe();
5,382✔
54
    if (!root.is_object()) return false;
5,382✔
55
    JsonValue json(root);
5,382✔
56
    return q.evaluate(json);
5,382!
57
}
2,710✔
58

59
struct LineRange {
60
    std::size_t start_line;
61
    std::size_t end_line;
62
};
63

64
coro::AsyncGenerator<Line> yield_lines_from_stream(
5,596!
65
    std::unique_ptr<internal::ReaderStream> stream, std::size_t start_line_num,
66
    const Query* query, bool chunk_prune_only = false,
67
    const LinePrefilter* prefilter = nullptr) {
12!
68
    std::size_t line_num = start_line_num;
12✔
69
    while (!stream->done()) {
24!
70
        auto chunk = co_await stream->read_async();
108!
71
        if (chunk.empty()) break;
2,810✔
72
        const char* data = chunk.data();
2,798✔
73
        std::size_t len = chunk.size();
2,798✔
74

75
        // Chunk-level pre-filter: if any required literal is absent from this
76
        // entire buffer, no line within it can match. Skip without splitting.
77
        // Line numbers must stay correct for subsequent chunks.
78
        if (prefilter && !prefilter->empty() &&
2,798!
79
            !prefilter->may_match(std::string_view(data, len))) {
×
80
            line_num += std::count(data, data + len, '\n');
×
81
            continue;
82
        }
83

84
        std::size_t pos = 0;
2,798✔
85
        while (pos < len) {
4,885✔
86
            const void* nl_ptr = std::memchr(data + pos, '\n', len - pos);
4,873!
87
            std::size_t end_pos =
9,746✔
88
                nl_ptr ? static_cast<const char*>(nl_ptr) - data : len;
4,873!
89
            if (end_pos > pos) {
4,873!
90
                auto line_sv = std::string_view(data + pos, end_pos - pos);
4,873✔
91
                bool accept = chunk_prune_only || !query ||
6,185✔
92
                              line_matches_query(*query, line_sv);
1,312!
93
                if (accept && prefilter && !prefilter->empty() &&
4,873!
94
                    !prefilter->may_match(line_sv)) {
×
95
                    accept = false;
96
                }
97
                if (accept) {
4,873✔
98
                    co_yield Line(line_sv, line_num);
5,572!
99
                }
1,393✔
100
                ++line_num;
2,087✔
101
            } else {
2,087!
102
                ++line_num;
103
            }
104
            pos = end_pos + 1;
2,087✔
105
        }
2,087!
106
    }
2,810!
107
}
5,704!
108

109
coro::AsyncGenerator<Line> yield_lines_from_ranges(
×
110
    std::shared_ptr<internal::Reader> reader, std::vector<LineRange> ranges,
111
    std::size_t buffer_size, Query query, bool chunk_prune_only = false,
112
    LinePrefilter prefilter = {}) {
×
113
    for (const auto& range : ranges) {
×
114
        auto stream =
115
            reader->stream(internal::StreamConfig()
×
116
                               .stream_type(internal::StreamType::MULTI_LINES)
117
                               .range_type(internal::RangeType::LINE_RANGE)
118
                               .from(range.start_line)
119
                               .to(range.end_line)
120
                               .buffer_size(buffer_size));
121
        auto gen =
122
            yield_lines_from_stream(std::move(stream), range.start_line, &query,
×
123
                                    chunk_prune_only, &prefilter);
124
        while (auto line = co_await gen.next()) {
×
125
            co_yield *line;
×
126
        }
×
127
    }
×
128
}
×
129

130
// Raw-chunk variants of the yield/read helpers. Same pruning logic as the
131
// line-yielding flavors but emit std::span<const char> buffers untouched
132
// (multi-line boundary respected by stream type). Used by read_json to run
133
// simdjson iterate_many over each chunk instead of parsing line by line.
134
coro::AsyncGenerator<std::span<const char>> yield_chunks_from_stream(
848!
135
    std::unique_ptr<internal::ReaderStream> stream,
136
    const LinePrefilter* prefilter = nullptr) {
53!
137
    while (!stream->done()) {
106!
138
        auto chunk = co_await stream->read_async();
477!
139
        if (chunk.empty()) break;
106✔
140
        if (prefilter && !prefilter->empty() &&
53!
141
            !prefilter->may_match(
×
142
                std::string_view(chunk.data(), chunk.size()))) {
143
            continue;
144
        }
145
        co_yield chunk;
106!
146
    }
106!
147
}
318!
148

149
coro::AsyncGenerator<std::span<const char>> yield_chunks_from_ranges(
×
150
    std::shared_ptr<internal::Reader> reader, std::vector<LineRange> ranges,
151
    std::size_t buffer_size, LinePrefilter prefilter = {}) {
×
152
    for (const auto& range : ranges) {
×
153
        auto stream =
154
            reader->stream(internal::StreamConfig()
×
155
                               .stream_type(internal::StreamType::MULTI_LINES)
×
156
                               .range_type(internal::RangeType::LINE_RANGE)
×
157
                               .from(range.start_line)
×
158
                               .to(range.end_line)
×
159
                               .buffer_size(buffer_size));
×
160
        auto gen = yield_chunks_from_stream(std::move(stream), &prefilter);
×
161
        while (auto chunk = co_await gen.next()) {
×
162
            co_yield *chunk;
×
163
        }
×
164
    }
×
165
}
×
166

167
coro::AsyncGenerator<Line> read_lines_indexed(
5,716!
168
    std::shared_ptr<internal::Reader> reader, std::string index_path,
169
    std::string file_path, ReadConfig config, std::optional<Query> query,
170
    bool chunk_prune_only = false) {
16!
171
    // Keep RocksDB alive for the generator's lifetime so per-method opens
172
    // in GzipIndexer reuse DBManager's cached handle.
173
    std::optional<indexer::IndexDatabase> db_keep_alive;
16✔
174
    if (!index_path.empty()) {
16!
175
        try {
176
            db_keep_alive.emplace(index_path,
16!
177
                                  rocksdb::RocksDatabase::OpenMode::ReadOnly);
16✔
178
        } catch (...) {
16✔
179
        }
×
180
    }
16✔
181

182
    LinePrefilter prefilter = query ? build_prefilter(*query) : LinePrefilter{};
16!
183
    auto range_type = config.has_line_range() ? internal::RangeType::LINE_RANGE
16✔
184
                                              : internal::RangeType::BYTE_RANGE;
185
    std::size_t start =
32✔
186
        config.has_line_range() ? config.start_line : config.start_byte;
16✔
187
    std::size_t end =
32✔
188
        config.has_line_range() ? config.end_line : config.end_byte;
16✔
189

190
    if (range_type == internal::RangeType::LINE_RANGE) {
16✔
191
        auto total_lines = reader->get_num_lines();
2!
192
        if (start == 0) start = 1;
2!
193
        if (end == 0 || end > total_lines) end = total_lines;
2!
194
        if (start > total_lines) co_return;
18✔
195
    } else {
2✔
196
        auto max_bytes = reader->get_max_bytes();
14!
197
        if (end == 0 || end > max_bytes) end = max_bytes;
14!
198
        if (start >= max_bytes) co_return;
14✔
199
    }
14✔
200

201
    if (query && !index_path.empty() &&
14!
202
        range_type == internal::RangeType::BYTE_RANGE) {
7✔
203
        ChunkPrunerInput pruner_input{index_path, file_path, *query, nullptr};
21!
204
        ChunkPrunerUtility pruner;
21!
205
        auto pruner_out = co_await pruner.process(pruner_input);
28!
206
        if (pruner_out.success && !pruner_out.file_may_match) {
7!
207
            co_return;
2✔
208
        }
209

210
        if (pruner_out.success && !pruner_out.candidate_checkpoints.empty() &&
5!
211
            pruner_out.candidate_checkpoints.size() <
8✔
212
                pruner_out.total_checkpoints) {
4✔
213
            indexer::IndexDatabase idx_db(
×
214
                index_path, rocksdb::RocksDatabase::OpenMode::ReadOnly);
215
            auto logical = indexer::internal::get_logical_path(file_path);
×
216
            int fid = idx_db.get_file_info_id(logical);
×
217

218
            if (fid >= 0) {
×
219
                auto all_ckpts = idx_db.query_checkpoints(fid);
×
220
                std::unordered_map<std::uint64_t, indexer::IndexerCheckpoint>
221
                    ckpt_map;
222
                for (auto& ckpt : all_ckpts) {
×
223
                    ckpt_map.emplace(ckpt.checkpoint_idx, std::move(ckpt));
×
224
                }
225

226
                std::vector<LineRange> ranges;
227
                std::uint64_t prev_idx = UINT64_MAX;
228

229
                for (auto ckpt_idx : pruner_out.candidate_checkpoints) {
×
230
                    auto it = ckpt_map.find(ckpt_idx);
×
231
                    if (it == ckpt_map.end()) continue;
×
232
                    const auto& ckpt = it->second;
×
233

234
                    if (ranges.empty() || ckpt_idx != prev_idx + 1) {
×
235
                        ranges.push_back(
×
236
                            {ckpt.first_line_num, ckpt.last_line_num});
237
                    } else {
238
                        ranges.back().end_line = ckpt.last_line_num;
239
                    }
240
                    prev_idx = ckpt_idx;
241
                }
×
242

243
                auto gen = yield_lines_from_ranges(reader, std::move(ranges),
×
244
                                                   config.buffer_size, *query,
×
245
                                                   chunk_prune_only, prefilter);
×
246
                while (auto line = co_await gen.next()) {
×
247
                    co_yield *line;
×
248
                }
×
249
                co_return;
250
            }
251
        }
×
252
    }
7✔
253

254
    auto stream =
26✔
255
        reader->stream(internal::StreamConfig()
52!
256
                           .stream_type(internal::StreamType::MULTI_LINES)
26✔
257
                           .range_type(range_type)
26✔
258
                           .from(start)
26✔
259
                           .to(end)
26✔
260
                           .buffer_size(config.buffer_size));
26✔
261

262
    auto gen = yield_lines_from_stream(std::move(stream), start,
24!
263
                                       query ? &*query : nullptr,
12✔
264
                                       chunk_prune_only, &prefilter);
12✔
265
    while (auto line = co_await gen.next()) {
5,620!
266
        co_yield *line;
2,786!
267
    }
1,405✔
268
}
8,492!
269

270
coro::AsyncGenerator<Line> read_lines_gz(std::string file_path,
27,291!
271
                                         ReadConfig config,
272
                                         std::optional<Query> query,
273
                                         bool chunk_prune_only = false) {
172!
274
    std::size_t start = config.has_line_range() ? config.start_line : 0;
172✔
275
    std::size_t end = config.has_line_range() ? config.end_line : 0;
172✔
276
    auto gen =
172✔
277
        fileio::lines::sources::async_streaming_gz_lines(file_path, start, end);
172!
278
    while (auto opt = co_await gen.next()) {
26,468!
279
        if (chunk_prune_only || !query ||
7,657✔
280
            line_matches_query(*query, opt->content)) {
1,246!
281
            co_yield *opt;
11,123!
282
        }
5,546✔
283
    }
6,591✔
284
}
39,936!
285

286
coro::AsyncGenerator<Line> read_lines_plain_bytes(
94!
287
    std::string file_path, ReadConfig config, std::optional<Query> query,
288
    bool chunk_prune_only = false) {
5!
289
    auto gen = fileio::lines::sources::async_plain_file_bytes(
5!
290
        file_path, config.start_byte, config.end_byte, config.buffer_size);
5!
291
    while (auto opt = co_await gen.next()) {
69!
292
        if (chunk_prune_only || !query ||
13✔
293
            line_matches_query(*query, opt->content)) {
2!
294
            co_yield *opt;
20!
295
        }
10✔
296
    }
16✔
297
}
111!
298

299
coro::AsyncGenerator<Line> read_lines_plain(std::string file_path,
10,082!
300
                                            ReadConfig config,
301
                                            std::optional<Query> query,
302
                                            bool chunk_prune_only = false) {
37!
303
    std::size_t start = config.has_line_range() ? config.start_line : 0;
37!
304
    std::size_t end = config.has_line_range() ? config.end_line : 0;
37!
305
    auto gen =
37✔
306
        fileio::lines::sources::async_plain_file_lines(file_path, start, end);
37!
307
    while (auto opt = co_await gen.next()) {
9,899!
308
        if (chunk_prune_only || !query ||
2,579✔
309
            line_matches_query(*query, opt->content)) {
150!
310
            co_yield *opt;
4,758!
311
        }
2,379✔
312
    }
2,466✔
313
}
14,901!
314

315
}  // namespace
316

317
namespace internal {
318

319
coro::AsyncGenerator<std::span<const char>> read_chunks_indexed(
802!
320
    std::shared_ptr<internal::Reader> reader, std::string index_path,
321
    std::string file_path, ReadConfig config, std::optional<Query> query,
322
    bool extend_to_line_boundary) {
55!
323
    // Keep RocksDB alive for the generator's lifetime so per-method opens
324
    // in GzipIndexer reuse DBManager's cached handle.
325
    std::optional<indexer::IndexDatabase> db_keep_alive;
55✔
326
    if (!index_path.empty()) {
55!
327
        try {
328
            db_keep_alive.emplace(index_path,
55!
329
                                  rocksdb::RocksDatabase::OpenMode::ReadOnly);
55✔
330
        } catch (...) {
55✔
331
        }
×
332
    }
55✔
333

334
    LinePrefilter prefilter = query ? build_prefilter(*query) : LinePrefilter{};
55!
335
    auto range_type = config.has_line_range() ? internal::RangeType::LINE_RANGE
55!
336
                                              : internal::RangeType::BYTE_RANGE;
337
    std::size_t start =
110✔
338
        config.has_line_range() ? config.start_line : config.start_byte;
55!
339
    std::size_t end =
110✔
340
        config.has_line_range() ? config.end_line : config.end_byte;
55!
341

342
    if (range_type == internal::RangeType::LINE_RANGE) {
55✔
343
        auto total_lines = reader->get_num_lines();
3!
344
        if (start == 0) start = 1;
3!
345
        if (end == 0 || end > total_lines) end = total_lines;
3!
346
        if (start > total_lines) co_return;
58!
347
    } else {
3!
348
        auto max_bytes = reader->get_max_bytes();
52!
349
        if (end == 0 || end > max_bytes) end = max_bytes;
52!
350
        if (start >= max_bytes) co_return;
52✔
351
    }
52✔
352

353
    if (query && !index_path.empty() && !config.skip_pruning) {
54!
354
        ChunkPrunerInput pruner_input{index_path, file_path, *query, nullptr};
120!
355
        ChunkPrunerUtility pruner;
120!
356
        auto pruner_out = co_await pruner.process(pruner_input);
160!
357
        if (pruner_out.success && !pruner_out.file_may_match) {
40!
358
            co_return;
1✔
359
        }
360

361
        if (pruner_out.success && !pruner_out.candidate_checkpoints.empty() &&
39!
362
            pruner_out.candidate_checkpoints.size() <
78✔
363
                pruner_out.total_checkpoints) {
39✔
364
            indexer::IndexDatabase idx_db(
×
365
                index_path, rocksdb::RocksDatabase::OpenMode::ReadOnly);
366
            auto logical = indexer::internal::get_logical_path(file_path);
×
367
            int fid = idx_db.get_file_info_id(logical);
×
368

369
            if (fid >= 0) {
×
370
                auto all_ckpts = idx_db.query_checkpoints(fid);
×
371
                std::unordered_map<std::uint64_t, indexer::IndexerCheckpoint>
372
                    ckpt_map;
373
                for (auto& ckpt : all_ckpts) {
×
374
                    ckpt_map.emplace(ckpt.checkpoint_idx, std::move(ckpt));
×
375
                }
376

377
                std::vector<LineRange> ranges;
378
                std::uint64_t prev_idx = UINT64_MAX;
379
                for (auto ckpt_idx : pruner_out.candidate_checkpoints) {
×
380
                    auto it = ckpt_map.find(ckpt_idx);
×
381
                    if (it == ckpt_map.end()) continue;
×
382
                    const auto& ckpt = it->second;
×
383
                    // Intersect with the caller's window (byte or line) so
384
                    // checkpoint-level parallel work items stay disjoint.
385
                    if (range_type == internal::RangeType::BYTE_RANGE) {
×
386
                        std::size_t ckpt_start = ckpt.uc_offset;
387
                        std::size_t ckpt_end = ckpt.uc_offset + ckpt.uc_size;
388
                        if (ckpt_end <= start) continue;
×
389
                        if (ckpt_start >= end) continue;
×
390
                    } else {
×
391
                        if (ckpt.last_line_num < start) continue;
×
392
                        if (ckpt.first_line_num > end) continue;
×
393
                    }
394
                    if (ranges.empty() || ckpt_idx != prev_idx + 1) {
×
395
                        ranges.push_back(
×
396
                            {ckpt.first_line_num, ckpt.last_line_num});
397
                    } else {
398
                        ranges.back().end_line = ckpt.last_line_num;
399
                    }
400
                    prev_idx = ckpt_idx;
401
                }
×
402

403
                if (ranges.empty()) {
×
404
                    co_return;
405
                }
406

407
                auto gen = yield_chunks_from_ranges(
×
408
                    reader, std::move(ranges), config.buffer_size, prefilter);
×
409
                while (auto chunk = co_await gen.next()) {
×
410
                    co_yield *chunk;
×
411
                }
×
412
                co_return;
413
            }
414
        }
×
415
    }
40✔
416

417
    auto stream_type = (range_type == internal::RangeType::BYTE_RANGE)
133✔
418
                           ? internal::StreamType::MULTI_LINES_BYTES
419
                           : internal::StreamType::MULTI_LINES;
420
    auto stream =
133✔
421
        reader->stream(internal::StreamConfig()
186✔
422
                           .stream_type(stream_type)
133✔
423
                           .range_type(range_type)
53!
424
                           .from(start)
53!
425
                           .to(end)
53!
426
                           .buffer_size(config.buffer_size)
53!
427
                           .extend_to_line_boundary(
53!
428
                               extend_to_line_boundary &&
53✔
429
                               range_type == internal::RangeType::BYTE_RANGE));
1✔
430

431
    auto gen = yield_chunks_from_stream(std::move(stream), &prefilter);
27!
432
    while (auto chunk = co_await gen.next()) {
424!
433
        co_yield *chunk;
106!
434
    }
106✔
435
}
1,065!
436

437
}  // namespace internal
438

439
TraceReader::TraceReader(TraceReaderConfig config)
1,868✔
440
    : config_(std::move(config)) {
1,401✔
441
    probe_index();
933!
442
}
1,403✔
443

444
void TraceReader::probe_index() {
935✔
445
    format_ = IndexerFactory::detect_format(config_.file_path);
935✔
446
    index_path_ = dft_internal::determine_index_path(config_.file_path,
1,402✔
447
                                                     config_.index_dir);
936✔
448
    has_index_ =
933!
449
        (format_ == ArchiveFormat::GZIP || format_ == ArchiveFormat::TAR_GZ) &&
1,333✔
450
        fs::exists(index_path_);
1,267!
451
}
934✔
452

453
bool TraceReader::has_index() const { return has_index_; }
342✔
454

455
void TraceReader::ensure_metadata_cached() {
38✔
456
    if (metadata_cached_) return;
38!
457

458
    if (has_index_) {
38✔
459
        auto reader = create_indexed_reader();
26!
460
        cached_max_bytes_ = reader->get_max_bytes();
26!
461
        cached_num_lines_ = reader->get_num_lines();
26!
462
    } else if (format_ == ArchiveFormat::GZIP ||
38!
463
               format_ == ArchiveFormat::TAR_GZ) {
×
464
        cached_max_bytes_ = 0;
12✔
465
        cached_num_lines_ = 0;
12✔
466
    } else {
6✔
467
        std::error_code ec;
×
468
        auto size = fs::file_size(config_.file_path, ec);
×
469
        cached_max_bytes_ = ec ? 0 : static_cast<std::size_t>(size);
×
470
        cached_num_lines_ = 0;
×
471
    }
472
    metadata_cached_ = true;
38✔
473
}
19✔
474

475
std::size_t TraceReader::get_max_bytes() {
24✔
476
    ensure_metadata_cached();
24✔
477
    return cached_max_bytes_;
24✔
478
}
479

480
std::size_t TraceReader::get_num_lines() {
14✔
481
    ensure_metadata_cached();
14✔
482
    return cached_num_lines_;
14✔
483
}
484

485
std::shared_ptr<internal::Reader> TraceReader::create_indexed_reader() {
180✔
486
    auto indexer = IndexerFactory::create(config_.file_path, index_path_,
270✔
487
                                          config_.checkpoint_size, false);
180!
488
    return internal::ReaderFactory::create(indexer);
269!
489
}
180✔
490

491
internal::StreamType TraceReader::resolve_raw_stream_type(
12✔
492
    const ReadConfig& config) const {
493
    if (!config.line_aligned) return internal::StreamType::BYTES;
12!
494
    if (config.multi_line) return internal::StreamType::MULTI_LINES_BYTES;
12✔
495
    return internal::StreamType::LINE_BYTES;
2✔
496
}
6✔
497

498
internal::RangeType TraceReader::resolve_range_type(
12✔
499
    const ReadConfig& config) const {
500
    if (config.has_line_range()) return internal::RangeType::LINE_RANGE;
12!
501
    return internal::RangeType::BYTE_RANGE;
12✔
502
}
6✔
503

504
coro::AsyncGenerator<Line> TraceReader::read_lines(ReadConfig config) {
459✔
505
    std::optional<Query> query;
459✔
506
    if (!config.query.empty()) {
459✔
507
        auto parsed = Query::from_string(config.query);
82✔
508
        if (!parsed) throw common::query::QueryParseError(parsed.error());
82!
509
        query = std::move(*parsed);
82!
510
    }
82✔
511

512
    bool cpo = config.chunk_prune_only;
458✔
513

514
    if (has_index_) {
458✔
515
        return read_lines_indexed(create_indexed_reader(), index_path_,
64!
516
                                  config_.file_path, std::move(config),
32!
517
                                  std::move(query), cpo);
80!
518
    }
519
    if (format_ == ArchiveFormat::GZIP || format_ == ArchiveFormat::TAR_GZ) {
426✔
520
        return read_lines_gz(config_.file_path, std::move(config),
344!
521
                             std::move(query), cpo);
512!
522
    }
523
    if (config.has_byte_range()) {
84✔
524
        return read_lines_plain_bytes(config_.file_path, std::move(config),
10!
525
                                      std::move(query), cpo);
15!
526
    }
527
    return read_lines_plain(config_.file_path, std::move(config),
74!
528
                            std::move(query), cpo);
111!
529
}
461✔
530

531
coro::AsyncGenerator<JsonLine> TraceReader::read_json(ReadConfig config) {
12,769!
532
    std::optional<Query> query;
11,940✔
533
    if (!config.query.empty()) {
11,940✔
534
        auto parsed = Query::from_string(config.query);
52!
535
        if (!parsed) throw common::query::QueryParseError(parsed.error());
52!
536
        query = std::move(*parsed);
52!
537
    }
52✔
538

539
    // chunk_prune_only path: dim_stats already proved every event with the
540
    // predicate field matches; we still need to skip events lacking the
541
    // field (e.g., metadata "ph":"M" events). Field-presence probe is
542
    // cheaper than full ValueMap eval.
543
    std::vector<std::string> presence_check_paths;
11,940✔
544
    if (query && config.chunk_prune_only) {
11,940!
545
        const auto& fset = query->fields();
×
546
        presence_check_paths.assign(fset.begin(), fset.end());
×
547
    }
548

549
    // Fast path: indexed gz files go through a chunk generator with
550
    // simdjson iterate_many. Query is evaluated on the ondemand document
551
    // directly, so non-matching docs never hit the yield_parser.
552
    if (has_index_) {
11,940✔
553
        auto reader = create_indexed_reader();
198!
554
        auto chunk_gen = read_chunks_indexed(reader, index_path_,
396!
555
                                             config_.file_path, config, query);
198!
556

557
        simdjson::ondemand::parser bulk_parser;
198✔
558
        common::json::JsonParser yield_parser;
198!
559

560
        while (auto chunk_opt = co_await chunk_gen.next()) {
482!
561
            auto chunk = *chunk_opt;
39✔
562
            if (chunk.empty()) continue;
39!
563
            auto trimmed = strip_ndjson_bookends(
78!
564
                std::string_view(chunk.data(), chunk.size()));
39✔
565
            if (trimmed.empty()) continue;
39!
566
            simdjson::padded_string padded(trimmed);
39✔
567

568
            auto docs_r = bulk_parser.iterate_many(
39✔
569
                padded, 1 << 20, /*allow_comma_separated=*/false);
570
            if (docs_r.error()) continue;
39!
571
            auto& docs = docs_r.value();
39!
572

573
            for (auto it = docs.begin(); it != docs.end(); ++it) {
2,007!
574
                auto doc_result = *it;
1,968✔
575
                if (doc_result.error()) continue;
1,968!
576
                auto& doc = doc_result.value();
1,968!
577

578
                std::string_view src(it.source().data(), it.source().size());
1,968✔
579

580
                if (query && config.chunk_prune_only) {
1,968!
581
                    bool all_present = true;
582
                    for (const auto& path : presence_check_paths) {
×
583
                        auto fld = doc.find_field_unordered(path);
584
                        if (fld.error()) {
×
585
                            all_present = false;
586
                            break;
587
                        }
588
                    }
×
589
                    if (!all_present) continue;
×
590
                    doc.rewind();
591
                } else if (query) {
1,968!
592
                    common::query::ValueMap fields;
1,968!
593
                    auto obj = doc.get_object();
1,968✔
594
                    if (obj.error()) continue;
1,968!
595
                    for (auto field : obj.value()) {
17,938!
596
                        if (field.error()) continue;
15,970!
597
                        auto key_r = field.unescaped_key();
15,970✔
598
                        if (key_r.error()) continue;
15,970!
599
                        auto val_r = field.value();
15,970✔
600
                        if (val_r.error()) continue;
15,970!
601
                        auto key = key_r.value();
15,970!
602
                        auto val = val_r.value();
15,970!
603
                        auto type_r = val.type();
15,970✔
604
                        if (type_r.error()) continue;
15,970!
605
                        auto type = type_r.value();
15,970!
606
                        if (type == simdjson::ondemand::json_type::object) {
15,970✔
607
                            auto nested = val.get_object();
1,968✔
608
                            if (nested.error()) continue;
1,968!
609
                            for (auto nf : nested.value()) {
5,354!
610
                                if (nf.error()) continue;
3,386!
611
                                auto nk_r = nf.unescaped_key();
3,386✔
612
                                if (nk_r.error()) continue;
3,386!
613
                                auto nv_r = nf.value();
3,386✔
614
                                if (nv_r.error()) continue;
3,386!
615
                                auto nk = nk_r.value();
3,386!
616
                                if (!query->references(nk)) continue;
3,386!
617
                                fields[std::string(nk)] =
×
618
                                    ondemand_to_literal(nv_r.value());
×
619
                            }
3,386!
620
                        } else if (query->references(key)) {
15,970!
621
                            fields[std::string(key)] = ondemand_to_literal(val);
1,933!
622
                        }
1,933✔
623
                    }
15,970!
624
                    if (!query->evaluate(fields)) continue;
1,968!
625
                }
1,968✔
626

627
                // Matched (or no query): lend the iterate_many doc to
628
                // yield_parser without re-parsing. Consumers like
629
                // build_arrow_row call parser.for_each_field which now
630
                // iterates the borrowed doc_reference.
631
                doc.rewind();
1,514✔
632
                yield_parser.set_borrowed_document(
3,028✔
633
                    simdjson::ondemand::document_reference(doc));
1,514✔
634
                co_yield JsonLine{src, 0, &yield_parser};
3,028!
635
            }
1,968!
636
        }
79!
637
        co_return;
40✔
638
    }
40✔
639

640
    // Fallback: non-indexed paths use the per-line pipeline unchanged.
641
    config.chunk_prune_only = true;
11,742✔
642
    auto line_gen = read_lines(config);
11,742!
643

644
    common::json::JsonParser parser;
11,742!
645

646
    while (auto opt = co_await line_gen.next()) {
23,242!
647
        const char* trimmed;
5,689✔
648
        std::size_t trimmed_len;
5,689✔
649
        if (!dftracer::utils::json_trim_and_validate_with_comma(
5,689✔
650
                opt->content.data(), opt->content.size(), trimmed, trimmed_len))
5,689✔
651
            continue;
142✔
652
        if (!parser.parse(std::string_view(trimmed, trimmed_len))) continue;
5,546!
653

654
        if (query) {
5,547✔
655
            common::query::ValueMap fields;
692!
656
            std::vector<std::string> nested_keys;
692✔
657
            parser.for_each_field(
692!
658
                [&](std::string_view key, simdjson::ondemand::value val) {
12,312✔
659
                    auto type = val.type().value_unsafe();
11,620✔
660
                    if (type == simdjson::ondemand::json_type::object) {
11,634✔
661
                        nested_keys.emplace_back(key);
1,378✔
662
                    } else if (query->references(key)) {
10,948✔
663
                        fields[std::string(key)] = ondemand_to_literal(val);
1,580!
664
                    }
792✔
665
                });
11,623✔
666
            for (auto& nk : nested_keys) {
1,384✔
667
                parser.rewind();
692!
668
                parser.for_each_field(nk, [&](std::string_view key,
2,716!
669
                                              simdjson::ondemand::value val) {
670
                    if (query->references(key)) {
2,024✔
671
                        fields[std::string(key)] = ondemand_to_literal(val);
×
672
                    }
673
                });
2,023✔
674
            }
692✔
675
            if (!query->evaluate(fields)) continue;
692!
676
            parser.rewind();
220!
677
        }
692✔
678

679
        co_yield JsonLine{opt->content, opt->line_number, &parser};
10,145!
680
    }
5,817!
681
}
24,061!
682

683
coro::AsyncGenerator<std::span<const char>> TraceReader::read_raw(
681!
684
    ReadConfig config) {
27!
685
    if (has_index_) {
27✔
686
        // Keep RocksDB alive for the generator's lifetime so per-method
687
        // opens in GzipIndexer reuse DBManager's cached handle.
688
        std::optional<indexer::IndexDatabase> db_keep_alive;
6✔
689
        if (!index_path_.empty()) {
6!
690
            try {
691
                db_keep_alive.emplace(
6!
692
                    index_path_, rocksdb::RocksDatabase::OpenMode::ReadOnly);
6✔
693
            } catch (...) {
6✔
694
            }
×
695
        }
6✔
696
        auto reader = create_indexed_reader();
6!
697
        auto stream_type = resolve_raw_stream_type(config);
6✔
698
        auto range_type = resolve_range_type(config);
6!
699
        std::size_t start =
12✔
700
            config.has_line_range() ? config.start_line : config.start_byte;
6!
701
        std::size_t end =
12✔
702
            config.has_line_range() ? config.end_line : config.end_byte;
6!
703

704
        if (range_type == internal::RangeType::LINE_RANGE) {
6!
705
            auto total_lines = reader->get_num_lines();
×
706
            if (start == 0) start = 1;
×
707
            if (end == 0 || end > total_lines) end = total_lines;
×
708
            if (start > total_lines) co_return;
27!
709
        } else {
×
710
            auto max_bytes = reader->get_max_bytes();
6!
711
            if (end == 0 || end > max_bytes) end = max_bytes;
6!
712
            if (start >= max_bytes) co_return;
6!
713
        }
6!
714

715
        if (!config.query.empty() && !index_path_.empty() &&
6!
716
            range_type == internal::RangeType::BYTE_RANGE) {
2✔
717
            auto parsed = Query::from_string(config.query);
6!
718
            if (!parsed) throw common::query::QueryParseError(parsed.error());
6!
719
            ChunkPrunerInput pruner_input{index_path_, config_.file_path,
6!
720
                                          std::move(*parsed), nullptr};
6!
721
            ChunkPrunerUtility pruner;
6!
722
            auto pruner_out = co_await pruner.process(pruner_input);
8!
723
            if (pruner_out.success && !pruner_out.file_may_match) {
2!
724
                co_return;
1✔
725
            }
726
        }
2✔
727

728
        auto stream = reader->stream(internal::StreamConfig()
14✔
729
                                         .stream_type(stream_type)
9✔
730
                                         .range_type(range_type)
5!
731
                                         .from(start)
5!
732
                                         .to(end)
5!
733
                                         .buffer_size(config.buffer_size));
5!
734

735
        while (!stream->done()) {
107!
736
            auto chunk = co_await stream->read_async();
444!
737
            if (chunk.empty()) break;
111✔
738
            co_yield chunk;
212!
739
        }
111✔
740
    } else if (format_ == ArchiveFormat::GZIP ||
685!
741
               format_ == ArchiveFormat::TAR_GZ) {
742
        auto gen =
21✔
743
            fileio::lines::sources::async_streaming_gz_lines(config_.file_path);
21!
744
        std::size_t byte_pos = 0;
21✔
745
        while (auto opt = co_await gen.next()) {
4,004!
746
            const auto& line = *opt;
2,939✔
747
            std::size_t line_end = byte_pos + line.content.size() + 1;
2,939✔
748
            if (config.end_byte > 0 && byte_pos >= config.end_byte) break;
2,939✔
749
            if (line_end > config.start_byte) {
2,938✔
750
                co_yield std::span<const char>(line.content.data(),
6,854!
751
                                               line.content.size());
2,937✔
752
            }
979✔
753
            byte_pos = line_end;
980✔
754
        }
4,917✔
755
    } else {
65✔
756
        auto gen =
757
            fileio::lines::sources::async_plain_file_lines(config_.file_path);
×
758
        std::size_t byte_pos = 0;
759
        while (auto opt = co_await gen.next()) {
×
760
            const auto& line = *opt;
761
            std::size_t line_end = byte_pos + line.content.size() + 1;
762
            if (config.end_byte > 0 && byte_pos >= config.end_byte) break;
×
763
            if (line_end > config.start_byte) {
×
764
                co_yield std::span<const char>(line.content.data(),
×
765
                                               line.content.size());
766
            }
767
            byte_pos = line_end;
768
        }
×
769
    }
×
770
}
6,252!
771

772
}  // namespace dftracer::utils::utilities::reader
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc