• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MikkelSchubert / adapterremoval / #117

25 May 2025 03:01PM UTC coverage: 66.932% (-0.07%) from 67.006%
#117

push

travis-ci

web-flow
iwyu and reduce build-time inter-dependencies (#144)

26 of 145 new or added lines in 20 files covered. (17.93%)

89 existing lines in 5 files now uncovered.

9738 of 14549 relevant lines covered (66.93%)

3041.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/fastq_io.cpp
1
// SPDX-License-Identifier: GPL-3.0-or-later
2
// SPDX-FileCopyrightText: 2015 Mikkel Schubert <mikkelsch@gmail.com>
3
#include "fastq_io.hpp"    // declarations
4
#include "commontypes.hpp" // for output_format
5
#include "debug.hpp"       // for AR_REQUIRE, AR_REQUIRE_SINGLE_THREAD
6
#include "errors.hpp"      // for io_error, gzip_error, fastq_error
7
#include "fastq.hpp"       // for fastq
8
#include "fastq_enc.hpp"   // for MATE_SEPARATOR
9
#include "output.hpp"      // for output_file
10
#include "progress.hpp"    // for progress_timer
11
#include "statistics.hpp"  // for fastq_statistics, fastq_stats_ptr, stat...
12
#include "strutils.hpp"    // for shell_escape, string_vec, ends_with
13
#include "userconfig.hpp"  // for userconfig
14
#include "utilities.hpp"   // for prng_seed
15
#include <algorithm>       // for max, min
16
#include <cerrno>          // for errno
17
#include <isa-l.h>         // for isal_gzip_header
18
#include <libdeflate.h>    // for libdeflate_alloc_compressor, libdeflate...
19
#include <memory>          // for unique_ptr, make_unique, __shared_ptr_a...
20
#include <sstream>         // for basic_ostream, basic_ostringstream, ope...
21
#include <string>          // for string<<
22
#include <string_view>     // for string_view
23
#include <utility>         // for move, swap
24

25
namespace adapterremoval {
26

27
// Default bgzip header, as described in the SAM spec. v1.6 section 4.1.
28
// Includes 2 trailing placeholder bytes for total block size (BSIZE)
29
constexpr std::string_view BGZF_HEADER = {
30
  "\37\213\10\4\0\0\0\0\0\377\6\0\102\103\2\0\0\0",
31
  18
32
};
33

34
// Eof of file marker for bgzip files; see SAM spec. v1.6 section 4.1.2
35
constexpr std::string_view BGZF_EOF = {
36
  "\37\213\10\4\0\0\0\0\0\377\6\0\102\103\2\0\33\0\3\0\0\0\0\0\0\0\0\0",
37
  28,
38
};
39

40
////////////////////////////////////////////////////////////////////////////////
41
// Helper function for isa-l
42

43
namespace {
44

45
//! The compression level used for block/stream compression with isa-l
46
constexpr size_t ISAL_COMPRESSION_LEVEL = 1;
47
//! The default buffer size for compression at level ISAL_COMPRESSION_LEVEL
48
constexpr size_t ISAL_BUFFER_SIZE = ISAL_DEF_LVL1_SMALL;
49

50
/**
51
 * ISA-l streaming is enabled only at compression level 1, since little
52
 * difference was observed between levels 1 to 3. However, it still offers a
53
 * faster compression (with a lower ratio) than libdeflate level 1.
54
 */
55
bool
56
is_isal_streaming_enabled(const output_file file, unsigned compression_level)
×
57
{
58
  switch (file.format) {
×
59
    case output_format::fastq:
60
    case output_format::sam:
61
    case output_format::bam:
62
    case output_format::ubam:
63
      return false;
64
    case output_format::fastq_gzip:
×
65
    case output_format::sam_gzip:
×
66
      return compression_level == ISAL_COMPRESSION_LEVEL;
×
67
    default:
×
68
      AR_FAIL("invalid output format");
×
69
  }
70
}
71

72
} // namespace
73

74
///////////////////////////////////////////////////////////////////////////////
75
// Implementations for 'read_fastq'
76

77
enum class read_fastq::file_type
78
{
79
  read_1,
80
  read_2,
81
  interleaved
82
};
83

84
namespace {
85

86
bool
NEW
87
read_record(joined_line_readers& reader, std::vector<fastq>& chunk)
×
88
{
89
  // Line numbers change as we attempt to read the record, and potentially
90
  // points to the next record in the case of invalid qualities/nucleotides
91
  const auto line_number = reader.linenumber();
×
92

93
  try {
×
94
    chunk.emplace_back();
×
95
    auto& record = chunk.back();
×
96

97
    if (record.read_unsafe(reader)) {
×
98
      return true;
99
    } else {
100
      chunk.pop_back();
×
101
      return false;
×
102
    }
103
  } catch (const fastq_error& error) {
×
104
    std::ostringstream stream;
×
105
    stream << "Error reading FASTQ record from '" << reader.filename()
×
106
           << "' at line " << line_number << "; aborting:\n"
×
107
           << indent_lines(error.what());
×
108

109
    throw fastq_error(stream.str());
×
110
  }
×
111
}
112

113
const string_vec&
114
select_filenames(const userconfig& config, const read_fastq::file_type mode)
×
115
{
116
  switch (mode) {
×
117
    case read_fastq::file_type::read_1:
×
118
    case read_fastq::file_type::interleaved:
×
119
      return config.input_files_1;
×
120
    case read_fastq::file_type::read_2:
×
121
      return config.input_files_2;
×
122
    default:
×
123
      AR_FAIL("invalid read_fastq::file_type value");
×
124
  }
125
}
126

127
/** Estimates an upper bound for the required capacity for gzip compression */
128
constexpr size_t
129
estimate_capacity(size_t input_size, bool eof)
×
130
{
131
  return input_size                    //
×
132
         + BGZF_HEADER.size()          // Standard bgzip header
133
         + 1                           // BFINAL | BTYPE
134
         + 4                           // LEN + NLEN
135
         + 4                           // CRC32
136
         + 4                           // ISIZE
137
         + (eof ? BGZF_EOF.size() : 0) // Standard bgzip tail
×
138
    ;
139
}
140

141
} // namespace
142

143
read_fastq::read_fastq(const userconfig& config,
×
144
                       const size_t next_step,
145
                       const read_fastq::file_type mode,
146
                       statistics& stats)
×
147
  : analytical_step(processing_order::ordered, "read_fastq")
148
  , m_reader(select_filenames(config, mode))
×
149
  , m_next_step(next_step)
×
150
  , m_mode(mode)
×
151
  , m_head(config.head)
×
152
  , m_mate_separator(config.mate_separator)
×
153
  , m_mate_separator_identified(config.mate_separator)
×
154
  , m_duplication_1(stats.duplication_1)
×
155
  , m_duplication_2(stats.duplication_2)
×
156
{
157
  AR_REQUIRE(m_duplication_1 && m_duplication_2);
×
158
}
159

160
void
161
read_fastq::add_steps(scheduler& sch,
×
162
                      const userconfig& config,
163
                      size_t next_step,
164
                      statistics& stats)
165
{
166
  const auto add_step = [&sch, &config, &stats](auto next, auto type) {
×
167
    return sch.add<read_fastq>(config, next, type, stats);
×
168
  };
169

170
  if (config.interleaved_input) {
×
171
    add_step(next_step, read_fastq::file_type::interleaved);
×
172
  } else if (config.paired_ended_mode) {
×
173
    next_step = add_step(next_step, read_fastq::file_type::read_2);
×
174
    add_step(next_step, read_fastq::file_type::read_1);
×
175
  } else {
176
    add_step(next_step, read_fastq::file_type::read_1);
×
177
  }
178
}
179

180
chunk_vec
181
read_fastq::process(chunk_ptr chunk)
×
182
{
183
  AR_REQUIRE_SINGLE_THREAD(m_lock);
×
184

185
  if (m_mode == file_type::read_1 || m_mode == file_type::interleaved) {
×
186
    // The scheduler only terminates when the first step stops returning chunks
187
    if (m_eof) {
×
188
      return {};
×
189
    }
190

191
    chunk = std::make_unique<analytical_chunk>();
×
192
  } else {
193
    AR_REQUIRE(!m_eof && chunk);
×
194
  }
195

196
  auto& reads_1 = chunk->reads_1;
×
197
  auto& reads_2 = chunk->reads_2;
×
198

199
  if (m_mode == file_type::read_1 || m_mode == file_type::interleaved) {
×
200
    if (m_mode == file_type::read_1) {
×
201
      read_single_end(reads_1, *m_duplication_1);
×
202
    } else {
203
      read_interleaved(reads_1, reads_2);
×
204
    }
205
  } else if (m_mode == file_type::read_2) {
×
206
    read_single_end(reads_2, *m_duplication_2);
×
207
  } else {
208
    AR_FAIL("invalid file_type value");
×
209
  }
210

211
  if (m_mode != file_type::read_1) {
×
212
    if (reads_1.size() != reads_2.size()) {
×
213
      throw fastq_error("Found unequal number of mate 1 and mate 2 reads; "
×
214
                        "input files may be truncated. Please fix before "
215
                        "continuing.");
×
216
    } else if (!m_mate_separator_identified) {
×
217
      AR_REQUIRE(reads_1.size() == reads_2.size());
×
218
      // Mate separators are identified using the first block, in order to
219
      // reduce the need for locking in the post-processing step
220

221
      // Attempt to determine the mate separator character
222
      m_mate_separator = fastq::guess_mate_separator(reads_1, reads_2);
×
223
      m_mate_separator_identified = true;
×
224
    }
225
  }
226

227
  // Head must be checked after the first loop, to produce at least one chunk
228
  m_eof |= !m_head;
×
229
  chunk->eof = m_eof;
×
230
  chunk->mate_separator = m_mate_separator;
×
231
  chunk->first = m_first;
×
232
  m_first = false;
×
233

234
  chunk_vec chunks;
×
235
  chunks.emplace_back(m_next_step, std::move(chunk));
×
236
  return chunks;
×
237
}
238

239
void
NEW
240
read_fastq::read_single_end(std::vector<fastq>& reads,
×
241
                            duplication_statistics& stats)
242
{
243
  for (; reads.size() < INPUT_READS && m_head && !m_eof; m_head--) {
×
244
    if (read_record(m_reader, reads)) {
×
245
      stats.process(reads.back());
×
246
    } else {
247
      m_eof = true;
×
248
    }
249
  }
250
}
251

252
void
NEW
253
read_fastq::read_interleaved(std::vector<fastq>& reads_1,
×
254
                             std::vector<fastq>& reads_2)
255
{
256
  for (; reads_1.size() < INPUT_READS && m_head && !m_eof; m_head--) {
×
257
    if (read_record(m_reader, reads_1)) {
×
258
      m_duplication_1->process(reads_1.back());
×
259
    } else {
260
      m_eof = true;
×
261
      break;
×
262
    }
263

264
    if (read_record(m_reader, reads_2)) {
×
265
      m_duplication_2->process(reads_2.back());
×
266
    }
267
  }
268
}
269

270
void
271
read_fastq::finalize()
×
272
{
273
  AR_REQUIRE_SINGLE_THREAD(m_lock);
×
274
  AR_REQUIRE(m_eof);
×
275
}
276

277
///////////////////////////////////////////////////////////////////////////////
278
// Implementations for 'post_process_fastq_stats'
279

NEW
280
post_process_fastq::stats_pair::stats_pair(double sample_rate,
×
NEW
281
                                           unsigned int seed)
×
NEW
282
  : stats_1(std::make_unique<fastq_statistics>(sample_rate, seed))
×
NEW
283
  , stats_2(std::make_unique<fastq_statistics>(sample_rate, seed))
×
284
{
285
}
286

287
///////////////////////////////////////////////////////////////////////////////
288
// Implementations for 'post_process_fastq'
289

290
post_process_fastq::post_process_fastq(const userconfig& config,
×
291
                                       size_t next_step,
292
                                       statistics& stats)
×
293
  : analytical_step(processing_order::unordered, "post_process_fastq")
294
  , m_statistics_1(stats.input_1)
×
295
  , m_statistics_2(stats.input_2)
×
296
  , m_next_step(next_step)
×
297
  , m_encoding(config.io_encoding)
×
NEW
298
  , m_timer(std::make_unique<progress_timer>(config.log_progress))
×
299
{
300
  AR_REQUIRE(m_statistics_1 && m_statistics_2);
×
301

302
  for (size_t i = 0; i < config.max_threads; ++i) {
×
303
    m_stats.emplace_back(config.report_sample_rate, prng_seed());
×
304
  }
305
}
306

307
// Ensure that progress_timer definition is available to unique_ptr destructor
NEW
308
post_process_fastq::~post_process_fastq() = default;
×
NEW
309

×
310
chunk_vec
×
311
post_process_fastq::process(chunk_ptr chunk)
312
{
313
  AR_REQUIRE(chunk);
×
314
  auto& reads_1 = chunk->reads_1;
315
  auto& reads_2 = chunk->reads_2;
×
UNCOV
316

×
317
  auto stats = m_stats.acquire();
×
318

319
  AR_REQUIRE((reads_1.size() == reads_2.size()) || reads_2.empty());
×
320
  if (reads_2.empty()) {
321
    for (auto& read_1 : reads_1) {
×
322
      read_1.post_process(m_encoding);
×
NEW
323
      stats->stats_1->process(read_1);
×
UNCOV
324
    }
×
UNCOV
325
  } else {
×
326
    auto it_1 = reads_1.begin();
327
    auto it_2 = reads_2.begin();
328
    for (; it_1 != reads_1.end(); ++it_1, ++it_2) {
×
329
      fastq::normalize_paired_reads(*it_1, *it_2, chunk->mate_separator);
×
UNCOV
330

×
331
      it_1->post_process(m_encoding);
×
332
      stats->stats_1->process(*it_1);
UNCOV
333

×
334
      it_2->post_process(m_encoding);
×
335
      stats->stats_2->process(*it_2);
UNCOV
336
    }
×
UNCOV
337

×
338
    // fastq::normalize_paired_reads replaces the mate separator if present
339
    if (chunk->mate_separator) {
340
      chunk->mate_separator = MATE_SEPARATOR;
UNCOV
341
    }
×
UNCOV
342
  }
×
343

344
  m_stats.release(stats);
345

346
  {
×
347
    std::unique_lock<std::mutex> lock(m_timer_lock);
NEW
348
    m_timer->increment(reads_1.size() + reads_2.size());
×
UNCOV
349
  }
×
UNCOV
350

×
351
  chunk_vec chunks;
352
  chunks.emplace_back(m_next_step, std::move(chunk));
UNCOV
353

×
354
  return chunks;
×
355
}
UNCOV
356

×
357
void
358
post_process_fastq::finalize()
359
{
360
  AR_REQUIRE_SINGLE_THREAD(m_timer_lock);
×
361

362
  while (auto it = m_stats.try_acquire()) {
×
363
    *m_statistics_1 += *it->stats_1;
NEW
364
    *m_statistics_2 += *it->stats_2;
×
UNCOV
365
  }
×
UNCOV
366

×
367
  m_timer->finalize();
368
}
UNCOV
369

×
370
///////////////////////////////////////////////////////////////////////////////
371
// Implementations for 'split_fastq'
372

373
split_fastq::split_fastq(const userconfig& config,
374
                         const output_file& file,
375
                         size_t next_step)
×
376
  : analytical_step(processing_order::ordered, "split_fastq")
377
  , m_next_step(next_step)
×
378
  , m_isal_stream(is_isal_streaming_enabled(file, config.compression_level))
UNCOV
379
{
×
UNCOV
380
}
×
381

382
void
383
split_fastq::finalize()
384
{
385
  AR_REQUIRE_SINGLE_THREAD(m_lock);
×
386

387
  AR_REQUIRE(m_eof);
×
388
  AR_REQUIRE(m_buffer.capacity() == 0);
UNCOV
389
}
×
UNCOV
390

×
391
chunk_vec
392
split_fastq::process(const chunk_ptr chunk)
393
{
394
  AR_REQUIRE(chunk);
×
395
  AR_REQUIRE_SINGLE_THREAD(m_lock);
396
  AR_REQUIRE(!m_eof);
×
397
  m_eof = chunk->eof;
×
UNCOV
398

×
399
  chunk_vec chunks;
×
400
  for (const auto& src : chunk->buffers) {
401
    for (size_t src_offset = 0; src_offset < src.size();) {
×
402
      const auto n =
×
403
        std::min(src.size() - src_offset, BGZF_BLOCK_SIZE - m_buffer.size());
×
404
      m_buffer.append(src.data() + src_offset, n);
×
UNCOV
405

×
406
      src_offset += n;
×
407

408
      if (m_buffer.size() == BGZF_BLOCK_SIZE) {
×
409
        auto block = std::make_unique<analytical_chunk>();
UNCOV
410

×
411
        if (m_isal_stream) {
×
412
          m_isal_crc32 =
413
            crc32_gzip_refl(m_isal_crc32, m_buffer.data(), m_buffer.size());
×
UNCOV
414
        }
×
UNCOV
415

×
416
        block->uncompressed_size = m_buffer.size();
417
        block->buffers.emplace_back(std::move(m_buffer));
UNCOV
418

×
419
        chunks.emplace_back(m_next_step, std::move(block));
×
420

421
        m_buffer = buffer();
×
422
        m_buffer.reserve(BGZF_BLOCK_SIZE);
UNCOV
423
      }
×
UNCOV
424
    }
×
425
  }
426

427
  if (m_eof) {
428
    auto block = std::make_unique<analytical_chunk>();
429
    block->eof = true;
×
UNCOV
430

×
431
    if (m_isal_stream) {
×
432
      m_isal_crc32 =
433
        crc32_gzip_refl(m_isal_crc32, m_buffer.data(), m_buffer.size());
×
UNCOV
434
    }
×
UNCOV
435

×
436
    block->crc32 = m_isal_crc32;
437
    block->uncompressed_size = m_buffer.size();
438
    block->buffers.emplace_back(std::move(m_buffer));
×
UNCOV
439

×
440
    chunks.emplace_back(m_next_step, std::move(block));
×
441
  }
UNCOV
442

×
443
  return chunks;
444
}
UNCOV
445

×
446
///////////////////////////////////////////////////////////////////////////////
447
// Implementations for 'gzip_split_fastq'
448

449
namespace {
450

451
size_t
452
isal_deflate_block(buffer& input_buffer,
453
                   buffer& output_buffer,
UNCOV
454
                   const size_t output_offset,
×
455
                   const bool eof)
456
{
457
  isal_zstream stream{};
458
  isal_deflate_stateless_init(&stream);
UNCOV
459

×
460
  stream.flush = FULL_FLUSH;
×
461
  stream.end_of_stream = eof;
UNCOV
462

×
463
  stream.level = ISAL_COMPRESSION_LEVEL;
×
464
  stream.level_buf_size = ISAL_BUFFER_SIZE;
465
  buffer level_buffer{ stream.level_buf_size };
×
466
  stream.level_buf = level_buffer.data();
×
UNCOV
467

×
468
  stream.avail_in = input_buffer.size();
×
469
  stream.next_in = input_buffer.data();
470
  stream.next_out = output_buffer.data() + output_offset;
×
471
  stream.avail_out = output_buffer.size() - output_offset;
×
UNCOV
472

×
473
  const auto ec = isal_deflate_stateless(&stream);
×
474
  switch (ec) {
475
    case COMP_OK:
×
476
      break;
×
477
    case INVALID_FLUSH:
×
478
      throw gzip_error("isal_deflate_stateless: invalid flush");
×
479
    case ISAL_INVALID_LEVEL:
×
480
      throw gzip_error("isal_deflate_stateless: invalid level");
×
481
    case ISAL_INVALID_LEVEL_BUF:
×
482
      throw gzip_error("isal_deflate_stateless: invalid buffer size");
×
483
    default: {
×
484
      std::ostringstream os;
×
485
      os << "isal_deflate_stateless: unknown error " << ec;
×
UNCOV
486

×
487
      throw gzip_error(os.str());
×
488
    }
UNCOV
489
  }
×
UNCOV
490

×
491
  // The easily compressible input should fit in a single output block
492
  AR_REQUIRE(stream.avail_in == 0);
493

494
  return stream.total_out;
×
495
}
UNCOV
496

×
497
} // namespace
498

499
gzip_split_fastq::gzip_split_fastq(const userconfig& config,
500
                                   const output_file& file,
501
                                   size_t next_step)
×
502
  : analytical_step(processing_order::unordered, "gzip_split_fastq")
503
  , m_config(config)
×
504
  , m_isal_stream(is_isal_streaming_enabled(file, config.compression_level))
505
  , m_format(file.format)
×
506
  , m_next_step(next_step)
×
UNCOV
507
{
×
UNCOV
508
}
×
509

510
chunk_vec
511
gzip_split_fastq::process(chunk_ptr chunk)
512
{
513
  AR_REQUIRE(chunk);
×
514
  AR_REQUIRE(chunk->buffers.size() == 1);
UNCOV
515

×
516
  buffer& input_buffer = chunk->buffers.front();
×
517
  buffer output_buffer;
UNCOV
518

×
519
  if (m_isal_stream) {
×
520
    output_buffer.resize(estimate_capacity(input_buffer.size(), false));
521
    const auto output_size =
×
522
      isal_deflate_block(input_buffer, output_buffer, 0, chunk->eof);
×
523
    output_buffer.resize(output_size);
×
UNCOV
524
  } else {
×
525
    if (m_format == output_format::ubam || m_config.compression_level == 0) {
×
526
      output_buffer.reserve(estimate_capacity(input_buffer.size(), chunk->eof));
527
      output_buffer.append(BGZF_HEADER);
×
528
      output_buffer.append_u8(1); // BFINAL=1, BTYPE=00; see RFC1951
×
529
      output_buffer.append_u16(input_buffer.size());
×
530
      output_buffer.append_u16(~input_buffer.size());
×
531
      output_buffer.append(input_buffer);
×
532
    } else if (m_config.compression_level == ISAL_COMPRESSION_LEVEL) {
×
533
      output_buffer.reserve(estimate_capacity(input_buffer.size(), chunk->eof));
×
534
      output_buffer.append(BGZF_HEADER);
×
535
      output_buffer.resize(output_buffer.capacity());
×
UNCOV
536

×
537
      const auto output_size = isal_deflate_block(input_buffer,
×
538
                                                  output_buffer,
UNCOV
539
                                                  BGZF_HEADER.size(),
×
540
                                                  true);
541

542
      // Resize the buffer to the actually used size
543
      output_buffer.resize(output_size + BGZF_HEADER.size());
544
    } else {
UNCOV
545
      // Libdeflate compression levels 1 to 12 are mapped onto 2 to 13
×
546
      AR_REQUIRE(m_config.compression_level >= 2 &&
547
                 m_config.compression_level <= 13);
548
      auto* compressor =
×
549
        libdeflate_alloc_compressor(m_config.compression_level - 1);
550
      const auto output_bound =
×
551
        libdeflate_deflate_compress_bound(compressor, input_buffer.size());
×
UNCOV
552

×
553
      output_buffer.reserve(estimate_capacity(output_bound, chunk->eof));
×
554
      output_buffer.append(BGZF_HEADER);
555
      output_buffer.resize(output_buffer.capacity());
×
UNCOV
556

×
557
      const auto output_size =
×
558
        libdeflate_deflate_compress(compressor,
559
                                    input_buffer.data(),
×
560
                                    input_buffer.size(),
×
561
                                    output_buffer.data() + BGZF_HEADER.size(),
×
562
                                    output_buffer.size() - BGZF_HEADER.size());
×
563
      libdeflate_free_compressor(compressor);
×
UNCOV
564
      // The easily compressible input should fit in a single output block
×
565
      AR_REQUIRE(output_size);
×
566

UNCOV
567
      // Resize the buffer to the actually used size
×
568
      output_buffer.resize(output_size + BGZF_HEADER.size());
569
    }
UNCOV
570

×
571
    const auto input_crc32 =
572
      libdeflate_crc32(0, input_buffer.data(), input_buffer.size());
573
    output_buffer.append_u32(input_crc32);         // checksum of data
×
574
    output_buffer.append_u32(input_buffer.size()); // size of data
×
UNCOV
575

×
576
    AR_REQUIRE(output_buffer.size() <= BGZF_MAX_BLOCK_SIZE);
×
577
    // Write the final block size; -1 to fit 65536 in 16 bit
578
    output_buffer.put_u16(16, output_buffer.size() - 1);
×
579

580
    if (chunk->eof) {
×
581
      output_buffer.append(BGZF_EOF);
UNCOV
582
    }
×
UNCOV
583
  }
×
584

585
  // Enable reuse of the analytical_chunks
586
  std::swap(input_buffer, output_buffer);
587

588
  chunk_vec chunks;
×
589
  chunks.emplace_back(m_next_step, std::move(chunk));
UNCOV
590

×
591
  return chunks;
×
592
}
UNCOV
593

×
594
///////////////////////////////////////////////////////////////////////////////
595
// Implementations for 'write_fastq'
596

597
write_fastq::write_fastq(const userconfig& config, const output_file& file)
598
  // Allow disk IO and writing to STDOUT at the same time
UNCOV
599
  : analytical_step(processing_order::ordered_io, "write_fastq")
×
600
  , m_output(file.name)
601
  , m_isal_stream(is_isal_streaming_enabled(file, config.compression_level))
UNCOV
602
{
×
603
  if (m_isal_stream) {
×
604
    buffer level_buf(ISAL_BUFFER_SIZE);
605
    buffer output_buf(OUTPUT_BLOCK_SIZE);
×
UNCOV
606

×
607
    struct isal_zstream stream = {};
×
608
    struct isal_gzip_header header = {};
UNCOV
609

×
610
    isal_gzip_header_init(&header);
×
611
    isal_deflate_init(&stream);
612
    stream.avail_in = 0;
×
613
    stream.flush = NO_FLUSH;
×
614
    stream.level = ISAL_COMPRESSION_LEVEL;
×
615
    stream.level_buf = level_buf.data();
×
616
    stream.level_buf_size = level_buf.size();
×
617
    stream.gzip_flag = IGZIP_GZIP_NO_HDR;
×
618
    stream.next_out = output_buf.data();
×
619
    stream.avail_out = output_buf.size();
×
UNCOV
620

×
621
    const auto ret = isal_write_gzip_header(&stream, &header);
×
622
    AR_REQUIRE(ret == 0, "buffer was not large enough for gzip header");
UNCOV
623

×
624
    output_buf.resize(stream.total_out);
×
625

626
    m_output.write(output_buf);
×
627
  }
UNCOV
628
}
×
629

630
chunk_vec
631
write_fastq::process(chunk_ptr chunk)
632
{
633
  AR_REQUIRE(chunk);
×
634
  AR_REQUIRE_SINGLE_THREAD(m_lock);
635
  AR_REQUIRE(!m_eof);
×
UNCOV
636

×
637
  try {
×
638
    m_eof = chunk->eof;
639
    m_uncompressed_bytes += chunk->uncompressed_size;
×
UNCOV
640

×
641
    const auto mode = (m_eof && !m_isal_stream) ? flush::on : flush::off;
×
642

643
    m_output.write(chunk->buffers, mode);
×
644

645
    if (m_eof && m_isal_stream) {
×
646
      buffer trailer;
647
      trailer.append_u32(chunk->crc32);
×
648
      trailer.append_u32(m_uncompressed_bytes);
×
UNCOV
649

×
650
      m_output.write(trailer, flush::on);
×
651
    }
652
  } catch (const io_error&) {
×
653
    std::ostringstream msg;
654
    msg << "Error writing to FASTQ file " << shell_escape(m_output.filename());
×
UNCOV
655

×
656
    throw io_error(msg.str(), errno);
×
657
  }
UNCOV
658

×
659
  return {};
×
660
}
UNCOV
661

×
662
void
663
write_fastq::finalize()
664
{
665
  AR_REQUIRE_SINGLE_THREAD(m_lock);
×
666
  AR_REQUIRE(m_eof);
UNCOV
667

×
UNCOV
668
  // Close file to trigger any exceptions due to badbit / failbit
×
669
  try {
670
    m_output.close();
671
  } catch (const io_error&) {
×
672
    std::ostringstream msg;
×
673
    msg << "Error closing FASTQ file " << shell_escape(m_output.filename());
×
UNCOV
674

×
675
    throw io_error(msg.str(), errno);
×
676
  }
UNCOV
677
}
×
UNCOV
678

×
679
} // namespace adapterremoval
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc