• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 11970970296

22 Nov 2024 10:35AM UTC coverage: 94.911% (+0.1%) from 94.802%
11970970296

Pull #298

github

web-flow
Merge 7621d2b92 into 28eadd65d
Pull Request #298: Add scheduler lookahead to elide buffer resizes

3189 of 3626 branches covered (87.95%)

Branch coverage included in aggregate %.

233 of 234 new or added lines in 6 files covered. (99.57%)

1 existing line in 1 file now uncovered.

7049 of 7161 relevant lines covered (98.44%)

1254159.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.23
/src/scheduler.cc
1
#include "scheduler.h"
2

3
#include "affinity.h"
4
#include "command_graph.h"
5
#include "command_graph_generator.h"
6
#include "double_buffered_queue.h"
7
#include "instruction_graph_generator.h"
8
#include "log.h"
9
#include "named_threads.h"
10
#include "print_utils.h"
11
#include "ranges.h"
12
#include "recorders.h"
13
#include "tracy.h"
14
#include "types.h"
15
#include "utils.h"
16

17
#include <cassert>
18
#include <cstddef>
19
#include <cstdlib>
20
#include <deque>
21
#include <exception>
22
#include <memory>
23
#include <optional>
24
#include <string>
25
#include <thread>
26
#include <utility>
27
#include <variant>
28

29
#include <matchbox.hh>
30

31

32
namespace celerity::detail::scheduler_detail {
33

34
struct event_task_available {
35
        const task* tsk;
36
};
37
struct event_command_available {
38
        const command* cmd;
39
        std::optional<instruction_graph_generator::scheduling_hint> hint;
40
};
41
struct event_buffer_created {
42
        buffer_id bid;
43
        celerity::range<3> range;
44
        size_t elem_size;
45
        size_t elem_align;
46
        allocation_id user_allocation_id;
47
};
48
struct event_buffer_debug_name_changed {
49
        buffer_id bid;
50
        std::string debug_name;
51
};
52
struct event_buffer_destroyed {
53
        buffer_id bid;
54
};
55
struct event_host_object_created {
56
        host_object_id hoid;
57
        bool owns_instance;
58
};
59
struct event_host_object_destroyed {
60
        host_object_id hoid;
61
};
62
struct event_epoch_reached {
63
        task_id tid;
64
};
65
struct event_set_lookahead {
66
        experimental::lookahead lookahead;
67
};
68
struct event_flush_commands {};
69
struct test_event_inspect {
70
        scheduler_detail::test_inspector inspect;
71
};
72

73
/// An event passed from task_manager or runtime through the public scheduler interface.
74
using task_event = std::variant<event_task_available, event_buffer_created, event_buffer_debug_name_changed, event_buffer_destroyed, event_host_object_created,
75
    event_host_object_destroyed, event_epoch_reached, event_set_lookahead, event_flush_commands, test_event_inspect>;
76

77
class task_queue {
78
  public:
79
        void push(task_event&& evt) { m_global_queue.push(std::move(evt)); }
7,092✔
80

81
        task_event wait_and_pop() {
7,092✔
82
                if(m_local_queue.empty()) {
7,092✔
83
                        // We can frequently suspend / resume the scheduler thread without adding latency as long as the executor remains busy
84
                        m_global_queue.wait_while_empty();
4,310✔
85
                        const auto& batch = m_global_queue.pop_all();
4,310✔
86
                        m_local_queue.insert(m_local_queue.end(), batch.begin(), batch.end());
4,310✔
87
                        assert(!m_local_queue.empty());
4,310✔
88
                }
89
                auto evt = std::move(m_local_queue.front());
7,092✔
90
                m_local_queue.pop_front();
7,092✔
91
                return evt;
7,092✔
92
        }
93

94
        void assert_empty() {
246✔
95
                assert(m_global_queue.pop_all().empty());
246✔
96
                assert(m_local_queue.empty());
246✔
97
        }
246✔
98

99
  private:
100
        double_buffered_queue<task_event> m_global_queue;
101
        std::deque<task_event> m_local_queue; // "triple buffer" here because double_buffered_queue only gives us a temporary reference to its read-end
102
};
103

104
/// An event originating from command_graph_generator, or forwarded from the task_queue because it requires in-order processing with commands.
105
using command_event = std::variant<event_command_available, event_buffer_debug_name_changed, event_buffer_destroyed, event_host_object_destroyed,
106
    event_flush_commands, event_set_lookahead>;
107

108
class command_queue {
109
  public:
110
        bool should_dequeue(const experimental::lookahead lookahead) const {
12,860✔
111
                if(m_queue.empty()) return false;
12,860✔
112
                if(lookahead == experimental::lookahead::none) return true;                        // unconditionally dequeue, and do not inspect scheduling hints
6,775✔
113
                if(m_num_flushes_in_queue > 0) return true;                                        // force-dequeue until all flushing events are processed
6,672✔
114
                if(!std::holds_alternative<event_command_available>(m_queue.front())) return true; // only commands carry a hint and are thus worth delaying
3,815✔
115
                const auto& avail = std::get<event_command_available>(m_queue.front());
3,442✔
116
                assert(avail.hint.has_value()); // only nullopt when lookahead == none, which we checked above
3,442✔
117
                if(avail.hint == instruction_graph_generator::scheduling_hint::is_self_contained) return true; // don't delay scheduling of self-contained commands
3,442✔
118
                if(lookahead == experimental::lookahead::infinite) return false;
1,056✔
119
                assert(lookahead == experimental::lookahead::automatic);
967✔
120
                return m_num_horizons_since_last_mergeable_cmd >= 2; // heuristic: passing two horizons suggests we have arrived at an "allocation steady state"
967✔
121
        }
122

123
        void push(command_event&& evt) {
5,768✔
124
                if(is_flush(evt)) m_num_flushes_in_queue += 1;
5,768✔
125
                if(const auto avail = std::get_if<event_command_available>(&evt)) {
5,768✔
126
                        if(utils::isa<horizon_command>(avail->cmd)) { m_num_horizons_since_last_mergeable_cmd += 1; }
4,237✔
127
                        if(avail->hint == instruction_graph_generator::scheduling_hint::could_merge_with_future_commands) { m_num_horizons_since_last_mergeable_cmd = 0; }
4,237✔
128
                }
129
                m_queue.push_back(std::move(evt));
5,768✔
130
        }
5,768✔
131

132
        command_event pop() {
5,768✔
133
                assert(!m_queue.empty());
5,768✔
134
                auto evt = std::move(m_queue.front());
5,768✔
135
                m_queue.pop_front();
5,768✔
136
                if(is_flush(evt)) { m_num_flushes_in_queue -= 1; }
5,768✔
137
                return evt;
5,768✔
UNCOV
138
        }
×
139

140
        void assert_empty() { assert(m_queue.empty()); }
246!
141

142
  private:
143
        std::deque<command_event> m_queue;
144
        int m_num_flushes_in_queue = 0;
145
        int m_num_horizons_since_last_mergeable_cmd = 0;
146

147
        static bool is_flush(const command_event& evt) {
11,536✔
148
                if(std::holds_alternative<event_flush_commands>(evt)) return true;
11,536✔
149
                // Flushing on all changes to the lookahead setting avoids complicated decisions on when to "anticipate" commands from incoming tasks
150
                if(std::holds_alternative<event_set_lookahead>(evt)) return true;
9,532✔
151
                if(const auto avail = std::get_if<event_command_available>(&evt)) {
9,504✔
152
                        return utils::isa<fence_command>(avail->cmd) || utils::isa<epoch_command>(avail->cmd);
8,474✔
153
                }
154
                return false;
1,030✔
155
        }
156
};
157

158
struct scheduler_impl {
159
        command_graph cdag;
160
        command_recorder* crec;
161
        command_graph_generator cggen;
162
        instruction_graph idag;
163
        instruction_recorder* irec;
164
        instruction_graph_generator iggen;
165

166
        experimental::lookahead lookahead = experimental::lookahead::automatic;
167

168
        class task_queue task_queue;
169
        class command_queue command_queue;
170

171
        std::optional<task_id> shutdown_epoch_created = std::nullopt;
172
        bool shutdown_epoch_reached = false;
173

174
        std::thread thread;
175

176
        scheduler_impl(bool start_thread, size_t num_nodes, node_id local_node_id, const system_info& system, scheduler::delegate* dlg, command_recorder* crec,
177
            instruction_recorder* irec, const scheduler::policy_set& policy);
178

179
        // immovable: self-referential via `thread`
180
        scheduler_impl(const scheduler_impl&) = delete;
181
        scheduler_impl(scheduler_impl&&) = delete;
182
        scheduler_impl& operator=(const scheduler_impl&) = delete;
183
        scheduler_impl& operator=(scheduler_impl&&) = delete;
184

185
        ~scheduler_impl();
186

187
        void process_task_queue_event(const task_event& evt);
188
        void process_command_queue_event(const command_event& evt);
189

190
        void scheduling_loop();
191
        void thread_main();
192
};
193

194
scheduler_impl::scheduler_impl(const bool start_thread, const size_t num_nodes, const node_id local_node_id, const system_info& system,
246✔
195
    scheduler::delegate* const dlg, command_recorder* const crec, instruction_recorder* const irec, const scheduler::policy_set& policy)
246✔
196
    : cdag(), crec(crec), cggen(num_nodes, local_node_id, cdag, crec, policy.command_graph_generator), idag(), irec(irec),
246✔
197
      iggen(num_nodes, local_node_id, system, idag, dlg, irec, policy.instruction_graph_generator) {
246✔
198
        if(start_thread) {
246!
199
                thread = std::thread(&scheduler_impl::thread_main, this);
246✔
200
                set_thread_name(thread.native_handle(), "cy-scheduler");
738✔
201
        }
202
}
246✔
203

204
scheduler_impl::~scheduler_impl() {
246✔
205
        // schedule() will exit as soon as it has processed the shutdown epoch
206
        if(thread.joinable()) { thread.join(); }
246!
207
}
246✔
208

209
void scheduler_impl::process_task_queue_event(const task_event& evt) {
7,092✔
210
        matchbox::match(
7,092✔
211
            evt,
212
            [&](const event_task_available& e) {
7,092✔
213
                    assert(!shutdown_epoch_created && !shutdown_epoch_reached);
3,620✔
214
                    assert(e.tsk != nullptr);
3,620✔
215
                    auto& tsk = *e.tsk;
3,620✔
216

217
                    const auto commands = [&] {
7,240✔
218
                            CELERITY_DETAIL_TRACY_ZONE_SCOPED_V("scheduler::build_task", WebMaroon, "T{} build", tsk.get_id());
219
                            CELERITY_DETAIL_TRACY_ZONE_TEXT(utils::make_task_debug_label(tsk.get_type(), tsk.get_id(), tsk.get_debug_name()));
220
                            return cggen.build_task(tsk);
3,620✔
221
                    }(); // IIFE
3,620✔
222

223
                    for(const auto cmd : commands) {
7,857✔
224
                            // If there are multiple commands, the shutdown epoch must come last. m_iggen.delegate must be considered dangling after receiving
225
                            // the corresponding instruction, as runtime will begin destroying the executor after it has observed the epoch to be reached.
226
                            assert(!shutdown_epoch_created);
4,237✔
227
                            if(tsk.get_type() == task_type::epoch && tsk.get_epoch_action() == epoch_action::shutdown) { shutdown_epoch_created = tsk.get_id(); }
4,237✔
228

229
                            std::optional<instruction_graph_generator::scheduling_hint> hint;
4,237✔
230
                            if(lookahead != experimental::lookahead::none) { hint = iggen.anticipate(*cmd); }
4,237✔
231
                            command_queue.push(event_command_available{cmd, hint});
4,237✔
232
                    }
233
            },
7,240✔
234
            [&](const event_buffer_created& e) {
14,184✔
235
                    assert(!shutdown_epoch_created && !shutdown_epoch_reached);
411✔
236
                    CELERITY_DETAIL_TRACY_ZONE_SCOPED_V("scheduler::buffer_created", DarkGreen, "B{} create", e.bid);
237
                    cggen.notify_buffer_created(e.bid, e.range, e.user_allocation_id != null_allocation_id);
411✔
238
                    // Buffer creation must be applied immediately (and out-of-order when necessary) so that instruction_graph_generator::anticipate() does not operate
239
                    // on unknown buffers. This is fine as buffer creation never has dependencies on other commands and we do not re-use buffer ids.
240
                    iggen.notify_buffer_created(e.bid, e.range, e.elem_size, e.elem_align, e.user_allocation_id);
411✔
241
            },
411✔
242
            [&](const event_buffer_debug_name_changed& e) {
14,184✔
243
                    assert(!shutdown_epoch_created && !shutdown_epoch_reached);
23✔
244
                    CELERITY_DETAIL_TRACY_ZONE_SCOPED_V("scheduler::buffer_name_changed", DarkGreen, "B{} set name", e.bid);
245
                    cggen.notify_buffer_debug_name_changed(e.bid, e.debug_name);
23✔
246
                    // buffer-name changes are enqueued in-order to ensure that instruction records have the buffer names as they existed at task creation time.
247
                    command_queue.push(e);
23✔
248
            },
23✔
249
            [&](const event_buffer_destroyed& e) {
14,184✔
250
                    assert(!shutdown_epoch_created && !shutdown_epoch_reached);
411✔
251
                    CELERITY_DETAIL_TRACY_ZONE_SCOPED_V("scheduler::buffer_destroyed", DarkGreen, "B{} destroy", e.bid);
252
                    cggen.notify_buffer_destroyed(e.bid);
411✔
253
                    // host-object destruction must happen in-order, otherwise iggen would need to compile commands on already-deleted buffers.
254
                    command_queue.push(e);
411✔
255
            },
411✔
256
            [&](const event_host_object_created& e) {
14,184✔
257
                    assert(!shutdown_epoch_created && !shutdown_epoch_reached);
81✔
258
                    CELERITY_DETAIL_TRACY_ZONE_SCOPED_V("scheduler::host_object_created", DarkGreen, "H{} create", e.hoid);
259
                    cggen.notify_host_object_created(e.hoid);
81✔
260
                    // instruction_graph_generator::anticipate() does not examine host objects (unlike it does with buffers), but it doesn't hurt to create them early
261
                    // either since we don't re-use host object ids.
262
                    iggen.notify_host_object_created(e.hoid, e.owns_instance);
81✔
263
            },
81✔
264
            [&](const event_host_object_destroyed& e) {
14,184✔
265
                    assert(!shutdown_epoch_created && !shutdown_epoch_reached);
81✔
266
                    CELERITY_DETAIL_TRACY_ZONE_SCOPED_V("scheduler::host_object_destroyed", DarkGreen, "H{} destroy", e.hoid);
267
                    cggen.notify_host_object_destroyed(e.hoid);
81✔
268
                    // host-object destruction must happen in-order, otherwise iggen would need to compile commands on already-deleted host objects.
269
                    command_queue.push(e);
81✔
270
            },
81✔
271
            [&](const event_epoch_reached& e) { //
14,184✔
272
                    CELERITY_DETAIL_TRACY_ZONE_SCOPED("scheduler::prune", Gray);
273
                    cdag.erase_before_epoch(e.tid);
1,371✔
274
                    idag.erase_before_epoch(e.tid);
1,371✔
275

276
                    // The scheduler will receive the shutdown-epoch completion event via the runtime even if executor destruction has already begun.
277
                    assert(!shutdown_epoch_reached);
1,371✔
278
                    if(shutdown_epoch_created && e.tid == *shutdown_epoch_created) { shutdown_epoch_reached = true; }
1,371✔
279
            },
1,371✔
280
            [&](const event_set_lookahead& e) { //
14,184✔
281
                    command_queue.push(e);
14✔
282
            },
14✔
283
            [&](const event_flush_commands& e) { //
14,184✔
284
                    command_queue.push(e);
1,002✔
285
            },
1,002✔
286
            [&](const test_event_inspect& e) { //
14,184✔
287
                    e.inspect({.cdag = &cdag, .idag = &idag, .lookahead = lookahead});
78✔
288
            });
78✔
289
}
7,092✔
290

291
void scheduler_impl::process_command_queue_event(const command_event& evt) {
5,768✔
292
        matchbox::match(
5,768✔
293
            evt, //
294
            [&](const event_command_available& e) {
5,768✔
295
                    CELERITY_DETAIL_TRACY_ZONE_SCOPED_V("scheduler::compile_command", MidnightBlue, "C{} compile", e.cmd->get_id());
296
                    CELERITY_DETAIL_TRACY_ZONE_TEXT("{}", print_command_type(*e.cmd));
297
                    iggen.compile(*e.cmd);
4,237✔
298
            },
4,237✔
299
            [&](const event_buffer_debug_name_changed& e) {
11,536✔
300
                    CELERITY_DETAIL_TRACY_ZONE_SCOPED_V("scheduler::buffer_name_changed", DarkGreen, "B{} set name", e.bid);
301
                    iggen.notify_buffer_debug_name_changed(e.bid, e.debug_name);
23✔
302
            },
23✔
303
            [&](const event_buffer_destroyed& e) {
11,536✔
304
                    CELERITY_DETAIL_TRACY_ZONE_SCOPED_V("scheduler::buffer_destroyed", DarkGreen, "B{} destroy", e.bid);
305
                    iggen.notify_buffer_destroyed(e.bid);
411✔
306
            },
411✔
307
            [&](const event_host_object_destroyed& e) {
11,536✔
308
                    CELERITY_DETAIL_TRACY_ZONE_SCOPED_V("scheduler::host_object_destroyed", DarkGreen, "H{} destroy", e.hoid);
309
                    iggen.notify_host_object_destroyed(e.hoid);
81✔
310
            },
81✔
311
            [&](const event_set_lookahead& e) {
11,536✔
312
                    // setting the lookahead must happen in the command queue, not task queue, to make sure all previous commands are flushed first
313
                    this->lookahead = e.lookahead;
14✔
314
            },
14✔
315
            [&](const event_flush_commands&) {
5,768✔
316
                    // no-op, but must still reside in command_queue to ensure a correct num_flushes_in_queue count
317
            });
1,002✔
318
}
5,768✔
319

320
void scheduler_impl::scheduling_loop() {
246✔
321
        while(!shutdown_epoch_reached) {
7,338✔
322
                process_task_queue_event(task_queue.wait_and_pop());
7,092✔
323
                while(command_queue.should_dequeue(lookahead)) {
12,860✔
324
                        process_command_queue_event(command_queue.pop());
5,768✔
325
                }
326
        }
327
        task_queue.assert_empty();
246✔
328
        command_queue.assert_empty();
246✔
329
}
246✔
330

331
void scheduler_impl::thread_main() {
246✔
332
        CELERITY_DETAIL_TRACY_SET_THREAD_NAME_AND_ORDER("cy-scheduler", tracy_detail::thread_order::scheduler)
333
        thread_pinning::pin_this_thread(thread_pinning::thread_type::scheduler); // TODO don't do this in benchmarks!
246✔
334
        try {
335
                scheduling_loop();
246✔
336
        }
337
        // LCOV_EXCL_START
338
        catch(const std::exception& e) {
339
                CELERITY_CRITICAL("[scheduler] {}", e.what());
340
                std::abort();
341
        }
342
        // LCOV_EXCL_STOP
343
}
246✔
344

345
} // namespace celerity::detail::scheduler_detail
346

347
using namespace celerity::detail::scheduler_detail;
348

349
namespace celerity::detail {
350

351
scheduler::scheduler(const size_t num_nodes, const node_id local_node_id, const system_info& system, delegate* const delegate, command_recorder* const crec,
246✔
352
    instruction_recorder* const irec, const policy_set& policy)
246✔
353
    : m_impl(std::make_unique<scheduler_impl>(true /* start_thread */, num_nodes, local_node_id, system, delegate, crec, irec, policy)) {}
246✔
354

355
scheduler::~scheduler() = default;
246✔
356

357
void scheduler::notify_task_created(const task* const tsk) { m_impl->task_queue.push(event_task_available{tsk}); }
3,620✔
358

359
void scheduler::notify_buffer_created(
411✔
360
    const buffer_id bid, const range<3>& range, const size_t elem_size, const size_t elem_align, const allocation_id user_allocation_id) {
361
        m_impl->task_queue.push(event_buffer_created{bid, range, elem_size, elem_align, user_allocation_id});
411✔
362
}
411✔
363

364
void scheduler::notify_buffer_debug_name_changed(const buffer_id bid, const std::string& name) {
23✔
365
        m_impl->task_queue.push(event_buffer_debug_name_changed{bid, name});
23✔
366
}
23✔
367

368
void scheduler::notify_buffer_destroyed(const buffer_id bid) { m_impl->task_queue.push(event_buffer_destroyed{bid}); }
411✔
369

370
void scheduler::notify_host_object_created(const host_object_id hoid, const bool owns_instance) {
81✔
371
        m_impl->task_queue.push(event_host_object_created{hoid, owns_instance});
81✔
372
}
81✔
373

374
void scheduler::notify_host_object_destroyed(const host_object_id hoid) { m_impl->task_queue.push(event_host_object_destroyed{hoid}); }
81✔
375

376
void scheduler::notify_epoch_reached(const task_id tid) { m_impl->task_queue.push(event_epoch_reached{tid}); }
1,371✔
377

378
void scheduler::set_lookahead(const experimental::lookahead lookahead) { m_impl->task_queue.push(event_set_lookahead{lookahead}); }
14✔
379

380
void scheduler::flush_commands() { m_impl->task_queue.push(event_flush_commands{}); }
1,002✔
381

382
// LCOV_EXCL_START - this is test instrumentation used only in benchmarks and not covered in unit tests
383

384
scheduler::scheduler(const test_threadless_tag /* tag */, const size_t num_nodes, const node_id local_node_id, const system_info& system,
385
    delegate* const delegate, command_recorder* const crec, instruction_recorder* const irec, const policy_set& policy)
386
    : m_impl(std::make_unique<scheduler_impl>(false /* start_thread */, num_nodes, local_node_id, system, delegate, crec, irec, policy)) {}
387

388
void scheduler::test_scheduling_loop() { m_impl->scheduling_loop(); }
389

390
// LCOV_EXCL_STOP
391

392
void scheduler::test_inspect(scheduler_detail::test_inspector inspector) { m_impl->task_queue.push(test_event_inspect{std::move(inspector)}); }
78✔
393

394
} // namespace celerity::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc