• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MikkelSchubert / adapterremoval / #48

12 Jan 2025 09:43AM UTC coverage: 27.139%. Remained the same
#48

push

travis-ci

MikkelSchubert
silence warning on musl build

2595 of 9562 relevant lines covered (27.14%)

4274.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

16.77
/src/managed_io.cpp
1
/*************************************************************************\
2
 * AdapterRemoval - cleaning next-generation sequencing reads            *
3
 *                                                                       *
4
 * Copyright (C) 2015 by Mikkel Schubert - mikkelsch@gmail.com           *
5
 *                                                                       *
6
 * This program is free software: you can redistribute it and/or modify  *
7
 * it under the terms of the GNU General Public License as published by  *
8
 * the Free Software Foundation, either version 3 of the License, or     *
9
 * (at your option) any later version.                                   *
10
 *                                                                       *
11
 * This program is distributed in the hope that it will be useful,       *
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of        *
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
14
 * GNU General Public License for more details.                          *
15
 *                                                                       *
16
 * You should have received a copy of the GNU General Public License     *
17
 * along with this program.  If not, see <http://www.gnu.org/licenses/>. *
18
\*************************************************************************/
19
#include "managed_io.hpp" // declarations
20
#include "debug.hpp"      // for AR_REQUIRE
21
#include "errors.hpp"     // for io_error
22
#include "logging.hpp"    // for log::warn
23
#include "strutils.hpp"   // for log_escape
24
#include <algorithm>      // for any_of
25
#include <cerrno>         // for EMFILE, errno
26
#include <cstddef>        // for size_t
27
#include <cstdio>         // for fopen, fread, fwrite, ...
28
#include <exception>      // for std::exception
29
#include <fcntl.h>        // for posix_fadvise
30
#include <mutex>          // for mutex, lock_guard
31
#include <string>         // for string
32
#include <sys/stat.h>     // for fstat
33

34
namespace adapterremoval {
35

36
class io_manager
37
{
38
public:
39
  static void open(managed_reader* reader)
1✔
40
  {
41
    if (reader->m_file) {
1✔
42
      return;
43
    } else if (reader->filename() == "/dev/stdin") {
3✔
44
      reader->m_file = stdin;
×
45
    } else if (reader->filename() != "-") {
3✔
46
      reader->m_file = io_manager::fopen(reader->filename(), "rb");
2✔
47

48
#if (defined(_XOPEN_SOURCE) && _XOPEN_SOURCE >= 600) ||                        \
49
  (defined(_POSIX_C_SOURCE) && _POSIX_C_SOURCE >= 200112L)
50
      posix_fadvise(fileno(reader->m_file), 0, 0, POSIX_FADV_WILLNEED);
×
51
      posix_fadvise(fileno(reader->m_file), 0, 0, POSIX_FADV_SEQUENTIAL);
×
52
#endif
53
    } else {
54
      // Merged I/O depends on filenames being identical
55
      AR_FAIL("unhandled STDIN marker");
×
56
    }
57
  }
58

59
  static void open(managed_writer* writer)
60
  {
61
    if (writer->m_file) {
×
62
      return;
63
    } else if (writer->filename() == "/dev/stdout") {
×
64
      writer->m_file = stdout;
×
65
    } else if (writer->filename() == "/dev/stderr") {
×
66
      // Not sure why anyone would do this, but ¯\_(ツ)_/¯
67
      writer->m_file = stderr;
×
68
    } else if (writer->filename() != "-") {
×
69
      writer->m_file =
×
70
        io_manager::fopen(writer->filename(), writer->m_created ? "ab" : "wb");
×
71
    } else {
72
      // Merged I/O depends on filenames being identical
73
      AR_FAIL("unhandled STDOUT marker");
×
74
    }
75

76
    writer->m_created = true;
×
77
    writer->m_stream =
×
78
      io_manager::is_stream(writer->filename(), writer->m_file);
×
79
  }
80

81
  /** Adds writer to list of inactive writers */
82
  static void add(managed_writer* writer)
83
  {
84
    std::lock_guard<std::mutex> lock(m_lock);
×
85

86
    AR_REQUIRE(!writer->m_prev);
×
87
    AR_REQUIRE(!writer->m_next);
×
88
    AR_REQUIRE(!m_head == !m_tail);
×
89
    if (m_head) {
×
90
      writer->m_next = m_head;
×
91
      m_head->m_prev = writer;
×
92
    }
93

94
    m_head = writer;
×
95

96
    if (!m_tail) {
×
97
      m_tail = writer;
×
98
    }
99

100
    AR_REQUIRE(m_head && m_tail);
×
101
    AR_REQUIRE(!m_head->m_prev);
×
102
    AR_REQUIRE(!m_tail->m_next);
×
103
  }
104

105
  /* Removes the writer from the list of inactive writers */
106
  static void remove(managed_writer* writer)
107
  {
108
    std::lock_guard<std::mutex> lock(m_lock);
×
109

110
    AR_REQUIRE(!m_head == !m_tail);
×
111
    AR_REQUIRE(!m_head || !m_head->m_prev);
×
112
    AR_REQUIRE(!m_tail || !m_tail->m_next);
×
113

114
    if (writer == m_head) {
×
115
      m_head = writer->m_next;
×
116
    }
117

118
    if (writer == m_tail) {
×
119
      m_tail = writer->m_prev;
×
120
    }
121

122
    AR_REQUIRE(!m_head == !m_tail);
×
123

124
    if (writer->m_prev) {
×
125
      writer->m_prev->m_next = writer->m_next;
×
126
    }
127

128
    if (writer->m_next) {
×
129
      writer->m_next->m_prev = writer->m_prev;
×
130
    }
131

132
    writer->m_prev = nullptr;
×
133
    writer->m_next = nullptr;
×
134

135
    AR_REQUIRE(writer != m_head);
×
136
    AR_REQUIRE(writer != m_tail);
×
137
    AR_REQUIRE(!writer->m_prev);
×
138
    AR_REQUIRE(!writer->m_next);
×
139
    AR_REQUIRE(!m_head || !m_head->m_prev);
×
140
    AR_REQUIRE(!m_tail || !m_tail->m_next);
×
141
  }
142

143
  io_manager() = delete;
144
  ~io_manager() = delete;
145
  io_manager(const io_manager&) = delete;
146
  io_manager(io_manager&&) = delete;
147
  io_manager& operator=(const io_manager&) = delete;
148
  io_manager& operator=(io_manager&&) = delete;
149

150
private:
151
  static FILE* fopen(const std::string& filename, const char* mode)
1✔
152
  {
153
    while (true) {
1✔
154
      FILE* handle = ::fopen(filename.c_str(), mode);
2✔
155

156
      if (handle) {
1✔
157
        AR_REQUIRE(!::ferror(handle));
×
158
        return handle;
×
159
      } else if (errno == EMFILE) {
1✔
160
        close_one();
×
161
      } else {
162
        throw io_error("failed to open file", errno);
2✔
163
      }
164
    }
165
  }
166

167
  static bool is_stream(const std::string& filename, FILE* handle)
168
  {
169
    if (handle == stdin || handle == stdout || handle == stderr) {
×
170
      return true;
171
    }
172

173
    struct stat statbuf = {};
×
174
    if (fstat(fileno(handle), &statbuf) == 0) {
×
175
      return S_ISFIFO(statbuf.st_mode);
×
176
    }
177

178
    log::warn() << "Could not fstat " << log_escape(filename);
×
179

180
    // Assumed to be a stream to be safe
181
    return true;
×
182
  }
183

184
  /** Try to close the least recently used writer */
185
  static void close_one()
186
  {
187
    AR_REQUIRE(!m_head == !m_tail);
×
188
    if (!m_warning_printed) {
×
189
      log::warn() << "Number of available file-handles (ulimit -n) is too low. "
×
190
                  << "AdapterRemoval will dynamically close/re-open files as "
×
191
                  << "required, but performance may suffer as a result.";
×
192

193
      m_warning_printed = true;
×
194
    }
195

196
    if (m_tail) {
×
197
      if (fclose(m_tail->m_file)) {
×
198
        m_tail->m_file = nullptr;
×
199
        throw io_error("failed to close file", errno);
×
200
      }
201
      m_tail->m_file = nullptr;
×
202

203
      remove(m_tail);
×
204
    } else {
205
      throw io_error(
×
206
        "available number of file-handles too low; could not open any files");
×
207
    }
208
  }
209

210
  //! Indicates if a performance warning has been printed
211
  static bool m_warning_printed;
212
  //! Most recently used managed_writer
213
  static managed_writer* m_head;
214
  //! Least recently used managed_writer
215
  static managed_writer* m_tail;
216
  //! Lock used to control access to internal state
217
  static std::mutex m_lock;
218
};
219

220
bool io_manager::m_warning_printed = false;
221
managed_writer* io_manager::m_head = nullptr;
222
managed_writer* io_manager::m_tail = nullptr;
223
std::mutex io_manager::m_lock{};
224

225
///////////////////////////////////////////////////////////////////////////////
226

227
managed_reader::managed_reader(FILE* handle)
14✔
228
  : m_filename("<unknown file>")
29✔
229
  , m_file(handle)
14✔
230
{
231
  AR_REQUIRE(handle);
21✔
232
}
14✔
233

234
managed_reader::managed_reader(std::string filename)
1✔
235
  : m_filename(std::move(filename))
2✔
236
{
237
  io_manager::open(this);
1✔
238
}
1✔
239

240
managed_reader::~managed_reader()
13✔
241
{
242
  if (m_file && ::fclose(m_file) != 0) {
13✔
243
    AR_FAIL(format_io_error("error closing " + log_escape(m_filename), errno));
×
244
  }
245
}
13✔
246

247
void
248
managed_reader::close()
×
249
{
250
  if (m_file) {
×
251
    if (::fclose(m_file) != 0) {
×
252
      m_file = nullptr;
×
253
      throw io_error("error closing " + log_escape(m_filename), errno);
×
254
    }
255

256
    m_file = nullptr;
×
257
  }
258
}
259

260
size_t
261
managed_reader::read(void* buffer, size_t size)
33✔
262
{
263
  AR_REQUIRE(buffer);
33✔
264
  const auto nread = ::fread(buffer, 1, size, m_file);
33✔
265
  if (ferror(m_file)) {
33✔
266
    throw io_error("error reading " + log_escape(m_filename), errno);
×
267
  }
268

269
  return nread;
33✔
270
}
271

272
///////////////////////////////////////////////////////////////////////////////
273

274
/** Locker  */
275
class writer_lock
276
{
277
public:
278
  explicit writer_lock(managed_writer* writer)
×
279
    : m_writer(writer)
×
280
  {
281
    AR_REQUIRE(m_writer);
×
282

283
    // Remove from global queue to prevent other threads from manipulating it
284
    io_manager::remove(m_writer);
×
285
    io_manager::open(m_writer);
×
286
  };
287

288
  ~writer_lock()
×
289
  {
290
    // Streams cannot be managed, since they cannot be reopened
291
    if (m_writer->m_file && !m_writer->m_stream) {
×
292
      // Allow this writer to be closed if we run out of file handles
293
      io_manager::add(m_writer);
×
294
    }
295
  }
296

297
  void write(const void* buffer, size_t size)
×
298
  {
299
    AR_REQUIRE(buffer || size == 0);
×
300
    AR_REQUIRE(m_writer && m_writer->m_file);
×
301
    if (size) {
×
302
      const auto ret = ::fwrite(buffer, 1, size, m_writer->m_file);
×
303
      if (ret != size) {
×
304
        throw io_error("error writing to " + log_escape(m_writer->m_filename),
×
305
                       errno);
×
306
      }
307
    }
308
  }
309

310
  void flush()
×
311
  {
312
    AR_REQUIRE(m_writer && m_writer->m_file);
×
313
    if (::fflush(m_writer->m_file)) {
×
314
      throw io_error("error flushing file " + log_escape(m_writer->filename()),
×
315
                     errno);
×
316
    }
317
  }
318

319
  writer_lock(const writer_lock&) = delete;
320
  writer_lock(writer_lock&&) = delete;
321
  writer_lock& operator=(const writer_lock&) = delete;
322
  writer_lock& operator=(writer_lock&&) = delete;
323

324
private:
325
  managed_writer* m_writer;
326
};
327

328
///////////////////////////////////////////////////////////////////////////////
329

330
namespace {
331

332
bool
333
any_nonempty_buffers(const buffer_vec& buffers)
×
334
{
335
  return std::any_of(buffers.begin(), buffers.end(), [](const auto& it) {
×
336
    return it.size() != 0;
×
337
  });
×
338
}
339

340
} // namespace
341

342
managed_writer::managed_writer(std::string filename)
×
343
  : m_filename(std::move(filename))
×
344
{
345
}
346

347
managed_writer::~managed_writer()
×
348
{
349
  try {
×
350
    close();
×
351
  } catch (const std::exception&) {
×
352
    AR_FAIL("unhandled exception");
×
353
  }
354
}
355

356
void
357
managed_writer::write(const buffer& buf, const flush mode)
×
358
{
359
  if (buf.size() || mode == flush::on) {
×
360
    writer_lock writer{ this };
×
361

362
    writer.write(buf.data(), buf.size());
×
363

364
    if (mode == flush::on) {
×
365
      writer.flush();
×
366
    }
367
  }
368
}
369

370
void
371
managed_writer::write(const buffer_vec& buffers, const flush mode)
×
372
{
373
  if (mode == flush::on || any_nonempty_buffers(buffers)) {
×
374
    writer_lock writer{ this };
×
375

376
    for (const auto& buf : buffers) {
×
377
      writer.write(buf.data(), buf.size());
×
378
    }
379

380
    if (mode == flush::on) {
×
381
      writer.flush();
×
382
    }
383
  }
384
}
385

386
void
387
managed_writer::write(const std::string& buf, const flush mode)
×
388
{
389
  if (!buf.empty() || mode == flush::on) {
×
390
    writer_lock writer{ this };
×
391

392
    writer.write(buf.data(), buf.length());
×
393

394
    if (mode == flush::on) {
×
395
      writer.flush();
×
396
    }
397
  }
398
}
399

400
void
401
managed_writer::close()
×
402
{
403
  io_manager::remove(this);
×
404
  if (m_file) {
×
405
    if (fclose(m_file)) {
×
406
      m_file = nullptr;
×
407
      throw io_error("failed to close file", errno);
×
408
    }
409

410
    m_file = nullptr;
×
411
  }
412
}
413

414
} // namespace adapterremoval
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc