• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 11609936308

31 Oct 2024 10:08AM UTC coverage: 95.249% (+0.05%) from 95.198%
11609936308

push

github

fknorr
Update benchmark results for new TDAG structure

3034 of 3420 branches covered (88.71%)

Branch coverage included in aggregate %.

6730 of 6831 relevant lines covered (98.52%)

1524046.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.94
/src/runtime.cc
1
#include "runtime.h"
2

3
#include <limits>
4
#include <string>
5

6
#ifdef _MSC_VER
7
#include <process.h>
8
#else
9
#include <unistd.h>
10
#endif
11

12
#if CELERITY_USE_MIMALLOC
13
// override default new/delete operators to use the mimalloc memory allocator
14
#include <mimalloc-new-delete.h>
15
#endif
16

17
#include "affinity.h"
18
#include "backend/sycl_backend.h"
19
#include "cgf_diagnostics.h"
20
#include "command_graph_generator.h"
21
#include "device_selection.h"
22
#include "dry_run_executor.h"
23
#include "host_object.h"
24
#include "instruction_graph_generator.h"
25
#include "live_executor.h"
26
#include "log.h"
27
#include "print_graph.h"
28
#include "reduction.h"
29
#include "scheduler.h"
30
#include "system_info.h"
31
#include "task_manager.h"
32
#include "tracy.h"
33
#include "types.h"
34
#include "version.h"
35

36
#if CELERITY_ENABLE_MPI
37
#include "mpi_communicator.h"
38
#include <mpi.h>
39
#else
40
#include "local_communicator.h"
41
#endif
42

43

44
namespace celerity {
45
namespace detail {
46

47
        std::unique_ptr<runtime> runtime::s_instance = nullptr;
48

49
        void runtime::mpi_initialize_once(int* argc, char*** argv) {
56✔
50
#if CELERITY_ENABLE_MPI
51
                CELERITY_DETAIL_TRACY_ZONE_SCOPED_V("mpi::init", LightSkyBlue, "MPI_Init");
52
                assert(!s_mpi_initialized);
56✔
53
                int provided;
56✔
54
                MPI_Init_thread(argc, argv, MPI_THREAD_MULTIPLE, &provided);
56✔
55
                assert(provided == MPI_THREAD_MULTIPLE);
56✔
56
#endif // CELERITY_ENABLE_MPI
57
                s_mpi_initialized = true;
56✔
58
        }
56✔
59

60
        void runtime::mpi_finalize_once() {
56✔
61
#if CELERITY_ENABLE_MPI
62
                CELERITY_DETAIL_TRACY_ZONE_SCOPED_V("mpi::finalize", LightSkyBlue, "MPI_Finalize");
63
                assert(s_mpi_initialized && !s_mpi_finalized && (!s_test_mode || !s_instance));
56✔
64
                MPI_Finalize();
56✔
65
#endif // CELERITY_ENABLE_MPI
66
                s_mpi_finalized = true;
56✔
67
        }
56✔
68

69
        void runtime::init(int* argc, char** argv[], const devices_or_selector& user_devices_or_selector) {
219✔
70
                assert(!s_instance);
219✔
71
                s_instance = std::unique_ptr<runtime>(new runtime(argc, argv, user_devices_or_selector));
219!
72
                if(!s_test_mode) { atexit(shutdown); }
219✔
73
        }
219✔
74

75
        runtime& runtime::get_instance() {
3,680✔
76
                if(s_instance == nullptr) { throw std::runtime_error("Runtime has not been initialized"); }
3,680!
77
                return *s_instance;
3,680✔
78
        }
79

80
        void runtime::shutdown() { s_instance.reset(); }
49✔
81

82
        static auto get_pid() {
219✔
83
#ifdef _MSC_VER
84
                return _getpid();
85
#else
86
                return getpid();
219✔
87
#endif
88
        }
89

90
        static std::string get_version_string() {
219✔
91
                using namespace celerity::version;
92
                return fmt::format("{}.{}.{} {}{}", major, minor, patch, git_revision, git_dirty ? "-dirty" : "");
438!
93
        }
94

95
        static const char* get_build_type() {
219✔
96
#if CELERITY_DETAIL_ENABLE_DEBUG
97
                return "debug";
219✔
98
#else
99
                return "release";
100
#endif
101
        }
102

103
        static const char* get_mimalloc_string() {
219✔
104
#if CELERITY_USE_MIMALLOC
105
                return "using mimalloc";
106
#else
107
                return "using the default allocator";
219✔
108
#endif
109
        }
110

111
        static std::string get_sycl_version() {
219✔
112
#if CELERITY_SYCL_IS_ACPP
113
                return fmt::format("AdaptiveCpp {}.{}.{}", HIPSYCL_VERSION_MAJOR, HIPSYCL_VERSION_MINOR, HIPSYCL_VERSION_PATCH);
114
#elif CELERITY_SYCL_IS_DPCPP
115
                return "DPC++ / Clang " __clang_version__;
116
#elif CELERITY_SYCL_IS_SIMSYCL
117
                return "SimSYCL " SIMSYCL_VERSION;
657✔
118
#else
119
#error "unknown SYCL implementation"
120
#endif
121
        }
122

123
        static std::string get_mpi_version() {
219✔
124
#if CELERITY_ENABLE_MPI
125
                char version[MPI_MAX_LIBRARY_VERSION_STRING];
219✔
126
                int len = -1;
219✔
127
                MPI_Get_library_version(version, &len);
219✔
128
                // try shortening the human-readable version string (so far tested on OpenMPI)
129
                if(const auto brk = /* find last of */ strpbrk(version, ",;")) { len = static_cast<int>(brk - version); }
219!
130
                return std::string(version, static_cast<size_t>(len));
876✔
131
#else
132
                return "single node";
133
#endif
134
        }
135

136
        static host_config get_mpi_host_config() {
213✔
137
#if CELERITY_ENABLE_MPI
138
                // Determine the "host config", i.e., how many nodes are spawned on this host,
139
                // and what this node's local rank is. We do this by finding all world-ranks
140
                // that can use a shared-memory transport (if running on OpenMPI, use the
141
                // per-host split instead).
142
#ifdef OPEN_MPI
143
#define SPLIT_TYPE OMPI_COMM_TYPE_HOST
144
#else
145
                // TODO: Assert that shared memory is available (i.e. not explicitly disabled)
146
#define SPLIT_TYPE MPI_COMM_TYPE_SHARED
147
#endif
148
                MPI_Comm host_comm = nullptr;
213✔
149
                MPI_Comm_split_type(MPI_COMM_WORLD, SPLIT_TYPE, 0, MPI_INFO_NULL, &host_comm);
213✔
150

151
                int local_rank = 0;
213✔
152
                MPI_Comm_rank(host_comm, &local_rank);
213✔
153

154
                int node_count = 0;
213✔
155
                MPI_Comm_size(host_comm, &node_count);
213✔
156

157
                host_config host_cfg;
213✔
158
                host_cfg.local_rank = local_rank;
213✔
159
                host_cfg.node_count = node_count;
213✔
160

161
                MPI_Comm_free(&host_comm);
213✔
162

163
                return host_cfg;
426✔
164
#else  // CELERITY_ENABLE_MPI
165
                return host_config{1, 0};
166
#endif // CELERITY_ENABLE_MPI
167
        }
168

169
        runtime::runtime(int* argc, char** argv[], const devices_or_selector& user_devices_or_selector) {
219✔
170
                m_application_thread = std::this_thread::get_id();
219✔
171

172
                m_cfg = std::make_unique<config>(argc, argv);
219✔
173

174
                CELERITY_DETAIL_IF_TRACY_SUPPORTED(tracy_detail::g_tracy_mode = m_cfg->get_tracy_mode());
175
                CELERITY_DETAIL_TRACY_ZONE_SCOPED("runtime::startup", DarkGray);
176

177
                if(s_test_mode) {
219✔
178
                        assert(s_test_active && "initializing the runtime from a test without a runtime_fixture");
176✔
179
                        s_test_runtime_was_instantiated = true;
176✔
180
                } else {
181
                        mpi_initialize_once(argc, argv);
43✔
182
                }
183

184
                int world_size = 1;
219✔
185
                int world_rank = 0;
219✔
186
#if CELERITY_ENABLE_MPI
187
                MPI_Comm_size(MPI_COMM_WORLD, &world_size);
219✔
188
                MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
219✔
189
#endif
190

191
                host_config host_cfg;
219✔
192
                if(m_cfg->is_dry_run()) {
219✔
193
                        if(world_size != 1) throw std::runtime_error("In order to run with CELERITY_DRY_RUN_NODES a single MPI process/rank must be used.");
6!
194
                        m_num_nodes = static_cast<size_t>(m_cfg->get_dry_run_nodes());
6✔
195
                        m_local_nid = 0;
6✔
196
                        host_cfg.node_count = 1;
6✔
197
                        host_cfg.local_rank = 0;
6✔
198
                } else {
199
                        m_num_nodes = static_cast<size_t>(world_size);
213✔
200
                        m_local_nid = static_cast<node_id>(world_rank);
213✔
201
                        host_cfg = get_mpi_host_config();
213✔
202
                }
203

204
                // Do not touch logger settings in tests, where the full (trace) logs are captured
205
                if(!s_test_mode) {
219✔
206
                        spdlog::set_level(m_cfg->get_log_level());
43✔
207
                        spdlog::set_pattern(fmt::format("[%Y-%m-%d %H:%M:%S.%e] [{:0{}}] [%^%l%$] %v", m_local_nid, int(ceil(log10(double(m_num_nodes))))));
86✔
208
                }
209

210
                CELERITY_INFO("Celerity runtime version {} running on {} / {}. PID = {}, build type = {}, {}", get_version_string(), get_sycl_version(),
219✔
211
                    get_mpi_version(), get_pid(), get_build_type(), get_mimalloc_string());
212

213
#ifndef __APPLE__
214
                if(const uint32_t cores = affinity_cores_available(); cores < min_cores_needed) {
219!
215
                        CELERITY_WARN("Celerity has detected that only {} logical cores are available to this process. It is recommended to assign at least {} "
×
216
                                      "logical cores. Performance may be negatively impacted.",
217
                            cores, min_cores_needed);
218
                }
219
#endif
220

221
                if(!s_test_mode && m_cfg->get_tracy_mode() != tracy_mode::off) {
219!
222
                        if constexpr(CELERITY_TRACY_SUPPORT) {
223
                                CELERITY_WARN("Profiling with Tracy is enabled. Performance may be negatively impacted.");
224
                        } else {
225
                                CELERITY_WARN("CELERITY_TRACY is set, but Celerity was compiled without Tracy support. Ignoring.");
×
226
                        }
227
                }
228

229
                cgf_diagnostics::make_available();
219✔
230

231
                std::vector<sycl::device> devices;
219✔
232
                {
233
                        CELERITY_DETAIL_TRACY_ZONE_SCOPED("runtime::pick_devices", PaleVioletRed);
234
                        devices = std::visit([&](const auto& value) { return pick_devices(host_cfg, value, sycl::platform::get_platforms()); }, user_devices_or_selector);
438✔
235
                        assert(!devices.empty()); // postcondition of pick_devices
219✔
236
                }
237

238
                auto backend = make_sycl_backend(select_backend(sycl_backend_enumerator{}, devices), devices, m_cfg->should_enable_device_profiling());
219✔
239
                const auto system = backend->get_system_info(); // backend is about to be moved
219✔
240

241
                if(m_cfg->is_dry_run()) {
219✔
242
                        m_exec = std::make_unique<dry_run_executor>(static_cast<executor::delegate*>(this));
6✔
243
                } else {
244
#if CELERITY_ENABLE_MPI
245
                        auto comm = std::make_unique<mpi_communicator>(collective_clone_from, MPI_COMM_WORLD);
213✔
246
#else
247
                        auto comm = std::make_unique<local_communicator>();
248
#endif
249
                        m_exec = std::make_unique<live_executor>(std::move(backend), std::move(comm), static_cast<executor::delegate*>(this));
213✔
250
                }
213✔
251

252
                if(m_cfg->should_record()) {
219✔
253
                        m_task_recorder = std::make_unique<task_recorder>();
16✔
254
                        m_command_recorder = std::make_unique<command_recorder>();
16✔
255
                        m_instruction_recorder = std::make_unique<instruction_recorder>();
16✔
256
                }
257

258
                task_manager::policy_set task_mngr_policy;
219✔
259
                // Merely _declaring_ an uninitialized read is legitimate as long as the kernel does not actually perform the read at runtime - this might happen in the
260
                // first iteration of a submit-loop. We could get rid of this case by making access-modes a runtime property of accessors (cf
261
                // https://github.com/celerity/meta/issues/74).
262
                task_mngr_policy.uninitialized_read_error = CELERITY_ACCESS_PATTERN_DIAGNOSTICS ? error_policy::log_warning : error_policy::ignore;
219✔
263

264
                m_task_mngr = std::make_unique<task_manager>(m_num_nodes, m_task_recorder.get(), static_cast<task_manager::delegate*>(this), task_mngr_policy);
219✔
265
                if(m_cfg->get_horizon_step()) m_task_mngr->set_horizon_step(m_cfg->get_horizon_step().value());
219!
266
                if(m_cfg->get_horizon_max_parallelism()) m_task_mngr->set_horizon_max_parallelism(m_cfg->get_horizon_max_parallelism().value());
219!
267

268
                scheduler::policy_set schdlr_policy;
219✔
269
                // Any uninitialized read that is observed on CDAG generation was already logged on task generation, unless we have a bug.
270
                schdlr_policy.command_graph_generator.uninitialized_read_error = error_policy::ignore;
219✔
271
                schdlr_policy.instruction_graph_generator.uninitialized_read_error = error_policy::ignore;
219✔
272
                schdlr_policy.command_graph_generator.overlapping_write_error = CELERITY_ACCESS_PATTERN_DIAGNOSTICS ? error_policy::log_error : error_policy::ignore;
219✔
273
                schdlr_policy.instruction_graph_generator.overlapping_write_error =
219✔
274
                    CELERITY_ACCESS_PATTERN_DIAGNOSTICS ? error_policy::log_error : error_policy::ignore;
275
                schdlr_policy.instruction_graph_generator.unsafe_oversubscription_error = error_policy::log_warning;
219✔
276

277
                // The scheduler references tasks by pointer, so we make sure its lifetime is shorter than the task_manager's.
278
                m_schdlr = std::make_unique<scheduler>(m_num_nodes, m_local_nid, system, static_cast<abstract_scheduler::delegate*>(this), m_command_recorder.get(),
876✔
279
                    m_instruction_recorder.get(), schdlr_policy);
657✔
280

281
                // task_manager will pass generated tasks through its delegate, so generate the init epoch only after the scheduler has been initialized
282
                m_task_mngr->generate_epoch_task(epoch_action::init);
219✔
283

284
                m_num_local_devices = system.devices.size();
219✔
285
        }
438✔
286

287
        void runtime::require_call_from_application_thread() const {
4,043✔
288
                if(std::this_thread::get_id() != m_application_thread) {
4,043✔
289
                        utils::panic("Celerity runtime, queue, handler, buffer and host_object types must only be constructed, used, and destroyed from the "
40✔
290
                                     "application thread. Make sure that you did not accidentally capture one of these types in a host_task.");
291
                }
292
        }
4,033✔
293

294
        runtime::~runtime() {
219✔
295
                // LCOV_EXCL_START
296
                if(!is_unreferenced()) {
297
                        // this call might originate from static destruction - we cannot assume spdlog to still be around
298
                        utils::panic("Detected an attempt to destroy runtime while at least one queue, buffer or host_object was still alive. This likely means "
299
                                     "that one of these objects was leaked, or at least its lifetime extended beyond the scope of main(). This is undefined.");
300
                }
301
                // LCOV_EXCL_STOP
302

303
                require_call_from_application_thread();
219✔
304

305
                CELERITY_DETAIL_TRACY_ZONE_SCOPED("runtime::shutdown", DimGray);
306

307
                // Create and await the shutdown epoch
308
                sync(epoch_action::shutdown);
219✔
309

310
                // The shutdown epoch is, by definition, the last task (and command / instruction) issued. Since it has now completed, no more scheduler -> executor
311
                // traffic will occur, and `runtime` can stop functioning as a scheduler_delegate (which would require m_exec to be live).
312
                m_exec.reset();
219✔
313

314
                // ~executor() joins its thread after notifying the scheduler that the shutdown epoch has been reached, which means that this notification is
315
                // sequenced-before the destructor return, and `runtime` can now stop functioning as an executor_delegate (which would require m_schdlr to be live).
316
                // m_schdlr references the task instances managed m_task_mngr, so we destroy it first. m_task_mngr uses m_schdlr as a delegate, but does not call to the
317
                // delegate from its destructor.
318
                m_schdlr.reset();
219✔
319

320
                // Since the executor thread is gone, task_manager::epoch_monitor will not be accessed by horizon_reached / epoch_reached across threads anymore.
321
                m_task_mngr.reset();
219✔
322

323
                // With scheduler and executor threads gone, all recorders can be safely accessed from the runtime / application thread
324
                if(spdlog::should_log(log_level::info) && m_cfg->should_print_graphs()) {
219!
325
                        if(m_local_nid == 0) { // It's the same across all nodes
16✔
326
                                assert(m_task_recorder.get() != nullptr);
8✔
327
                                const auto tdag_str = detail::print_task_graph(*m_task_recorder);
24✔
328
                                CELERITY_INFO("Task graph:\n\n{}\n", tdag_str);
8!
329
                        }
8✔
330

331
                        assert(m_command_recorder.get() != nullptr);
16✔
332
                        auto cdag_str = print_command_graph(m_local_nid, *m_command_recorder);
48✔
333
                        if(!is_dry_run()) { cdag_str = gather_command_graph(cdag_str, m_num_nodes, m_local_nid); } // must be called on all nodes
16!
334

335
                        if(m_local_nid == 0) {
16✔
336
                                // Avoid racing on stdout with other nodes (funneled through mpirun)
337
                                if(!is_dry_run()) { std::this_thread::sleep_for(std::chrono::milliseconds(500)); }
8!
338
                                CELERITY_INFO("Command graph:\n\n{}\n", cdag_str);
8!
339
                        }
340

341
                        // IDAGs become unreadable when all nodes print them at the same time - TODO attempt gathering them as well?
342
                        if(m_local_nid == 0) {
16✔
343
                                // we are allowed to deref m_instruction_recorder / m_command_recorder because the scheduler thread has exited at this point
344
                                const auto idag_str = detail::print_instruction_graph(*m_instruction_recorder, *m_command_recorder, *m_task_recorder);
24✔
345
                                CELERITY_INFO("Instruction graph on node 0:\n\n{}\n", idag_str);
8!
346
                        }
8✔
347
                }
16✔
348

349
                m_instruction_recorder.reset();
219✔
350
                m_command_recorder.reset();
219✔
351
                m_task_recorder.reset();
219✔
352

353
                cgf_diagnostics::teardown();
219✔
354

355
                if(!s_test_mode) { mpi_finalize_once(); }
219✔
356
        }
219✔
357

358
        task_id runtime::sync(epoch_action action) {
551✔
359
                require_call_from_application_thread();
551✔
360

361
                const auto epoch = m_task_mngr->generate_epoch_task(action);
550✔
362
                m_task_mngr->await_epoch(epoch);
550✔
363
                return epoch;
550✔
364
        }
365

366
        task_manager& runtime::get_task_manager() const {
1,891✔
367
                require_call_from_application_thread();
1,891✔
368
                return *m_task_mngr;
1,889✔
369
        }
370

371
        std::string gather_command_graph(const std::string& graph_str, const size_t num_nodes, const node_id local_nid) {
18✔
372
#if CELERITY_ENABLE_MPI
373
                const auto comm = MPI_COMM_WORLD;
18✔
374
                const int tag = 0xCDA6; // aka 'CDAG' - Celerity does not perform any other peer-to-peer communication over MPI_COMM_WORLD
18✔
375

376
                // Send local graph to rank 0 on all other nodes
377
                if(local_nid != 0) {
18✔
378
                        const uint64_t usize = graph_str.size();
9✔
379
                        assert(usize < std::numeric_limits<int32_t>::max());
9✔
380
                        const int32_t size = static_cast<int32_t>(usize);
9✔
381
                        MPI_Send(&size, 1, MPI_INT32_T, 0, tag, comm);
9✔
382
                        if(size > 0) MPI_Send(graph_str.data(), static_cast<int32_t>(size), MPI_BYTE, 0, tag, comm);
9!
383
                        return "";
27✔
384
                }
385
                // On node 0, receive and combine
386
                std::vector<std::string> graphs;
9✔
387
                graphs.push_back(graph_str);
9✔
388
                for(node_id peer = 1; peer < num_nodes; ++peer) {
18✔
389
                        int32_t size = 0;
9✔
390
                        MPI_Recv(&size, 1, MPI_INT32_T, static_cast<int>(peer), tag, comm, MPI_STATUS_IGNORE);
9✔
391
                        if(size > 0) {
9!
392
                                std::string graph;
9✔
393
                                graph.resize(size);
9✔
394
                                MPI_Recv(graph.data(), size, MPI_BYTE, static_cast<int>(peer), tag, comm, MPI_STATUS_IGNORE);
9✔
395
                                graphs.push_back(std::move(graph));
9✔
396
                        }
9✔
397
                }
398
                return combine_command_graphs(graphs);
27✔
399
#else  // CELERITY_ENABLE_MPI
400
                assert(num_nodes == 1 && local_nid == 0);
401
                return graph_str;
402
#endif // CELERITY_ENABLE_MPI
403
        }
9✔
404

405
        // task_manager::delegate
406

407
        void runtime::task_created(const task* tsk) {
3,224✔
408
                assert(m_schdlr != nullptr);
3,224✔
409
                m_schdlr->notify_task_created(tsk);
3,224✔
410
        }
3,224✔
411

412
        // scheduler::delegate
413

414
        void runtime::flush(std::vector<const instruction*> instructions, std::vector<outbound_pilot> pilots) {
3,854✔
415
                // thread-safe
416
                assert(m_exec != nullptr);
3,854✔
417
                m_exec->submit(std::move(instructions), std::move(pilots));
3,854✔
418
        }
3,854✔
419

420
        // executor::delegate
421

422
        void runtime::horizon_reached(const task_id horizon_tid) {
581✔
423
                assert(m_task_mngr != nullptr);
581✔
424
                m_task_mngr->notify_horizon_reached(horizon_tid); // thread-safe
581✔
425

426
                // The two-horizon logic is duplicated from task_manager::notify_horizon_reached. TODO move epoch_monitor from task_manager to runtime.
427
                assert(m_schdlr != nullptr);
581✔
428
                if(m_latest_horizon_reached.has_value()) { m_schdlr->notify_epoch_reached(*m_latest_horizon_reached); }
581✔
429
                m_latest_horizon_reached = horizon_tid;
581✔
430
        }
581✔
431

432
        void runtime::epoch_reached(const task_id epoch_tid) {
769✔
433
                assert(m_task_mngr != nullptr);
769✔
434
                m_task_mngr->notify_epoch_reached(epoch_tid); // thread-safe
769✔
435

436
                assert(m_schdlr != nullptr);
769✔
437
                m_schdlr->notify_epoch_reached(epoch_tid);
769✔
438
                m_latest_horizon_reached = std::nullopt; // Any non-applied horizon is now behind the epoch and will therefore never become an epoch itself
769✔
439
        }
769✔
440

441
        void runtime::create_queue() {
219✔
442
                require_call_from_application_thread();
219✔
443
                ++m_num_live_queues;
218✔
444
        }
218✔
445

446
        void runtime::destroy_queue() {
219✔
447
                require_call_from_application_thread();
219✔
448

449
                assert(m_num_live_queues > 0);
218✔
450
                --m_num_live_queues;
218✔
451
        }
218✔
452

453
        allocation_id runtime::create_user_allocation(void* const ptr) {
108✔
454
                require_call_from_application_thread();
108✔
455
                const auto aid = allocation_id(user_memory_id, m_next_user_allocation_id++);
107✔
456
                m_exec->track_user_allocation(aid, ptr);
107✔
457
                return aid;
107✔
458
        }
459

460
        buffer_id runtime::create_buffer(const range<3>& range, const size_t elem_size, const size_t elem_align, const allocation_id user_aid) {
340✔
461
                require_call_from_application_thread();
340✔
462

463
                const auto bid = m_next_buffer_id++;
339✔
464
                m_live_buffers.emplace(bid);
339✔
465
                m_task_mngr->notify_buffer_created(bid, range, user_aid != null_allocation_id);
339✔
466
                m_schdlr->notify_buffer_created(bid, range, elem_size, elem_align, user_aid);
339✔
467
                return bid;
678✔
468
        }
469

470
        void runtime::set_buffer_debug_name(const buffer_id bid, const std::string& debug_name) {
23✔
471
                require_call_from_application_thread();
23✔
472

473
                assert(utils::contains(m_live_buffers, bid));
23✔
474
                m_task_mngr->notify_buffer_debug_name_changed(bid, debug_name);
23✔
475
                m_schdlr->notify_buffer_debug_name_changed(bid, debug_name);
23✔
476
        }
23✔
477

478
        void runtime::destroy_buffer(const buffer_id bid) {
340✔
479
                require_call_from_application_thread();
340✔
480

481
                assert(utils::contains(m_live_buffers, bid));
339✔
482
                m_schdlr->notify_buffer_destroyed(bid);
339✔
483
                m_task_mngr->notify_buffer_destroyed(bid);
339✔
484
                m_live_buffers.erase(bid);
339✔
485
        }
339✔
486

487
        host_object_id runtime::create_host_object(std::unique_ptr<host_object_instance> instance) {
34✔
488
                require_call_from_application_thread();
34✔
489

490
                const auto hoid = m_next_host_object_id++;
33✔
491
                m_live_host_objects.emplace(hoid);
33✔
492
                const bool owns_instance = instance != nullptr;
33✔
493
                if(owns_instance) { m_exec->track_host_object_instance(hoid, std::move(instance)); }
33✔
494
                m_task_mngr->notify_host_object_created(hoid);
33✔
495
                m_schdlr->notify_host_object_created(hoid, owns_instance);
33✔
496
                return hoid;
66✔
497
        }
498

499
        void runtime::destroy_host_object(const host_object_id hoid) {
34✔
500
                require_call_from_application_thread();
34✔
501

502
                assert(utils::contains(m_live_host_objects, hoid));
33✔
503
                m_schdlr->notify_host_object_destroyed(hoid);
33✔
504
                m_task_mngr->notify_host_object_destroyed(hoid);
33✔
505
                m_live_host_objects.erase(hoid);
33✔
506
        }
33✔
507

508

509
        reduction_id runtime::create_reduction(std::unique_ptr<reducer> reducer) {
65✔
510
                require_call_from_application_thread();
65✔
511

512
                const auto rid = m_next_reduction_id++;
65✔
513
                m_exec->track_reducer(rid, std::move(reducer));
65✔
514
                return rid;
130✔
515
        }
516

517
        bool runtime::is_unreferenced() const { return m_num_live_queues == 0 && m_live_buffers.empty() && m_live_host_objects.empty(); }
219!
518

519
} // namespace detail
520
} // namespace celerity
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc