• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MikkelSchubert / adapterremoval / #36

22 Jul 2024 09:33AM UTC coverage: 87.26% (-12.7%) from 100.0%
#36

push

travis-ci

MikkelSchubert
remove duplicate tests

2185 of 2504 relevant lines covered (87.26%)

16293.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.42
/src/managed_io.cpp
1
/*************************************************************************\
2
 * AdapterRemoval - cleaning next-generation sequencing reads            *
3
 *                                                                       *
4
 * Copyright (C) 2015 by Mikkel Schubert - mikkelsch@gmail.com           *
5
 *                                                                       *
6
 * This program is free software: you can redistribute it and/or modify  *
7
 * it under the terms of the GNU General Public License as published by  *
8
 * the Free Software Foundation, either version 3 of the License, or     *
9
 * (at your option) any later version.                                   *
10
 *                                                                       *
11
 * This program is distributed in the hope that it will be useful,       *
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of        *
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
14
 * GNU General Public License for more details.                          *
15
 *                                                                       *
16
 * You should have received a copy of the GNU General Public License     *
17
 * along with this program.  If not, see <http://www.gnu.org/licenses/>. *
18
\*************************************************************************/
19
#include "managed_io.hpp" // declarations
20
#include "debug.hpp"      // for AR_REQUIRE
21
#include "errors.hpp"     // for io_error
22
#include "logging.hpp"    // for log::warn
23
#include "strutils.hpp"   // for log_escape
24
#include <cerrno>         // for EMFILE, errno
25
#include <cstddef>        // for size_t
26
#include <cstdio>         // for fopen, fread, fwrite, ...
27
#include <mutex>          // for mutex, lock_guard
28
#include <string>         // for string
29
#include <sys/stat.h>     // for fstat
30

31
namespace adapterremoval {
32

33
class io_manager
34
{
35
public:
36
  static void open(managed_reader* reader)
1✔
37
  {
38
    if (reader->m_file) {
1✔
39
      return;
40
    } else if (reader->filename() == "/dev/stdin") {
3✔
41
      reader->m_file = stdin;
×
42
    } else if (reader->filename() != "-") {
3✔
43
      reader->m_file = io_manager::fopen(reader->filename(), "rb");
2✔
44
    } else {
45
      // Merged I/O depends on filenames being identical
46
      AR_FAIL("unhandled STDIN marker");
×
47
    }
48
  }
49

50
  static void open(managed_writer* writer)
51
  {
52
    if (writer->m_file) {
×
53
      return;
54
    } else if (writer->filename() == "/dev/stdout") {
×
55
      writer->m_file = stdout;
×
56
    } else if (writer->filename() == "/dev/stderr") {
×
57
      // Not sure why anyone would do this, but ¯\_(ツ)_/¯
58
      writer->m_file = stderr;
×
59
    } else if (writer->filename() != "-") {
×
60
      writer->m_file =
×
61
        io_manager::fopen(writer->filename(), writer->m_created ? "ab" : "wb");
×
62
    } else {
63
      // Merged I/O depends on filenames being identical
64
      AR_FAIL("unhandled STDOUT marker");
×
65
    }
66

67
    writer->m_created = true;
×
68
    writer->m_stream =
×
69
      io_manager::is_stream(writer->filename(), writer->m_file);
×
70
  }
71

72
  /** Adds writer to list of inactive writers */
73
  static void add(managed_writer* writer)
74
  {
75
    std::lock_guard<std::mutex> lock(m_lock);
×
76

77
    AR_REQUIRE(!writer->m_prev);
×
78
    AR_REQUIRE(!writer->m_next);
×
79
    AR_REQUIRE(!m_head == !m_tail);
×
80
    if (m_head) {
×
81
      writer->m_next = m_head;
×
82
      m_head->m_prev = writer;
×
83
    }
84

85
    m_head = writer;
×
86

87
    if (!m_tail) {
×
88
      m_tail = writer;
×
89
    }
90

91
    AR_REQUIRE(m_head && m_tail);
×
92
    AR_REQUIRE(!m_head->m_prev);
×
93
    AR_REQUIRE(!m_tail->m_next);
×
94
  }
95

96
  /* Removes the writer from the list of inactive writers */
97
  static void remove(managed_writer* writer)
98
  {
99
    std::lock_guard<std::mutex> lock(m_lock);
×
100

101
    AR_REQUIRE(!m_head == !m_tail);
×
102
    AR_REQUIRE(!m_head || !m_head->m_prev);
×
103
    AR_REQUIRE(!m_tail || !m_tail->m_next);
×
104

105
    if (writer == m_head) {
×
106
      m_head = writer->m_next;
×
107
    }
108

109
    if (writer == m_tail) {
×
110
      m_tail = writer->m_prev;
×
111
    }
112

113
    AR_REQUIRE(!m_head == !m_tail);
×
114

115
    if (writer->m_prev) {
×
116
      writer->m_prev->m_next = writer->m_next;
×
117
    }
118

119
    if (writer->m_next) {
×
120
      writer->m_next->m_prev = writer->m_prev;
×
121
    }
122

123
    writer->m_prev = nullptr;
×
124
    writer->m_next = nullptr;
×
125

126
    AR_REQUIRE(writer != m_head);
×
127
    AR_REQUIRE(writer != m_tail);
×
128
    AR_REQUIRE(!writer->m_prev);
×
129
    AR_REQUIRE(!writer->m_next);
×
130
    AR_REQUIRE(!m_head || !m_head->m_prev);
×
131
    AR_REQUIRE(!m_tail || !m_tail->m_next);
×
132
  }
133

134
  io_manager() = delete;
135
  ~io_manager() = delete;
136
  io_manager(const io_manager&) = delete;
137
  io_manager(io_manager&&) = delete;
138
  io_manager& operator=(const io_manager&) = delete;
139
  io_manager& operator=(io_manager&&) = delete;
140

141
private:
142
  static FILE* fopen(const std::string& filename, const char* mode)
1✔
143
  {
144
    while (true) {
1✔
145
      FILE* handle = ::fopen(filename.c_str(), mode);
2✔
146

147
      if (handle) {
1✔
148
        AR_REQUIRE(!::ferror_unlocked(handle));
×
149
        return handle;
×
150
      } else if (errno == EMFILE) {
1✔
151
        close_one();
×
152
      } else {
153
        throw io_error("failed to open file", errno);
2✔
154
      }
155
    }
156
  }
157

158
  static bool is_stream(const std::string& filename, FILE* handle)
159
  {
160
    if (handle == stdin || handle == stdout || handle == stderr) {
×
161
      return true;
162
    }
163

164
    struct stat statbuf = {};
×
165
    if (fstat(fileno(handle), &statbuf) == 0) {
×
166
      return S_ISFIFO(statbuf.st_mode);
×
167
    }
168

169
    log::warn() << "Could not fstat " << log_escape(filename);
×
170

171
    // Assumed to be a stream to be safe
172
    return true;
×
173
  }
174

175
  /** Try to close the least recently used writer */
176
  static void close_one()
177
  {
178
    AR_REQUIRE(!m_head == !m_tail);
×
179
    if (!m_warning_printed) {
×
180
      log::warn() << "Number of available file-handles (ulimit -n) is too low. "
×
181
                  << "AdapterRemoval will dynamically close/re-open files as "
×
182
                  << "required, but performance may suffer as a result.";
×
183

184
      m_warning_printed = true;
×
185
    }
186

187
    if (m_tail) {
×
188
      if (fclose(m_tail->m_file)) {
×
189
        m_tail->m_file = nullptr;
×
190
        throw io_error("failed to close file", errno);
×
191
      }
192
      m_tail->m_file = nullptr;
×
193

194
      remove(m_tail);
×
195
    } else {
196
      throw io_error(
×
197
        "available number of file-handles too low; could not open any files");
×
198
    }
199
  }
200

201
  //! Indicates if a performance warning has been printed
202
  static bool m_warning_printed;
203
  //! Most recently used managed_writer
204
  static managed_writer* m_head;
205
  //! Least recently used managed_writer
206
  static managed_writer* m_tail;
207
  //! Lock used to control access to internal state
208
  static std::mutex m_lock;
209
};
210

211
bool io_manager::m_warning_printed = false;
212
managed_writer* io_manager::m_head = nullptr;
213
managed_writer* io_manager::m_tail = nullptr;
214
std::mutex io_manager::m_lock{};
215

216
///////////////////////////////////////////////////////////////////////////////
217

218
managed_reader::managed_reader(FILE* handle)
14✔
219
  : m_filename("<unknown file>")
29✔
220
  , m_file(handle)
14✔
221
{
222
  AR_REQUIRE(handle);
18✔
223
}
14✔
224

225
managed_reader::managed_reader(std::string filename)
1✔
226
  : m_filename(std::move(filename))
2✔
227
{
228
  io_manager::open(this);
1✔
229
}
1✔
230

231
managed_reader::~managed_reader()
13✔
232
{
233
  if (m_file && ::fclose(m_file) != 0) {
13✔
234
    AR_FAIL(format_io_error("error closing " + log_escape(m_filename), errno));
×
235
  }
236
}
13✔
237

238
void
239
managed_reader::close()
×
240
{
241
  if (m_file) {
×
242
    if (::fclose(m_file) != 0) {
×
243
      m_file = nullptr;
×
244
      throw io_error("error closing " + log_escape(m_filename), errno);
×
245
    }
246

247
    m_file = nullptr;
×
248
  }
249
}
250

251
size_t
252
managed_reader::read(void* buffer, size_t size)
33✔
253
{
254
  const auto nread = ::fread_unlocked(buffer, 1, size, m_file);
33✔
255
  if (ferror(m_file)) {
33✔
256
    throw io_error("error reading " + log_escape(m_filename), errno);
×
257
  }
258

259
  return nread;
33✔
260
}
261

262
///////////////////////////////////////////////////////////////////////////////
263

264
/** Locker  */
265
class writer_lock
266
{
267
public:
268
  explicit writer_lock(managed_writer* writer)
×
269
    : m_writer(writer)
×
270
  {
271
    AR_REQUIRE(m_writer);
×
272

273
    // Remove from global queue to prevent other threads from manipulating it
274
    io_manager::remove(m_writer);
×
275
    io_manager::open(m_writer);
×
276
  };
277

278
  ~writer_lock()
×
279
  {
280
    // Streams cannot be managed, since they cannot be reopened
281
    if (m_writer->m_file && !m_writer->m_stream) {
×
282
      // Allow this writer to be closed if we run out of file handles
283
      io_manager::add(m_writer);
×
284
    }
285
  }
286

287
  void write(const void* buffer, size_t size)
×
288
  {
289
    AR_REQUIRE(m_writer && m_writer->m_file);
×
290
    const auto ret = ::fwrite_unlocked(buffer, 1, size, m_writer->m_file);
×
291
    if (ret != size) {
×
292
      throw io_error("error writing to " + log_escape(m_writer->m_filename),
×
293
                     errno);
×
294
    }
295
  }
296

297
  void flush()
×
298
  {
299
    AR_REQUIRE(m_writer && m_writer->m_file);
×
300
    if (::fflush_unlocked(m_writer->m_file)) {
×
301
      throw io_error("error flushing file " + log_escape(m_writer->filename()),
×
302
                     errno);
×
303
    }
304
  }
305

306
  writer_lock(const writer_lock&) = delete;
307
  writer_lock(writer_lock&&) = delete;
308
  writer_lock& operator=(const writer_lock&) = delete;
309
  writer_lock& operator=(writer_lock&&) = delete;
310

311
private:
312
  managed_writer* m_writer;
313
};
314

315
///////////////////////////////////////////////////////////////////////////////
316

317
managed_writer::managed_writer(std::string filename)
×
318
  : m_filename(std::move(filename))
×
319
{
320
}
321

322
managed_writer::~managed_writer()
×
323
{
324
  AR_REQUIRE(!m_file);
×
325
}
326

327
void
328
managed_writer::write(const buffer& buf, const flush mode)
×
329
{
330
  if (buf.size() || mode == flush::on) {
×
331
    writer_lock writer{ this };
×
332

333
    writer.write(buf.get_signed(), buf.size());
×
334

335
    if (mode == flush::on) {
×
336
      writer.flush();
×
337
    }
338
  }
339
}
340

341
void
342
managed_writer::write(const buffer_vec& buffers, const flush mode)
×
343
{
344
  if (!buffers.empty() || mode == flush::on) {
×
345
    writer_lock writer{ this };
×
346

347
    for (const auto& buf : buffers) {
×
348
      writer.write(buf.get_signed(), buf.size());
×
349
    }
350

351
    if (mode == flush::on) {
×
352
      writer.flush();
×
353
    }
354
  }
355
}
356

357
void
358
managed_writer::write(const std::string& buf, const flush mode)
×
359
{
360
  if (!buf.empty() || mode == flush::on) {
×
361
    writer_lock writer{ this };
×
362

363
    writer.write(buf.data(), buf.length());
×
364

365
    if (mode == flush::on) {
×
366
      writer.flush();
×
367
    }
368
  }
369
}
370

371
void
372
managed_writer::close()
×
373
{
374
  io_manager::remove(this);
×
375
  if (m_file) {
×
376
    if (fclose(m_file)) {
×
377
      m_file = nullptr;
×
378
      throw io_error("failed to close file", errno);
×
379
    }
380

381
    m_file = nullptr;
×
382
  }
383
}
384

385
} // namespace adapterremoval
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc