• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MikkelSchubert / adapterremoval / #81

07 Apr 2025 10:01PM UTC coverage: 27.335% (-0.5%) from 27.836%
#81

push

travis-ci

web-flow
fixes/improvements to duplicate statistics (#106)

* fix duplication statistics gathering and add minimal regression tests
* add duplicate statistics plots to html report
* fixed wrongly escaped legend settings for this and other plots
* add implicit argument to `--report-duplication`
* support k/m/g suffixes for `--report-duplication` arguments

0 of 82 new or added lines in 6 files covered. (0.0%)

548 existing lines in 5 files now uncovered.

2719 of 9947 relevant lines covered (27.33%)

4053.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/fastq_io.cpp
1
// SPDX-License-Identifier: GPL-3.0-or-later
2
// SPDX-FileCopyrightText: 2015 Mikkel Schubert <mikkelsch@gmail.com>
3
#include "fastq_io.hpp"      // declarations
4
#include "debug.hpp"         // for AR_REQUIRE, AR_REQUIRE_SINGLE_THREAD
5
#include "errors.hpp"        // for io_error, gzip_error, fastq_error
6
#include "fastq.hpp"         // for fastq
7
#include "fastq_enc.hpp"     // for MATE_SEPARATOR
8
#include "output.hpp"        // for output_file
9
#include "simd.hpp"          // for size_t
10
#include "statistics.hpp"    // for fastq_statistics, fastq_stats_ptr, stat...
11
#include "strutils.hpp"      // for shell_escape, string_vec, ends_with
12
#include "userconfig.hpp"    // for userconfig
13
#include "utilities.hpp"     // for prng_seed
14
#include <algorithm>         // for max, min
15
#include <cerrno>            // for errno
16
#include <cstring>           // for size_t, memcpy
17
#include <isa-l/crc.h>       // for crc32_gzip_refl
18
#include <isa-l/igzip_lib.h> // for isal_zstream, isal_deflate_init, isal_d...
19
#include <libdeflate.h>      // for libdeflate_alloc_compressor, libdeflate...
20
#include <memory>            // for unique_ptr, make_unique, __shared_ptr_a...
21
#include <sstream>           // for basic_ostream, basic_ostringstream, ope...
22
#include <utility>           // for move, swap
23

24
namespace adapterremoval {
25

26
// Default bgzip header, as described in the SAM spec. v1.6 section 4.1.
27
// Includes 2 trailing placeholder bytes for total block size (BSIZE)
28
constexpr std::string_view BGZF_HEADER = {
29
  "\37\213\10\4\0\0\0\0\0\377\6\0\102\103\2\0\0\0",
30
  18
31
};
32

33
// Eof of file marker for bgzip files; see SAM spec. v1.6 section 4.1.2
34
constexpr std::string_view BGZF_EOF = {
35
  "\37\213\10\4\0\0\0\0\0\377\6\0\102\103\2\0\33\0\3\0\0\0\0\0\0\0\0\0",
36
  28,
37
};
38

39
//! Roughly how much extra space is needed for headers, CRC32, and ISIZE
40
constexpr size_t BGZF_META = BGZF_HEADER.size() + 4 + 4;
41

42
////////////////////////////////////////////////////////////////////////////////
43
// Helper function for isa-l
44

45
namespace {
46

47
//! The compression level used for block/stream compression with isa-l
48
constexpr size_t ISAL_COMPRESSION_LEVEL = 1;
49
//! The default buffer size for compression at level ISAL_COMPRESSION_LEVEL
50
constexpr size_t ISAL_BUFFER_SIZE = ISAL_DEF_LVL1_SMALL;
51

52
/**
53
 * ISA-l streaming is enabled only at compression level 1, since little
54
 * difference was observed between levels 1 to 3. However, it still offers a
55
 * faster compression (with a lower ratio) than libdeflate level 1.
56
 */
57
bool
58
is_isal_streaming_enabled(const output_file file, unsigned compression_level)
×
59
{
60
  switch (file.format) {
×
61
    case output_format::fastq:
62
    case output_format::sam:
63
    case output_format::bam:
64
    case output_format::ubam:
65
      return false;
66
    case output_format::fastq_gzip:
×
67
    case output_format::sam_gzip:
×
68
      return compression_level == ISAL_COMPRESSION_LEVEL;
×
69
    default:
×
70
      AR_FAIL("invalid output format");
×
71
  }
72
}
73

74
} // namespace
75

76
///////////////////////////////////////////////////////////////////////////////
77
// Implementations for 'read_fastq'
78

79
enum class read_fastq::file_type
80
{
81
  read_1,
82
  read_2,
83
  interleaved
84
};
85

86
namespace {
87

88
bool
89
read_record(joined_line_readers& reader, fastq_vec& chunk)
×
90
{
91
  // Line numbers change as we attempt to read the record, and potentially
92
  // points to the next record in the case of invalid qualities/nucleotides
93
  const auto line_number = reader.linenumber();
×
94

95
  try {
×
96
    chunk.emplace_back();
×
97
    auto& record = chunk.back();
×
98

99
    if (record.read_unsafe(reader)) {
×
100
      return true;
101
    } else {
102
      chunk.pop_back();
×
103
      return false;
×
104
    }
105
  } catch (const fastq_error& error) {
×
106
    std::ostringstream stream;
×
107
    stream << "Error reading FASTQ record from '" << reader.filename()
×
108
           << "' at line " << line_number << "; aborting:\n"
×
109
           << indent_lines(error.what());
×
110

111
    throw fastq_error(stream.str());
×
112
  }
×
113
}
114

115
const string_vec&
116
select_filenames(const userconfig& config, const read_fastq::file_type mode)
×
117
{
118
  switch (mode) {
×
119
    case read_fastq::file_type::read_1:
×
120
    case read_fastq::file_type::interleaved:
×
121
      return config.input_files_1;
×
122
    case read_fastq::file_type::read_2:
×
123
      return config.input_files_2;
×
124
    default:
×
125
      AR_FAIL("invalid read_fastq::file_type value");
×
126
  }
127
}
128

129
} // namespace
130

131
read_fastq::read_fastq(const userconfig& config,
×
132
                       const size_t next_step,
133
                       const read_fastq::file_type mode,
NEW
134
                       statistics& stats)
×
135
  : analytical_step(processing_order::ordered, "read_fastq")
136
  , m_reader(select_filenames(config, mode))
×
137
  , m_next_step(next_step)
×
138
  , m_mode(mode)
×
139
  , m_head(config.head)
×
140
  , m_mate_separator(config.mate_separator)
×
141
  , m_mate_separator_identified(config.mate_separator)
×
NEW
142
  , m_duplication_1(stats.duplication_1)
×
NEW
143
  , m_duplication_2(stats.duplication_2)
×
144
{
NEW
145
  AR_REQUIRE(m_duplication_1 && m_duplication_2);
×
146
}
147

148
void
149
read_fastq::add_steps(scheduler& sch,
×
150
                      const userconfig& config,
151
                      size_t next_step,
152
                      statistics& stats)
153
{
NEW
154
  const auto add_step = [&sch, &config, &stats](auto next_step, auto type) {
×
NEW
155
    return sch.add<read_fastq>(config, next_step, type, stats);
×
156
  };
157

158
  if (config.interleaved_input) {
×
NEW
159
    add_step(next_step, read_fastq::file_type::interleaved);
×
160
  } else if (config.paired_ended_mode) {
×
NEW
161
    next_step = add_step(next_step, read_fastq::file_type::read_2);
×
NEW
162
    add_step(next_step, read_fastq::file_type::read_1);
×
163
  } else {
NEW
164
    add_step(next_step, read_fastq::file_type::read_1);
×
165
  }
166
}
167

168
chunk_vec
169
read_fastq::process(chunk_ptr chunk)
×
170
{
171
  AR_REQUIRE_SINGLE_THREAD(m_lock);
×
172

173
  if (m_mode == file_type::read_1 || m_mode == file_type::interleaved) {
×
174
    // The scheduler only terminates when the first step stops returning chunks
175
    if (m_eof) {
×
176
      return {};
×
177
    }
178

179
    chunk = std::make_unique<analytical_chunk>();
×
180
  } else {
181
    AR_REQUIRE(!m_eof && chunk);
×
182
  }
183

184
  auto& reads_1 = chunk->reads_1;
×
185
  auto& reads_2 = chunk->reads_2;
×
186

187
  if (m_mode == file_type::read_1 || m_mode == file_type::interleaved) {
×
188
    if (m_mode == file_type::read_1) {
×
NEW
189
      read_single_end(reads_1, *m_duplication_1);
×
190
    } else {
191
      read_interleaved(reads_1, reads_2);
×
192
    }
193
  } else if (m_mode == file_type::read_2) {
×
NEW
194
    read_single_end(reads_2, *m_duplication_2);
×
195
  } else {
196
    AR_FAIL("invalid file_type value");
×
197
  }
198

199
  if (m_mode != file_type::read_1) {
×
200
    if (reads_1.size() != reads_2.size()) {
×
201
      throw fastq_error("Found unequal number of mate 1 and mate 2 reads; "
×
202
                        "input files may be truncated. Please fix before "
203
                        "continuing.");
×
204
    } else if (!m_mate_separator_identified) {
×
205
      AR_REQUIRE(reads_1.size() == reads_2.size());
×
206
      // Mate separators are identified using the first block, in order to
207
      // reduce the need for locking in the post-processing step
208

209
      // Attempt to determine the mate separator character
210
      m_mate_separator = fastq::guess_mate_separator(reads_1, reads_2);
×
211
      m_mate_separator_identified = true;
×
212
    }
213
  }
214

215
  // Head must be checked after the first loop, to produce at least one chunk
216
  m_eof |= !m_head;
×
217
  chunk->eof = m_eof;
×
218
  chunk->mate_separator = m_mate_separator;
×
219
  chunk->first = m_first;
×
220
  m_first = false;
×
221

222
  chunk_vec chunks;
×
223
  chunks.emplace_back(m_next_step, std::move(chunk));
×
224
  return chunks;
×
225
}
226

227
void
NEW
228
read_fastq::read_single_end(fastq_vec& reads, duplication_statistics& stats)
×
229
{
230
  for (; reads.size() < INPUT_READS && m_head && !m_eof; m_head--) {
×
NEW
231
    if (read_record(m_reader, reads)) {
×
NEW
232
      stats.process(reads.back());
×
233
    } else {
NEW
234
      m_eof = true;
×
235
    }
236
  }
237
}
238

239
void
240
read_fastq::read_interleaved(fastq_vec& reads_1, fastq_vec& reads_2)
×
241
{
242
  for (; reads_1.size() < INPUT_READS && m_head && !m_eof; m_head--) {
×
NEW
243
    if (read_record(m_reader, reads_1)) {
×
NEW
244
      m_duplication_1->process(reads_1.back());
×
245
    } else {
NEW
246
      m_eof = true;
×
NEW
247
      break;
×
248
    }
249

NEW
250
    if (read_record(m_reader, reads_2)) {
×
NEW
251
      m_duplication_2->process(reads_2.back());
×
252
    }
253
  }
254
}
255

256
void
257
read_fastq::finalize()
×
258
{
259
  AR_REQUIRE_SINGLE_THREAD(m_lock);
×
260
  AR_REQUIRE(m_eof);
×
261
}
262

263
///////////////////////////////////////////////////////////////////////////////
264
// Implementations for 'post_process_fastq'
265

266
post_process_fastq::post_process_fastq(const userconfig& config,
×
267
                                       size_t next_step,
268
                                       statistics& stats)
×
269
  : analytical_step(processing_order::unordered, "post_process_fastq")
270
  , m_statistics_1(stats.input_1)
×
271
  , m_statistics_2(stats.input_2)
×
272
  , m_next_step(next_step)
×
273
  , m_encoding(config.io_encoding)
×
274
  , m_timer(config.log_progress)
×
275
{
276
  AR_REQUIRE(m_statistics_1 && m_statistics_2);
×
277

278
  for (size_t i = 0; i < config.max_threads; ++i) {
×
279
    m_stats.emplace_back(config.report_sample_rate, prng_seed());
×
280
  }
281
}
282

283
chunk_vec
284
post_process_fastq::process(chunk_ptr chunk)
×
285
{
286
  AR_REQUIRE(chunk);
×
287
  auto& reads_1 = chunk->reads_1;
×
288
  auto& reads_2 = chunk->reads_2;
×
289

290
  auto stats = m_stats.acquire();
×
291

292
  AR_REQUIRE((reads_1.size() == reads_2.size()) || reads_2.empty());
×
293
  if (reads_2.empty()) {
×
294
    for (auto& read_1 : reads_1) {
×
295
      read_1.post_process(m_encoding);
×
296
      stats->stats_1.process(read_1);
×
297
    }
298
  } else {
299
    auto it_1 = reads_1.begin();
×
300
    auto it_2 = reads_2.begin();
×
301
    for (; it_1 != reads_1.end(); ++it_1, ++it_2) {
×
302
      fastq::normalize_paired_reads(*it_1, *it_2, chunk->mate_separator);
×
303

304
      it_1->post_process(m_encoding);
×
305
      stats->stats_1.process(*it_1);
×
306

307
      it_2->post_process(m_encoding);
×
308
      stats->stats_2.process(*it_2);
×
309
    }
310

311
    // fastq::normalize_paired_reads replaces the mate separator if present
312
    if (chunk->mate_separator) {
×
313
      chunk->mate_separator = MATE_SEPARATOR;
×
314
    }
315
  }
316

317
  m_stats.release(stats);
×
318

319
  {
×
320
    std::unique_lock<std::mutex> lock(m_timer_lock);
×
321
    m_timer.increment(reads_1.size() + reads_2.size());
×
322
  }
323

324
  chunk_vec chunks;
×
325
  chunks.emplace_back(m_next_step, std::move(chunk));
×
326

327
  return chunks;
×
328
}
329

330
void
331
post_process_fastq::finalize()
×
332
{
333
  AR_REQUIRE_SINGLE_THREAD(m_timer_lock);
×
334

335
  while (auto it = m_stats.try_acquire()) {
×
336
    *m_statistics_1 += it->stats_1;
×
337
    *m_statistics_2 += it->stats_2;
×
338
  }
339

340
  m_timer.finalize();
×
341
}
342

343
///////////////////////////////////////////////////////////////////////////////
344
// Implementations for 'split_fastq'
345

346
split_fastq::split_fastq(const userconfig& config,
×
347
                         const output_file& file,
348
                         size_t next_step)
×
349
  : analytical_step(processing_order::ordered, "split_fastq")
350
  , m_next_step(next_step)
×
351
  , m_isal_stream(is_isal_streaming_enabled(file, config.compression_level))
×
352
{
353
}
354

355
void
356
split_fastq::finalize()
×
357
{
358
  AR_REQUIRE_SINGLE_THREAD(m_lock);
×
359

360
  AR_REQUIRE(m_eof);
×
361
  AR_REQUIRE(m_buffer.capacity() == 0);
×
362
}
363

364
chunk_vec
365
split_fastq::process(const chunk_ptr chunk)
×
366
{
367
  AR_REQUIRE(chunk);
×
368
  AR_REQUIRE_SINGLE_THREAD(m_lock);
×
369
  AR_REQUIRE(!m_eof);
×
370
  m_eof = chunk->eof;
×
371

372
  chunk_vec chunks;
×
373
  for (const auto& src : chunk->buffers) {
×
374
    for (size_t src_offset = 0; src_offset < src.size();) {
×
375
      const auto n =
×
376
        std::min(src.size() - src_offset, BGZF_BLOCK_SIZE - m_buffer.size());
×
377
      m_buffer.append(src.data() + src_offset, n);
×
378

379
      src_offset += n;
×
380

381
      if (m_buffer.size() == BGZF_BLOCK_SIZE) {
×
382
        auto block = std::make_unique<analytical_chunk>();
×
383

384
        if (m_isal_stream) {
×
385
          m_isal_crc32 =
×
386
            crc32_gzip_refl(m_isal_crc32, m_buffer.data(), m_buffer.size());
×
387
        }
388

389
        block->uncompressed_size = m_buffer.size();
×
390
        block->buffers.emplace_back(std::move(m_buffer));
×
391

392
        chunks.emplace_back(m_next_step, std::move(block));
×
393

394
        m_buffer = buffer();
×
395
        m_buffer.reserve(BGZF_BLOCK_SIZE);
×
396
      }
397
    }
398
  }
399

400
  if (m_eof) {
×
401
    auto block = std::make_unique<analytical_chunk>();
×
402
    block->eof = true;
×
403

404
    if (m_isal_stream) {
×
405
      m_isal_crc32 =
×
406
        crc32_gzip_refl(m_isal_crc32, m_buffer.data(), m_buffer.size());
×
407
    }
408

409
    block->crc32 = m_isal_crc32;
×
410
    block->uncompressed_size = m_buffer.size();
×
411
    block->buffers.emplace_back(std::move(m_buffer));
×
412

413
    chunks.emplace_back(m_next_step, std::move(block));
×
414
  }
415

416
  return chunks;
×
417
}
418

419
///////////////////////////////////////////////////////////////////////////////
420
// Implementations for 'gzip_split_fastq'
421

422
namespace {
423

424
size_t
425
isal_deflate_block(buffer& input_buffer,
×
426
                   buffer& output_buffer,
427
                   const size_t output_offset,
428
                   const bool eof)
429
{
430
  isal_zstream stream{};
×
431
  isal_deflate_stateless_init(&stream);
×
432

433
  stream.flush = FULL_FLUSH;
×
434
  stream.end_of_stream = eof;
×
435

436
  stream.level = ISAL_COMPRESSION_LEVEL;
×
437
  stream.level_buf_size = ISAL_BUFFER_SIZE;
×
438
  buffer level_buffer{ stream.level_buf_size };
×
439
  stream.level_buf = level_buffer.data();
×
440

441
  stream.avail_in = input_buffer.size();
×
442
  stream.next_in = input_buffer.data();
×
443
  stream.next_out = output_buffer.data() + output_offset;
×
444
  stream.avail_out = output_buffer.size() - output_offset;
×
445

446
  switch (isal_deflate_stateless(&stream)) {
×
447
    case COMP_OK:
×
448
      break;
×
449
    case INVALID_FLUSH:
×
450
      throw gzip_error("isal_deflate_stateless: invalid flush");
×
451
    case ISAL_INVALID_LEVEL:
×
452
      throw gzip_error("isal_deflate_stateless: invalid level");
×
453
    case ISAL_INVALID_LEVEL_BUF:
×
454
      throw gzip_error("isal_deflate_stateless: invalid buffer size");
×
455
    default:
×
456
      throw gzip_error("isal_deflate_stateless: unexpected error");
×
457
  }
458

459
  // The easily compressible input should fit in a single output block
460
  AR_REQUIRE(stream.avail_in == 0);
×
461

462
  return stream.total_out;
×
463
}
464

465
} // namespace
466

467
gzip_split_fastq::gzip_split_fastq(const userconfig& config,
×
468
                                   const output_file& file,
469
                                   size_t next_step)
×
470
  : analytical_step(processing_order::unordered, "gzip_split_fastq")
471
  , m_config(config)
×
472
  , m_isal_stream(is_isal_streaming_enabled(file, config.compression_level))
×
473
  , m_format(file.format)
×
474
  , m_next_step(next_step)
×
475
{
476
}
477

478
chunk_vec
479
gzip_split_fastq::process(chunk_ptr chunk)
×
480
{
481
  AR_REQUIRE(chunk);
×
482
  AR_REQUIRE(chunk->buffers.size() == 1);
×
483

484
  buffer& input_buffer = chunk->buffers.front();
×
485
  buffer output_buffer;
×
486

487
  if (m_isal_stream) {
×
488
    output_buffer.resize(input_buffer.size());
×
489
    const auto output_size =
×
490
      isal_deflate_block(input_buffer, output_buffer, 0, chunk->eof);
×
491
    output_buffer.resize(output_size);
×
492
  } else {
493
    if (m_format == output_format::ubam || m_config.compression_level == 0) {
×
494
      output_buffer.reserve(input_buffer.size() + BGZF_META);
×
495
      output_buffer.append(BGZF_HEADER);
×
496
      output_buffer.append_u8(1); // BFINAL=1, BTYPE=00; see RFC1951
×
497
      output_buffer.append_u16(input_buffer.size());
×
498
      output_buffer.append_u16(~input_buffer.size());
×
499
      output_buffer.append(input_buffer);
×
500
    } else if (m_config.compression_level == ISAL_COMPRESSION_LEVEL) {
×
501
      output_buffer.reserve(input_buffer.size());
×
502
      output_buffer.append(BGZF_HEADER);
×
503
      output_buffer.resize(output_buffer.capacity());
×
504

505
      const auto output_size = isal_deflate_block(input_buffer,
×
506
                                                  output_buffer,
507
                                                  BGZF_HEADER.size(),
508
                                                  true);
509

510
      // Resize the buffer to the actually used size
511
      output_buffer.resize(output_size + BGZF_HEADER.size());
×
512
    } else {
513
      // Libdeflate compression levels 1 to 12 are mapped onto 2 to 13
514
      AR_REQUIRE(m_config.compression_level >= 2 &&
×
515
                 m_config.compression_level <= 13);
516
      auto* compressor =
×
517
        libdeflate_alloc_compressor(m_config.compression_level - 1);
×
518
      const auto output_bound =
×
519
        libdeflate_deflate_compress_bound(compressor, input_buffer.size());
×
520

521
      output_buffer.reserve(output_bound + BGZF_META);
×
522
      output_buffer.append(BGZF_HEADER);
×
523
      output_buffer.resize(output_buffer.capacity());
×
524

525
      const auto output_size =
×
526
        libdeflate_deflate_compress(compressor,
×
527
                                    input_buffer.data(),
×
528
                                    input_buffer.size(),
×
529
                                    output_buffer.data() + BGZF_HEADER.size(),
×
530
                                    output_buffer.size() - BGZF_HEADER.size());
×
531
      libdeflate_free_compressor(compressor);
×
532
      // The easily compressible input should fit in a single output block
533
      AR_REQUIRE(output_size);
×
534

535
      // Resize the buffer to the actually used size
536
      output_buffer.resize(output_size + BGZF_HEADER.size());
×
537
    }
538

539
    const auto input_crc32 =
×
540
      libdeflate_crc32(0, input_buffer.data(), input_buffer.size());
×
541
    output_buffer.append_u32(input_crc32);         // checksum of data
×
542
    output_buffer.append_u32(input_buffer.size()); // size of data
×
543

544
    AR_REQUIRE(output_buffer.size() <= BGZF_MAX_BLOCK_SIZE);
×
545
    // Write the final block size; -1 to fit 65536 in 16 bit
546
    output_buffer.put_u16(16, output_buffer.size() - 1);
×
547

548
    if (chunk->eof) {
×
549
      output_buffer.append(BGZF_EOF);
×
550
    }
551
  }
552

553
  // Enable reuse of the analytical_chunks
554
  std::swap(input_buffer, output_buffer);
×
555

556
  chunk_vec chunks;
×
557
  chunks.emplace_back(m_next_step, std::move(chunk));
×
558

559
  return chunks;
×
560
}
561

562
///////////////////////////////////////////////////////////////////////////////
563
// Implementations for 'write_fastq'
564

565
write_fastq::write_fastq(const userconfig& config, const output_file& file)
×
566
  // Allow disk IO and writing to STDOUT at the same time
567
  : analytical_step(processing_order::ordered_io, "write_fastq")
568
  , m_output(file.name)
×
569
  , m_isal_stream(is_isal_streaming_enabled(file, config.compression_level))
×
570
{
571
  if (m_isal_stream) {
×
572
    buffer level_buf(ISAL_BUFFER_SIZE);
×
573
    buffer output_buf(OUTPUT_BLOCK_SIZE);
×
574

575
    struct isal_zstream stream = {};
×
576
    struct isal_gzip_header header = {};
×
577

578
    isal_gzip_header_init(&header);
×
579
    isal_deflate_init(&stream);
×
580
    stream.avail_in = 0;
×
581
    stream.flush = NO_FLUSH;
×
582
    stream.level = ISAL_COMPRESSION_LEVEL;
×
583
    stream.level_buf = level_buf.data();
×
584
    stream.level_buf_size = level_buf.size();
×
585
    stream.gzip_flag = IGZIP_GZIP_NO_HDR;
×
586
    stream.next_out = output_buf.data();
×
587
    stream.avail_out = output_buf.size();
×
588

589
    const auto ret = isal_write_gzip_header(&stream, &header);
×
590
    AR_REQUIRE(ret == 0, "buffer was not large enough for gzip header");
×
591

592
    output_buf.resize(stream.total_out);
×
593

594
    m_output.write(output_buf);
×
595
  }
596
}
597

598
chunk_vec
599
write_fastq::process(chunk_ptr chunk)
×
600
{
601
  AR_REQUIRE(chunk);
×
602
  AR_REQUIRE_SINGLE_THREAD(m_lock);
×
603
  AR_REQUIRE(!m_eof);
×
604

605
  try {
×
606
    m_eof = chunk->eof;
×
607
    m_uncompressed_bytes += chunk->uncompressed_size;
×
608

609
    const auto mode = (m_eof && !m_isal_stream) ? flush::on : flush::off;
×
610

611
    m_output.write(chunk->buffers, mode);
×
612

613
    if (m_eof && m_isal_stream) {
×
614
      buffer trailer;
×
615
      trailer.append_u32(chunk->crc32);
×
616
      trailer.append_u32(m_uncompressed_bytes);
×
617

618
      m_output.write(trailer, flush::on);
×
619
    }
620
  } catch (const std::ios_base::failure&) {
×
621
    std::ostringstream msg;
×
622
    msg << "Error writing to FASTQ file " << shell_escape(m_output.filename());
×
623

624
    throw io_error(msg.str(), errno);
×
625
  }
×
626

627
  return {};
×
628
}
629

630
void
631
write_fastq::finalize()
×
632
{
633
  AR_REQUIRE_SINGLE_THREAD(m_lock);
×
634
  AR_REQUIRE(m_eof);
×
635

636
  // Close file to trigger any exceptions due to badbit / failbit
637
  try {
×
638
    m_output.close();
×
639
  } catch (const std::ios_base::failure&) {
×
640
    std::ostringstream msg;
×
641
    msg << "Error closing FASTQ file " << shell_escape(m_output.filename());
×
642

643
    throw io_error(msg.str(), errno);
×
644
  }
×
645
}
646

647
} // namespace adapterremoval
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc