• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MikkelSchubert / adapterremoval / #68

19 Mar 2025 04:27PM UTC coverage: 27.092% (-0.02%) from 27.114%
#68

push

travis-ci

web-flow
add default terminate handler (#90)

This allows error handling in the scheduler to be simplified and
simplifies debugging. The auto-generated GDB wrapper scripts are updated
to break on this handler

0 of 31 new or added lines in 2 files covered. (0.0%)

5 existing lines in 2 files now uncovered.

2597 of 9586 relevant lines covered (27.09%)

4264.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/scheduler.cpp
1
/*************************************************************************\
2
 * AdapterRemoval - cleaning next-generation sequencing reads            *
3
 *                                                                       *
4
 * Copyright (C) 2015 by Mikkel Schubert - mikkelsch@gmail.com           *
5
 *                                                                       *
6
 * This program is free software: you can redistribute it and/or modify  *
7
 * it under the terms of the GNU General Public License as published by  *
8
 * the Free Software Foundation, either version 3 of the License, or     *
9
 * (at your option) any later version.                                   *
10
 *                                                                       *
11
 * This program is distributed in the hope that it will be useful,       *
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of        *
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
14
 * GNU General Public License for more details.                          *
15
 *                                                                       *
16
 * You should have received a copy of the GNU General Public License     *
17
 * along with this program.  If not, see <http://www.gnu.org/licenses/>. *
18
\*************************************************************************/
19
#include "buffer.hpp"   // for buffer
20
#include "debug.hpp"    // for AR_REQUIRE, AR_FAIL
21
#include "fastq.hpp"    // for fastq
22
#include "logging.hpp"  // for error, log_stream, debug
23
#include "strutils.hpp" // for indent_lines
24
#include <algorithm>    // for max
25
#include <exception>    // for exception
26
#include <functional>   // for greater
27
#include <memory>       // for __shared_ptr_access, operator<, unique...
28
#include <system_error> // for system_error
29
#include <thread>       // for thread
30

31
#include "scheduler.hpp"
32

33
namespace adapterremoval {
34

35
/** Indicates the role of a worker thread */
36
enum class threadtype
37
{
38
  //! Mainly compute threads; may optionally perform IO
39
  cpu,
40
  //! Pure IO threads; compute work must be minimized
41
  io
42
};
43

44
///////////////////////////////////////////////////////////////////////////////
45
// analytical_step
46

47
analytical_step::analytical_step(processing_order step_order, std::string name)
×
48
  : m_step_order(step_order)
×
49
  , m_name(std::move(name))
×
50
{
51
}
52

53
///////////////////////////////////////////////////////////////////////////////
54
// scheduler_step
55

56
using data_chunk = std::pair<size_t, chunk_ptr>;
57

58
/** Class wrapping a analytical_step. */
59
class scheduler_step
60
{
61
public:
62
  /** Constructor; name is used in error messages related to bugs */
63
  explicit scheduler_step(std::unique_ptr<analytical_step> value)
×
64
    : m_ptr(std::move(value))
×
65
  {
66
    AR_REQUIRE(m_ptr);
×
67
  }
68

69
  ~scheduler_step() = default;
×
70

71
  /**
72
   * Returns true if the next chunk is queued; only applicable to ordered tasks,
73
   * since unordered tasks are always added to a scheduler queue and looping on
74
   * the same task will result in queues going out of sync. Requires locking.
75
   */
76
  bool can_run_next() const
×
77
  {
78
    return ordering() != processing_order::unordered && !m_queue.empty() &&
×
79
           m_queue.front().first == m_next_chunk;
×
80
  }
81

82
  /** FIFO queues a task; requires locking */
83
  bool push_chunk(size_t chunk_id, chunk_ptr ptr)
×
84
  {
85
    m_queue.emplace_back(chunk_id, std::move(ptr));
×
86
    std::push_heap(m_queue.begin(), m_queue.end(), std::greater<>{});
×
87

88
    return m_next_chunk == chunk_id ||
×
89
           ordering() == processing_order::unordered;
×
90
  }
91

92
  /** Returns the oldest task; requires locking */
93
  data_chunk pop_chunk()
×
94
  {
95
    std::pop_heap(m_queue.begin(), m_queue.end(), std::greater<>{});
×
96
    auto task = std::move(m_queue.back());
×
97
    m_queue.pop_back();
×
98

99
    return task;
×
100
  }
101

102
  /** Name of the analytical task; for error messages when bugs are detected */
103
  const std::string& name() const { return m_ptr->name(); }
×
104

105
  /** Name of the analytical task; for error messages when bugs are detected */
106
  processing_order ordering() const { return m_ptr->ordering(); }
×
107

108
  /** Number of chunks waiting to be processed by this task; requires locking */
109
  size_t chunks_queued() const { return m_queue.size(); }
×
110

111
  /** Returns new ID for (sparse) output from ordered tasks; requires locking */
112
  size_t new_chunk_id() { return m_last_chunk++; }
×
113

114
  /** Processes a data chunk and returns the output chunks; assumes `can_run` */
115
  chunk_vec process(chunk_ptr chunk) const
×
116
  {
117
    return m_ptr->process(std::move(chunk));
×
118
  }
119

120
  /** Increment the next chunk to be processed; requires locking */
121
  bool increment_next_chunk()
×
122
  {
123
    m_next_chunk++;
×
124
    return can_run_next();
×
125
  }
126

127
  /** Perform any final cleanup associated with the task; requires locking */
128
  void finalize() const { return m_ptr->finalize(); }
×
129

130
  scheduler_step(const scheduler_step&) = delete;
131
  scheduler_step(scheduler_step&&) = delete;
132
  scheduler_step& operator=(const scheduler_step&) = delete;
133
  scheduler_step& operator=(scheduler_step&&) = delete;
134

135
private:
136
  //! Analytical step implementation
137
  std::unique_ptr<analytical_step> m_ptr = nullptr;
138
  //! The next chunk to be processed
139
  size_t m_next_chunk = 0;
140
  //! Running counter of output; for (possibly) sparse output of ordered steps
141
  size_t m_last_chunk = 0;
142
  //! (Ordered) vector of chunks to be processed
143
  std::vector<chunk_pair> m_queue{};
144
};
145

146
///////////////////////////////////////////////////////////////////////////////
147
// scheduler
148

149
size_t
150
scheduler::add_step(std::unique_ptr<analytical_step> step)
×
151
{
152
  AR_REQUIRE(step);
×
153

154
  const size_t step_id = m_steps.size();
×
155
  m_steps.push_back(std::make_shared<scheduler_step>(std::move(step)));
×
156

157
  return step_id;
×
158
}
159

160
bool
161
scheduler::run(int nthreads)
×
162
{
163
  AR_REQUIRE(!m_steps.empty());
×
164
  AR_REQUIRE(nthreads >= 1);
×
165
  AR_REQUIRE(!m_chunk_counter);
×
166
  // The last step added is assumed to be the initial/producing step
167
  AR_REQUIRE(m_steps.back()->ordering() == processing_order::ordered ||
×
168
             m_steps.back()->ordering() == processing_order::ordered_io);
169

170
  m_tasks_max = static_cast<size_t>(nthreads) * 3;
×
171

172
  std::vector<std::thread> threads;
×
173

174
  try {
×
175
    // CPU bound threads
176
    for (int i = 0; i < nthreads - 1; ++i) {
×
177
      threads.emplace_back(run_wrapper, this, threadtype::cpu);
×
178
    }
179

180
    // IO only threads
181
    for (int i = 0; i < 2; ++i) {
×
182
      threads.emplace_back(run_wrapper, this, threadtype::io);
×
183
    }
184
  } catch (const std::system_error& error) {
×
NEW
185
    log::error() << "Failed to create worker threads";
×
NEW
186
    throw;
×
UNCOV
187
  }
×
188

189
  // Run the main thread; the only calculation thread when "single-threaded"
190
  run_wrapper(this, threadtype::cpu);
×
191

192
  for (auto& thread : threads) {
×
193
    try {
×
194
      thread.join();
×
195
    } catch (const std::system_error& error) {
×
NEW
196
      log::error() << "Failed to join worker threads";
×
NEW
197
      throw;
×
UNCOV
198
    }
×
199
  }
200

201
  for (const auto& step : m_steps) {
×
UNCOV
202
    if (step->chunks_queued()) {
×
203
      log::error() << "Not all parts run for step " << step->name() << "; "
×
204
                   << step->chunks_queued() << " parts left";
×
NEW
205
      return false;
×
206
    }
207
  }
208

209
  // Steps are added in reverse order
UNCOV
210
  for (auto it = m_steps.rbegin(); it != m_steps.rend(); ++it) {
×
211
    try {
×
212
      (*it)->finalize();
×
213
    } catch (const std::exception&) {
×
214
      log::error() << "Failed to finalize task: " << (*it)->name();
×
215
      throw;
×
216
    }
×
217
  }
218

219
  return true;
×
220
}
221

222
void
223
scheduler::run_wrapper(scheduler* sch, threadtype thread_type)
×
224
{
NEW
225
  switch (thread_type) {
×
NEW
226
    case threadtype::cpu:
×
NEW
227
      sch->run_calc_loop();
×
NEW
228
      break;
×
NEW
229
    case threadtype::io:
×
NEW
230
      sch->run_io_loop();
×
NEW
231
      break;
×
NEW
232
    default:
×
NEW
233
      AR_FAIL("unexpected threadtype value");
×
234
  }
235

236
  // Signal any waiting threads
237
  sch->m_condition_calc.notify_all();
×
238
  sch->m_condition_io.notify_all();
×
239
}
240

241
void
242
scheduler::run_io_loop()
×
243
{
244
  std::unique_lock<std::mutex> lock(m_queue_lock);
×
245

NEW
246
  while (true) {
×
247
    step_ptr step;
×
248
    if (!m_queue_io.empty()) {
×
249
      step = m_queue_io.front();
×
250
      m_queue_io.pop();
×
251
    } else if (m_tasks || m_tasks_max) {
×
252
      m_condition_io.wait(lock);
×
253
      continue;
×
254
    } else {
255
      break;
256
    }
257

258
    const bool wake_thread = !m_queue_io.empty();
×
259
    data_chunk chunk = step->pop_chunk();
×
260

261
    lock.unlock();
×
262
    if (wake_thread) {
×
263
      m_condition_io.notify_one();
×
264
    }
265

266
    chunk_vec chunks = step->process(std::move(chunk.second));
×
267
    // Currently only (final) output steps
268
    AR_REQUIRE(chunks.empty());
×
269
    lock.lock();
×
270

271
    // Indicate that the next chunk can be processed
272
    if (step->increment_next_chunk()) {
×
273
      m_queue_io.push(step);
×
274
    }
275

276
    // One less task in memory
277
    m_tasks--;
×
278

279
    // Queue additional read tasks
280
    if (!m_queue_calc.empty() || m_tasks < m_tasks_max) {
×
281
      m_condition_calc.notify_one();
×
282
    }
283
  }
284

285
  m_condition_io.notify_all();
×
286
  m_condition_calc.notify_all();
×
287
}
288

289
void
290
scheduler::run_calc_loop()
×
291
{
292
  std::unique_lock<std::mutex> lock(m_queue_lock);
×
293

NEW
294
  while (true) {
×
295
    step_ptr step;
×
296
    if (!m_queue_calc.empty()) {
×
297
      // Otherwise try do do some non-IO work
298
      step = m_queue_calc.front();
×
299
      m_queue_calc.pop();
×
300
    } else if (m_tasks < m_tasks_max) {
×
301
      // If all (or no) tasks are running and IO is idle then read another chunk
302
      m_tasks++;
×
303
      step = m_steps.back();
×
304
      if (!step->push_chunk(m_chunk_counter++, chunk_ptr())) {
×
305
        continue;
×
306
      }
307
    } else if (m_tasks || m_tasks_max) {
×
308
      // There are either tasks running (which may produce new tasks) or tasks
309
      // that cannot yet be run due to IO already being active.
310
      m_condition_calc.wait(lock);
×
311
      continue;
×
312
    } else {
313
      break;
314
    }
315

316
    const bool wake_thread = !m_queue_calc.empty() || (m_tasks < m_tasks_max);
×
317
    data_chunk chunk = step->pop_chunk();
×
318

319
    lock.unlock();
×
320
    if (wake_thread) {
×
321
      m_condition_calc.notify_one();
×
322
    }
323

324
    chunk_vec chunks = step->process(std::move(chunk.second));
×
325
    lock.lock();
×
326

327
    if (chunks.empty() && step == m_steps.back()) {
×
328
      // The source has stopped producing chunks; nothing more to do
329
      m_tasks_max = 0;
×
330
    }
331

332
    // Schedule each of the resulting blocks
333
    for (auto& result : chunks) {
×
334
      AR_REQUIRE(result.first < m_steps.size());
×
335
      const step_ptr& recipient = m_steps.at(result.first);
×
336

337
      // Inherit reference count from source chunk
338
      auto next_id = chunk.first;
×
339
      if (step->ordering() != processing_order::unordered) {
×
340
        // Ordered steps are allowed to not return results, so the chunk
341
        // numbering is remembered for down-stream steps
342
        next_id = recipient->new_chunk_id();
×
343
      }
344

345
      if (recipient->push_chunk(next_id, std::move(result.second))) {
×
346
        if (recipient->ordering() == processing_order::ordered_io) {
×
347
          m_queue_io.push(recipient);
×
348
        } else {
349
          m_queue_calc.push(recipient);
×
350
        }
351
      }
352

353
      m_tasks++;
×
354
    }
355

356
    if (step->ordering() != processing_order::unordered) {
×
357
      // Indicate that the next chunk can be processed
358
      if (step->increment_next_chunk()) {
×
359
        m_queue_calc.push(step);
×
360
      }
361
    }
362

363
    // One less task in memory
364
    m_tasks--;
×
365

366
    if (!m_queue_io.empty()) {
×
367
      m_condition_io.notify_one();
×
368
    }
369
  }
370

371
  m_condition_io.notify_all();
×
372
  m_condition_calc.notify_all();
×
373
}
374

375
} // namespace adapterremoval
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc