• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28286012595

27 Jun 2026 10:04AM UTC coverage: 51.056% (-1.3%) from 52.356%
28286012595

Pull #79

github

web-flow
Merge 6c6535a19 into 8eb383f39
Pull Request #79: Add Valgrind memory checking (C++, Python, MPI) and fix the bugs it found

32079 of 80165 branches covered (40.02%)

Branch coverage included in aggregate %.

129 of 149 new or added lines in 11 files covered. (86.58%)

5116 existing lines in 181 files now uncovered.

32739 of 46790 relevant lines covered (69.97%)

9929.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.26
/src/dftracer/utils/utilities/reader/internal/gzip_reader.cpp
1
#include <dftracer/utils/core/coro/task.h>
2
#include <dftracer/utils/core/utils/timer.h>
3
#include <dftracer/utils/utilities/indexer/internal/indexer.h>
4
#include <dftracer/utils/utilities/indexer/internal/indexer_factory.h>
5
#include <dftracer/utils/utilities/reader/internal/error.h>
6
#include <dftracer/utils/utilities/reader/internal/gzip_reader.h>
7
#include <dftracer/utils/utilities/reader/internal/stream_config.h>
8
#include <dftracer/utils/utilities/reader/internal/streams/gzip_byte_stream.h>
9
#include <dftracer/utils/utilities/reader/internal/streams/gzip_line_byte_stream.h>
10
#include <dftracer/utils/utilities/reader/internal/streams/line_stream.h>
11
#include <dftracer/utils/utilities/reader/internal/streams/multi_line_stream.h>
12
#include <dftracer/utils/utilities/reader/internal/string_line_processor.h>
13

14
#include <cstdio>
15
#include <cstring>
16
#include <limits>
17
#include <string_view>
18

19
static void validate_parameters(
77,315✔
20
    const char *buffer, std::size_t buffer_size, std::size_t start_bytes,
21
    std::size_t end_bytes,
22
    std::size_t max_bytes = std::numeric_limits<std::size_t>::max()) {
23
    if (!buffer || buffer_size == 0) {
77,315!
24
        throw dftracer::utils::utilities::reader::internal::ReaderError(
12!
25
            dftracer::utils::utilities::reader::internal::ReaderError::
26
                INVALID_ARGUMENT,
27
            "Invalid buffer parameters");
×
28
    }
29
    if (start_bytes >= end_bytes) {
77,315✔
30
        throw dftracer::utils::utilities::reader::internal::ReaderError(
10!
31
            dftracer::utils::utilities::reader::internal::ReaderError::
32
                INVALID_ARGUMENT,
33
            "start_bytes must be less than end_bytes");
10!
34
    }
35
    if (max_bytes != SIZE_MAX) {
77,305!
36
        if (end_bytes > max_bytes) {
77,305✔
37
            throw dftracer::utils::utilities::reader::internal::ReaderError(
2!
38
                dftracer::utils::utilities::reader::internal::ReaderError::
39
                    INVALID_ARGUMENT,
40
                "end_bytes exceeds maximum available bytes");
2!
41
        }
42
        if (start_bytes > max_bytes) {
77,303!
43
            throw dftracer::utils::utilities::reader::internal::ReaderError(
×
44
                dftracer::utils::utilities::reader::internal::ReaderError::
45
                    INVALID_ARGUMENT,
46
                "start_bytes exceeds maximum available bytes");
×
47
        }
48
    }
77,303✔
49
}
77,315✔
50

51
static void check_reader_state(bool is_open, const void *indexer) {
80,109✔
52
    if (!is_open || !indexer) {
80,109!
53
        throw std::runtime_error("Reader is not open");
×
54
    }
55
}
80,109✔
56

57
static constexpr std::size_t DEFAULT_READER_BUFFER_SIZE = 1 * 1024 * 1024;
58

59
namespace dftracer::utils::utilities::reader::internal {
60

61
GzipReader::GzipReader(const std::string &gz_path_,
1,426!
62
                       const std::string &idx_path_,
63
                       std::size_t index_ckpt_size)
64
    : gz_path(gz_path_),
474!
65
      index_path(idx_path_),
474!
66
      is_open(false),
474✔
67
      default_buffer_size(DEFAULT_READER_BUFFER_SIZE),
474✔
68
      indexer(nullptr) {
1,426✔
69
    try {
70
        indexer = dftracer::utils::utilities::indexer::internal::
474✔
71
            IndexerFactory::create(gz_path, index_path, index_ckpt_size, false);
474✔
72
        is_open = true;
468✔
73

74
        DFTRACER_UTILS_LOG_DEBUG(
75
            "Successfully created GZIP reader for gz: %s and index: %s",
76
            gz_path.c_str(), index_path.c_str());
77
    } catch (const std::exception &e) {
474!
78
        throw ReaderError(ReaderError::INITIALIZATION_ERROR,
12!
79
                          "Failed to initialize reader with indexer: " +
6!
80
                              std::string(e.what()));
6!
81
    }
6!
82
}
964✔
83

84
GzipReader::GzipReader(
512!
85
    std::shared_ptr<dftracer::utils::utilities::indexer::internal::Indexer>
86
        indexer_)
87
    : default_buffer_size(DEFAULT_READER_BUFFER_SIZE),
128✔
88
      indexer(std::move(indexer_)) {
384✔
89
    if (!indexer) {
128!
90
        throw ReaderError(ReaderError::INITIALIZATION_ERROR,
×
91
                          "Invalid indexer provided");
×
92
    }
93
    is_open = true;
128✔
94
    gz_path = indexer->get_archive_path();
128!
95
    index_path = indexer->get_index_path();
128!
96
}
256✔
97

98
GzipReader::~GzipReader() {
1,200✔
99
    DFTRACER_UTILS_LOG_DEBUG("Destroying GZIP reader for gz: %s and index: %s",
100
                             gz_path.c_str(), index_path.c_str());
101
    reset();
600!
102
    is_open = false;
600✔
103
}
1,200✔
104

105
GzipReader::GzipReader(GzipReader &&other) noexcept
×
106
    : gz_path(std::move(other.gz_path)),
×
107
      index_path(std::move(other.index_path)),
×
108
      is_open(other.is_open),
×
109
      default_buffer_size(other.default_buffer_size),
×
110
      indexer(std::move(other.indexer)) {
×
111
    other.is_open = false;
×
112
}
×
113

114
GzipReader &GzipReader::operator=(GzipReader &&other) noexcept {
×
115
    if (this != &other) {
×
116
        gz_path = std::move(other.gz_path);
×
117
        index_path = std::move(other.index_path);
×
118
        is_open = other.is_open;
×
119
        default_buffer_size = other.default_buffer_size;
×
120
        indexer = std::move(other.indexer);
×
121
        other.is_open = false;
×
UNCOV
122
    }
×
123
    return *this;
×
124
}
125

126
std::size_t GzipReader::get_max_bytes() const {
184✔
127
    check_reader_state(is_open, indexer.get());
184✔
128
    std::size_t max_bytes =
184✔
129
        static_cast<std::size_t>(indexer.get()->get_max_bytes());
184✔
130
    DFTRACER_UTILS_LOG_DEBUG("Maximum bytes available: %zu", max_bytes);
131
    return max_bytes;
184✔
132
}
133

134
std::size_t GzipReader::get_num_lines() const {
55✔
135
    check_reader_state(is_open, indexer.get());
55✔
136
    std::size_t num_lines = static_cast<std::size_t>(indexer->get_num_lines());
55✔
137
    DFTRACER_UTILS_LOG_DEBUG("Total lines available: %zu", num_lines);
138
    return num_lines;
55✔
139
}
140

141
const std::string &GzipReader::get_archive_path() const { return gz_path; }
7✔
142

143
const std::string &GzipReader::get_index_path() const { return index_path; }
4✔
144

145
void GzipReader::set_buffer_size(std::size_t size) {
×
146
    default_buffer_size = size;
×
147
}
×
148

149
void GzipReader::reset() {
604✔
150
    check_reader_state(is_open, indexer.get());
604✔
151
    stream_cache_.clear();
604✔
152
}
604✔
153

154
coro::CoroTask<std::size_t> GzipReader::read_async(std::size_t start_bytes,
62,410!
155
                                                   std::size_t end_bytes,
156
                                                   char *buffer,
157
                                                   std::size_t buffer_size) {
12,482!
158
    check_reader_state(is_open, indexer.get());
12,482!
159
    validate_parameters(buffer, buffer_size, start_bytes, end_bytes,
12,482✔
160
                        indexer.get()->get_max_bytes());
12,482!
161

162
    DFTRACER_UTILS_LOG_DEBUG(
163
        "GzipReader::read - request: start_bytes=%zu, end_bytes=%zu, "
164
        "buffer_size=%zu",
165
        start_bytes, end_bytes, buffer_size);
166

167
    // Check if we can reuse cached stream
168
    if (!stream_cache_.can_continue(StreamType::BYTES, gz_path, start_bytes,
24,940!
169
                                    end_bytes)) {
12,470✔
170
        DFTRACER_UTILS_LOG_DEBUG("%s",
171
                                 "GzipReader::read - creating new byte stream");
172
        auto new_stream = stream(StreamConfig()
322!
173
                                     .stream_type(StreamType::BYTES)
161!
174
                                     .range_type(RangeType::BYTE_RANGE)
161!
175
                                     .from(start_bytes)
161!
176
                                     .to(end_bytes));
161!
177
        stream_cache_.update(std::move(new_stream), StreamType::BYTES, gz_path,
322!
178
                             start_bytes, end_bytes);
161✔
179
    } else {
161✔
180
        DFTRACER_UTILS_LOG_DEBUG(
181
            "%s", "GzipReader::read - reusing cached byte stream");
182
    }
183

184
    std::size_t result =
24,940✔
185
        co_await stream_cache_.get()->read_async(buffer, buffer_size);
37,422!
186
    DFTRACER_UTILS_LOG_DEBUG("GzipReader::read - returned %zu bytes", result);
187

188
    // Update position for next potential read
189
    stream_cache_.update_position(start_bytes + result);
12,470!
190

191
    co_return result;
12,470!
192
}
37,422!
193

194
coro::CoroTask<std::size_t> GzipReader::read_line_bytes_async(
453,831!
195
    std::size_t start_bytes, std::size_t end_bytes, char *buffer,
196
    std::size_t buffer_size) {
64,833!
197
    check_reader_state(is_open, indexer.get());
194,499✔
198

199
    if (end_bytes > indexer.get()->get_max_bytes()) {
64,833!
UNCOV
200
        end_bytes = indexer.get()->get_max_bytes();
×
UNCOV
201
    }
×
202

203
    validate_parameters(buffer, buffer_size, start_bytes, end_bytes,
64,833!
204
                        indexer.get()->get_max_bytes());
64,833!
205

206
    // Check if we can reuse cached stream
207
    if (!stream_cache_.can_continue(StreamType::MULTI_LINES_BYTES, gz_path,
129,666✔
208
                                    start_bytes, end_bytes)) {
64,833✔
209
        auto new_stream = stream(StreamConfig()
2,032!
210
                                     .stream_type(StreamType::MULTI_LINES_BYTES)
1,016!
211
                                     .range_type(RangeType::BYTE_RANGE)
1,016!
212
                                     .from(start_bytes)
1,016!
213
                                     .to(end_bytes));
1,016!
214
        stream_cache_.update(std::move(new_stream),
2,032!
215
                             StreamType::MULTI_LINES_BYTES, gz_path,
1,016✔
216
                             start_bytes, end_bytes);
1,016✔
217
    }
1,016✔
218

219
    std::size_t result =
259,332✔
220
        co_await stream_cache_.get()->read_async(buffer, buffer_size);
324,165!
221

222
    // Update position for next potential read
223
    stream_cache_.update_position(start_bytes + result);
64,833!
224

225
    co_return result;
64,833!
226
}
713,163!
227

228
coro::CoroTask<std::string> GzipReader::read_lines_async(std::size_t start_line,
1,843!
229
                                                         std::size_t end_line) {
163✔
230
    check_reader_state(is_open, indexer.get());
163!
231

232
    if (start_line == 0 || end_line == 0) {
163✔
233
        throw std::runtime_error("Line numbers must be 1-based (start from 1)");
6!
234
    }
235

236
    if (start_line > end_line) {
157✔
237
        throw std::runtime_error("Start line must be <= end line");
3!
238
    }
239

240
    std::size_t total_lines = indexer.get()->get_num_lines();
154!
241
    if (start_line > total_lines || end_line > total_lines) {
154✔
242
        throw std::runtime_error("Line numbers exceed total lines in file (" +
4!
243
                                 std::to_string(total_lines) + ")");
4!
244
    }
245

246
    // Check if we can reuse cached stream
247
    if (!stream_cache_.can_continue(StreamType::MULTI_LINES, gz_path,
304!
248
                                    start_line, end_line)) {
152✔
249
        auto new_stream = stream(StreamConfig()
304!
250
                                     .stream_type(StreamType::MULTI_LINES)
152!
251
                                     .range_type(RangeType::LINE_RANGE)
152!
252
                                     .from(start_line)
152!
253
                                     .to(end_line));
152!
254
        stream_cache_.update(std::move(new_stream), StreamType::MULTI_LINES,
304!
255
                             gz_path, start_line, end_line);
152✔
256
    }
152✔
257

258
    std::string result;
152✔
259
    // Pre-allocate to avoid reallocations (like old StringLineProcessor)
260
    std::size_t estimated_lines = end_line - start_line + 1;
152✔
261
    result.reserve(estimated_lines * 100);  // Estimate ~100 bytes per line
152!
262

263
    std::vector<char> buffer(default_buffer_size);
152!
264

265
    while (!stream_cache_.get()->done()) {
561!
266
        std::size_t bytes_read = co_await stream_cache_.get()->read_async(
1,799!
267
            buffer.data(), buffer.size());
409✔
268
        if (bytes_read == 0) break;
409!
269

270
        result.append(buffer.data(), bytes_read);
409!
271
    }
409✔
272

273
    co_return result;
1,180!
274
}
3,133✔
275

276
coro::CoroTask<void> GzipReader::read_lines_with_processor_async(
×
UNCOV
277
    std::size_t start_line, std::size_t end_line, LineProcessor &processor) {
×
UNCOV
278
    check_reader_state(is_open, indexer.get());
×
279

UNCOV
280
    if (start_line == 0 || end_line == 0) {
×
UNCOV
281
        throw std::runtime_error("Line numbers must be 1-based (start from 1)");
×
282
    }
283

UNCOV
284
    if (start_line > end_line) {
×
UNCOV
285
        throw std::runtime_error("Start line must be <= end line");
×
286
    }
287

UNCOV
288
    std::size_t total_lines = indexer.get()->get_num_lines();
×
UNCOV
289
    if (start_line > total_lines || end_line > total_lines) {
×
UNCOV
290
        throw std::runtime_error("Line numbers exceed total lines in file (" +
×
UNCOV
291
                                 std::to_string(total_lines) + ")");
×
292
    }
293

UNCOV
294
    processor.begin(start_line, end_line);
×
295

296
    // Create a LineStream that returns one line at a time
UNCOV
297
    auto line_stream = stream(StreamConfig()
×
UNCOV
298
                                  .stream_type(StreamType::LINE)
×
UNCOV
299
                                  .range_type(RangeType::LINE_RANGE)
×
UNCOV
300
                                  .from(start_line)
×
UNCOV
301
                                  .to(end_line));
×
302

UNCOV
303
    std::vector<char> buffer(default_buffer_size);
×
304

UNCOV
305
    while (!line_stream->done()) {
×
UNCOV
306
        std::size_t bytes_read =
×
UNCOV
307
            co_await line_stream->read_async(buffer.data(), buffer.size());
×
UNCOV
308
        if (bytes_read == 0) break;
×
309

310
        // LineStream returns one complete line with \n
311
        // Processor expects line without \n
UNCOV
312
        std::size_t line_length = bytes_read;
×
UNCOV
313
        if (line_length > 0 && buffer[line_length - 1] == '\n') {
×
UNCOV
314
            line_length--;
×
UNCOV
315
        }
×
316

UNCOV
317
        if (!co_await processor.process(buffer.data(), line_length)) {
×
UNCOV
318
            processor.end();
×
UNCOV
319
            co_return;
×
320
        }
UNCOV
321
    }
×
322

UNCOV
323
    processor.end();
×
324
}
×
325

326
coro::CoroTask<void> GzipReader::read_line_bytes_with_processor_async(
×
UNCOV
327
    std::size_t start_bytes, std::size_t end_bytes, LineProcessor &processor) {
×
UNCOV
328
    check_reader_state(is_open, indexer.get());
×
329

UNCOV
330
    if (end_bytes > indexer.get()->get_max_bytes()) {
×
UNCOV
331
        end_bytes = indexer.get()->get_max_bytes();
×
UNCOV
332
    }
×
333

UNCOV
334
    if (start_bytes >= end_bytes) {
×
UNCOV
335
        co_return;
×
336
    }
337

UNCOV
338
    processor.begin(start_bytes, end_bytes);
×
339

UNCOV
340
    auto lines_stream = stream(StreamConfig()
×
UNCOV
341
                                   .stream_type(StreamType::LINE_BYTES)
×
UNCOV
342
                                   .range_type(RangeType::BYTE_RANGE)
×
UNCOV
343
                                   .from(start_bytes)
×
UNCOV
344
                                   .to(end_bytes));
×
345

UNCOV
346
    std::vector<char> buffer(default_buffer_size);
×
347

UNCOV
348
    while (!lines_stream->done()) {
×
UNCOV
349
        std::size_t bytes_read =
×
UNCOV
350
            co_await lines_stream->read_async(buffer.data(), buffer.size());
×
UNCOV
351
        if (bytes_read == 0) break;
×
UNCOV
352
        co_await processor.process(buffer.data(), bytes_read);
×
UNCOV
353
    }
×
354

UNCOV
355
    processor.end();
×
356
}
×
357

358
bool GzipReader::is_valid() const { return is_open && indexer.get(); }
37!
359

360
std::string GzipReader::get_format_name() const { return "GZIP"; }
1✔
361

362
std::unique_ptr<ReaderStream> GzipReader::stream(const StreamConfig &config) {
1,788✔
363
    check_reader_state(is_open, indexer.get());
1,788✔
364

365
    // Extract config parameters
366
    StreamType stream_type = config.stream_type();
1,788✔
367
    RangeType range_type = config.range_type();
1,788✔
368
    std::size_t start = config.start();
1,788✔
369
    std::size_t end = config.end();
1,788✔
370
    std::size_t buffer_size = config.buffer_size();
1,788✔
371
    bool extend_to_line_boundary = config.extend_to_line_boundary();
1,788✔
372

373
    // Convert line range to byte range if needed
374
    std::size_t start_bytes = start;
1,788✔
375
    std::size_t end_bytes = end;
1,788✔
376
    std::size_t actual_start_line =
1,788✔
377
        1;  // Track what line number start_bytes corresponds to
378

379
    if (range_type == RangeType::LINE_RANGE) {
1,788✔
380
        // Convert line numbers to byte offsets using checkpoints
381
        if (start == 0 || end == 0) {
226!
382
            throw ReaderError(ReaderError::INVALID_ARGUMENT,
×
383
                              "Line numbers must be 1-based (start from 1)");
×
384
        }
385
        if (start > end) {
226!
386
            throw ReaderError(ReaderError::INVALID_ARGUMENT,
×
387
                              "Start line must be <= end line");
×
388
        }
389

390
        std::size_t total_lines = indexer->get_num_lines();
226✔
391
        if (start > total_lines || end > total_lines) {
226!
392
            throw ReaderError(ReaderError::INVALID_ARGUMENT,
×
393
                              "Line numbers exceed total lines in file (" +
×
UNCOV
394
                                  std::to_string(total_lines) + ")");
×
395
        }
396

397
        // Get checkpoints for the line range
398
        std::vector<
399
            dftracer::utils::utilities::indexer::internal::IndexerCheckpoint>
400
            checkpoints = indexer->get_checkpoints_for_line_range(start, end);
226✔
401

402
        DFTRACER_UTILS_LOG_DEBUG("Line range %zu-%zu: found %zu checkpoints",
403
                                 start, end, checkpoints.size());
404

405
        if (checkpoints.empty()) {
226✔
406
            // No checkpoints, read from beginning
407
            start_bytes = 0;
90✔
408
            end_bytes = indexer->get_max_bytes();
90!
409
            actual_start_line = 1;
90✔
410
            DFTRACER_UTILS_LOG_DEBUG(
411
                "No checkpoints found, using full file: start_bytes=%zu, "
412
                "end_bytes=%zu, max_bytes=%zu",
413
                start_bytes, end_bytes, indexer->get_max_bytes());
414
        } else {
90✔
415
            // Use checkpoint to determine byte range.
416
            //
417
            // Checkpoint uc_offset values fall at deflate block boundaries
418
            // which may land in the middle of a text line.  When we start
419
            // decompressing from such a mid-line position the first "line"
420
            // seen by MultiLineStream is a partial fragment.  If
421
            // actual_start_line == start_line the fragment is emitted as
422
            // the requested first line, producing wrong content.
423
            //
424
            // To avoid this we choose a checkpoint whose last_line_num is
425
            // strictly less than (start - 1), guaranteeing
426
            // actual_start_line < start.  MultiLineStream then filters
427
            // out the (potentially partial) early lines before reaching
428
            // the requested range.
429
            auto all_checkpoints = indexer->get_checkpoints();
136!
430
            bool found_start = false;
136✔
431

432
            // Walk checkpoints from the end to find the latest one whose
433
            // line range ends before (start - 1).
434
            for (auto it = all_checkpoints.rbegin();
11,380✔
435
                 it != all_checkpoints.rend(); ++it) {
11,244!
436
                if (it->last_line_num < start - 1) {
5,624!
437
                    start_bytes = it->uc_offset;
70!
438
                    actual_start_line = it->last_line_num + 1;
70!
439
                    found_start = true;
70✔
440
                    break;
70✔
441
                }
442
            }
5,554✔
443

444
            if (!found_start) {
136✔
445
                // No suitable checkpoint found -- start from beginning
446
                start_bytes = 0;
66✔
447
                actual_start_line = 1;
66✔
448
            }
66✔
449

450
            const auto &last_checkpoint = checkpoints.back();
136✔
451
            end_bytes = last_checkpoint.uc_offset + last_checkpoint.uc_size;
136✔
452

453
            DFTRACER_UTILS_LOG_DEBUG(
454
                "Using checkpoints: matched_first_idx=%zu "
455
                "(first_line=%zu, last_line=%zu), "
456
                "end_checkpoint_idx=%zu (first_line=%zu, last_line=%zu), "
457
                "byte_range=%zu-%zu, actual_start_line=%zu",
458
                checkpoints[0].checkpoint_idx, checkpoints[0].first_line_num,
459
                checkpoints[0].last_line_num, last_checkpoint.checkpoint_idx,
460
                last_checkpoint.first_line_num, last_checkpoint.last_line_num,
461
                start_bytes, end_bytes, actual_start_line);
462
        }
136✔
463
    }
226✔
464

465
    // Create appropriate stream type
466
    switch (stream_type) {
1,788!
467
        case StreamType::BYTES: {
468
            auto byte_stream = std::make_unique<GzipByteStream>(buffer_size);
193✔
469
            byte_stream->initialize(gz_path, start_bytes, end_bytes, *indexer);
193!
470
            return byte_stream;
193✔
471
        }
193✔
472
        case StreamType::LINE_BYTES: {
473
            // Single line-aligned bytes at a time
474
            auto line_byte_stream =
475
                std::make_unique<GzipLineByteStream>(buffer_size);
160✔
476
            line_byte_stream->set_extend_to_line_boundary(
320!
477
                extend_to_line_boundary);
160✔
478
            line_byte_stream->initialize(gz_path, start_bytes, end_bytes,
320!
479
                                         *indexer);
160✔
480

481
            // Wrap with LineStream to return one line-aligned chunk at a time
482
            if (range_type == RangeType::LINE_RANGE) {
160✔
483
                return std::make_unique<LineStream>(
2!
484
                    std::move(line_byte_stream), start, end, actual_start_line);
485
            } else {
486
                return std::make_unique<LineStream>(
158!
487
                    std::move(line_byte_stream));
488
            }
489
        }
160✔
490
        case StreamType::MULTI_LINES_BYTES: {
491
            // Multiple line-aligned bytes per read
492
            auto line_byte_stream =
493
                std::make_unique<GzipLineByteStream>(buffer_size);
1,200✔
494
            line_byte_stream->set_extend_to_line_boundary(
2,400!
495
                extend_to_line_boundary);
1,200✔
496
            line_byte_stream->initialize(gz_path, start_bytes, end_bytes,
2,400!
497
                                         *indexer);
1,200✔
498
            return line_byte_stream;
1,200✔
499
        }
1,200✔
500
        case StreamType::LINE: {
501
            // Single parsed line per read
502
            auto line_byte_stream =
503
                std::make_unique<GzipLineByteStream>(buffer_size);
54✔
504
            line_byte_stream->set_extend_to_line_boundary(
108!
505
                extend_to_line_boundary);
54✔
506
            line_byte_stream->initialize(gz_path, start_bytes, end_bytes,
108!
507
                                         *indexer);
54✔
508

509
            if (range_type == RangeType::LINE_RANGE) {
54✔
510
                return std::make_unique<LineStream>(
53!
511
                    std::move(line_byte_stream), start, end, actual_start_line);
512
            } else {
513
                return std::make_unique<LineStream>(
1!
514
                    std::move(line_byte_stream));
515
            }
516
        }
54✔
517
        case StreamType::MULTI_LINES: {
518
            // Multiple parsed lines per read
519
            auto line_byte_stream =
520
                std::make_unique<GzipLineByteStream>(buffer_size);
181✔
521
            line_byte_stream->set_extend_to_line_boundary(
362!
522
                extend_to_line_boundary);
181✔
523
            line_byte_stream->initialize(gz_path, start_bytes, end_bytes,
362!
524
                                         *indexer);
181✔
525

526
            if (range_type == RangeType::LINE_RANGE) {
181✔
527
                return std::make_unique<MultiLineStream>(
169!
528
                    std::move(line_byte_stream), start, end, actual_start_line);
529
            } else {
530
                return std::make_unique<MultiLineStream>(
12!
531
                    std::move(line_byte_stream));
532
            }
533
        }
181✔
534
        default:
535
            throw ReaderError(ReaderError::INVALID_ARGUMENT,
×
536
                              "Invalid stream type");
×
537
    }
538
}
1,788✔
539

540
}  // namespace dftracer::utils::utilities::reader::internal
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc