• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MikkelSchubert / adapterremoval / #117

25 May 2025 03:01PM UTC coverage: 66.932% (-0.07%) from 67.006%
#117

push

travis-ci

web-flow
iwyu and reduce build-time inter-dependencies (#144)

26 of 145 new or added lines in 20 files covered. (17.93%)

89 existing lines in 5 files now uncovered.

9738 of 14549 relevant lines covered (66.93%)

3041.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/scheduler.cpp
1
// SPDX-License-Identifier: GPL-3.0-or-later
2
// SPDX-FileCopyrightText: 2015 Mikkel Schubert <mikkelsch@gmail.com>
3
#include "scheduler.hpp" // declarations
4
#include "buffer.hpp"    // for buffer
5
#include "debug.hpp"     // for AR_REQUIRE, AR_FAIL
6
#include "fastq.hpp"     // for fastq
7
#include "logging.hpp"   // for error, log_stream, debug
8
#include <exception>     // for exception
9
#include <functional>    // for greater
10
#include <memory>        // for __shared_ptr_access, operator<, unique...
11
#include <system_error>  // for system_error
12
#include <thread>        // for thread
13

14
namespace adapterremoval {
15

16
/** Indicates the role of a worker thread */
17
enum class threadtype
18
{
19
  //! Mainly compute threads; may optionally perform IO
20
  cpu,
21
  //! Pure IO threads; compute work must be minimized
22
  io
23
};
24

25
///////////////////////////////////////////////////////////////////////////////
26
// analytical_chunk
27

28
// Out of line implementation to avoid adding dependency on fastq to every file
NEW
29
analytical_chunk::~analytical_chunk() = default;
×
30

31
///////////////////////////////////////////////////////////////////////////////
32
// analytical_step
33

34
analytical_step::analytical_step(processing_order step_order, std::string name)
×
35
  : m_step_order(step_order)
×
36
  , m_name(std::move(name))
×
37
{
38
}
39

40
///////////////////////////////////////////////////////////////////////////////
41
// scheduler_step
42

43
using data_chunk = std::pair<size_t, chunk_ptr>;
44

45
/** Class wrapping a analytical_step. */
46
class scheduler_step
47
{
48
public:
49
  /** Constructor; name is used in error messages related to bugs */
50
  explicit scheduler_step(std::unique_ptr<analytical_step> value)
×
51
    : m_ptr(std::move(value))
×
52
  {
53
    AR_REQUIRE(m_ptr);
×
54
  }
55

56
  ~scheduler_step() = default;
×
57

58
  /**
59
   * Returns true if the next chunk is queued; only applicable to ordered tasks,
60
   * since unordered tasks are always added to a scheduler queue and looping on
61
   * the same task will result in queues going out of sync. Requires locking.
62
   */
63
  bool can_run_next() const
×
64
  {
65
    return ordering() != processing_order::unordered && !m_queue.empty() &&
×
66
           m_queue.front().first == m_next_chunk;
×
67
  }
68

69
  /** FIFO queues a task; requires locking */
70
  bool push_chunk(size_t chunk_id, chunk_ptr ptr)
×
71
  {
72
    m_queue.emplace_back(chunk_id, std::move(ptr));
×
73
    std::push_heap(m_queue.begin(), m_queue.end(), std::greater<>{});
×
74

75
    return m_next_chunk == chunk_id ||
×
76
           ordering() == processing_order::unordered;
×
77
  }
78

79
  /** Returns the oldest task; requires locking */
80
  data_chunk pop_chunk()
×
81
  {
82
    std::pop_heap(m_queue.begin(), m_queue.end(), std::greater<>{});
×
83
    auto task = std::move(m_queue.back());
×
84
    m_queue.pop_back();
×
85

86
    return task;
×
87
  }
88

89
  /** Name of the analytical task; for error messages when bugs are detected */
90
  const std::string& name() const { return m_ptr->name(); }
×
91

92
  /** Name of the analytical task; for error messages when bugs are detected */
93
  processing_order ordering() const { return m_ptr->ordering(); }
×
94

95
  /** Number of chunks waiting to be processed by this task; requires locking */
96
  size_t chunks_queued() const { return m_queue.size(); }
×
97

98
  /** Returns new ID for (sparse) output from ordered tasks; requires locking */
99
  size_t new_chunk_id() { return m_last_chunk++; }
×
100

101
  /** Processes a data chunk and returns the output chunks; assumes `can_run` */
102
  chunk_vec process(chunk_ptr chunk) const
×
103
  {
104
    return m_ptr->process(std::move(chunk));
×
105
  }
106

107
  /** Increment the next chunk to be processed; requires locking */
108
  bool increment_next_chunk()
×
109
  {
110
    m_next_chunk++;
×
111
    return can_run_next();
×
112
  }
113

114
  /** Perform any final cleanup associated with the task; requires locking */
115
  void finalize() const { return m_ptr->finalize(); }
×
116

117
  scheduler_step(const scheduler_step&) = delete;
118
  scheduler_step(scheduler_step&&) = delete;
119
  scheduler_step& operator=(const scheduler_step&) = delete;
120
  scheduler_step& operator=(scheduler_step&&) = delete;
121

122
private:
123
  //! Analytical step implementation
124
  std::unique_ptr<analytical_step> m_ptr = nullptr;
125
  //! The next chunk to be processed
126
  size_t m_next_chunk = 0;
127
  //! Running counter of output; for (possibly) sparse output of ordered steps
128
  size_t m_last_chunk = 0;
129
  //! (Ordered) vector of chunks to be processed
130
  std::vector<chunk_pair> m_queue{};
131
};
132

133
///////////////////////////////////////////////////////////////////////////////
134
// scheduler
135

136
size_t
137
scheduler::add_step(std::unique_ptr<analytical_step> step)
×
138
{
139
  AR_REQUIRE(step);
×
140

141
  const size_t step_id = m_steps.size();
×
142
  m_steps.push_back(std::make_shared<scheduler_step>(std::move(step)));
×
143

144
  return step_id;
×
145
}
146

147
bool
148
scheduler::run(int nthreads)
×
149
{
150
  AR_REQUIRE(!m_steps.empty());
×
151
  AR_REQUIRE(nthreads >= 1);
×
152
  AR_REQUIRE(!m_chunk_counter);
×
153
  // The last step added is assumed to be the initial/producing step
154
  AR_REQUIRE(m_steps.back()->ordering() == processing_order::ordered ||
×
155
             m_steps.back()->ordering() == processing_order::ordered_io);
156

157
  m_tasks_max = static_cast<size_t>(nthreads) * 3;
×
158

159
  std::vector<std::thread> threads;
×
160

161
  try {
×
162
    // CPU bound threads
163
    for (int i = 0; i < nthreads - 1; ++i) {
×
164
      threads.emplace_back(run_wrapper, this, threadtype::cpu);
×
165
    }
166

167
    // IO only threads
168
    for (int i = 0; i < 2; ++i) {
×
169
      threads.emplace_back(run_wrapper, this, threadtype::io);
×
170
    }
171
  } catch (const std::system_error& error) {
×
172
    log::error() << "Failed to create worker threads";
×
173
    throw;
×
174
  }
×
175

176
  // Run the main thread; the only calculation thread when "single-threaded"
177
  run_wrapper(this, threadtype::cpu);
×
178

179
  for (auto& thread : threads) {
×
180
    try {
×
181
      thread.join();
×
182
    } catch (const std::system_error& error) {
×
183
      log::error() << "Failed to join worker threads";
×
184
      throw;
×
185
    }
×
186
  }
187

188
  for (const auto& step : m_steps) {
×
189
    if (step->chunks_queued()) {
×
190
      log::error() << "Not all parts run for step " << step->name() << "; "
×
191
                   << step->chunks_queued() << " parts left";
×
192
      return false;
×
193
    }
194
  }
195

196
  // Steps are added in reverse order
197
  for (auto it = m_steps.rbegin(); it != m_steps.rend(); ++it) {
×
198
    try {
×
199
      (*it)->finalize();
×
200
    } catch (const std::exception&) {
×
201
      log::error() << "Failed to finalize task: " << (*it)->name();
×
202
      throw;
×
203
    }
×
204
  }
205

206
  return true;
×
207
}
208

209
void
210
scheduler::run_wrapper(scheduler* sch, threadtype thread_type)
×
211
{
212
  switch (thread_type) {
×
213
    case threadtype::cpu:
×
214
      sch->run_calc_loop();
×
215
      break;
×
216
    case threadtype::io:
×
217
      sch->run_io_loop();
×
218
      break;
×
219
    default:
×
220
      AR_FAIL("unexpected threadtype value");
×
221
  }
222

223
  // Signal any waiting threads
224
  sch->m_condition_calc.notify_all();
×
225
  sch->m_condition_io.notify_all();
×
226
}
227

228
void
229
scheduler::run_io_loop()
×
230
{
231
  std::unique_lock<std::mutex> lock(m_queue_lock);
×
232

233
  while (true) {
×
234
    step_ptr step;
×
235
    if (!m_queue_io.empty()) {
×
236
      step = m_queue_io.front();
×
237
      m_queue_io.pop();
×
238
    } else if (m_tasks || m_tasks_max) {
×
239
      m_condition_io.wait(lock);
×
240
      continue;
×
241
    } else {
242
      break;
243
    }
244

245
    const bool wake_thread = !m_queue_io.empty();
×
246
    data_chunk chunk = step->pop_chunk();
×
247

248
    lock.unlock();
×
249
    if (wake_thread) {
×
250
      m_condition_io.notify_one();
×
251
    }
252

253
    chunk_vec chunks = step->process(std::move(chunk.second));
×
254
    // Currently only (final) output steps
255
    AR_REQUIRE(chunks.empty());
×
256
    lock.lock();
×
257

258
    // Indicate that the next chunk can be processed
259
    if (step->increment_next_chunk()) {
×
260
      m_queue_io.push(step);
×
261
    }
262

263
    // One less task in memory
264
    m_tasks--;
×
265

266
    // Queue additional read tasks
267
    if (!m_queue_calc.empty() || m_tasks < m_tasks_max) {
×
268
      m_condition_calc.notify_one();
×
269
    }
270
  }
271

272
  m_condition_io.notify_all();
×
273
  m_condition_calc.notify_all();
×
274
}
275

276
void
277
scheduler::run_calc_loop()
×
278
{
279
  std::unique_lock<std::mutex> lock(m_queue_lock);
×
280

281
  while (true) {
×
282
    step_ptr step;
×
283
    if (!m_queue_calc.empty()) {
×
284
      // Otherwise try do do some non-IO work
285
      step = m_queue_calc.front();
×
286
      m_queue_calc.pop();
×
287
    } else if (m_tasks < m_tasks_max) {
×
288
      // If all (or no) tasks are running and IO is idle then read another chunk
289
      m_tasks++;
×
290
      step = m_steps.back();
×
291
      if (!step->push_chunk(m_chunk_counter++, chunk_ptr())) {
×
292
        continue;
×
293
      }
294
    } else if (m_tasks || m_tasks_max) {
×
295
      // There are either tasks running (which may produce new tasks) or tasks
296
      // that cannot yet be run due to IO already being active.
297
      m_condition_calc.wait(lock);
×
298
      continue;
×
299
    } else {
300
      break;
301
    }
302

303
    const bool wake_thread = !m_queue_calc.empty() || (m_tasks < m_tasks_max);
×
304
    data_chunk chunk = step->pop_chunk();
×
305

306
    lock.unlock();
×
307
    if (wake_thread) {
×
308
      m_condition_calc.notify_one();
×
309
    }
310

311
    chunk_vec chunks = step->process(std::move(chunk.second));
×
312
    lock.lock();
×
313

314
    if (chunks.empty() && step == m_steps.back()) {
×
315
      // The source has stopped producing chunks; nothing more to do
316
      m_tasks_max = 0;
×
317
    }
318

319
    // Schedule each of the resulting blocks
320
    for (auto& result : chunks) {
×
321
      AR_REQUIRE(result.first < m_steps.size());
×
322
      const step_ptr& recipient = m_steps.at(result.first);
×
323

324
      // Inherit reference count from source chunk
325
      auto next_id = chunk.first;
×
326
      if (step->ordering() != processing_order::unordered) {
×
327
        // Ordered steps are allowed to not return results, so the chunk
328
        // numbering is remembered for down-stream steps
329
        next_id = recipient->new_chunk_id();
×
330
      }
331

332
      if (recipient->push_chunk(next_id, std::move(result.second))) {
×
333
        if (recipient->ordering() == processing_order::ordered_io) {
×
334
          m_queue_io.push(recipient);
×
335
        } else {
336
          m_queue_calc.push(recipient);
×
337
        }
338
      }
339

340
      m_tasks++;
×
341
    }
342

343
    if (step->ordering() != processing_order::unordered) {
×
344
      // Indicate that the next chunk can be processed
345
      if (step->increment_next_chunk()) {
×
346
        m_queue_calc.push(step);
×
347
      }
348
    }
349

350
    // One less task in memory
351
    m_tasks--;
×
352

353
    if (!m_queue_io.empty()) {
×
354
      m_condition_io.notify_one();
×
355
    }
356
  }
357

358
  m_condition_io.notify_all();
×
359
  m_condition_calc.notify_all();
×
360
}
361

362
} // namespace adapterremoval
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc