• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28423703495

30 Jun 2026 05:59AM UTC coverage: 51.998% (-0.3%) from 52.278%
28423703495

Pull #83

github

web-flow
Merge fb542a938 into 2efed6649
Pull Request #83: refactor and improve code QoL

37282 of 93303 branches covered (39.96%)

Branch coverage included in aggregate %.

801 of 1525 new or added lines in 78 files covered. (52.52%)

98 existing lines in 37 files now uncovered.

33674 of 43157 relevant lines covered (78.03%)

20306.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.67
/src/dftracer/utils/utilities/reader/internal/gzip_reader.cpp
1
#include <dftracer/utils/core/coro/task.h>
2
#include <dftracer/utils/core/utils/timer.h>
3
#include <dftracer/utils/utilities/indexer/internal/indexer.h>
4
#include <dftracer/utils/utilities/indexer/internal/indexer_factory.h>
5
#include <dftracer/utils/utilities/reader/error.h>
6
#include <dftracer/utils/utilities/reader/internal/gzip_reader.h>
7
#include <dftracer/utils/utilities/reader/internal/stream_config.h>
8
#include <dftracer/utils/utilities/reader/internal/streams/gzip_byte_stream.h>
9
#include <dftracer/utils/utilities/reader/internal/streams/gzip_line_byte_stream.h>
10
#include <dftracer/utils/utilities/reader/internal/streams/line_stream.h>
11
#include <dftracer/utils/utilities/reader/internal/streams/multi_line_stream.h>
12
#include <dftracer/utils/utilities/reader/internal/string_line_processor.h>
13

14
#include <cstdio>
15
#include <cstring>
16
#include <limits>
17
#include <string_view>
18

19
static void validate_parameters(
154,660✔
20
    const char *buffer, std::size_t buffer_size, std::size_t start_bytes,
21
    std::size_t end_bytes,
22
    std::size_t max_bytes = std::numeric_limits<std::size_t>::max()) {
23
    if (!buffer || buffer_size == 0) {
154,660!
24
        throw dftracer::utils::utilities::reader::ReaderError(
12!
25
            dftracer::utils::utilities::reader::ReaderError::INVALID_ARGUMENT,
UNCOV
26
            "Invalid buffer parameters");
×
27
    }
28
    if (start_bytes >= end_bytes) {
154,660✔
29
        throw dftracer::utils::utilities::reader::ReaderError(
20!
30
            dftracer::utils::utilities::reader::ReaderError::INVALID_ARGUMENT,
31
            "start_bytes must be less than end_bytes");
30!
32
    }
33
    if (max_bytes != SIZE_MAX) {
154,640✔
34
        if (end_bytes > max_bytes) {
154,640✔
35
            throw dftracer::utils::utilities::reader::ReaderError(
4!
36
                dftracer::utils::utilities::reader::ReaderError::
37
                    INVALID_ARGUMENT,
38
                "end_bytes exceeds maximum available bytes");
6!
39
        }
40
        if (start_bytes > max_bytes) {
154,636✔
NEW
41
            throw dftracer::utils::utilities::reader::ReaderError(
×
42
                dftracer::utils::utilities::reader::ReaderError::
43
                    INVALID_ARGUMENT,
44
                "start_bytes exceeds maximum available bytes");
×
45
        }
46
    }
77,315✔
47
}
154,648✔
48

49
static void check_reader_state(bool is_open, const void *indexer) {
160,237✔
50
    if (!is_open || !indexer) {
160,237!
NEW
51
        throw dftracer::utils::utilities::reader::ReaderError(
×
52
            dftracer::utils::utilities::reader::ReaderError::
53
                INITIALIZATION_ERROR,
NEW
54
            "Reader is not open");
×
55
    }
56
}
160,245✔
57

58
static constexpr std::size_t DEFAULT_READER_BUFFER_SIZE = 1 * 1024 * 1024;
59

60
namespace dftracer::utils::utilities::reader::internal {
61

62
GzipReader::GzipReader(const std::string &gz_path_,
1,902!
63
                       const std::string &idx_path_,
64
                       std::size_t index_ckpt_size)
472✔
65
    : gz_path(gz_path_),
948!
66
      index_path(idx_path_),
953!
67
      is_open(false),
951✔
68
      default_buffer_size(DEFAULT_READER_BUFFER_SIZE),
951✔
69
      indexer(nullptr) {
2,377!
70
    try {
71
        indexer = dftracer::utils::utilities::indexer::internal::
476✔
72
            IndexerFactory::create(gz_path, index_path, index_ckpt_size, false);
949✔
73
        is_open = true;
943✔
74

75
        DFTRACER_UTILS_LOG_DEBUG(
943!
76
            "Successfully created GZIP reader for gz: %s and index: %s",
77
            gz_path.c_str(), index_path.c_str());
78
    } catch (const std::exception &e) {
484!
79
        throw ReaderError(ReaderError::INITIALIZATION_ERROR,
18!
80
                          "Failed to initialize reader with indexer: " +
18!
81
                              std::string(e.what()));
18!
82
    }
12!
83
}
1,470✔
84

85
GzipReader::GzipReader(
639!
86
    std::shared_ptr<dftracer::utils::utilities::indexer::internal::Indexer>
87
        indexer_)
127✔
88
    : default_buffer_size(DEFAULT_READER_BUFFER_SIZE),
256✔
89
      indexer(std::move(indexer_)) {
511✔
90
    if (!indexer) {
255✔
91
        throw ReaderError(ReaderError::INITIALIZATION_ERROR,
×
92
                          "Invalid indexer provided");
×
93
    }
94
    is_open = true;
255✔
95
    gz_path = indexer->get_archive_path();
255!
96
    index_path = indexer->get_index_path();
256!
97
}
384✔
98

99
GzipReader::~GzipReader() {
1,800✔
100
    DFTRACER_UTILS_LOG_DEBUG("Destroying GZIP reader for gz: %s and index: %s",
1,200!
101
                             gz_path.c_str(), index_path.c_str());
102
    reset();
1,200!
103
    is_open = false;
1,200✔
104
}
1,800✔
105

106
GzipReader::GzipReader(GzipReader &&other) noexcept
×
107
    : gz_path(std::move(other.gz_path)),
×
108
      index_path(std::move(other.index_path)),
×
109
      is_open(other.is_open),
×
110
      default_buffer_size(other.default_buffer_size),
×
111
      indexer(std::move(other.indexer)) {
×
112
    other.is_open = false;
×
113
}
×
114

115
GzipReader &GzipReader::operator=(GzipReader &&other) noexcept {
×
116
    if (this != &other) {
×
117
        gz_path = std::move(other.gz_path);
×
118
        index_path = std::move(other.index_path);
×
119
        is_open = other.is_open;
×
120
        default_buffer_size = other.default_buffer_size;
×
121
        indexer = std::move(other.indexer);
×
122
        other.is_open = false;
×
123
    }
124
    return *this;
×
125
}
126

127
std::size_t GzipReader::get_max_bytes() const {
374✔
128
    check_reader_state(is_open, indexer.get());
374✔
129
    std::size_t max_bytes =
187✔
130
        static_cast<std::size_t>(indexer.get()->get_max_bytes());
374✔
131
    DFTRACER_UTILS_LOG_DEBUG("Maximum bytes available: %zu", max_bytes);
371!
132
    return max_bytes;
374✔
133
}
134

135
std::size_t GzipReader::get_num_lines() const {
110✔
136
    check_reader_state(is_open, indexer.get());
110✔
137
    std::size_t num_lines = static_cast<std::size_t>(indexer->get_num_lines());
110✔
138
    DFTRACER_UTILS_LOG_DEBUG("Total lines available: %zu", num_lines);
110!
139
    return num_lines;
110✔
140
}
141

142
const std::string &GzipReader::get_archive_path() const { return gz_path; }
14✔
143

144
const std::string &GzipReader::get_index_path() const { return index_path; }
8✔
145

146
void GzipReader::set_buffer_size(std::size_t size) {
×
147
    default_buffer_size = size;
×
148
}
×
149

150
void GzipReader::reset() {
1,208✔
151
    check_reader_state(is_open, indexer.get());
1,208✔
152
    stream_cache_.clear();
1,208✔
153
}
1,208✔
154

155
coro::CoroTask<std::size_t> GzipReader::read_async(std::size_t start_bytes,
74,892!
156
                                                   std::size_t end_bytes,
157
                                                   char *buffer,
158
                                                   std::size_t buffer_size) {
12,482!
159
    check_reader_state(is_open, indexer.get());
12,482!
160
    validate_parameters(buffer, buffer_size, start_bytes, end_bytes,
12,482✔
161
                        indexer.get()->get_max_bytes());
12,482!
162

163
    DFTRACER_UTILS_LOG_DEBUG(
12,470!
164
        "GzipReader::read - request: start_bytes=%zu, end_bytes=%zu, "
165
        "buffer_size=%zu",
166
        start_bytes, end_bytes, buffer_size);
167

168
    // Check if we can reuse cached stream
169
    if (!stream_cache_.can_continue(StreamType::BYTES, gz_path, start_bytes,
24,940!
170
                                    end_bytes)) {
12,470✔
171
        DFTRACER_UTILS_LOG_DEBUG("%s",
161!
172
                                 "GzipReader::read - creating new byte stream");
173
        auto new_stream = stream(StreamConfig()
322!
174
                                     .stream_type(StreamType::BYTES)
161!
175
                                     .range_type(RangeType::BYTE_RANGE)
161!
176
                                     .from(start_bytes)
161!
177
                                     .to(end_bytes));
161!
178
        stream_cache_.update(std::move(new_stream), StreamType::BYTES, gz_path,
322!
179
                             start_bytes, end_bytes);
161✔
180
    } else {
161✔
181
        DFTRACER_UTILS_LOG_DEBUG(
12,309!
182
            "%s", "GzipReader::read - reusing cached byte stream");
183
    }
184

185
    std::size_t result =
24,940✔
186
        co_await stream_cache_.get()->read_async(buffer, buffer_size);
37,422!
187
    DFTRACER_UTILS_LOG_DEBUG("GzipReader::read - returned %zu bytes", result);
12,470!
188

189
    // Update position for next potential read
190
    stream_cache_.update_position(start_bytes + result);
12,470!
191

192
    co_return result;
12,470!
193
}
62,386!
194

195
coro::CoroTask<std::size_t> GzipReader::read_line_bytes_async(
518,766!
196
    std::size_t start_bytes, std::size_t end_bytes, char *buffer,
197
    std::size_t buffer_size) {
64,845!
198
    check_reader_state(is_open, indexer.get());
194,535✔
199

200
    if (end_bytes > indexer.get()->get_max_bytes()) {
64,845!
201
        end_bytes = indexer.get()->get_max_bytes();
×
202
    }
203

204
    validate_parameters(buffer, buffer_size, start_bytes, end_bytes,
64,845!
205
                        indexer.get()->get_max_bytes());
64,845!
206

207
    // Check if we can reuse cached stream
208
    if (!stream_cache_.can_continue(StreamType::MULTI_LINES_BYTES, gz_path,
129,690✔
209
                                    start_bytes, end_bytes)) {
64,845✔
210
        auto new_stream = stream(StreamConfig()
2,032!
211
                                     .stream_type(StreamType::MULTI_LINES_BYTES)
1,016!
212
                                     .range_type(RangeType::BYTE_RANGE)
1,016!
213
                                     .from(start_bytes)
1,016!
214
                                     .to(end_bytes));
1,016!
215
        stream_cache_.update(std::move(new_stream),
2,032!
216
                             StreamType::MULTI_LINES_BYTES, gz_path,
1,016✔
217
                             start_bytes, end_bytes);
1,016✔
218
    }
1,016✔
219

220
    std::size_t result =
259,380✔
221
        co_await stream_cache_.get()->read_async(buffer, buffer_size);
324,225!
222

223
    // Update position for next potential read
224
    stream_cache_.update_position(start_bytes + result);
64,845!
225

226
    co_return result;
64,845!
227
}
842,997!
228

229
coro::CoroTask<std::string> GzipReader::read_lines_async(std::size_t start_line,
2,005!
230
                                                         std::size_t end_line) {
163!
231
    check_reader_state(is_open, indexer.get());
163!
232

233
    if (start_line == 0 || end_line == 0) {
163✔
234
        throw ReaderError(ReaderError::INVALID_ARGUMENT,
6!
235
                          "Line numbers must be 1-based (start from 1)");
6!
236
    }
237

238
    if (start_line > end_line) {
157✔
239
        throw ReaderError(ReaderError::INVALID_ARGUMENT,
3!
240
                          "Start line must be <= end line");
3!
241
    }
242

243
    std::size_t total_lines = indexer.get()->get_num_lines();
154!
244
    if (start_line > total_lines || end_line > total_lines) {
154✔
245
        throw ReaderError(ReaderError::INVALID_ARGUMENT,
4!
246
                          "Line numbers exceed total lines in file (" +
2!
247
                              std::to_string(total_lines) + ")");
4!
248
    }
249

250
    // Check if we can reuse cached stream
251
    if (!stream_cache_.can_continue(StreamType::MULTI_LINES, gz_path,
304!
252
                                    start_line, end_line)) {
152✔
253
        auto new_stream = stream(StreamConfig()
304!
254
                                     .stream_type(StreamType::MULTI_LINES)
152!
255
                                     .range_type(RangeType::LINE_RANGE)
152!
256
                                     .from(start_line)
152!
257
                                     .to(end_line));
152!
258
        stream_cache_.update(std::move(new_stream), StreamType::MULTI_LINES,
304!
259
                             gz_path, start_line, end_line);
152✔
260
    }
152✔
261

262
    std::string result;
152✔
263
    // Pre-allocate to avoid reallocations (like old StringLineProcessor)
264
    std::size_t estimated_lines = end_line - start_line + 1;
152✔
265
    result.reserve(estimated_lines * 100);  // Estimate ~100 bytes per line
152!
266

267
    std::vector<char> buffer(default_buffer_size);
152!
268

269
    while (!stream_cache_.get()->done()) {
561!
270
        std::size_t bytes_read = co_await stream_cache_.get()->read_async(
1,799!
271
            buffer.data(), buffer.size());
409✔
272
        if (bytes_read == 0) break;
409!
273

274
        result.append(buffer.data(), bytes_read);
409!
275
    }
409✔
276

277
    co_return result;
1,180!
278
}
3,468!
279

280
coro::CoroTask<void> GzipReader::read_lines_with_processor_async(
×
281
    std::size_t start_line, std::size_t end_line, LineProcessor &processor) {
×
282
    check_reader_state(is_open, indexer.get());
×
283

284
    if (start_line == 0 || end_line == 0) {
×
285
        throw ReaderError(ReaderError::INVALID_ARGUMENT,
×
286
                          "Line numbers must be 1-based (start from 1)");
×
287
    }
288

289
    if (start_line > end_line) {
×
290
        throw ReaderError(ReaderError::INVALID_ARGUMENT,
×
291
                          "Start line must be <= end line");
×
292
    }
293

294
    std::size_t total_lines = indexer.get()->get_num_lines();
×
295
    if (start_line > total_lines || end_line > total_lines) {
×
296
        throw ReaderError(ReaderError::INVALID_ARGUMENT,
×
297
                          "Line numbers exceed total lines in file (" +
×
298
                              std::to_string(total_lines) + ")");
×
299
    }
300

301
    processor.begin(start_line, end_line);
×
302

303
    // Create a LineStream that returns one line at a time
304
    auto line_stream = stream(StreamConfig()
×
305
                                  .stream_type(StreamType::LINE)
×
306
                                  .range_type(RangeType::LINE_RANGE)
×
307
                                  .from(start_line)
×
308
                                  .to(end_line));
×
309

310
    std::vector<char> buffer(default_buffer_size);
×
311

312
    while (!line_stream->done()) {
×
313
        std::size_t bytes_read =
314
            co_await line_stream->read_async(buffer.data(), buffer.size());
×
315
        if (bytes_read == 0) break;
×
316

317
        // LineStream returns one complete line with \n
318
        // Processor expects line without \n
319
        std::size_t line_length = bytes_read;
320
        if (line_length > 0 && buffer[line_length - 1] == '\n') {
×
321
            line_length--;
322
        }
323

324
        if (!co_await processor.process(buffer.data(), line_length)) {
×
325
            processor.end();
×
326
            co_return;
327
        }
328
    }
×
329

330
    processor.end();
×
331
}
×
332

333
coro::CoroTask<void> GzipReader::read_line_bytes_with_processor_async(
×
334
    std::size_t start_bytes, std::size_t end_bytes, LineProcessor &processor) {
×
335
    check_reader_state(is_open, indexer.get());
×
336

337
    if (end_bytes > indexer.get()->get_max_bytes()) {
×
338
        end_bytes = indexer.get()->get_max_bytes();
×
339
    }
340

341
    if (start_bytes >= end_bytes) {
×
342
        co_return;
343
    }
344

345
    processor.begin(start_bytes, end_bytes);
×
346

347
    auto lines_stream = stream(StreamConfig()
×
348
                                   .stream_type(StreamType::LINE_BYTES)
×
349
                                   .range_type(RangeType::BYTE_RANGE)
×
350
                                   .from(start_bytes)
×
351
                                   .to(end_bytes));
×
352

353
    std::vector<char> buffer(default_buffer_size);
×
354

355
    while (!lines_stream->done()) {
×
356
        std::size_t bytes_read =
357
            co_await lines_stream->read_async(buffer.data(), buffer.size());
×
358
        if (bytes_read == 0) break;
×
359
        co_await processor.process(buffer.data(), bytes_read);
×
360
    }
×
361

362
    processor.end();
×
363
}
×
364

365
bool GzipReader::is_valid() const { return is_open && indexer.get(); }
74!
366

367
std::string GzipReader::get_format_name() const { return "GZIP"; }
2!
368

369
std::unique_ptr<ReaderStream> GzipReader::stream(const StreamConfig &config) {
3,559✔
370
    check_reader_state(is_open, indexer.get());
3,559!
371

372
    // Extract config parameters
373
    StreamType stream_type = config.stream_type();
3,570✔
374
    RangeType range_type = config.range_type();
3,558✔
375
    std::size_t start = config.start();
3,559✔
376
    std::size_t end = config.end();
3,564✔
377
    std::size_t buffer_size = config.buffer_size();
3,561✔
378
    bool extend_to_line_boundary = config.extend_to_line_boundary();
3,560✔
379

380
    // Convert line range to byte range if needed
381
    std::size_t start_bytes = start;
3,559✔
382
    std::size_t end_bytes = end;
3,559✔
383
    std::size_t actual_start_line =
3,559✔
384
        1;  // Track what line number start_bytes corresponds to
385

386
    if (range_type == RangeType::LINE_RANGE) {
3,559✔
387
        // Convert line numbers to byte offsets using checkpoints
388
        if (start == 0 || end == 0) {
451!
389
            throw ReaderError(ReaderError::INVALID_ARGUMENT,
×
390
                              "Line numbers must be 1-based (start from 1)");
1!
391
        }
392
        if (start > end) {
450✔
393
            throw ReaderError(ReaderError::INVALID_ARGUMENT,
×
394
                              "Start line must be <= end line");
×
395
        }
396

397
        std::size_t total_lines = indexer->get_num_lines();
450!
398
        if (start > total_lines || end > total_lines) {
452!
399
            throw ReaderError(ReaderError::INVALID_ARGUMENT,
×
400
                              "Line numbers exceed total lines in file (" +
×
401
                                  std::to_string(total_lines) + ")");
1!
402
        }
403

404
        // Get checkpoints for the line range
405
        std::vector<
406
            dftracer::utils::utilities::indexer::internal::IndexerCheckpoint>
407
            checkpoints = indexer->get_checkpoints_for_line_range(start, end);
451!
408

409
        DFTRACER_UTILS_LOG_DEBUG("Line range %zu-%zu: found %zu checkpoints",
451!
410
                                 start, end, checkpoints.size());
411

412
        if (checkpoints.empty()) {
452✔
413
            // No checkpoints, read from beginning
414
            start_bytes = 0;
180✔
415
            end_bytes = indexer->get_max_bytes();
180!
416
            actual_start_line = 1;
180✔
417
            DFTRACER_UTILS_LOG_DEBUG(
180!
418
                "No checkpoints found, using full file: start_bytes=%zu, "
419
                "end_bytes=%zu, max_bytes=%zu",
420
                start_bytes, end_bytes, indexer->get_max_bytes());
421
        } else {
90✔
422
            // Use checkpoint to determine byte range.
423
            //
424
            // Checkpoint uc_offset values fall at deflate block boundaries
425
            // which may land in the middle of a text line.  When we start
426
            // decompressing from such a mid-line position the first "line"
427
            // seen by MultiLineStream is a partial fragment.  If
428
            // actual_start_line == start_line the fragment is emitted as
429
            // the requested first line, producing wrong content.
430
            //
431
            // To avoid this we choose a checkpoint whose last_line_num is
432
            // strictly less than (start - 1), guaranteeing
433
            // actual_start_line < start.  MultiLineStream then filters
434
            // out the (potentially partial) early lines before reaching
435
            // the requested range.
436
            auto all_checkpoints = indexer->get_checkpoints();
271!
437
            bool found_start = false;
272✔
438

439
            // Walk checkpoints from the end to find the latest one whose
440
            // line range ends before (start - 1).
441
            for (auto it = all_checkpoints.rbegin();
11,778✔
442
                 it != all_checkpoints.rend(); ++it) {
17,071!
443
                if (it->last_line_num < start - 1) {
11,253!
444
                    start_bytes = it->uc_offset;
139!
445
                    actual_start_line = it->last_line_num + 1;
139!
446
                    found_start = true;
139✔
447
                    break;
139✔
448
                }
449
            }
5,685✔
450

451
            if (!found_start) {
272✔
452
                // No suitable checkpoint found -- start from beginning
453
                start_bytes = 0;
133✔
454
                actual_start_line = 1;
133✔
455
            }
67✔
456

457
            const auto &last_checkpoint = checkpoints.back();
272✔
458
            end_bytes = last_checkpoint.uc_offset + last_checkpoint.uc_size;
272✔
459

460
            DFTRACER_UTILS_LOG_DEBUG(
272!
461
                "Using checkpoints: matched_first_idx=%zu "
462
                "(first_line=%zu, last_line=%zu), "
463
                "end_checkpoint_idx=%zu (first_line=%zu, last_line=%zu), "
464
                "byte_range=%zu-%zu, actual_start_line=%zu",
465
                checkpoints[0].checkpoint_idx, checkpoints[0].first_line_num,
466
                checkpoints[0].last_line_num, last_checkpoint.checkpoint_idx,
467
                last_checkpoint.first_line_num, last_checkpoint.last_line_num,
468
                start_bytes, end_bytes, actual_start_line);
469
        }
272✔
470
    }
452✔
471

472
    // Create appropriate stream type
473
    switch (stream_type) {
3,560!
474
        case StreamType::BYTES: {
193✔
475
            auto byte_stream = std::make_unique<GzipByteStream>(buffer_size);
386!
476
            byte_stream->initialize(gz_path, start_bytes, end_bytes, *indexer);
386!
477
            return byte_stream;
386✔
478
        }
386✔
479
        case StreamType::LINE_BYTES: {
154✔
480
            // Single line-aligned bytes at a time
481
            auto line_byte_stream =
482
                std::make_unique<GzipLineByteStream>(buffer_size);
314!
483
            line_byte_stream->set_extend_to_line_boundary(
479!
484
                extend_to_line_boundary);
160✔
485
            line_byte_stream->initialize(gz_path, start_bytes, end_bytes,
639!
486
                                         *indexer);
320✔
487

488
            // Wrap with LineStream to return one line-aligned chunk at a time
489
            if (range_type == RangeType::LINE_RANGE) {
320✔
490
                return std::make_unique<LineStream>(
6!
491
                    std::move(line_byte_stream), start, end, actual_start_line);
4✔
492
            } else {
493
                return std::make_unique<LineStream>(
474!
494
                    std::move(line_byte_stream));
316✔
495
            }
496
        }
320✔
497
        case StreamType::MULTI_LINES_BYTES: {
1,190✔
498
            // Multiple line-aligned bytes per read
499
            auto line_byte_stream =
500
                std::make_unique<GzipLineByteStream>(buffer_size);
2,390!
501
            line_byte_stream->set_extend_to_line_boundary(
3,595!
502
                extend_to_line_boundary);
1,200✔
503
            line_byte_stream->initialize(gz_path, start_bytes, end_bytes,
4,795!
504
                                         *indexer);
2,396✔
505
            return line_byte_stream;
2,398✔
506
        }
2,398✔
507
        case StreamType::LINE: {
54✔
508
            // Single parsed line per read
509
            auto line_byte_stream =
510
                std::make_unique<GzipLineByteStream>(buffer_size);
108!
511
            line_byte_stream->set_extend_to_line_boundary(
162!
512
                extend_to_line_boundary);
54✔
513
            line_byte_stream->initialize(gz_path, start_bytes, end_bytes,
216!
514
                                         *indexer);
108✔
515

516
            if (range_type == RangeType::LINE_RANGE) {
108✔
517
                return std::make_unique<LineStream>(
159!
518
                    std::move(line_byte_stream), start, end, actual_start_line);
106✔
519
            } else {
520
                return std::make_unique<LineStream>(
3!
521
                    std::move(line_byte_stream));
2✔
522
            }
523
        }
108✔
524
        case StreamType::MULTI_LINES: {
181✔
525
            // Multiple parsed lines per read
526
            auto line_byte_stream =
527
                std::make_unique<GzipLineByteStream>(buffer_size);
362!
528
            line_byte_stream->set_extend_to_line_boundary(
543!
529
                extend_to_line_boundary);
181✔
530
            line_byte_stream->initialize(gz_path, start_bytes, end_bytes,
724!
531
                                         *indexer);
362✔
532

533
            if (range_type == RangeType::LINE_RANGE) {
362✔
534
                return std::make_unique<MultiLineStream>(
507!
535
                    std::move(line_byte_stream), start, end, actual_start_line);
338✔
536
            } else {
537
                return std::make_unique<MultiLineStream>(
36!
538
                    std::move(line_byte_stream));
24✔
539
            }
540
        }
362✔
541
        default:
542
            throw ReaderError(ReaderError::INVALID_ARGUMENT,
×
543
                              "Invalid stream type");
×
544
    }
545
}
1,788✔
546

547
}  // namespace dftracer::utils::utilities::reader::internal
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc