• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28496595030

01 Jul 2026 05:50AM UTC coverage: 50.727% (-1.6%) from 52.278%
28496595030

Pull #83

github

web-flow
Merge 8f1ff4df5 into 2efed6649
Pull Request #83: refactor and improve code QoL

31872 of 80367 branches covered (39.66%)

Branch coverage included in aggregate %.

770 of 1591 new or added lines in 85 files covered. (48.4%)

5070 existing lines in 182 files now uncovered.

32742 of 47009 relevant lines covered (69.65%)

9887.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.35
/src/dftracer/utils/utilities/reader/internal/gzip_reader.cpp
1
#include <dftracer/utils/core/coro/task.h>
2
#include <dftracer/utils/core/utils/timer.h>
3
#include <dftracer/utils/utilities/indexer/internal/indexer.h>
4
#include <dftracer/utils/utilities/indexer/internal/indexer_factory.h>
5
#include <dftracer/utils/utilities/reader/error.h>
6
#include <dftracer/utils/utilities/reader/internal/gzip_reader.h>
7
#include <dftracer/utils/utilities/reader/internal/stream_config.h>
8
#include <dftracer/utils/utilities/reader/internal/streams/gzip_byte_stream.h>
9
#include <dftracer/utils/utilities/reader/internal/streams/gzip_line_byte_stream.h>
10
#include <dftracer/utils/utilities/reader/internal/streams/line_stream.h>
11
#include <dftracer/utils/utilities/reader/internal/streams/multi_line_stream.h>
12
#include <dftracer/utils/utilities/reader/internal/string_line_processor.h>
13

14
#include <cstdio>
15
#include <cstring>
16
#include <limits>
17
#include <string_view>
18

19
static void validate_parameters(
77,329✔
20
    const char *buffer, std::size_t buffer_size, std::size_t start_bytes,
21
    std::size_t end_bytes,
22
    std::size_t max_bytes = std::numeric_limits<std::size_t>::max()) {
23
    if (!buffer || buffer_size == 0) {
77,329!
24
        throw dftracer::utils::utilities::reader::ReaderError(
12!
25
            dftracer::utils::utilities::reader::ReaderError::INVALID_ARGUMENT,
UNCOV
26
            "Invalid buffer parameters");
×
27
    }
28
    if (start_bytes >= end_bytes) {
77,329✔
29
        throw dftracer::utils::utilities::reader::ReaderError(
10!
30
            dftracer::utils::utilities::reader::ReaderError::INVALID_ARGUMENT,
31
            "start_bytes must be less than end_bytes");
10!
32
    }
33
    if (max_bytes != SIZE_MAX) {
77,319!
34
        if (end_bytes > max_bytes) {
77,319✔
35
            throw dftracer::utils::utilities::reader::ReaderError(
2!
36
                dftracer::utils::utilities::reader::ReaderError::
37
                    INVALID_ARGUMENT,
38
                "end_bytes exceeds maximum available bytes");
2!
39
        }
40
        if (start_bytes > max_bytes) {
77,317!
NEW
41
            throw dftracer::utils::utilities::reader::ReaderError(
×
42
                dftracer::utils::utilities::reader::ReaderError::
43
                    INVALID_ARGUMENT,
44
                "start_bytes exceeds maximum available bytes");
×
45
        }
46
    }
77,317✔
47
}
77,329✔
48

49
static void check_reader_state(bool is_open, const void *indexer) {
80,117✔
50
    if (!is_open || !indexer) {
80,117!
NEW
51
        throw dftracer::utils::utilities::reader::ReaderError(
×
52
            dftracer::utils::utilities::reader::ReaderError::
53
                INITIALIZATION_ERROR,
NEW
54
            "Reader is not open");
×
55
    }
56
}
80,117✔
57

58
static constexpr std::size_t DEFAULT_READER_BUFFER_SIZE = 1 * 1024 * 1024;
59

60
namespace dftracer::utils::utilities::reader::internal {
61

62
GzipReader::GzipReader(const std::string &gz_path_,
1,426!
63
                       const std::string &idx_path_,
64
                       std::size_t index_ckpt_size)
65
    : gz_path(gz_path_),
474!
66
      index_path(idx_path_),
474!
67
      is_open(false),
474✔
68
      default_buffer_size(DEFAULT_READER_BUFFER_SIZE),
474✔
69
      indexer(nullptr) {
1,426✔
70
    try {
71
        indexer = dftracer::utils::utilities::indexer::internal::
474✔
72
            IndexerFactory::create(gz_path, index_path, index_ckpt_size, false);
474✔
73
        is_open = true;
470✔
74

75
        DFTRACER_UTILS_LOG_DEBUG(
470!
76
            "Successfully created GZIP reader for gz: %s and index: %s",
77
            gz_path.c_str(), index_path.c_str());
78
    } catch (const std::exception &e) {
478!
79
        throw ReaderError(ReaderError::INITIALIZATION_ERROR,
12!
80
                          "Failed to initialize reader with indexer: " +
6!
81
                              std::string(e.what()));
6!
82
    }
6!
83
}
968✔
84

85
GzipReader::GzipReader(
512!
86
    std::shared_ptr<dftracer::utils::utilities::indexer::internal::Indexer>
87
        indexer_)
88
    : default_buffer_size(DEFAULT_READER_BUFFER_SIZE),
128✔
89
      indexer(std::move(indexer_)) {
384✔
90
    if (!indexer) {
128!
91
        throw ReaderError(ReaderError::INITIALIZATION_ERROR,
×
92
                          "Invalid indexer provided");
×
93
    }
94
    is_open = true;
128✔
95
    gz_path = indexer->get_archive_path();
128!
96
    index_path = indexer->get_index_path();
128!
97
}
256✔
98

99
GzipReader::~GzipReader() {
1,200✔
100
    DFTRACER_UTILS_LOG_DEBUG("Destroying GZIP reader for gz: %s and index: %s",
600!
101
                             gz_path.c_str(), index_path.c_str());
102
    reset();
600!
103
    is_open = false;
600✔
104
}
1,200✔
105

106
GzipReader::GzipReader(GzipReader &&other) noexcept
×
107
    : gz_path(std::move(other.gz_path)),
×
108
      index_path(std::move(other.index_path)),
×
109
      is_open(other.is_open),
×
110
      default_buffer_size(other.default_buffer_size),
×
111
      indexer(std::move(other.indexer)) {
×
112
    other.is_open = false;
×
113
}
×
114

115
GzipReader &GzipReader::operator=(GzipReader &&other) noexcept {
×
116
    if (this != &other) {
×
117
        gz_path = std::move(other.gz_path);
×
118
        index_path = std::move(other.index_path);
×
119
        is_open = other.is_open;
×
120
        default_buffer_size = other.default_buffer_size;
×
121
        indexer = std::move(other.indexer);
×
122
        other.is_open = false;
×
UNCOV
123
    }
×
124
    return *this;
×
125
}
126

127
std::size_t GzipReader::get_max_bytes() const {
187✔
128
    check_reader_state(is_open, indexer.get());
187✔
129
    std::size_t max_bytes =
187✔
130
        static_cast<std::size_t>(indexer.get()->get_max_bytes());
187✔
131
    DFTRACER_UTILS_LOG_DEBUG("Maximum bytes available: %zu", max_bytes);
187!
132
    return max_bytes;
187✔
UNCOV
133
}
×
134

135
std::size_t GzipReader::get_num_lines() const {
55✔
136
    check_reader_state(is_open, indexer.get());
55✔
137
    std::size_t num_lines = static_cast<std::size_t>(indexer->get_num_lines());
55✔
138
    DFTRACER_UTILS_LOG_DEBUG("Total lines available: %zu", num_lines);
55!
139
    return num_lines;
55✔
UNCOV
140
}
×
141

142
const std::string &GzipReader::get_archive_path() const { return gz_path; }
7✔
143

144
const std::string &GzipReader::get_index_path() const { return index_path; }
4✔
145

146
void GzipReader::set_buffer_size(std::size_t size) {
×
147
    default_buffer_size = size;
×
148
}
×
149

150
void GzipReader::reset() {
604✔
151
    check_reader_state(is_open, indexer.get());
604✔
152
    stream_cache_.clear();
604✔
153
}
604✔
154

155
coro::CoroTask<std::size_t> GzipReader::read_async(std::size_t start_bytes,
62,410!
156
                                                   std::size_t end_bytes,
157
                                                   char *buffer,
158
                                                   std::size_t buffer_size) {
12,482!
159
    check_reader_state(is_open, indexer.get());
12,482!
160
    validate_parameters(buffer, buffer_size, start_bytes, end_bytes,
12,482✔
161
                        indexer.get()->get_max_bytes());
12,482!
162

163
    DFTRACER_UTILS_LOG_DEBUG(
12,470!
164
        "GzipReader::read - request: start_bytes=%zu, end_bytes=%zu, "
165
        "buffer_size=%zu",
166
        start_bytes, end_bytes, buffer_size);
167

168
    // Check if we can reuse cached stream
169
    if (!stream_cache_.can_continue(StreamType::BYTES, gz_path, start_bytes,
24,940!
170
                                    end_bytes)) {
12,470✔
171
        DFTRACER_UTILS_LOG_DEBUG("%s",
161!
172
                                 "GzipReader::read - creating new byte stream");
173
        auto new_stream = stream(StreamConfig()
322!
174
                                     .stream_type(StreamType::BYTES)
161!
175
                                     .range_type(RangeType::BYTE_RANGE)
161!
176
                                     .from(start_bytes)
161!
177
                                     .to(end_bytes));
161!
178
        stream_cache_.update(std::move(new_stream), StreamType::BYTES, gz_path,
322!
179
                             start_bytes, end_bytes);
161✔
180
    } else {
161✔
181
        DFTRACER_UTILS_LOG_DEBUG(
12,309!
182
            "%s", "GzipReader::read - reusing cached byte stream");
183
    }
184

185
    std::size_t result =
24,940✔
186
        co_await stream_cache_.get()->read_async(buffer, buffer_size);
37,422!
187
    DFTRACER_UTILS_LOG_DEBUG("GzipReader::read - returned %zu bytes", result);
12,470!
188

189
    // Update position for next potential read
190
    stream_cache_.update_position(start_bytes + result);
12,470!
191

192
    co_return result;
12,470!
193
}
37,422!
194

195
coro::CoroTask<std::size_t> GzipReader::read_line_bytes_async(
453,929!
196
    std::size_t start_bytes, std::size_t end_bytes, char *buffer,
197
    std::size_t buffer_size) {
64,847!
198
    check_reader_state(is_open, indexer.get());
194,541✔
199

200
    if (end_bytes > indexer.get()->get_max_bytes()) {
64,847!
UNCOV
201
        end_bytes = indexer.get()->get_max_bytes();
×
UNCOV
202
    }
×
203

204
    validate_parameters(buffer, buffer_size, start_bytes, end_bytes,
64,847!
205
                        indexer.get()->get_max_bytes());
64,847!
206

207
    // Check if we can reuse cached stream
208
    if (!stream_cache_.can_continue(StreamType::MULTI_LINES_BYTES, gz_path,
129,694✔
209
                                    start_bytes, end_bytes)) {
64,847✔
210
        auto new_stream = stream(StreamConfig()
2,032!
211
                                     .stream_type(StreamType::MULTI_LINES_BYTES)
1,016!
212
                                     .range_type(RangeType::BYTE_RANGE)
1,016!
213
                                     .from(start_bytes)
1,016!
214
                                     .to(end_bytes));
1,016!
215
        stream_cache_.update(std::move(new_stream),
2,032!
216
                             StreamType::MULTI_LINES_BYTES, gz_path,
1,016✔
217
                             start_bytes, end_bytes);
1,016✔
218
    }
1,016✔
219

220
    std::size_t result =
259,388✔
221
        co_await stream_cache_.get()->read_async(buffer, buffer_size);
324,235!
222

223
    // Update position for next potential read
224
    stream_cache_.update_position(start_bytes + result);
64,847!
225

226
    co_return result;
64,847!
227
}
713,317!
228

229
coro::CoroTask<std::string> GzipReader::read_lines_async(std::size_t start_line,
1,843!
230
                                                         std::size_t end_line) {
163!
231
    check_reader_state(is_open, indexer.get());
163!
232

233
    if (start_line == 0 || end_line == 0) {
163✔
234
        throw ReaderError(ReaderError::INVALID_ARGUMENT,
6!
235
                          "Line numbers must be 1-based (start from 1)");
6!
236
    }
237

238
    if (start_line > end_line) {
157✔
239
        throw ReaderError(ReaderError::INVALID_ARGUMENT,
3!
240
                          "Start line must be <= end line");
3!
241
    }
242

243
    std::size_t total_lines = indexer.get()->get_num_lines();
154!
244
    if (start_line > total_lines || end_line > total_lines) {
154✔
245
        throw ReaderError(ReaderError::INVALID_ARGUMENT,
4!
246
                          "Line numbers exceed total lines in file (" +
2!
247
                              std::to_string(total_lines) + ")");
4!
248
    }
249

250
    // Check if we can reuse cached stream
251
    if (!stream_cache_.can_continue(StreamType::MULTI_LINES, gz_path,
304!
252
                                    start_line, end_line)) {
152✔
253
        auto new_stream = stream(StreamConfig()
304!
254
                                     .stream_type(StreamType::MULTI_LINES)
152!
255
                                     .range_type(RangeType::LINE_RANGE)
152!
256
                                     .from(start_line)
152!
257
                                     .to(end_line));
152!
258
        stream_cache_.update(std::move(new_stream), StreamType::MULTI_LINES,
304!
259
                             gz_path, start_line, end_line);
152✔
260
    }
152✔
261

262
    std::string result;
152✔
263
    // Pre-allocate to avoid reallocations (like old StringLineProcessor)
264
    std::size_t estimated_lines = end_line - start_line + 1;
152✔
265
    result.reserve(estimated_lines * 100);  // Estimate ~100 bytes per line
152!
266

267
    std::vector<char> buffer(default_buffer_size);
152!
268

269
    while (!stream_cache_.get()->done()) {
561!
270
        std::size_t bytes_read = co_await stream_cache_.get()->read_async(
1,799!
271
            buffer.data(), buffer.size());
409✔
272
        if (bytes_read == 0) break;
409!
273

274
        result.append(buffer.data(), bytes_read);
409!
275
    }
409✔
276

277
    co_return result;
1,180!
278
}
3,142✔
279

280
coro::CoroTask<void> GzipReader::read_lines_with_processor_async(
×
UNCOV
281
    std::size_t start_line, std::size_t end_line, LineProcessor &processor) {
×
UNCOV
282
    check_reader_state(is_open, indexer.get());
×
283

UNCOV
284
    if (start_line == 0 || end_line == 0) {
×
NEW
285
        throw ReaderError(ReaderError::INVALID_ARGUMENT,
×
NEW
286
                          "Line numbers must be 1-based (start from 1)");
×
287
    }
288

UNCOV
289
    if (start_line > end_line) {
×
NEW
290
        throw ReaderError(ReaderError::INVALID_ARGUMENT,
×
NEW
291
                          "Start line must be <= end line");
×
292
    }
293

UNCOV
294
    std::size_t total_lines = indexer.get()->get_num_lines();
×
UNCOV
295
    if (start_line > total_lines || end_line > total_lines) {
×
NEW
296
        throw ReaderError(ReaderError::INVALID_ARGUMENT,
×
NEW
297
                          "Line numbers exceed total lines in file (" +
×
NEW
298
                              std::to_string(total_lines) + ")");
×
299
    }
300

UNCOV
301
    processor.begin(start_line, end_line);
×
302

303
    // Create a LineStream that returns one line at a time
UNCOV
304
    auto line_stream = stream(StreamConfig()
×
UNCOV
305
                                  .stream_type(StreamType::LINE)
×
UNCOV
306
                                  .range_type(RangeType::LINE_RANGE)
×
UNCOV
307
                                  .from(start_line)
×
UNCOV
308
                                  .to(end_line));
×
309

UNCOV
310
    std::vector<char> buffer(default_buffer_size);
×
311

UNCOV
312
    while (!line_stream->done()) {
×
UNCOV
313
        std::size_t bytes_read =
×
UNCOV
314
            co_await line_stream->read_async(buffer.data(), buffer.size());
×
UNCOV
315
        if (bytes_read == 0) break;
×
316

317
        // LineStream returns one complete line with \n
318
        // Processor expects line without \n
UNCOV
319
        std::size_t line_length = bytes_read;
×
UNCOV
320
        if (line_length > 0 && buffer[line_length - 1] == '\n') {
×
UNCOV
321
            line_length--;
×
UNCOV
322
        }
×
323

UNCOV
324
        if (!co_await processor.process(buffer.data(), line_length)) {
×
UNCOV
325
            processor.end();
×
UNCOV
326
            co_return;
×
327
        }
UNCOV
328
    }
×
329

UNCOV
330
    processor.end();
×
331
}
×
332

333
coro::CoroTask<void> GzipReader::read_line_bytes_with_processor_async(
×
UNCOV
334
    std::size_t start_bytes, std::size_t end_bytes, LineProcessor &processor) {
×
UNCOV
335
    check_reader_state(is_open, indexer.get());
×
336

UNCOV
337
    if (end_bytes > indexer.get()->get_max_bytes()) {
×
UNCOV
338
        end_bytes = indexer.get()->get_max_bytes();
×
UNCOV
339
    }
×
340

UNCOV
341
    if (start_bytes >= end_bytes) {
×
UNCOV
342
        co_return;
×
343
    }
344

UNCOV
345
    processor.begin(start_bytes, end_bytes);
×
346

UNCOV
347
    auto lines_stream = stream(StreamConfig()
×
UNCOV
348
                                   .stream_type(StreamType::LINE_BYTES)
×
UNCOV
349
                                   .range_type(RangeType::BYTE_RANGE)
×
UNCOV
350
                                   .from(start_bytes)
×
UNCOV
351
                                   .to(end_bytes));
×
352

UNCOV
353
    std::vector<char> buffer(default_buffer_size);
×
354

UNCOV
355
    while (!lines_stream->done()) {
×
UNCOV
356
        std::size_t bytes_read =
×
UNCOV
357
            co_await lines_stream->read_async(buffer.data(), buffer.size());
×
UNCOV
358
        if (bytes_read == 0) break;
×
UNCOV
359
        co_await processor.process(buffer.data(), bytes_read);
×
UNCOV
360
    }
×
361

UNCOV
362
    processor.end();
×
363
}
×
364

365
bool GzipReader::is_valid() const { return is_open && indexer.get(); }
37!
366

367
std::string GzipReader::get_format_name() const { return "GZIP"; }
1✔
368

369
std::unique_ptr<ReaderStream> GzipReader::stream(const StreamConfig &config) {
1,788✔
370
    check_reader_state(is_open, indexer.get());
1,788✔
371

372
    // Extract config parameters
373
    StreamType stream_type = config.stream_type();
1,788✔
374
    RangeType range_type = config.range_type();
1,788✔
375
    std::size_t start = config.start();
1,788✔
376
    std::size_t end = config.end();
1,788✔
377
    std::size_t buffer_size = config.buffer_size();
1,788✔
378
    bool extend_to_line_boundary = config.extend_to_line_boundary();
1,788✔
379

380
    // Convert line range to byte range if needed
381
    std::size_t start_bytes = start;
1,788✔
382
    std::size_t end_bytes = end;
1,788✔
383
    std::size_t actual_start_line =
1,788✔
384
        1;  // Track what line number start_bytes corresponds to
385

386
    if (range_type == RangeType::LINE_RANGE) {
1,788✔
387
        // Convert line numbers to byte offsets using checkpoints
388
        if (start == 0 || end == 0) {
226!
389
            throw ReaderError(ReaderError::INVALID_ARGUMENT,
×
390
                              "Line numbers must be 1-based (start from 1)");
×
391
        }
392
        if (start > end) {
226!
393
            throw ReaderError(ReaderError::INVALID_ARGUMENT,
×
394
                              "Start line must be <= end line");
×
395
        }
396

397
        std::size_t total_lines = indexer->get_num_lines();
226✔
398
        if (start > total_lines || end > total_lines) {
226!
399
            throw ReaderError(ReaderError::INVALID_ARGUMENT,
×
400
                              "Line numbers exceed total lines in file (" +
×
401
                                  std::to_string(total_lines) + ")");
×
402
        }
403

404
        // Get checkpoints for the line range
405
        std::vector<
406
            dftracer::utils::utilities::indexer::internal::IndexerCheckpoint>
407
            checkpoints = indexer->get_checkpoints_for_line_range(start, end);
226✔
408

409
        DFTRACER_UTILS_LOG_DEBUG("Line range %zu-%zu: found %zu checkpoints",
226!
410
                                 start, end, checkpoints.size());
411

412
        if (checkpoints.empty()) {
226✔
413
            // No checkpoints, read from beginning
414
            start_bytes = 0;
90✔
415
            end_bytes = indexer->get_max_bytes();
90!
416
            actual_start_line = 1;
90✔
417
            DFTRACER_UTILS_LOG_DEBUG(
90!
418
                "No checkpoints found, using full file: start_bytes=%zu, "
419
                "end_bytes=%zu, max_bytes=%zu",
420
                start_bytes, end_bytes, indexer->get_max_bytes());
421
        } else {
90✔
422
            // Use checkpoint to determine byte range.
423
            //
424
            // Checkpoint uc_offset values fall at deflate block boundaries
425
            // which may land in the middle of a text line.  When we start
426
            // decompressing from such a mid-line position the first "line"
427
            // seen by MultiLineStream is a partial fragment.  If
428
            // actual_start_line == start_line the fragment is emitted as
429
            // the requested first line, producing wrong content.
430
            //
431
            // To avoid this we choose a checkpoint whose last_line_num is
432
            // strictly less than (start - 1), guaranteeing
433
            // actual_start_line < start.  MultiLineStream then filters
434
            // out the (potentially partial) early lines before reaching
435
            // the requested range.
436
            auto all_checkpoints = indexer->get_checkpoints();
136!
437
            bool found_start = false;
136✔
438

439
            // Walk checkpoints from the end to find the latest one whose
440
            // line range ends before (start - 1).
441
            for (auto it = all_checkpoints.rbegin();
11,364✔
442
                 it != all_checkpoints.rend(); ++it) {
11,228!
443
                if (it->last_line_num < start - 1) {
5,616!
444
                    start_bytes = it->uc_offset;
70!
445
                    actual_start_line = it->last_line_num + 1;
70!
446
                    found_start = true;
70✔
447
                    break;
70✔
448
                }
449
            }
5,546✔
450

451
            if (!found_start) {
136✔
452
                // No suitable checkpoint found -- start from beginning
453
                start_bytes = 0;
66✔
454
                actual_start_line = 1;
66✔
455
            }
66✔
456

457
            const auto &last_checkpoint = checkpoints.back();
136✔
458
            end_bytes = last_checkpoint.uc_offset + last_checkpoint.uc_size;
136✔
459

460
            DFTRACER_UTILS_LOG_DEBUG(
136!
461
                "Using checkpoints: matched_first_idx=%zu "
462
                "(first_line=%zu, last_line=%zu), "
463
                "end_checkpoint_idx=%zu (first_line=%zu, last_line=%zu), "
464
                "byte_range=%zu-%zu, actual_start_line=%zu",
465
                checkpoints[0].checkpoint_idx, checkpoints[0].first_line_num,
466
                checkpoints[0].last_line_num, last_checkpoint.checkpoint_idx,
467
                last_checkpoint.first_line_num, last_checkpoint.last_line_num,
468
                start_bytes, end_bytes, actual_start_line);
469
        }
136✔
470
    }
226✔
471

472
    // Create appropriate stream type
473
    switch (stream_type) {
1,788!
474
        case StreamType::BYTES: {
475
            auto byte_stream = std::make_unique<GzipByteStream>(buffer_size);
193✔
476
            byte_stream->initialize(gz_path, start_bytes, end_bytes, *indexer);
193!
477
            return byte_stream;
193✔
478
        }
193✔
479
        case StreamType::LINE_BYTES: {
480
            // Single line-aligned bytes at a time
481
            auto line_byte_stream =
482
                std::make_unique<GzipLineByteStream>(buffer_size);
160✔
483
            line_byte_stream->set_extend_to_line_boundary(
320!
484
                extend_to_line_boundary);
160✔
485
            line_byte_stream->initialize(gz_path, start_bytes, end_bytes,
320!
486
                                         *indexer);
160✔
487

488
            // Wrap with LineStream to return one line-aligned chunk at a time
489
            if (range_type == RangeType::LINE_RANGE) {
160✔
490
                return std::make_unique<LineStream>(
2!
491
                    std::move(line_byte_stream), start, end, actual_start_line);
492
            } else {
493
                return std::make_unique<LineStream>(
158!
494
                    std::move(line_byte_stream));
495
            }
496
        }
160✔
497
        case StreamType::MULTI_LINES_BYTES: {
498
            // Multiple line-aligned bytes per read
499
            auto line_byte_stream =
500
                std::make_unique<GzipLineByteStream>(buffer_size);
1,200✔
501
            line_byte_stream->set_extend_to_line_boundary(
2,400!
502
                extend_to_line_boundary);
1,200✔
503
            line_byte_stream->initialize(gz_path, start_bytes, end_bytes,
2,400!
504
                                         *indexer);
1,200✔
505
            return line_byte_stream;
1,200✔
506
        }
1,200✔
507
        case StreamType::LINE: {
508
            // Single parsed line per read
509
            auto line_byte_stream =
510
                std::make_unique<GzipLineByteStream>(buffer_size);
54✔
511
            line_byte_stream->set_extend_to_line_boundary(
108!
512
                extend_to_line_boundary);
54✔
513
            line_byte_stream->initialize(gz_path, start_bytes, end_bytes,
108!
514
                                         *indexer);
54✔
515

516
            if (range_type == RangeType::LINE_RANGE) {
54✔
517
                return std::make_unique<LineStream>(
53!
518
                    std::move(line_byte_stream), start, end, actual_start_line);
519
            } else {
520
                return std::make_unique<LineStream>(
1!
521
                    std::move(line_byte_stream));
522
            }
523
        }
54✔
524
        case StreamType::MULTI_LINES: {
525
            // Multiple parsed lines per read
526
            auto line_byte_stream =
527
                std::make_unique<GzipLineByteStream>(buffer_size);
181✔
528
            line_byte_stream->set_extend_to_line_boundary(
362!
529
                extend_to_line_boundary);
181✔
530
            line_byte_stream->initialize(gz_path, start_bytes, end_bytes,
362!
531
                                         *indexer);
181✔
532

533
            if (range_type == RangeType::LINE_RANGE) {
181✔
534
                return std::make_unique<MultiLineStream>(
169!
535
                    std::move(line_byte_stream), start, end, actual_start_line);
536
            } else {
537
                return std::make_unique<MultiLineStream>(
12!
538
                    std::move(line_byte_stream));
539
            }
540
        }
181✔
541
        default:
542
            throw ReaderError(ReaderError::INVALID_ARGUMENT,
×
543
                              "Invalid stream type");
×
544
    }
545
}
1,788✔
546

547
}  // namespace dftracer::utils::utilities::reader::internal
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc