• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MikkelSchubert / adapterremoval / #38

07 Aug 2024 07:47PM UTC coverage: 83.365% (-3.5%) from 86.839%
#38

push

travis-ci

MikkelSchubert
additional tests

2190 of 2627 relevant lines covered (83.37%)

15528.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.07
/src/managed_io.cpp
1
/*************************************************************************\
2
 * AdapterRemoval - cleaning next-generation sequencing reads            *
3
 *                                                                       *
4
 * Copyright (C) 2015 by Mikkel Schubert - mikkelsch@gmail.com           *
5
 *                                                                       *
6
 * This program is free software: you can redistribute it and/or modify  *
7
 * it under the terms of the GNU General Public License as published by  *
8
 * the Free Software Foundation, either version 3 of the License, or     *
9
 * (at your option) any later version.                                   *
10
 *                                                                       *
11
 * This program is distributed in the hope that it will be useful,       *
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of        *
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
14
 * GNU General Public License for more details.                          *
15
 *                                                                       *
16
 * You should have received a copy of the GNU General Public License     *
17
 * along with this program.  If not, see <http://www.gnu.org/licenses/>. *
18
\*************************************************************************/
19
#include "managed_io.hpp" // declarations
20
#include "debug.hpp"      // for AR_REQUIRE
21
#include "errors.hpp"     // for io_error
22
#include "logging.hpp"    // for log::warn
23
#include "strutils.hpp"   // for log_escape
24
#include <algorithm>      // for any_of
25
#include <cerrno>         // for EMFILE, errno
26
#include <cstddef>        // for size_t
27
#include <cstdio>         // for fopen, fread, fwrite, ...
28
#include <fcntl.h>        // for posix_fadvise
29
#include <mutex>          // for mutex, lock_guard
30
#include <string>         // for string
31
#include <sys/stat.h>     // for fstat
32

33
namespace adapterremoval {
34

35
class io_manager
36
{
37
public:
38
  static void open(managed_reader* reader)
1✔
39
  {
40
    if (reader->m_file) {
1✔
41
      return;
42
    } else if (reader->filename() == "/dev/stdin") {
3✔
43
      reader->m_file = stdin;
×
44
    } else if (reader->filename() != "-") {
3✔
45
      reader->m_file = io_manager::fopen(reader->filename(), "rb");
2✔
46

47
#if _XOPEN_SOURCE >= 600 || _POSIX_C_SOURCE >= 200112L
48
      posix_fadvise(fileno(reader->m_file), 0, 0, POSIX_FADV_WILLNEED);
×
49
      posix_fadvise(fileno(reader->m_file), 0, 0, POSIX_FADV_SEQUENTIAL);
×
50
#endif
51
    } else {
52
      // Merged I/O depends on filenames being identical
53
      AR_FAIL("unhandled STDIN marker");
×
54
    }
55
  }
56

57
  static void open(managed_writer* writer)
58
  {
59
    if (writer->m_file) {
×
60
      return;
61
    } else if (writer->filename() == "/dev/stdout") {
×
62
      writer->m_file = stdout;
×
63
    } else if (writer->filename() == "/dev/stderr") {
×
64
      // Not sure why anyone would do this, but ¯\_(ツ)_/¯
65
      writer->m_file = stderr;
×
66
    } else if (writer->filename() != "-") {
×
67
      writer->m_file =
×
68
        io_manager::fopen(writer->filename(), writer->m_created ? "ab" : "wb");
×
69
    } else {
70
      // Merged I/O depends on filenames being identical
71
      AR_FAIL("unhandled STDOUT marker");
×
72
    }
73

74
    writer->m_created = true;
×
75
    writer->m_stream =
×
76
      io_manager::is_stream(writer->filename(), writer->m_file);
×
77
  }
78

79
  /** Adds writer to list of inactive writers */
80
  static void add(managed_writer* writer)
81
  {
82
    std::lock_guard<std::mutex> lock(m_lock);
×
83

84
    AR_REQUIRE(!writer->m_prev);
×
85
    AR_REQUIRE(!writer->m_next);
×
86
    AR_REQUIRE(!m_head == !m_tail);
×
87
    if (m_head) {
×
88
      writer->m_next = m_head;
×
89
      m_head->m_prev = writer;
×
90
    }
91

92
    m_head = writer;
×
93

94
    if (!m_tail) {
×
95
      m_tail = writer;
×
96
    }
97

98
    AR_REQUIRE(m_head && m_tail);
×
99
    AR_REQUIRE(!m_head->m_prev);
×
100
    AR_REQUIRE(!m_tail->m_next);
×
101
  }
102

103
  /* Removes the writer from the list of inactive writers */
104
  static void remove(managed_writer* writer)
105
  {
106
    std::lock_guard<std::mutex> lock(m_lock);
×
107

108
    AR_REQUIRE(!m_head == !m_tail);
×
109
    AR_REQUIRE(!m_head || !m_head->m_prev);
×
110
    AR_REQUIRE(!m_tail || !m_tail->m_next);
×
111

112
    if (writer == m_head) {
×
113
      m_head = writer->m_next;
×
114
    }
115

116
    if (writer == m_tail) {
×
117
      m_tail = writer->m_prev;
×
118
    }
119

120
    AR_REQUIRE(!m_head == !m_tail);
×
121

122
    if (writer->m_prev) {
×
123
      writer->m_prev->m_next = writer->m_next;
×
124
    }
125

126
    if (writer->m_next) {
×
127
      writer->m_next->m_prev = writer->m_prev;
×
128
    }
129

130
    writer->m_prev = nullptr;
×
131
    writer->m_next = nullptr;
×
132

133
    AR_REQUIRE(writer != m_head);
×
134
    AR_REQUIRE(writer != m_tail);
×
135
    AR_REQUIRE(!writer->m_prev);
×
136
    AR_REQUIRE(!writer->m_next);
×
137
    AR_REQUIRE(!m_head || !m_head->m_prev);
×
138
    AR_REQUIRE(!m_tail || !m_tail->m_next);
×
139
  }
140

141
  io_manager() = delete;
142
  ~io_manager() = delete;
143
  io_manager(const io_manager&) = delete;
144
  io_manager(io_manager&&) = delete;
145
  io_manager& operator=(const io_manager&) = delete;
146
  io_manager& operator=(io_manager&&) = delete;
147

148
private:
149
  static FILE* fopen(const std::string& filename, const char* mode)
1✔
150
  {
151
    while (true) {
1✔
152
      FILE* handle = ::fopen(filename.c_str(), mode);
2✔
153

154
      if (handle) {
1✔
155
        AR_REQUIRE(!::ferror(handle));
×
156
        return handle;
×
157
      } else if (errno == EMFILE) {
1✔
158
        close_one();
×
159
      } else {
160
        throw io_error("failed to open file", errno);
2✔
161
      }
162
    }
163
  }
164

165
  static bool is_stream(const std::string& filename, FILE* handle)
166
  {
167
    if (handle == stdin || handle == stdout || handle == stderr) {
×
168
      return true;
169
    }
170

171
    struct stat statbuf = {};
×
172
    if (fstat(fileno(handle), &statbuf) == 0) {
×
173
      return S_ISFIFO(statbuf.st_mode);
×
174
    }
175

176
    log::warn() << "Could not fstat " << log_escape(filename);
×
177

178
    // Assumed to be a stream to be safe
179
    return true;
×
180
  }
181

182
  /** Try to close the least recently used writer */
183
  static void close_one()
184
  {
185
    AR_REQUIRE(!m_head == !m_tail);
×
186
    if (!m_warning_printed) {
×
187
      log::warn() << "Number of available file-handles (ulimit -n) is too low. "
×
188
                  << "AdapterRemoval will dynamically close/re-open files as "
×
189
                  << "required, but performance may suffer as a result.";
×
190

191
      m_warning_printed = true;
×
192
    }
193

194
    if (m_tail) {
×
195
      if (fclose(m_tail->m_file)) {
×
196
        m_tail->m_file = nullptr;
×
197
        throw io_error("failed to close file", errno);
×
198
      }
199
      m_tail->m_file = nullptr;
×
200

201
      remove(m_tail);
×
202
    } else {
203
      throw io_error(
×
204
        "available number of file-handles too low; could not open any files");
×
205
    }
206
  }
207

208
  //! Indicates if a performance warning has been printed
209
  static bool m_warning_printed;
210
  //! Most recently used managed_writer
211
  static managed_writer* m_head;
212
  //! Least recently used managed_writer
213
  static managed_writer* m_tail;
214
  //! Lock used to control access to internal state
215
  static std::mutex m_lock;
216
};
217

218
bool io_manager::m_warning_printed = false;
219
managed_writer* io_manager::m_head = nullptr;
220
managed_writer* io_manager::m_tail = nullptr;
221
std::mutex io_manager::m_lock{};
222

223
///////////////////////////////////////////////////////////////////////////////
224

225
managed_reader::managed_reader(FILE* handle)
14✔
226
  : m_filename("<unknown file>")
29✔
227
  , m_file(handle)
14✔
228
{
229
  AR_REQUIRE(handle);
18✔
230
}
14✔
231

232
managed_reader::managed_reader(std::string filename)
1✔
233
  : m_filename(std::move(filename))
2✔
234
{
235
  io_manager::open(this);
1✔
236
}
1✔
237

238
managed_reader::~managed_reader()
13✔
239
{
240
  if (m_file && ::fclose(m_file) != 0) {
13✔
241
    AR_FAIL(format_io_error("error closing " + log_escape(m_filename), errno));
×
242
  }
243
}
13✔
244

245
void
246
managed_reader::close()
×
247
{
248
  if (m_file) {
×
249
    if (::fclose(m_file) != 0) {
×
250
      m_file = nullptr;
×
251
      throw io_error("error closing " + log_escape(m_filename), errno);
×
252
    }
253

254
    m_file = nullptr;
×
255
  }
256
}
257

258
size_t
259
managed_reader::read(void* buffer, size_t size)
33✔
260
{
261
  AR_REQUIRE(buffer);
33✔
262
  const auto nread = ::fread(buffer, 1, size, m_file);
33✔
263
  if (ferror(m_file)) {
33✔
264
    throw io_error("error reading " + log_escape(m_filename), errno);
×
265
  }
266

267
  return nread;
33✔
268
}
269

270
///////////////////////////////////////////////////////////////////////////////
271

272
/** Locker  */
273
class writer_lock
274
{
275
public:
276
  explicit writer_lock(managed_writer* writer)
×
277
    : m_writer(writer)
×
278
  {
279
    AR_REQUIRE(m_writer);
×
280

281
    // Remove from global queue to prevent other threads from manipulating it
282
    io_manager::remove(m_writer);
×
283
    io_manager::open(m_writer);
×
284
  };
285

286
  ~writer_lock()
×
287
  {
288
    // Streams cannot be managed, since they cannot be reopened
289
    if (m_writer->m_file && !m_writer->m_stream) {
×
290
      // Allow this writer to be closed if we run out of file handles
291
      io_manager::add(m_writer);
×
292
    }
293
  }
294

295
  void write(const void* buffer, size_t size)
×
296
  {
297
    AR_REQUIRE(buffer || size == 0);
×
298
    AR_REQUIRE(m_writer && m_writer->m_file);
×
299
    if (size) {
×
300
      const auto ret = ::fwrite(buffer, 1, size, m_writer->m_file);
×
301
      if (ret != size) {
×
302
        throw io_error("error writing to " + log_escape(m_writer->m_filename),
×
303
                       errno);
×
304
      }
305
    }
306
  }
307

308
  void flush()
×
309
  {
310
    AR_REQUIRE(m_writer && m_writer->m_file);
×
311
    if (::fflush(m_writer->m_file)) {
×
312
      throw io_error("error flushing file " + log_escape(m_writer->filename()),
×
313
                     errno);
×
314
    }
315
  }
316

317
  writer_lock(const writer_lock&) = delete;
318
  writer_lock(writer_lock&&) = delete;
319
  writer_lock& operator=(const writer_lock&) = delete;
320
  writer_lock& operator=(writer_lock&&) = delete;
321

322
private:
323
  managed_writer* m_writer;
324
};
325

326
///////////////////////////////////////////////////////////////////////////////
327

328
namespace {
329

330
bool
331
any_nonempty_buffers(const buffer_vec& buffers)
×
332
{
333
  return std::any_of(buffers.begin(), buffers.end(), [](const auto& it) {
×
334
    return it.size() != 0;
×
335
  });
×
336
}
337

338
} // namespace
339

340
managed_writer::managed_writer(std::string filename)
×
341
  : m_filename(std::move(filename))
×
342
{
343
}
344

345
managed_writer::~managed_writer()
×
346
{
347
  AR_REQUIRE(!m_file);
×
348
}
349

350
void
351
managed_writer::write(const buffer& buf, const flush mode)
×
352
{
353
  if (buf.size() || mode == flush::on) {
×
354
    writer_lock writer{ this };
×
355

356
    writer.write(buf.data(), buf.size());
×
357

358
    if (mode == flush::on) {
×
359
      writer.flush();
×
360
    }
361
  }
362
}
363

364
void
365
managed_writer::write(const buffer_vec& buffers, const flush mode)
×
366
{
367
  if (mode == flush::on || any_nonempty_buffers(buffers)) {
×
368
    writer_lock writer{ this };
×
369

370
    for (const auto& buf : buffers) {
×
371
      writer.write(buf.data(), buf.size());
×
372
    }
373

374
    if (mode == flush::on) {
×
375
      writer.flush();
×
376
    }
377
  }
378
}
379

380
void
381
managed_writer::write(const std::string& buf, const flush mode)
×
382
{
383
  if (!buf.empty() || mode == flush::on) {
×
384
    writer_lock writer{ this };
×
385

386
    writer.write(buf.data(), buf.length());
×
387

388
    if (mode == flush::on) {
×
389
      writer.flush();
×
390
    }
391
  }
392
}
393

394
void
395
managed_writer::close()
×
396
{
397
  io_manager::remove(this);
×
398
  if (m_file) {
×
399
    if (fclose(m_file)) {
×
400
      m_file = nullptr;
×
401
      throw io_error("failed to close file", errno);
×
402
    }
403

404
    m_file = nullptr;
×
405
  }
406
}
407

408
} // namespace adapterremoval
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc