• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 12009876465

25 Nov 2024 12:19PM UTC coverage: 94.911%. Remained the same
12009876465

push

github

fknorr
[RM] fixup includes

3189 of 3626 branches covered (87.95%)

Branch coverage included in aggregate %.

7049 of 7161 relevant lines covered (98.44%)

1541661.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.07
/src/runtime.cc
1
#include "runtime.h"
2

3
#include "affinity.h"
4
#include "backend/sycl_backend.h"
5
#include "cgf_diagnostics.h"
6
#include "command_graph_generator.h"
7
#include "config.h"
8
#include "device_selection.h"
9
#include "dry_run_executor.h"
10
#include "executor.h"
11
#include "host_object.h"
12
#include "instruction_graph_generator.h"
13
#include "live_executor.h"
14
#include "log.h"
15
#include "print_graph.h"
16
#include "ranges.h"
17
#include "reduction.h"
18
#include "scheduler.h"
19
#include "system_info.h"
20
#include "task.h"
21
#include "task_manager.h"
22
#include "tracy.h"
23
#include "types.h"
24
#include "utils.h"
25
#include "version.h"
26

27
#include <atomic>
28
#include <cassert>
29
#include <chrono>
30
#include <cmath>
31
#include <cstddef>
32
#include <cstdint>
33
#include <cstdlib>
34
#include <cstring>
35
#include <future>
36
#include <limits>
37
#include <memory>
38
#include <optional>
39
#include <stdexcept>
40
#include <string>
41
#include <thread>
42
#include <utility>
43
#include <variant>
44
#include <vector>
45

46
#include <fmt/format.h>
47
#include <spdlog/spdlog.h>
48
#include <sycl/sycl.hpp>
49

50
#ifdef _MSC_VER
51
#include <process.h>
52
#else
53
#include <unistd.h>
54
#endif
55

56
#if CELERITY_USE_MIMALLOC
57
// override default new/delete operators to use the mimalloc memory allocator
58
#include <mimalloc-new-delete.h>
59
#endif
60

61
#if CELERITY_ENABLE_MPI
62
#include "mpi_communicator.h"
63
#include <mpi.h>
64
#else
65
#include "local_communicator.h"
66
#endif
67

68

69
namespace celerity {
70
namespace detail {
71

72
        class epoch_promise final : public task_promise {
73
          public:
74
                std::future<void> get_future() { return m_promise.get_future(); }
570✔
75

76
                void fulfill() override { m_promise.set_value(); }
570✔
77

78
                allocation_id get_user_allocation_id() override { utils::panic("epoch_promise::get_user_allocation_id"); }
×
79

80
          private:
81
                std::promise<void> m_promise;
82
        };
83

84
        std::unique_ptr<runtime> runtime::s_instance = nullptr;
85

86
        void runtime::mpi_initialize_once(int* argc, char*** argv) {
56✔
87
#if CELERITY_ENABLE_MPI
88
                CELERITY_DETAIL_TRACY_ZONE_SCOPED_V("mpi::init", LightSkyBlue, "MPI_Init");
89
                assert(!s_mpi_initialized);
56✔
90
                int provided;
56✔
91
                MPI_Init_thread(argc, argv, MPI_THREAD_MULTIPLE, &provided);
56✔
92
                assert(provided == MPI_THREAD_MULTIPLE);
56✔
93
#endif // CELERITY_ENABLE_MPI
94
                s_mpi_initialized = true;
56✔
95
        }
56✔
96

97
        void runtime::mpi_finalize_once() {
56✔
98
#if CELERITY_ENABLE_MPI
99
                CELERITY_DETAIL_TRACY_ZONE_SCOPED_V("mpi::finalize", LightSkyBlue, "MPI_Finalize");
100
                assert(s_mpi_initialized && !s_mpi_finalized && (!s_test_mode || !s_instance));
56✔
101
                MPI_Finalize();
56✔
102
#endif // CELERITY_ENABLE_MPI
103
                s_mpi_finalized = true;
56✔
104
        }
56✔
105

106
        void runtime::init(int* argc, char** argv[], const devices_or_selector& user_devices_or_selector) {
230✔
107
                assert(!s_instance);
230✔
108
                s_instance = std::unique_ptr<runtime>(new runtime(argc, argv, user_devices_or_selector));
230!
109
                if(!s_test_mode) { atexit(shutdown); }
230✔
110
        }
230✔
111

112
        runtime& runtime::get_instance() {
4,795✔
113
                if(s_instance == nullptr) { throw std::runtime_error("Runtime has not been initialized"); }
4,795!
114
                return *s_instance;
4,795✔
115
        }
116

117
        void runtime::shutdown() { s_instance.reset(); }
53✔
118

119
        static auto get_pid() {
230✔
120
#ifdef _MSC_VER
121
                return _getpid();
122
#else
123
                return getpid();
230✔
124
#endif
125
        }
126

127
        static std::string get_version_string() {
230✔
128
                using namespace celerity::version;
129
                return fmt::format("{}.{}.{} {}{}", major, minor, patch, git_revision, git_dirty ? "-dirty" : "");
460!
130
        }
131

132
        static const char* get_build_type() {
230✔
133
#if CELERITY_DETAIL_ENABLE_DEBUG
134
                return "debug";
230✔
135
#else
136
                return "release";
137
#endif
138
        }
139

140
        static const char* get_mimalloc_string() {
230✔
141
#if CELERITY_USE_MIMALLOC
142
                return "using mimalloc";
143
#else
144
                return "using the default allocator";
230✔
145
#endif
146
        }
147

148
        static std::string get_sycl_version() {
230✔
149
#if CELERITY_SYCL_IS_ACPP
150
                return fmt::format("AdaptiveCpp {}.{}.{}", HIPSYCL_VERSION_MAJOR, HIPSYCL_VERSION_MINOR, HIPSYCL_VERSION_PATCH);
151
#elif CELERITY_SYCL_IS_DPCPP
152
                return "DPC++ / Clang " __clang_version__;
153
#elif CELERITY_SYCL_IS_SIMSYCL
154
                return "SimSYCL " SIMSYCL_VERSION;
690✔
155
#else
156
#error "unknown SYCL implementation"
157
#endif
158
        }
159

160
        static std::string get_mpi_version() {
230✔
161
#if CELERITY_ENABLE_MPI
162
                char version[MPI_MAX_LIBRARY_VERSION_STRING];
230✔
163
                int len = -1;
230✔
164
                MPI_Get_library_version(version, &len);
230✔
165
                // try shortening the human-readable version string (so far tested on OpenMPI)
166
                if(const auto brk = /* find last of */ strpbrk(version, ",;")) { len = static_cast<int>(brk - version); }
230!
167
                return std::string(version, static_cast<size_t>(len));
920✔
168
#else
169
                return "single node";
170
#endif
171
        }
172

173
        static host_config get_mpi_host_config() {
224✔
174
#if CELERITY_ENABLE_MPI
175
                // Determine the "host config", i.e., how many nodes are spawned on this host,
176
                // and what this node's local rank is. We do this by finding all world-ranks
177
                // that can use a shared-memory transport (if running on OpenMPI, use the
178
                // per-host split instead).
179
#ifdef OPEN_MPI
180
#define SPLIT_TYPE OMPI_COMM_TYPE_HOST
181
#else
182
                // TODO: Assert that shared memory is available (i.e. not explicitly disabled)
183
#define SPLIT_TYPE MPI_COMM_TYPE_SHARED
184
#endif
185
                MPI_Comm host_comm = nullptr;
224✔
186
                MPI_Comm_split_type(MPI_COMM_WORLD, SPLIT_TYPE, 0, MPI_INFO_NULL, &host_comm);
224✔
187

188
                int local_rank = 0;
224✔
189
                MPI_Comm_rank(host_comm, &local_rank);
224✔
190

191
                int node_count = 0;
224✔
192
                MPI_Comm_size(host_comm, &node_count);
224✔
193

194
                host_config host_cfg;
224✔
195
                host_cfg.local_rank = local_rank;
224✔
196
                host_cfg.node_count = node_count;
224✔
197

198
                MPI_Comm_free(&host_comm);
224✔
199

200
                return host_cfg;
448✔
201
#else  // CELERITY_ENABLE_MPI
202
                return host_config{1, 0};
203
#endif // CELERITY_ENABLE_MPI
204
        }
205

206
        runtime::runtime(int* argc, char** argv[], const devices_or_selector& user_devices_or_selector) {
230✔
207
                m_application_thread = std::this_thread::get_id();
230✔
208

209
                m_cfg = std::make_unique<config>(argc, argv);
230✔
210

211
                CELERITY_DETAIL_IF_TRACY_SUPPORTED(tracy_detail::g_tracy_mode = m_cfg->get_tracy_mode());
212
                CELERITY_DETAIL_TRACY_ZONE_SCOPED("runtime::startup", DarkGray);
213

214
                if(s_test_mode) {
230✔
215
                        assert(s_test_active && "initializing the runtime from a test without a runtime_fixture");
187✔
216
                        s_test_runtime_was_instantiated = true;
187✔
217
                } else {
218
                        mpi_initialize_once(argc, argv);
43✔
219
                }
220

221
                int world_size = 1;
230✔
222
                int world_rank = 0;
230✔
223
#if CELERITY_ENABLE_MPI
224
                MPI_Comm_size(MPI_COMM_WORLD, &world_size);
230✔
225
                MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
230✔
226
#endif
227

228
                host_config host_cfg;
230✔
229
                if(m_cfg->is_dry_run()) {
230✔
230
                        if(world_size != 1) throw std::runtime_error("In order to run with CELERITY_DRY_RUN_NODES a single MPI process/rank must be used.");
6!
231
                        m_num_nodes = static_cast<size_t>(m_cfg->get_dry_run_nodes());
6✔
232
                        m_local_nid = 0;
6✔
233
                        host_cfg.node_count = 1;
6✔
234
                        host_cfg.local_rank = 0;
6✔
235
                } else {
236
                        m_num_nodes = static_cast<size_t>(world_size);
224✔
237
                        m_local_nid = static_cast<node_id>(world_rank);
224✔
238
                        host_cfg = get_mpi_host_config();
224✔
239
                }
240

241
                // Do not touch logger settings in tests, where the full (trace) logs are captured
242
                if(!s_test_mode) {
230✔
243
                        spdlog::set_level(m_cfg->get_log_level());
43✔
244
                        spdlog::set_pattern(fmt::format("[%Y-%m-%d %H:%M:%S.%e] [{:0{}}] [%^%l%$] %v", m_local_nid, int(ceil(log10(double(m_num_nodes))))));
86✔
245
                }
246

247
                CELERITY_INFO("Celerity runtime version {} running on {} / {}. PID = {}, build type = {}, {}", get_version_string(), get_sycl_version(),
230✔
248
                    get_mpi_version(), get_pid(), get_build_type(), get_mimalloc_string());
249

250
                if(!s_test_mode && m_cfg->get_tracy_mode() != tracy_mode::off) {
230!
251
                        if constexpr(CELERITY_TRACY_SUPPORT) {
252
                                CELERITY_WARN("Profiling with Tracy is enabled. Performance may be negatively impacted.");
253
                        } else {
254
                                CELERITY_WARN("CELERITY_TRACY is set, but Celerity was compiled without Tracy support. Ignoring.");
×
255
                        }
256
                }
257

258
                cgf_diagnostics::make_available();
230✔
259

260
                std::vector<sycl::device> devices;
230✔
261
                {
262
                        CELERITY_DETAIL_TRACY_ZONE_SCOPED("runtime::pick_devices", PaleVioletRed);
263
                        devices = std::visit([&](const auto& value) { return pick_devices(host_cfg, value, sycl::platform::get_platforms()); }, user_devices_or_selector);
460✔
264
                        assert(!devices.empty()); // postcondition of pick_devices
230✔
265
                }
266

267
                {
268
                        const auto& pin_cfg = m_cfg->get_thread_pinning_config();
230✔
269
                        const thread_pinning::runtime_configuration thread_pinning_cfg{
230✔
270
                            .enabled = pin_cfg.enabled,
230✔
271
                            .num_devices = static_cast<uint32_t>(devices.size()),
230✔
272
                            .use_backend_device_submission_threads = m_cfg->should_use_backend_device_submission_threads(),
230✔
273
                            .num_legacy_processes = static_cast<uint32_t>(host_cfg.node_count),
230✔
274
                            .legacy_process_index = static_cast<uint32_t>(host_cfg.local_rank),
230✔
275
                            .standard_core_start_id = pin_cfg.starting_from_core,
230✔
276
                            .hardcoded_core_ids = pin_cfg.hardcoded_core_ids,
230✔
277
                        };
460✔
278
                        m_thread_pinner = std::make_unique<thread_pinning::thread_pinner>(thread_pinning_cfg);
230✔
279
                        thread_pinning::pin_this_thread(thread_pinning::thread_type::application);
230✔
280
                }
230✔
281

282
                const sycl_backend::configuration backend_config = {
230✔
283
                    .per_device_submission_threads = m_cfg->should_use_backend_device_submission_threads(), .profiling = m_cfg->should_enable_device_profiling()};
230✔
284
                auto backend = make_sycl_backend(select_backend(sycl_backend_enumerator{}, devices), devices, backend_config);
230✔
285
                const auto system = backend->get_system_info(); // backend is about to be moved
230✔
286

287
                if(m_cfg->is_dry_run()) {
230✔
288
                        m_exec = std::make_unique<dry_run_executor>(static_cast<executor::delegate*>(this));
6✔
289
                } else {
290
#if CELERITY_ENABLE_MPI
291
                        auto comm = std::make_unique<mpi_communicator>(collective_clone_from, MPI_COMM_WORLD);
224✔
292
#else
293
                        auto comm = std::make_unique<local_communicator>();
294
#endif
295
                        m_exec = std::make_unique<live_executor>(std::move(backend), std::move(comm), static_cast<executor::delegate*>(this));
224✔
296
                }
224✔
297

298
                if(m_cfg->should_record()) {
230✔
299
                        m_task_recorder = std::make_unique<task_recorder>();
16✔
300
                        m_command_recorder = std::make_unique<command_recorder>();
16✔
301
                        m_instruction_recorder = std::make_unique<instruction_recorder>();
16✔
302
                }
303

304
                task_manager::policy_set task_mngr_policy;
230✔
305
                // Merely _declaring_ an uninitialized read is legitimate as long as the kernel does not actually perform the read at runtime - this might happen in the
306
                // first iteration of a submit-loop. We could get rid of this case by making access-modes a runtime property of accessors (cf
307
                // https://github.com/celerity/meta/issues/74).
308
                task_mngr_policy.uninitialized_read_error = CELERITY_ACCESS_PATTERN_DIAGNOSTICS ? error_policy::log_warning : error_policy::ignore;
230✔
309

310
                m_task_mngr = std::make_unique<task_manager>(m_num_nodes, m_tdag, m_task_recorder.get(), static_cast<task_manager::delegate*>(this), task_mngr_policy);
230✔
311
                if(m_cfg->get_horizon_step()) m_task_mngr->set_horizon_step(m_cfg->get_horizon_step().value());
230!
312
                if(m_cfg->get_horizon_max_parallelism()) m_task_mngr->set_horizon_max_parallelism(m_cfg->get_horizon_max_parallelism().value());
230!
313

314
                scheduler::policy_set schdlr_policy;
230✔
315
                // Any uninitialized read that is observed on CDAG generation was already logged on task generation, unless we have a bug.
316
                schdlr_policy.command_graph_generator.uninitialized_read_error = error_policy::ignore;
230✔
317
                schdlr_policy.instruction_graph_generator.uninitialized_read_error = error_policy::ignore;
230✔
318
                schdlr_policy.command_graph_generator.overlapping_write_error = CELERITY_ACCESS_PATTERN_DIAGNOSTICS ? error_policy::log_error : error_policy::ignore;
230✔
319
                schdlr_policy.instruction_graph_generator.overlapping_write_error =
230✔
320
                    CELERITY_ACCESS_PATTERN_DIAGNOSTICS ? error_policy::log_error : error_policy::ignore;
321
                schdlr_policy.instruction_graph_generator.unsafe_oversubscription_error = error_policy::log_warning;
230✔
322

323
                // The scheduler references tasks by pointer, so we make sure its lifetime is shorter than the task_manager's.
324
                m_schdlr = std::make_unique<scheduler>(
690✔
325
                    m_num_nodes, m_local_nid, system, static_cast<scheduler::delegate*>(this), m_command_recorder.get(), m_instruction_recorder.get(), schdlr_policy);
460✔
326
                if(m_cfg->get_lookahead() != experimental::lookahead::automatic) { m_schdlr->set_lookahead(m_cfg->get_lookahead()); }
230✔
327

328
                // task_manager will pass generated tasks through its delegate, so generate the init epoch only after the scheduler has been initialized
329
                m_task_mngr->generate_epoch_task(epoch_action::init);
230✔
330

331
                m_num_local_devices = system.devices.size();
230✔
332
        }
460✔
333

334
        void runtime::require_call_from_application_thread() const {
7,692✔
335
                if(std::this_thread::get_id() != m_application_thread) {
7,692✔
336
                        utils::panic("Celerity runtime, queue, handler, buffer and host_object types must only be constructed, used, and destroyed from the "
40✔
337
                                     "application thread. Make sure that you did not accidentally capture one of these types in a host_task.");
338
                }
339
        }
7,682✔
340

341
        runtime::~runtime() {
230✔
342
                // LCOV_EXCL_START
343
                if(!is_unreferenced()) {
344
                        // this call might originate from static destruction - we cannot assume spdlog to still be around
345
                        utils::panic("Detected an attempt to destroy runtime while at least one queue, buffer or host_object was still alive. This likely means "
346
                                     "that one of these objects was leaked, or at least its lifetime extended beyond the scope of main(). This is undefined.");
347
                }
348
                // LCOV_EXCL_STOP
349

350
                require_call_from_application_thread();
230✔
351

352
                CELERITY_DETAIL_TRACY_ZONE_SCOPED("runtime::shutdown", DimGray);
353

354
                // Create and await the shutdown epoch
355
                sync(epoch_action::shutdown);
230✔
356

357
                // The shutdown epoch is, by definition, the last task (and command / instruction) issued. Since it has now completed, no more scheduler -> executor
358
                // traffic will occur, and `runtime` can stop functioning as a scheduler_delegate (which would require m_exec to be live).
359
                m_exec.reset();
230✔
360

361
                // task_manager references the scheduler as its delegate, so we destroy it first.
362
                m_task_mngr.reset();
230✔
363

364
                // ~executor() joins its thread after notifying the scheduler that the shutdown epoch has been reached, which means that this notification is
365
                // sequenced-before the destructor return, and `runtime` can now stop functioning as an executor_delegate (which would require m_schdlr to be live).
366
                m_schdlr.reset();
230✔
367

368
                // With scheduler and executor threads gone, all recorders can be safely accessed from the runtime / application thread
369
                if(spdlog::should_log(log_level::info) && m_cfg->should_print_graphs()) {
230!
370
                        if(m_local_nid == 0) { // It's the same across all nodes
16✔
371
                                assert(m_task_recorder.get() != nullptr);
8✔
372
                                const auto tdag_str = detail::print_task_graph(*m_task_recorder);
24✔
373
                                CELERITY_INFO("Task graph:\n\n{}\n", tdag_str);
8!
374
                        }
8✔
375

376
                        assert(m_command_recorder.get() != nullptr);
16✔
377
                        auto cdag_str = print_command_graph(m_local_nid, *m_command_recorder);
48✔
378
                        if(!is_dry_run()) { cdag_str = gather_command_graph(cdag_str, m_num_nodes, m_local_nid); } // must be called on all nodes
16!
379

380
                        if(m_local_nid == 0) {
16✔
381
                                // Avoid racing on stdout with other nodes (funneled through mpirun)
382
                                if(!is_dry_run()) { std::this_thread::sleep_for(std::chrono::milliseconds(500)); }
8!
383
                                CELERITY_INFO("Command graph:\n\n{}\n", cdag_str);
8!
384
                        }
385

386
                        // IDAGs become unreadable when all nodes print them at the same time - TODO attempt gathering them as well?
387
                        if(m_local_nid == 0) {
16✔
388
                                // we are allowed to deref m_instruction_recorder / m_command_recorder because the scheduler thread has exited at this point
389
                                const auto idag_str = detail::print_instruction_graph(*m_instruction_recorder, *m_command_recorder, *m_task_recorder);
24✔
390
                                CELERITY_INFO("Instruction graph on node 0:\n\n{}\n", idag_str);
8!
391
                        }
8✔
392
                }
16✔
393

394
                m_instruction_recorder.reset();
230✔
395
                m_command_recorder.reset();
230✔
396
                m_task_recorder.reset();
230✔
397

398
                cgf_diagnostics::teardown();
230✔
399

400
                if(!s_test_mode) { mpi_finalize_once(); }
230✔
401
        }
230✔
402

403
        task_id runtime::fence(buffer_access_map access_map, side_effect_map side_effects, std::unique_ptr<task_promise> fence_promise) {
69✔
404
                require_call_from_application_thread();
69✔
405
                maybe_prune_task_graph();
68✔
406
                return m_task_mngr->generate_fence_task(std::move(access_map), std::move(side_effects), std::move(fence_promise));
68✔
407
        }
408

409
        task_id runtime::sync(epoch_action action) {
571✔
410
                require_call_from_application_thread();
571✔
411

412
                maybe_prune_task_graph();
570✔
413
                auto promise = std::make_unique<epoch_promise>();
570✔
414
                const auto future = promise->get_future();
570✔
415
                const auto epoch = m_task_mngr->generate_epoch_task(action, std::move(promise));
570✔
416
                future.wait();
570✔
417
                return epoch;
570✔
418
        }
570✔
419

420
        void runtime::maybe_prune_task_graph() {
2,523✔
421
                require_call_from_application_thread();
2,523✔
422

423
                const auto current_epoch = m_latest_epoch_reached.load(std::memory_order_relaxed);
2,523✔
424
                if(current_epoch > m_last_epoch_pruned_before) {
2,523✔
425
                        m_tdag.erase_before_epoch(current_epoch);
840✔
426
                        m_last_epoch_pruned_before = current_epoch;
840✔
427
                }
428
        }
2,523✔
429

430
        std::string gather_command_graph(const std::string& graph_str, const size_t num_nodes, const node_id local_nid) {
18✔
431
#if CELERITY_ENABLE_MPI
432
                const auto comm = MPI_COMM_WORLD;
18✔
433
                const int tag = 0xCDA6; // aka 'CDAG' - Celerity does not perform any other peer-to-peer communication over MPI_COMM_WORLD
18✔
434

435
                // Send local graph to rank 0 on all other nodes
436
                if(local_nid != 0) {
18✔
437
                        const uint64_t usize = graph_str.size();
9✔
438
                        assert(usize < std::numeric_limits<int32_t>::max());
9✔
439
                        const int32_t size = static_cast<int32_t>(usize);
9✔
440
                        MPI_Send(&size, 1, MPI_INT32_T, 0, tag, comm);
9✔
441
                        if(size > 0) MPI_Send(graph_str.data(), static_cast<int32_t>(size), MPI_BYTE, 0, tag, comm);
9!
442
                        return "";
27✔
443
                }
444
                // On node 0, receive and combine
445
                std::vector<std::string> graphs;
9✔
446
                graphs.push_back(graph_str);
9✔
447
                for(node_id peer = 1; peer < num_nodes; ++peer) {
18✔
448
                        int32_t size = 0;
9✔
449
                        MPI_Recv(&size, 1, MPI_INT32_T, static_cast<int>(peer), tag, comm, MPI_STATUS_IGNORE);
9✔
450
                        if(size > 0) {
9!
451
                                std::string graph;
9✔
452
                                graph.resize(size);
9✔
453
                                MPI_Recv(graph.data(), size, MPI_BYTE, static_cast<int>(peer), tag, comm, MPI_STATUS_IGNORE);
9✔
454
                                graphs.push_back(std::move(graph));
9✔
455
                        }
9✔
456
                }
457
                return combine_command_graphs(graphs);
27✔
458
#else  // CELERITY_ENABLE_MPI
459
                assert(num_nodes == 1 && local_nid == 0);
460
                return graph_str;
461
#endif // CELERITY_ENABLE_MPI
462
        }
9✔
463

464
        // task_manager::delegate
465

466
        void runtime::task_created(const task* tsk) {
3,324✔
467
                assert(m_schdlr != nullptr);
3,324✔
468
                m_schdlr->notify_task_created(tsk);
3,324✔
469
        }
3,324✔
470

471
        // scheduler::delegate
472

473
        void runtime::flush(std::vector<const instruction*> instructions, std::vector<outbound_pilot> pilots) {
3,960✔
474
                // thread-safe
475
                assert(m_exec != nullptr);
3,960✔
476
                m_exec->submit(std::move(instructions), std::move(pilots));
3,960✔
477
        }
3,960✔
478

479
        // executor::delegate
480

481
        void runtime::horizon_reached(const task_id horizon_tid) {
582✔
482
                assert(!m_latest_horizon_reached || *m_latest_horizon_reached < horizon_tid);
582✔
483
                assert(m_latest_epoch_reached.load(std::memory_order::relaxed) < horizon_tid); // relaxed: written only by this thread
1,164✔
484

485
                if(m_latest_horizon_reached.has_value()) {
582✔
486
                        m_latest_epoch_reached.store(*m_latest_horizon_reached, std::memory_order_relaxed);
553✔
487
                        m_schdlr->notify_epoch_reached(*m_latest_horizon_reached);
553✔
488
                }
489
                m_latest_horizon_reached = horizon_tid;
582✔
490
        }
582✔
491

492
        void runtime::epoch_reached(const task_id epoch_tid) {
800✔
493
                // m_latest_horizon_reached does not need synchronization (see definition), all other accesses are implicitly synchronized.
494
                assert(!m_latest_horizon_reached || *m_latest_horizon_reached < epoch_tid);
800✔
495
                assert(epoch_tid == 0 || m_latest_epoch_reached.load(std::memory_order_relaxed) < epoch_tid);
1,370✔
496

497
                m_latest_epoch_reached.store(epoch_tid, std::memory_order_relaxed);
800✔
498
                m_schdlr->notify_epoch_reached(epoch_tid);
800✔
499
                m_latest_horizon_reached = std::nullopt; // Any non-applied horizon is now behind the epoch and will therefore never become an epoch itself
800✔
500
        }
800✔
501

502
        void runtime::create_queue() {
224✔
503
                require_call_from_application_thread();
224✔
504
                ++m_num_live_queues;
223✔
505
        }
223✔
506

507
        void runtime::destroy_queue() {
224✔
508
                require_call_from_application_thread();
224✔
509

510
                assert(m_num_live_queues > 0);
223✔
511
                --m_num_live_queues;
223✔
512
        }
223✔
513

514
        allocation_id runtime::create_user_allocation(void* const ptr) {
113✔
515
                require_call_from_application_thread();
113✔
516
                const auto aid = allocation_id(user_memory_id, m_next_user_allocation_id++);
112✔
517
                m_exec->track_user_allocation(aid, ptr);
112✔
518
                return aid;
112✔
519
        }
520

521
        buffer_id runtime::create_buffer(const range<3>& range, const size_t elem_size, const size_t elem_align, const allocation_id user_aid) {
346✔
522
                require_call_from_application_thread();
346✔
523

524
                const auto bid = m_next_buffer_id++;
345✔
525
                m_live_buffers.emplace(bid);
345✔
526
                m_task_mngr->notify_buffer_created(bid, range, user_aid != null_allocation_id);
345✔
527
                m_schdlr->notify_buffer_created(bid, range, elem_size, elem_align, user_aid);
345✔
528
                return bid;
690✔
529
        }
530

531
        void runtime::set_buffer_debug_name(const buffer_id bid, const std::string& debug_name) {
23✔
532
                require_call_from_application_thread();
23✔
533

534
                assert(utils::contains(m_live_buffers, bid));
23✔
535
                m_task_mngr->notify_buffer_debug_name_changed(bid, debug_name);
23✔
536
                m_schdlr->notify_buffer_debug_name_changed(bid, debug_name);
23✔
537
        }
23✔
538

539
        void runtime::destroy_buffer(const buffer_id bid) {
346✔
540
                require_call_from_application_thread();
346✔
541

542
                assert(utils::contains(m_live_buffers, bid));
345✔
543
                m_schdlr->notify_buffer_destroyed(bid);
345✔
544
                m_task_mngr->notify_buffer_destroyed(bid);
345✔
545
                m_live_buffers.erase(bid);
345✔
546
        }
345✔
547

548
        host_object_id runtime::create_host_object(std::unique_ptr<host_object_instance> instance) {
34✔
549
                require_call_from_application_thread();
34✔
550

551
                const auto hoid = m_next_host_object_id++;
33✔
552
                m_live_host_objects.emplace(hoid);
33✔
553
                const bool owns_instance = instance != nullptr;
33✔
554
                if(owns_instance) { m_exec->track_host_object_instance(hoid, std::move(instance)); }
33✔
555
                m_task_mngr->notify_host_object_created(hoid);
33✔
556
                m_schdlr->notify_host_object_created(hoid, owns_instance);
33✔
557
                return hoid;
66✔
558
        }
559

560
        void runtime::destroy_host_object(const host_object_id hoid) {
34✔
561
                require_call_from_application_thread();
34✔
562

563
                assert(utils::contains(m_live_host_objects, hoid));
33✔
564
                m_schdlr->notify_host_object_destroyed(hoid);
33✔
565
                m_task_mngr->notify_host_object_destroyed(hoid);
33✔
566
                m_live_host_objects.erase(hoid);
33✔
567
        }
33✔
568

569
        reduction_id runtime::create_reduction(std::unique_ptr<reducer> reducer) {
65✔
570
                require_call_from_application_thread();
65✔
571

572
                const auto rid = m_next_reduction_id++;
65✔
573
                m_exec->track_reducer(rid, std::move(reducer));
65✔
574
                return rid;
130✔
575
        }
576

577
        void runtime::set_scheduler_lookahead(const experimental::lookahead lookahead) {
4✔
578
                require_call_from_application_thread();
4✔
579
                m_schdlr->set_lookahead(lookahead);
4✔
580
        }
4✔
581

582
        void runtime::flush_scheduler() {
1,000✔
583
                require_call_from_application_thread();
1,000✔
584
                m_schdlr->flush_commands();
1,000✔
585
        }
1,000✔
586

587
        bool runtime::is_unreferenced() const { return m_num_live_queues == 0 && m_live_buffers.empty() && m_live_host_objects.empty(); }
230!
588

589
} // namespace detail
590
} // namespace celerity
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc