• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MikkelSchubert / adapterremoval / #39

20 Aug 2024 01:31PM UTC coverage: 79.602% (-3.8%) from 83.365%
#39

push

travis-ci

MikkelSchubert
update schema URL to use github pages

2279 of 2863 relevant lines covered (79.6%)

14257.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

16.77
/src/managed_io.cpp
1
/*************************************************************************\
2
 * AdapterRemoval - cleaning next-generation sequencing reads            *
3
 *                                                                       *
4
 * Copyright (C) 2015 by Mikkel Schubert - mikkelsch@gmail.com           *
5
 *                                                                       *
6
 * This program is free software: you can redistribute it and/or modify  *
7
 * it under the terms of the GNU General Public License as published by  *
8
 * the Free Software Foundation, either version 3 of the License, or     *
9
 * (at your option) any later version.                                   *
10
 *                                                                       *
11
 * This program is distributed in the hope that it will be useful,       *
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of        *
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
14
 * GNU General Public License for more details.                          *
15
 *                                                                       *
16
 * You should have received a copy of the GNU General Public License     *
17
 * along with this program.  If not, see <http://www.gnu.org/licenses/>. *
18
\*************************************************************************/
19
#include "managed_io.hpp" // declarations
20
#include "debug.hpp"      // for AR_REQUIRE
21
#include "errors.hpp"     // for io_error
22
#include "logging.hpp"    // for log::warn
23
#include "strutils.hpp"   // for log_escape
24
#include <algorithm>      // for any_of
25
#include <cerrno>         // for EMFILE, errno
26
#include <cstddef>        // for size_t
27
#include <cstdio>         // for fopen, fread, fwrite, ...
28
#include <exception>      // for std::exception
29
#include <fcntl.h>        // for posix_fadvise
30
#include <mutex>          // for mutex, lock_guard
31
#include <string>         // for string
32
#include <sys/stat.h>     // for fstat
33

34
namespace adapterremoval {
35

36
class io_manager
37
{
38
public:
39
  static void open(managed_reader* reader)
1✔
40
  {
41
    if (reader->m_file) {
1✔
42
      return;
43
    } else if (reader->filename() == "/dev/stdin") {
3✔
44
      reader->m_file = stdin;
×
45
    } else if (reader->filename() != "-") {
3✔
46
      reader->m_file = io_manager::fopen(reader->filename(), "rb");
2✔
47

48
#if _XOPEN_SOURCE >= 600 || _POSIX_C_SOURCE >= 200112L
49
      posix_fadvise(fileno(reader->m_file), 0, 0, POSIX_FADV_WILLNEED);
×
50
      posix_fadvise(fileno(reader->m_file), 0, 0, POSIX_FADV_SEQUENTIAL);
×
51
#endif
52
    } else {
53
      // Merged I/O depends on filenames being identical
54
      AR_FAIL("unhandled STDIN marker");
×
55
    }
56
  }
57

58
  static void open(managed_writer* writer)
59
  {
60
    if (writer->m_file) {
×
61
      return;
62
    } else if (writer->filename() == "/dev/stdout") {
×
63
      writer->m_file = stdout;
×
64
    } else if (writer->filename() == "/dev/stderr") {
×
65
      // Not sure why anyone would do this, but ¯\_(ツ)_/¯
66
      writer->m_file = stderr;
×
67
    } else if (writer->filename() != "-") {
×
68
      writer->m_file =
×
69
        io_manager::fopen(writer->filename(), writer->m_created ? "ab" : "wb");
×
70
    } else {
71
      // Merged I/O depends on filenames being identical
72
      AR_FAIL("unhandled STDOUT marker");
×
73
    }
74

75
    writer->m_created = true;
×
76
    writer->m_stream =
×
77
      io_manager::is_stream(writer->filename(), writer->m_file);
×
78
  }
79

80
  /** Adds writer to list of inactive writers */
81
  static void add(managed_writer* writer)
82
  {
83
    std::lock_guard<std::mutex> lock(m_lock);
×
84

85
    AR_REQUIRE(!writer->m_prev);
×
86
    AR_REQUIRE(!writer->m_next);
×
87
    AR_REQUIRE(!m_head == !m_tail);
×
88
    if (m_head) {
×
89
      writer->m_next = m_head;
×
90
      m_head->m_prev = writer;
×
91
    }
92

93
    m_head = writer;
×
94

95
    if (!m_tail) {
×
96
      m_tail = writer;
×
97
    }
98

99
    AR_REQUIRE(m_head && m_tail);
×
100
    AR_REQUIRE(!m_head->m_prev);
×
101
    AR_REQUIRE(!m_tail->m_next);
×
102
  }
103

104
  /* Removes the writer from the list of inactive writers */
105
  static void remove(managed_writer* writer)
106
  {
107
    std::lock_guard<std::mutex> lock(m_lock);
×
108

109
    AR_REQUIRE(!m_head == !m_tail);
×
110
    AR_REQUIRE(!m_head || !m_head->m_prev);
×
111
    AR_REQUIRE(!m_tail || !m_tail->m_next);
×
112

113
    if (writer == m_head) {
×
114
      m_head = writer->m_next;
×
115
    }
116

117
    if (writer == m_tail) {
×
118
      m_tail = writer->m_prev;
×
119
    }
120

121
    AR_REQUIRE(!m_head == !m_tail);
×
122

123
    if (writer->m_prev) {
×
124
      writer->m_prev->m_next = writer->m_next;
×
125
    }
126

127
    if (writer->m_next) {
×
128
      writer->m_next->m_prev = writer->m_prev;
×
129
    }
130

131
    writer->m_prev = nullptr;
×
132
    writer->m_next = nullptr;
×
133

134
    AR_REQUIRE(writer != m_head);
×
135
    AR_REQUIRE(writer != m_tail);
×
136
    AR_REQUIRE(!writer->m_prev);
×
137
    AR_REQUIRE(!writer->m_next);
×
138
    AR_REQUIRE(!m_head || !m_head->m_prev);
×
139
    AR_REQUIRE(!m_tail || !m_tail->m_next);
×
140
  }
141

142
  io_manager() = delete;
143
  ~io_manager() = delete;
144
  io_manager(const io_manager&) = delete;
145
  io_manager(io_manager&&) = delete;
146
  io_manager& operator=(const io_manager&) = delete;
147
  io_manager& operator=(io_manager&&) = delete;
148

149
private:
150
  static FILE* fopen(const std::string& filename, const char* mode)
1✔
151
  {
152
    while (true) {
1✔
153
      FILE* handle = ::fopen(filename.c_str(), mode);
2✔
154

155
      if (handle) {
1✔
156
        AR_REQUIRE(!::ferror(handle));
×
157
        return handle;
×
158
      } else if (errno == EMFILE) {
1✔
159
        close_one();
×
160
      } else {
161
        throw io_error("failed to open file", errno);
2✔
162
      }
163
    }
164
  }
165

166
  static bool is_stream(const std::string& filename, FILE* handle)
167
  {
168
    if (handle == stdin || handle == stdout || handle == stderr) {
×
169
      return true;
170
    }
171

172
    struct stat statbuf = {};
×
173
    if (fstat(fileno(handle), &statbuf) == 0) {
×
174
      return S_ISFIFO(statbuf.st_mode);
×
175
    }
176

177
    log::warn() << "Could not fstat " << log_escape(filename);
×
178

179
    // Assumed to be a stream to be safe
180
    return true;
×
181
  }
182

183
  /** Try to close the least recently used writer */
184
  static void close_one()
185
  {
186
    AR_REQUIRE(!m_head == !m_tail);
×
187
    if (!m_warning_printed) {
×
188
      log::warn() << "Number of available file-handles (ulimit -n) is too low. "
×
189
                  << "AdapterRemoval will dynamically close/re-open files as "
×
190
                  << "required, but performance may suffer as a result.";
×
191

192
      m_warning_printed = true;
×
193
    }
194

195
    if (m_tail) {
×
196
      if (fclose(m_tail->m_file)) {
×
197
        m_tail->m_file = nullptr;
×
198
        throw io_error("failed to close file", errno);
×
199
      }
200
      m_tail->m_file = nullptr;
×
201

202
      remove(m_tail);
×
203
    } else {
204
      throw io_error(
×
205
        "available number of file-handles too low; could not open any files");
×
206
    }
207
  }
208

209
  //! Indicates if a performance warning has been printed
210
  static bool m_warning_printed;
211
  //! Most recently used managed_writer
212
  static managed_writer* m_head;
213
  //! Least recently used managed_writer
214
  static managed_writer* m_tail;
215
  //! Lock used to control access to internal state
216
  static std::mutex m_lock;
217
};
218

219
bool io_manager::m_warning_printed = false;
220
managed_writer* io_manager::m_head = nullptr;
221
managed_writer* io_manager::m_tail = nullptr;
222
std::mutex io_manager::m_lock{};
223

224
///////////////////////////////////////////////////////////////////////////////
225

226
managed_reader::managed_reader(FILE* handle)
14✔
227
  : m_filename("<unknown file>")
29✔
228
  , m_file(handle)
14✔
229
{
230
  AR_REQUIRE(handle);
18✔
231
}
14✔
232

233
managed_reader::managed_reader(std::string filename)
1✔
234
  : m_filename(std::move(filename))
2✔
235
{
236
  io_manager::open(this);
1✔
237
}
1✔
238

239
managed_reader::~managed_reader()
13✔
240
{
241
  if (m_file && ::fclose(m_file) != 0) {
13✔
242
    AR_FAIL(format_io_error("error closing " + log_escape(m_filename), errno));
×
243
  }
244
}
13✔
245

246
void
247
managed_reader::close()
×
248
{
249
  if (m_file) {
×
250
    if (::fclose(m_file) != 0) {
×
251
      m_file = nullptr;
×
252
      throw io_error("error closing " + log_escape(m_filename), errno);
×
253
    }
254

255
    m_file = nullptr;
×
256
  }
257
}
258

259
size_t
260
managed_reader::read(void* buffer, size_t size)
33✔
261
{
262
  AR_REQUIRE(buffer);
33✔
263
  const auto nread = ::fread(buffer, 1, size, m_file);
33✔
264
  if (ferror(m_file)) {
33✔
265
    throw io_error("error reading " + log_escape(m_filename), errno);
×
266
  }
267

268
  return nread;
33✔
269
}
270

271
///////////////////////////////////////////////////////////////////////////////
272

273
/** Locker  */
274
class writer_lock
275
{
276
public:
277
  explicit writer_lock(managed_writer* writer)
×
278
    : m_writer(writer)
×
279
  {
280
    AR_REQUIRE(m_writer);
×
281

282
    // Remove from global queue to prevent other threads from manipulating it
283
    io_manager::remove(m_writer);
×
284
    io_manager::open(m_writer);
×
285
  };
286

287
  ~writer_lock()
×
288
  {
289
    // Streams cannot be managed, since they cannot be reopened
290
    if (m_writer->m_file && !m_writer->m_stream) {
×
291
      // Allow this writer to be closed if we run out of file handles
292
      io_manager::add(m_writer);
×
293
    }
294
  }
295

296
  void write(const void* buffer, size_t size)
×
297
  {
298
    AR_REQUIRE(buffer || size == 0);
×
299
    AR_REQUIRE(m_writer && m_writer->m_file);
×
300
    if (size) {
×
301
      const auto ret = ::fwrite(buffer, 1, size, m_writer->m_file);
×
302
      if (ret != size) {
×
303
        throw io_error("error writing to " + log_escape(m_writer->m_filename),
×
304
                       errno);
×
305
      }
306
    }
307
  }
308

309
  void flush()
×
310
  {
311
    AR_REQUIRE(m_writer && m_writer->m_file);
×
312
    if (::fflush(m_writer->m_file)) {
×
313
      throw io_error("error flushing file " + log_escape(m_writer->filename()),
×
314
                     errno);
×
315
    }
316
  }
317

318
  writer_lock(const writer_lock&) = delete;
319
  writer_lock(writer_lock&&) = delete;
320
  writer_lock& operator=(const writer_lock&) = delete;
321
  writer_lock& operator=(writer_lock&&) = delete;
322

323
private:
324
  managed_writer* m_writer;
325
};
326

327
///////////////////////////////////////////////////////////////////////////////
328

329
namespace {
330

331
bool
332
any_nonempty_buffers(const buffer_vec& buffers)
×
333
{
334
  return std::any_of(buffers.begin(), buffers.end(), [](const auto& it) {
×
335
    return it.size() != 0;
×
336
  });
×
337
}
338

339
} // namespace
340

341
managed_writer::managed_writer(std::string filename)
×
342
  : m_filename(std::move(filename))
×
343
{
344
}
345

346
managed_writer::~managed_writer()
×
347
{
348
  try {
×
349
    close();
×
350
  } catch (const std::exception&) {
×
351
    AR_FAIL("unhandled exception");
×
352
  }
353
}
354

355
void
356
managed_writer::write(const buffer& buf, const flush mode)
×
357
{
358
  if (buf.size() || mode == flush::on) {
×
359
    writer_lock writer{ this };
×
360

361
    writer.write(buf.data(), buf.size());
×
362

363
    if (mode == flush::on) {
×
364
      writer.flush();
×
365
    }
366
  }
367
}
368

369
void
370
managed_writer::write(const buffer_vec& buffers, const flush mode)
×
371
{
372
  if (mode == flush::on || any_nonempty_buffers(buffers)) {
×
373
    writer_lock writer{ this };
×
374

375
    for (const auto& buf : buffers) {
×
376
      writer.write(buf.data(), buf.size());
×
377
    }
378

379
    if (mode == flush::on) {
×
380
      writer.flush();
×
381
    }
382
  }
383
}
384

385
void
386
managed_writer::write(const std::string& buf, const flush mode)
×
387
{
388
  if (!buf.empty() || mode == flush::on) {
×
389
    writer_lock writer{ this };
×
390

391
    writer.write(buf.data(), buf.length());
×
392

393
    if (mode == flush::on) {
×
394
      writer.flush();
×
395
    }
396
  }
397
}
398

399
void
400
managed_writer::close()
×
401
{
402
  io_manager::remove(this);
×
403
  if (m_file) {
×
404
    if (fclose(m_file)) {
×
405
      m_file = nullptr;
×
406
      throw io_error("failed to close file", errno);
×
407
    }
408

409
    m_file = nullptr;
×
410
  }
411
}
412

413
} // namespace adapterremoval
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc