• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 23531027933

25 Mar 2026 08:05AM UTC coverage: 48.592% (-1.5%) from 50.098%
23531027933

Pull #57

github

web-flow
Merge d1070e289 into 38f9f3616
Pull Request #57: feat(comparator): add pairwise traces comparator

18900 of 49456 branches covered (38.22%)

Branch coverage included in aggregate %.

1604 of 1954 new or added lines in 25 files covered. (82.09%)

3407 existing lines in 135 files now uncovered.

18487 of 27485 relevant lines covered (67.26%)

240991.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.71
/src/dftracer/utils/utilities/reader/trace_reader.cpp
1
#include <dftracer/utils/core/common/archive_format.h>
2
#include <dftracer/utils/core/common/filesystem.h>
3
#include <dftracer/utils/utilities/common/json/json_value.h>
4
#include <dftracer/utils/utilities/common/query/query.h>
5
#include <dftracer/utils/utilities/composites/dft/indexing/chunk_pruner_utility.h>
6
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
7
#include <dftracer/utils/utilities/fileio/lines/sources/async_plain_file_line_generator.h>
8
#include <dftracer/utils/utilities/fileio/lines/sources/async_streaming_gz_line_generator.h>
9
#include <dftracer/utils/utilities/indexer/internal/indexer_factory.h>
10
#include <dftracer/utils/utilities/reader/internal/reader.h>
11
#include <dftracer/utils/utilities/reader/internal/reader_factory.h>
12
#include <dftracer/utils/utilities/reader/internal/stream.h>
13
#include <dftracer/utils/utilities/reader/internal/stream_config.h>
14
#include <dftracer/utils/utilities/reader/internal/stream_type.h>
15
#include <dftracer/utils/utilities/reader/trace_reader.h>
16
#include <yyjson.h>
17

18
#include <cstring>
19
#include <optional>
20
#include <span>
21

22
namespace dftracer::utils::utilities::reader {
23

24
namespace dft_internal = composites::dft::internal;
25
using common::json::JsonValue;
26
using common::query::Query;
27
using composites::dft::indexing::ChunkPrunerInput;
28
using composites::dft::indexing::ChunkPrunerUtility;
29
using indexer::internal::IndexerFactory;
30

31
namespace {
32

33
bool line_matches_query(const Query& q, std::string_view content) {
2,188✔
34
    yyjson_doc* doc = yyjson_read(content.data(), content.size(), 0);
2,188✔
35
    if (!doc) return false;
2,188✔
36
    yyjson_val* root = yyjson_doc_get_root(doc);
2,169✔
37
    bool result = false;
2,169✔
38
    if (root && yyjson_is_obj(root)) {
2,169!
39
        JsonValue json(root);
2,169✔
40
        result = q.evaluate(json);
2,169✔
41
    }
2,169✔
42
    yyjson_doc_free(doc);
2,169✔
43
    return result;
2,169✔
44
}
2,188✔
45

46
}  // namespace
47

48
TraceReader::TraceReader(TraceReaderConfig config)
470✔
49
    : config_(std::move(config)) {
470✔
50
    probe_index();
235!
51
}
235✔
52

53
void TraceReader::probe_index() {
235✔
54
    idx_path_ = dft_internal::determine_index_path(config_.file_path,
470✔
55
                                                   config_.index_dir);
235✔
56
    has_index_ = fs::exists(idx_path_);
235!
57
    format_ = IndexerFactory::detect_format(config_.file_path);
235✔
58
}
235✔
59

60
bool TraceReader::has_index() const { return has_index_; }
111✔
61

62
void TraceReader::ensure_metadata_cached() {
19✔
63
    if (metadata_cached_) return;
19!
64

65
    if (has_index_) {
19✔
66
        auto reader = create_indexed_reader();
13✔
67
        cached_max_bytes_ = reader->get_max_bytes();
13!
68
        cached_num_lines_ = reader->get_num_lines();
13!
69
    } else if (format_ == ArchiveFormat::GZIP ||
19!
70
               format_ == ArchiveFormat::TAR_GZ) {
×
71
        cached_max_bytes_ = 0;
6✔
72
        cached_num_lines_ = 0;
6✔
73
    } else {
6✔
74
        std::error_code ec;
×
75
        auto size = fs::file_size(config_.file_path, ec);
×
76
        cached_max_bytes_ = ec ? 0 : static_cast<std::size_t>(size);
×
77
        cached_num_lines_ = 0;
×
78
    }
79
    metadata_cached_ = true;
19✔
80
}
19✔
81

82
std::size_t TraceReader::get_max_bytes() {
13✔
83
    ensure_metadata_cached();
13✔
84
    return cached_max_bytes_;
13✔
85
}
86

87
std::size_t TraceReader::get_num_lines() {
6✔
88
    ensure_metadata_cached();
6✔
89
    return cached_num_lines_;
6✔
90
}
91

92
std::shared_ptr<internal::Reader> TraceReader::create_indexed_reader() {
36✔
93
    auto indexer = IndexerFactory::create(config_.file_path, idx_path_,
72✔
94
                                          config_.checkpoint_size, false);
36✔
95
    return internal::ReaderFactory::create(indexer);
36!
96
}
36✔
97

98
internal::StreamType TraceReader::resolve_raw_stream_type(
6✔
99
    const ReadConfig& config) const {
100
    if (!config.line_aligned) return internal::StreamType::BYTES;
6!
101
    if (config.multi_line) return internal::StreamType::MULTI_LINES_BYTES;
6✔
102
    return internal::StreamType::LINE_BYTES;
1✔
103
}
6✔
104

105
internal::RangeType TraceReader::resolve_range_type(
23✔
106
    const ReadConfig& config) const {
107
    if (config.has_line_range()) return internal::RangeType::LINE_RANGE;
23✔
108
    return internal::RangeType::BYTE_RANGE;
18✔
109
}
23✔
110

111
coro::AsyncGenerator<Line> TraceReader::read_lines(ReadConfig config) {
17,430!
112
    std::optional<Query> query;
104✔
113
    if (!config.query.empty()) {
104✔
114
        auto parsed = Query::from_string(config.query);
25!
115
        if (!parsed) throw common::query::QueryParseError(parsed.error());
25!
116
        query = std::move(*parsed);
25!
117
    }
25✔
118

119
    if (has_index_) {
104✔
120
        auto reader = create_indexed_reader();
17!
121
        auto range_type = resolve_range_type(config);
17!
122
        std::size_t start =
34✔
123
            config.has_line_range() ? config.start_line : config.start_byte;
17!
124
        std::size_t end =
34✔
125
            config.has_line_range() ? config.end_line : config.end_byte;
17!
126

127
        if (range_type == internal::RangeType::LINE_RANGE) {
17✔
128
            auto total_lines = reader->get_num_lines();
5!
129
            if (start == 0) start = 1;
5!
130
            if (end == 0 || end > total_lines) end = total_lines;
5!
131
            if (start > total_lines) co_return;
109✔
132
        } else {
5✔
133
            auto max_bytes = reader->get_max_bytes();
12!
134
            if (end == 0 || end > max_bytes) end = max_bytes;
12✔
135
            if (start >= max_bytes) co_return;
12✔
136
        }
12✔
137

138
        if (query && !idx_path_.empty() &&
15!
139
            range_type == internal::RangeType::BYTE_RANGE) {
4✔
140
            ChunkPrunerInput pruner_input{idx_path_, config_.file_path, *query,
12!
141
                                          nullptr};
142
            ChunkPrunerUtility pruner;
12!
143
            auto pruner_out = co_await pruner.process(pruner_input);
16!
144
            if (pruner_out.success && !pruner_out.file_may_match) {
4!
145
                co_return;
1✔
146
            }
147
        }
4✔
148

149
        auto stream =
22✔
150
            reader->stream(internal::StreamConfig()
36✔
151
                               .stream_type(internal::StreamType::MULTI_LINES)
22✔
152
                               .range_type(range_type)
14!
153
                               .from(start)
14!
154
                               .to(end)
14!
155
                               .buffer_size(config.buffer_size));
14!
156

157
        std::size_t line_num = start;
6✔
158
        while (!stream->done()) {
20!
159
            auto chunk = co_await stream->read_async();
112!
160
            if (chunk.empty()) break;
2,216✔
161
            const char* data = chunk.data();
2,202✔
162
            std::size_t len = chunk.size();
2,202✔
163
            std::size_t pos = 0;
2,202✔
164
            while (pos < len) {
3,785✔
165
                const void* nl_ptr = std::memchr(data + pos, '\n', len - pos);
3,771!
166
                std::size_t end_pos =
7,542✔
167
                    nl_ptr ? static_cast<const char*>(nl_ptr) - data : len;
3,771!
168
                if (end_pos > pos) {
3,771!
169
                    auto line_sv = std::string_view(data + pos, end_pos - pos);
3,771✔
170
                    if (!query || line_matches_query(*query, line_sv)) {
3,771!
171
                        co_yield Line(line_sv, line_num);
4,376!
172
                    }
1,094✔
173
                    ++line_num;
1,583✔
174
                } else {
1,583!
UNCOV
175
                    ++line_num;
×
176
                }
177
                pos = end_pos + 1;
1,583✔
178
            }
1,583!
179
        }
2,216✔
180
    } else if (format_ == ArchiveFormat::GZIP ||
2,356!
181
               format_ == ArchiveFormat::TAR_GZ) {
1✔
182
        std::size_t start = config.has_line_range() ? config.start_line : 0;
86!
183
        std::size_t end = config.has_line_range() ? config.end_line : 0;
86!
184
        auto gen = fileio::lines::sources::async_streaming_gz_lines(
86!
185
            config_.file_path, start, end);
86!
186
        while (auto opt = co_await gen.next()) {
12,628!
187
            if (!query || line_matches_query(*query, opt->content)) {
3,070!
188
                co_yield *opt;
4,482!
189
            }
2,241✔
190
        }
3,162✔
191
    } else {
6,390✔
192
        std::size_t start = config.has_line_range() ? config.start_line : 0;
1!
193
        std::size_t end = config.has_line_range() ? config.end_line : 0;
1!
194
        auto gen = fileio::lines::sources::async_plain_file_lines(
1!
195
            config_.file_path, start, end);
1!
196
        while (auto opt = co_await gen.next()) {
2!
UNCOV
197
            if (!query || line_matches_query(*query, opt->content)) {
×
UNCOV
198
                co_yield *opt;
×
UNCOV
199
            }
×
200
        }
1!
201
    }
1!
202
}
23,455✔
203

204
coro::AsyncGenerator<std::span<const char>> TraceReader::read_raw(
619!
205
    ReadConfig config) {
24!
206
    if (has_index_) {
24✔
207
        auto reader = create_indexed_reader();
6!
208
        auto stream_type = resolve_raw_stream_type(config);
6✔
209
        auto range_type = resolve_range_type(config);
6!
210
        std::size_t start =
12✔
211
            config.has_line_range() ? config.start_line : config.start_byte;
6!
212
        std::size_t end =
12✔
213
            config.has_line_range() ? config.end_line : config.end_byte;
6!
214

215
        if (range_type == internal::RangeType::LINE_RANGE) {
6!
UNCOV
216
            auto total_lines = reader->get_num_lines();
×
UNCOV
217
            if (start == 0) start = 1;
×
UNCOV
218
            if (end == 0 || end > total_lines) end = total_lines;
×
219
            if (start > total_lines) co_return;
24!
UNCOV
220
        } else {
×
221
            auto max_bytes = reader->get_max_bytes();
6!
222
            if (end == 0 || end > max_bytes) end = max_bytes;
6!
223
            if (start >= max_bytes) co_return;
6!
224
        }
6!
225

226
        if (!config.query.empty() && !idx_path_.empty() &&
6!
227
            range_type == internal::RangeType::BYTE_RANGE) {
2✔
228
            auto parsed = Query::from_string(config.query);
6!
229
            if (!parsed) throw common::query::QueryParseError(parsed.error());
6!
230
            ChunkPrunerInput pruner_input{idx_path_, config_.file_path,
6!
231
                                          std::move(*parsed), nullptr};
6!
232
            ChunkPrunerUtility pruner;
6!
233
            auto pruner_out = co_await pruner.process(pruner_input);
8!
234
            if (pruner_out.success && !pruner_out.file_may_match) {
2!
235
                co_return;
1✔
236
            }
237
        }
2✔
238

239
        auto stream = reader->stream(internal::StreamConfig()
14✔
240
                                         .stream_type(stream_type)
9✔
241
                                         .range_type(range_type)
5!
242
                                         .from(start)
5!
243
                                         .to(end)
5!
244
                                         .buffer_size(config.buffer_size));
5!
245

246
        while (!stream->done()) {
107!
247
            auto chunk = co_await stream->read_async();
444!
248
            if (chunk.empty()) break;
111✔
249
            co_yield chunk;
212!
250
        }
111✔
251
    } else if (format_ == ArchiveFormat::GZIP ||
682!
UNCOV
252
               format_ == ArchiveFormat::TAR_GZ) {
×
253
        auto gen =
18✔
254
            fileio::lines::sources::async_streaming_gz_lines(config_.file_path);
18!
255
        std::size_t byte_pos = 0;
18✔
256
        while (auto opt = co_await gen.next()) {
3,666!
257
            const auto& line = *opt;
2,697✔
258
            std::size_t line_end = byte_pos + line.content.size() + 1;
2,697✔
259
            if (config.end_byte > 0 && byte_pos >= config.end_byte) break;
2,697✔
260
            if (line_end > config.start_byte) {
2,696✔
261
                co_yield std::span<const char>(line.content.data(),
6,289!
262
                                               line.content.size());
2,695✔
263
            }
898✔
264
            byte_pos = line_end;
899✔
265
        }
4,511✔
266
    } else {
52✔
UNCOV
267
        auto gen =
×
UNCOV
268
            fileio::lines::sources::async_plain_file_lines(config_.file_path);
×
UNCOV
269
        std::size_t byte_pos = 0;
×
UNCOV
270
        while (auto opt = co_await gen.next()) {
×
UNCOV
271
            const auto& line = *opt;
×
UNCOV
272
            std::size_t line_end = byte_pos + line.content.size() + 1;
×
UNCOV
273
            if (config.end_byte > 0 && byte_pos >= config.end_byte) break;
×
UNCOV
274
            if (line_end > config.start_byte) {
×
UNCOV
275
                co_yield std::span<const char>(line.content.data(),
×
UNCOV
276
                                               line.content.size());
×
UNCOV
277
            }
×
UNCOV
278
            byte_pos = line_end;
×
UNCOV
279
        }
×
UNCOV
280
    }
×
281
}
5,697✔
282

283
}  // namespace dftracer::utils::utilities::reader
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc