• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MikkelSchubert / adapterremoval / #45

20 Sep 2024 06:49PM UTC coverage: 26.244% (-49.2%) from 75.443%
#45

push

travis-ci

web-flow
attempt to fix coveralls run

2458 of 9366 relevant lines covered (26.24%)

4362.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/scheduler.cpp
1
/*************************************************************************\
2
 * AdapterRemoval - cleaning next-generation sequencing reads            *
3
 *                                                                       *
4
 * Copyright (C) 2015 by Mikkel Schubert - mikkelsch@gmail.com           *
5
 *                                                                       *
6
 * This program is free software: you can redistribute it and/or modify  *
7
 * it under the terms of the GNU General Public License as published by  *
8
 * the Free Software Foundation, either version 3 of the License, or     *
9
 * (at your option) any later version.                                   *
10
 *                                                                       *
11
 * This program is distributed in the hope that it will be useful,       *
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of        *
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
14
 * GNU General Public License for more details.                          *
15
 *                                                                       *
16
 * You should have received a copy of the GNU General Public License     *
17
 * along with this program.  If not, see <http://www.gnu.org/licenses/>. *
18
\*************************************************************************/
19
#include "buffer.hpp"   // for buffer
20
#include "debug.hpp"    // for AR_REQUIRE, AR_FAIL
21
#include "fastq.hpp"    // for fastq
22
#include "logging.hpp"  // for error, log_stream, debug
23
#include "strutils.hpp" // for indent_lines
24
#include <algorithm>    // for max
25
#include <exception>    // for exception
26
#include <functional>   // for greater
27
#include <memory>       // for __shared_ptr_access, operator<, unique...
28
#include <system_error> // for system_error
29
#include <thread>       // for thread
30

31
#include "scheduler.hpp"
32

33
namespace adapterremoval {
34

35
/** Indicates the role of a worker thread */
36
enum class threadtype
37
{
38
  //! Mainly compute threads; may optionally perform IO
39
  cpu,
40
  //! Pure IO threads; compute work must be minimized
41
  io
42
};
43

44
///////////////////////////////////////////////////////////////////////////////
45
// analytical_step
46

47
analytical_step::analytical_step(processing_order step_order, std::string name)
×
48
  : m_step_order(step_order)
×
49
  , m_name(std::move(name))
×
50
{
51
}
52

53
///////////////////////////////////////////////////////////////////////////////
54
// scheduler_step
55

56
using data_chunk = std::pair<size_t, chunk_ptr>;
57

58
/** Class wrapping a analytical_step. */
59
class scheduler_step
60
{
61
public:
62
  /** Constructor; name is used in error messages related to bugs */
63
  explicit scheduler_step(std::unique_ptr<analytical_step> value)
×
64
    : m_ptr(std::move(value))
×
65
  {
66
    AR_REQUIRE(m_ptr);
×
67
  }
68

69
  ~scheduler_step() = default;
×
70

71
  /**
72
   * Returns true if the next chunk is queued; only applicable to ordered tasks,
73
   * since unordered tasks are always added to a scheduler queue and looping on
74
   * the same task will result in queues going out of sync. Requires locking.
75
   */
76
  bool can_run_next() const
×
77
  {
78
    return ordering() != processing_order::unordered && !m_queue.empty() &&
×
79
           m_queue.front().first == m_next_chunk;
×
80
  }
81

82
  /** FIFO queues a task; requires locking */
83
  bool push_chunk(size_t chunk_id, chunk_ptr ptr)
×
84
  {
85
    m_queue.emplace_back(chunk_id, std::move(ptr));
×
86
    std::push_heap(m_queue.begin(), m_queue.end(), std::greater<>{});
×
87

88
    return m_next_chunk == chunk_id ||
×
89
           ordering() == processing_order::unordered;
×
90
  }
91

92
  /** Returns the oldest task; requires locking */
93
  data_chunk pop_chunk()
×
94
  {
95
    std::pop_heap(m_queue.begin(), m_queue.end(), std::greater<>{});
×
96
    auto task = std::move(m_queue.back());
×
97
    m_queue.pop_back();
×
98

99
    return task;
×
100
  }
101

102
  /** Name of the analytical task; for error messages when bugs are detected */
103
  const std::string& name() const { return m_ptr->name(); }
×
104

105
  /** Name of the analytical task; for error messages when bugs are detected */
106
  processing_order ordering() const { return m_ptr->ordering(); }
×
107

108
  /** Number of chunks waiting to be processed by this task; requires locking */
109
  size_t chunks_queued() const { return m_queue.size(); }
×
110

111
  /** Returns new ID for (sparse) output from ordered tasks; requires locking */
112
  size_t new_chunk_id() { return m_last_chunk++; }
×
113

114
  /** Processes a data chunk and returns the output chunks; assumes `can_run` */
115
  chunk_vec process(chunk_ptr chunk) const
×
116
  {
117
    return m_ptr->process(std::move(chunk));
×
118
  }
119

120
  /** Increment the next chunk to be processed; requires locking */
121
  bool increment_next_chunk()
×
122
  {
123
    m_next_chunk++;
×
124
    return can_run_next();
×
125
  }
126

127
  /** Perform any final cleanup associated with the task; requires locking */
128
  void finalize() const { return m_ptr->finalize(); }
×
129

130
  scheduler_step(const scheduler_step&) = delete;
131
  scheduler_step(scheduler_step&&) = delete;
132
  scheduler_step& operator=(const scheduler_step&) = delete;
133
  scheduler_step& operator=(scheduler_step&&) = delete;
134

135
private:
136
  //! Analytical step implementation
137
  std::unique_ptr<analytical_step> m_ptr = nullptr;
138
  //! The next chunk to be processed
139
  size_t m_next_chunk = 0;
140
  //! Running counter of output; for (possibly) sparse output of ordered steps
141
  size_t m_last_chunk = 0;
142
  //! (Ordered) vector of chunks to be processed
143
  std::vector<chunk_pair> m_queue{};
144
};
145

146
///////////////////////////////////////////////////////////////////////////////
147
// scheduler
148

149
size_t
150
scheduler::add_step(std::unique_ptr<analytical_step> step)
×
151
{
152
  AR_REQUIRE(step);
×
153

154
  const size_t step_id = m_steps.size();
×
155
  m_steps.push_back(std::make_shared<scheduler_step>(std::move(step)));
×
156

157
  return step_id;
×
158
}
159

160
bool
161
scheduler::run(int nthreads)
×
162
{
163
  AR_REQUIRE(!m_steps.empty());
×
164
  AR_REQUIRE(nthreads >= 1);
×
165
  AR_REQUIRE(!m_chunk_counter);
×
166
  // The last step added is assumed to be the initial/producing step
167
  AR_REQUIRE(m_steps.back()->ordering() == processing_order::ordered ||
×
168
             m_steps.back()->ordering() == processing_order::ordered_io);
169

170
  m_tasks_max = static_cast<size_t>(nthreads) * 3;
×
171

172
  std::vector<std::thread> threads;
×
173

174
  try {
×
175
    // CPU bound threads
176
    for (int i = 0; i < nthreads - 1; ++i) {
×
177
      threads.emplace_back(run_wrapper, this, threadtype::cpu);
×
178
    }
179

180
    // IO only threads
181
    for (int i = 0; i < 2; ++i) {
×
182
      threads.emplace_back(run_wrapper, this, threadtype::io);
×
183
    }
184
  } catch (const std::system_error& error) {
×
185
    log::error() << "Failed to create threads:\n" << indent_lines(error.what());
×
186
    m_errors = true;
×
187
  }
×
188

189
  // Run the main thread; the only calculation thread when "single-threaded"
190
  run_wrapper(this, threadtype::cpu);
×
191

192
  for (auto& thread : threads) {
×
193
    try {
×
194
      thread.join();
×
195
    } catch (const std::system_error& error) {
×
196
      log::error() << "Failed to join thread: " << error.what();
×
197
      m_errors = true;
×
198
    }
×
199
  }
200

201
  if (m_errors) {
×
202
    return false;
203
  }
204

205
  for (const auto& step : m_steps) {
×
206
    if (step->chunks_queued()) {
×
207
      log::error() << "Not all parts run for step " << step->name() << "; "
×
208
                   << step->chunks_queued() << " parts left";
×
209

210
      m_errors = true;
×
211
    }
212
  }
213

214
  if (m_errors) {
×
215
    return false;
216
  }
217

218
  // Steps are added in reverse order
219
  for (auto it = m_steps.rbegin(); it != m_steps.rend(); ++it) {
×
220
    try {
×
221
      (*it)->finalize();
×
222
    } catch (const std::exception&) {
×
223
      log::error() << "Failed to finalize task: " << (*it)->name();
×
224
      throw;
×
225
    }
×
226
  }
227

228
  return true;
×
229
}
230

231
void
232
scheduler::run_wrapper(scheduler* sch, threadtype thread_type)
×
233
{
234
  try {
×
235
    switch (thread_type) {
×
236
      case threadtype::cpu:
×
237
        sch->run_calc_loop();
×
238
        break;
239
      case threadtype::io:
×
240
        sch->run_io_loop();
×
241
        break;
242
      default:
×
243
        AR_FAIL("unexpected threadtype value");
×
244
    }
245
  } catch (const std::exception& error) {
×
246
    sch->m_errors = true;
×
247
    log::error() << error.what();
×
248
  } catch (...) {
×
249
    AR_FAIL("Unhandled, non-standard exception in thread");
×
250
  }
×
251

252
  // Signal any waiting threads
253
  sch->m_condition_calc.notify_all();
×
254
  sch->m_condition_io.notify_all();
×
255
}
256

257
void
258
scheduler::run_io_loop()
×
259
{
260
  std::unique_lock<std::mutex> lock(m_queue_lock);
×
261

262
  while (!m_errors) {
×
263
    step_ptr step;
×
264
    if (!m_queue_io.empty()) {
×
265
      step = m_queue_io.front();
×
266
      m_queue_io.pop();
×
267
    } else if (m_tasks || m_tasks_max) {
×
268
      m_condition_io.wait(lock);
×
269
      continue;
×
270
    } else {
271
      break;
272
    }
273

274
    const bool wake_thread = !m_queue_io.empty();
×
275
    data_chunk chunk = step->pop_chunk();
×
276

277
    lock.unlock();
×
278
    if (wake_thread) {
×
279
      m_condition_io.notify_one();
×
280
    }
281

282
    chunk_vec chunks = step->process(std::move(chunk.second));
×
283
    // Currently only (final) output steps
284
    AR_REQUIRE(chunks.empty());
×
285
    lock.lock();
×
286

287
    // Indicate that the next chunk can be processed
288
    if (step->increment_next_chunk()) {
×
289
      m_queue_io.push(step);
×
290
    }
291

292
    // One less task in memory
293
    m_tasks--;
×
294

295
    // Queue additional read tasks
296
    if (!m_queue_calc.empty() || m_tasks < m_tasks_max) {
×
297
      m_condition_calc.notify_one();
×
298
    }
299
  }
300

301
  m_condition_io.notify_all();
×
302
  m_condition_calc.notify_all();
×
303
}
304

305
void
306
scheduler::run_calc_loop()
×
307
{
308
  std::unique_lock<std::mutex> lock(m_queue_lock);
×
309

310
  while (!m_errors) {
×
311
    step_ptr step;
×
312
    if (!m_queue_calc.empty()) {
×
313
      // Otherwise try do do some non-IO work
314
      step = m_queue_calc.front();
×
315
      m_queue_calc.pop();
×
316
    } else if (m_tasks < m_tasks_max) {
×
317
      // If all (or no) tasks are running and IO is idle then read another chunk
318
      m_tasks++;
×
319
      step = m_steps.back();
×
320
      if (!step->push_chunk(m_chunk_counter++, chunk_ptr())) {
×
321
        continue;
×
322
      }
323
    } else if (m_tasks || m_tasks_max) {
×
324
      // There are either tasks running (which may produce new tasks) or tasks
325
      // that cannot yet be run due to IO already being active.
326
      m_condition_calc.wait(lock);
×
327
      continue;
×
328
    } else {
329
      break;
330
    }
331

332
    const bool wake_thread = !m_queue_calc.empty() || (m_tasks < m_tasks_max);
×
333
    data_chunk chunk = step->pop_chunk();
×
334

335
    lock.unlock();
×
336
    if (wake_thread) {
×
337
      m_condition_calc.notify_one();
×
338
    }
339

340
    chunk_vec chunks = step->process(std::move(chunk.second));
×
341
    lock.lock();
×
342

343
    if (chunks.empty() && step == m_steps.back()) {
×
344
      // The source has stopped producing chunks; nothing more to do
345
      m_tasks_max = 0;
×
346
    }
347

348
    // Schedule each of the resulting blocks
349
    for (auto& result : chunks) {
×
350
      AR_REQUIRE(result.first < m_steps.size());
×
351
      const step_ptr& recipient = m_steps.at(result.first);
×
352

353
      // Inherit reference count from source chunk
354
      auto next_id = chunk.first;
×
355
      if (step->ordering() != processing_order::unordered) {
×
356
        // Ordered steps are allowed to not return results, so the chunk
357
        // numbering is remembered for down-stream steps
358
        next_id = recipient->new_chunk_id();
×
359
      }
360

361
      if (recipient->push_chunk(next_id, std::move(result.second))) {
×
362
        if (recipient->ordering() == processing_order::ordered_io) {
×
363
          m_queue_io.push(recipient);
×
364
        } else {
365
          m_queue_calc.push(recipient);
×
366
        }
367
      }
368

369
      m_tasks++;
×
370
    }
371

372
    if (step->ordering() != processing_order::unordered) {
×
373
      // Indicate that the next chunk can be processed
374
      if (step->increment_next_chunk()) {
×
375
        m_queue_calc.push(step);
×
376
      }
377
    }
378

379
    // One less task in memory
380
    m_tasks--;
×
381

382
    if (!m_queue_io.empty()) {
×
383
      m_condition_io.notify_one();
×
384
    }
385
  }
386

387
  m_condition_io.notify_all();
×
388
  m_condition_calc.notify_all();
×
389
}
390

391
} // namespace adapterremoval
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc