• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 15253684557

26 May 2025 12:15PM UTC coverage: 95.066% (-0.04%) from 95.104%
15253684557

push

github

PeterTh
Update benchmark results for executor starvation tracking

3248 of 3682 branches covered (88.21%)

Branch coverage included in aggregate %.

7137 of 7242 relevant lines covered (98.55%)

1937355.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.29
/src/runtime.cc
1
#include "runtime.h"
2

3
#include "affinity.h"
4
#include "backend/sycl_backend.h"
5
#include "cgf.h"
6
#include "cgf_diagnostics.h"
7
#include "command_graph_generator.h"
8
#include "dry_run_executor.h"
9
#include "host_object.h"
10
#include "instruction_graph_generator.h"
11
#include "live_executor.h"
12
#include "log.h"
13
#include "named_threads.h"
14
#include "print_graph.h"
15
#include "print_utils.h"
16
#include "print_utils_internal.h"
17
#include "ranges.h"
18
#include "reduction.h"
19
#include "scheduler.h"
20
#include "select_devices.h"
21
#include "system_info.h"
22
#include "task.h"
23
#include "task_manager.h"
24
#include "testspy/runtime_testspy.h"
25
#include "tracy.h"
26
#include "types.h"
27
#include "utils.h"
28
#include "version.h"
29

30
#include <atomic>
31
#include <cassert>
32
#include <chrono>
33
#include <cmath>
34
#include <cstddef>
35
#include <cstdint>
36
#include <cstdlib>
37
#include <cstring>
38
#include <future>
39
#include <limits>
40
#include <memory>
41
#include <optional>
42
#include <stdexcept>
43
#include <string>
44
#include <thread>
45
#include <utility>
46
#include <variant>
47
#include <vector>
48

49
#include <fmt/format.h>
50
#include <spdlog/spdlog.h>
51
#include <sycl/sycl.hpp>
52

53

54
#ifdef _MSC_VER
55
#include <process.h>
56
#else
57
#include <unistd.h>
58
#endif
59

60
#if CELERITY_USE_MIMALLOC
61
// override default new/delete operators to use the mimalloc memory allocator
62
#include <mimalloc-new-delete.h>
63
#endif
64

65
#if CELERITY_ENABLE_MPI
66
#include "mpi_communicator.h"
67
#include <mpi.h>
68
#else
69
#include "local_communicator.h"
70
#endif
71

72

73
namespace celerity {
74
namespace detail {
75

76
        class epoch_promise final : public task_promise {
77
          public:
78
                std::future<void> get_future() { return m_promise.get_future(); }
583✔
79

80
                void fulfill() override { m_promise.set_value(); }
583✔
81

82
                allocation_id get_user_allocation_id() override { utils::panic("epoch_promise::get_user_allocation_id"); }
×
83

84
          private:
85
                std::promise<void> m_promise;
86
        };
87

88
        class runtime::impl final : public runtime, private task_manager::delegate, private scheduler::delegate, private executor::delegate {
89
          public:
90
                impl(int* argc, char** argv[], const devices_or_selector& user_devices_or_selector);
91

92
                impl(const runtime::impl&) = delete;
93
                impl(runtime::impl&&) = delete;
94
                impl& operator=(const runtime::impl&) = delete;
95
                impl& operator=(runtime::impl&&) = delete;
96

97
                ~impl();
98

99
                task_id submit(raw_command_group&& cg);
100

101
                task_id fence(buffer_access access, std::unique_ptr<task_promise> fence_promise);
102

103
                task_id fence(host_object_effect effect, std::unique_ptr<task_promise> fence_promise);
104

105
                task_id sync(detail::epoch_action action);
106

107
                void create_queue();
108

109
                void destroy_queue();
110

111
                allocation_id create_user_allocation(void* ptr);
112

113
                buffer_id create_buffer(const range<3>& range, size_t elem_size, size_t elem_align, allocation_id user_aid);
114

115
                void set_buffer_debug_name(buffer_id bid, const std::string& debug_name);
116

117
                void destroy_buffer(buffer_id bid);
118

119
                host_object_id create_host_object(std::unique_ptr<host_object_instance> instance /* optional */);
120

121
                void destroy_host_object(host_object_id hoid);
122

123
                reduction_id create_reduction(std::unique_ptr<reducer> reducer);
124

125
                bool is_dry_run() const;
126

127
                void set_scheduler_lookahead(experimental::lookahead lookahead);
128

129
                void flush_scheduler();
130

131
          private:
132
                friend struct runtime_testspy;
133

134
                // `runtime` is not thread safe except for its delegate implementations, so we store the id of the thread where it was instantiated (the application
135
                // thread) in order to throw if the user attempts to issue a runtime operation from any other thread. One case where this may happen unintentionally
136
                // is capturing a buffer into a host-task by value, where this capture is the last reference to the buffer: The runtime would attempt to destroy itself
137
                // from a thread that it also needs to await, which would at least cause a deadlock. This variable is immutable, so reading it from a different thread
138
                // for the purpose of the check is safe.
139
                std::thread::id m_application_thread;
140

141
                std::unique_ptr<config> m_cfg;
142
                size_t m_num_nodes = 0;
143
                node_id m_local_nid = 0;
144
                size_t m_num_local_devices = 0;
145

146
                // track all instances of celerity::queue, celerity::buffer and celerity::host_object to sanity-check runtime destruction
147
                size_t m_num_live_queues = 0;
148
                std::unordered_set<buffer_id> m_live_buffers;
149
                std::unordered_set<host_object_id> m_live_host_objects;
150

151
                buffer_id m_next_buffer_id = 0;
152
                raw_allocation_id m_next_user_allocation_id = 1;
153
                host_object_id m_next_host_object_id = 0;
154
                reduction_id m_next_reduction_id = no_reduction_id + 1;
155

156
                task_graph m_tdag;
157
                std::unique_ptr<task_manager> m_task_mngr;
158
                std::unique_ptr<scheduler> m_schdlr;
159
                std::unique_ptr<executor> m_exec;
160

161
                std::optional<task_id> m_latest_horizon_reached; // only accessed by executor thread
162
                std::atomic<size_t> m_latest_epoch_reached;      // task_id, but cast to size_t to work with std::atomic
163
                task_id m_last_epoch_pruned_before = 0;
164

165
                std::unique_ptr<detail::task_recorder> m_task_recorder;               // accessed by task manager (application thread)
166
                std::unique_ptr<detail::command_recorder> m_command_recorder;         // accessed only by scheduler thread (until shutdown)
167
                std::unique_ptr<detail::instruction_recorder> m_instruction_recorder; // accessed only by scheduler thread (until shutdown)
168

169
                std::unique_ptr<detail::thread_pinning::thread_pinner> m_thread_pinner; // thread safe, manages lifetime of thread pinning machinery
170

171
                /// Panic when not called from m_application_thread (see that variable for more info on the matter). Since there are thread-safe and non thread-safe
172
                /// member functions, we call this check at the beginning of all the non-safe ones.
173
                void require_call_from_application_thread() const;
174

175
                void maybe_prune_task_graph();
176

177
                // task_manager::delegate
178
                void task_created(const task* tsk) override;
179

180
                // scheduler::delegate
181
                void flush(std::vector<const instruction*> instructions, std::vector<outbound_pilot> pilot) override;
182
                void on_scheduler_idle() override;
183
                void on_scheduler_busy() override;
184

185
                // executor::delegate
186
                void horizon_reached(task_id horizon_tid) override;
187
                void epoch_reached(task_id epoch_tid) override;
188

189
                /// True when no buffers, host objects or queues are live that keep the runtime alive.
190
                bool is_unreferenced() const;
191
        };
192

193
        static auto get_pid() {
235✔
194
#ifdef _MSC_VER
195
                return _getpid();
196
#else
197
                return getpid();
235✔
198
#endif
199
        }
200

201
        static std::string get_version_string() {
235✔
202
                using namespace celerity::version;
203
                return fmt::format("{}.{}.{} {}{}", major, minor, patch, git_revision, git_dirty ? "-dirty" : "");
470!
204
        }
205

206
        static const char* get_build_type() {
235✔
207
#if CELERITY_DETAIL_ENABLE_DEBUG
208
                return "debug";
235✔
209
#else
210
                return "release";
211
#endif
212
        }
213

214
        static const char* get_mimalloc_string() {
235✔
215
#if CELERITY_USE_MIMALLOC
216
                return "using mimalloc";
217
#else
218
                return "using the default allocator";
235✔
219
#endif
220
        }
221

222
        static std::string get_sycl_version() {
235✔
223
#if CELERITY_SYCL_IS_ACPP
224
                return fmt::format("AdaptiveCpp {}.{}.{}", HIPSYCL_VERSION_MAJOR, HIPSYCL_VERSION_MINOR, HIPSYCL_VERSION_PATCH);
225
#elif CELERITY_SYCL_IS_DPCPP
226
                return "DPC++ / Clang " __clang_version__;
227
#elif CELERITY_SYCL_IS_SIMSYCL
228
                return "SimSYCL " SIMSYCL_VERSION;
705✔
229
#else
230
#error "unknown SYCL implementation"
231
#endif
232
        }
233

234
        static std::string get_mpi_version() {
235✔
235
#if CELERITY_ENABLE_MPI
236
                char version[MPI_MAX_LIBRARY_VERSION_STRING];
235✔
237
                int len = -1;
235✔
238
                MPI_Get_library_version(version, &len);
235✔
239
                // try shortening the human-readable version string (so far tested on OpenMPI)
240
                if(const auto brk = /* find last of */ strpbrk(version, ",;")) { len = static_cast<int>(brk - version); }
235!
241
                return std::string(version, static_cast<size_t>(len));
940✔
242
#else
243
                return "single node";
244
#endif
245
        }
246

247
        static host_config get_mpi_host_config() {
229✔
248
#if CELERITY_ENABLE_MPI
249
                // Determine the "host config", i.e., how many nodes are spawned on this host,
250
                // and what this node's local rank is. We do this by finding all world-ranks
251
                // that can use a shared-memory transport (if running on OpenMPI, use the
252
                // per-host split instead).
253
#ifdef OPEN_MPI
254
#define SPLIT_TYPE OMPI_COMM_TYPE_HOST
255
#else
256
                // TODO: Assert that shared memory is available (i.e. not explicitly disabled)
257
#define SPLIT_TYPE MPI_COMM_TYPE_SHARED
258
#endif
259
                MPI_Comm host_comm = nullptr;
229✔
260
                MPI_Comm_split_type(MPI_COMM_WORLD, SPLIT_TYPE, 0, MPI_INFO_NULL, &host_comm);
229✔
261

262
                int local_rank = 0;
229✔
263
                MPI_Comm_rank(host_comm, &local_rank);
229✔
264

265
                int node_count = 0;
229✔
266
                MPI_Comm_size(host_comm, &node_count);
229✔
267

268
                host_config host_cfg;
229✔
269
                host_cfg.local_rank = local_rank;
229✔
270
                host_cfg.node_count = node_count;
229✔
271

272
                MPI_Comm_free(&host_comm);
229✔
273

274
                return host_cfg;
458✔
275
#else  // CELERITY_ENABLE_MPI
276
                return host_config{1, 0};
277
#endif // CELERITY_ENABLE_MPI
278
        }
279

280
        runtime::impl::impl(int* argc, char** argv[], const devices_or_selector& user_devices_or_selector) {
235✔
281
                m_application_thread = std::this_thread::get_id();
235✔
282

283
                m_cfg = std::make_unique<config>(argc, argv);
235✔
284

285
                CELERITY_DETAIL_IF_TRACY_SUPPORTED(tracy_detail::g_tracy_mode = m_cfg->get_tracy_mode());
286
                CELERITY_DETAIL_TRACY_ZONE_SCOPED("runtime::startup", runtime_startup);
287

288
                if(s_test_mode) {
235✔
289
                        assert(s_test_active && "initializing the runtime from a test without a runtime_fixture");
192✔
290
                        s_test_runtime_was_instantiated = true;
192✔
291
                } else {
292
                        mpi_initialize_once(argc, argv);
43✔
293
                }
294

295
                int world_size = 1;
235✔
296
                int world_rank = 0;
235✔
297
#if CELERITY_ENABLE_MPI
298
                MPI_Comm_size(MPI_COMM_WORLD, &world_size);
235✔
299
                MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
235✔
300
#endif
301

302
                host_config host_cfg;
235✔
303
                if(m_cfg->is_dry_run()) {
235✔
304
                        if(world_size != 1) throw std::runtime_error("In order to run with CELERITY_DRY_RUN_NODES a single MPI process/rank must be used.");
6!
305
                        m_num_nodes = static_cast<size_t>(m_cfg->get_dry_run_nodes());
6✔
306
                        m_local_nid = 0;
6✔
307
                        host_cfg.node_count = 1;
6✔
308
                        host_cfg.local_rank = 0;
6✔
309
                } else {
310
                        m_num_nodes = static_cast<size_t>(world_size);
229✔
311
                        m_local_nid = static_cast<node_id>(world_rank);
229✔
312
                        host_cfg = get_mpi_host_config();
229✔
313
                }
314

315
                // Do not touch logger settings in tests, where the full (trace) logs are captured
316
                if(!s_test_mode) {
235✔
317
                        spdlog::set_level(m_cfg->get_log_level());
43✔
318
                        spdlog::set_pattern(fmt::format("[%Y-%m-%d %H:%M:%S.%e] [{:0{}}] [%^%l%$] %v", m_local_nid, int(ceil(log10(double(m_num_nodes))))));
43✔
319
                }
320

321
                CELERITY_INFO("Celerity runtime version {} running on {} / {}. PID = {}, build type = {}, {}", get_version_string(), get_sycl_version(),
235✔
322
                    get_mpi_version(), get_pid(), get_build_type(), get_mimalloc_string());
323

324
                if(!s_test_mode && m_cfg->get_tracy_mode() != tracy_mode::off) {
235!
325
                        if constexpr(CELERITY_TRACY_SUPPORT) {
326
                                CELERITY_WARN("Profiling with Tracy is enabled. Performance may be negatively impacted.");
327
                        } else {
328
                                CELERITY_WARN("CELERITY_TRACY is set, but Celerity was compiled without Tracy support. Ignoring.");
×
329
                        }
330
                }
331

332
                cgf_diagnostics::make_available();
235✔
333

334
                std::vector<sycl::device> devices;
235✔
335
                {
336
                        CELERITY_DETAIL_TRACY_ZONE_SCOPED("runtime::select_devices", runtime_select_devices);
337
                        devices = std::visit([&](const auto& value) { return select_devices(host_cfg, value, sycl::platform::get_platforms()); }, user_devices_or_selector);
470✔
338
                        assert(!devices.empty()); // postcondition of select_devices
235✔
339
                }
340

341
                {
342
                        const auto& pin_cfg = m_cfg->get_thread_pinning_config();
235✔
343
                        const thread_pinning::runtime_configuration thread_pinning_cfg{
235✔
344
                            .enabled = pin_cfg.enabled,
235✔
345
                            .num_devices = static_cast<uint32_t>(devices.size()),
235✔
346
                            .use_backend_device_submission_threads = m_cfg->should_use_backend_device_submission_threads(),
235✔
347
                            .num_legacy_processes = static_cast<uint32_t>(host_cfg.node_count),
235✔
348
                            .legacy_process_index = static_cast<uint32_t>(host_cfg.local_rank),
235✔
349
                            .standard_core_start_id = pin_cfg.starting_from_core,
235✔
350
                            .hardcoded_core_ids = pin_cfg.hardcoded_core_ids,
235✔
351
                        };
470✔
352
                        m_thread_pinner = std::make_unique<thread_pinning::thread_pinner>(thread_pinning_cfg);
235✔
353
                        name_and_pin_and_order_this_thread(named_threads::thread_type::application);
235✔
354
                }
235✔
355

356
                const sycl_backend::configuration backend_config = {
235✔
357
                    .per_device_submission_threads = m_cfg->should_use_backend_device_submission_threads(), .profiling = m_cfg->should_enable_device_profiling()};
235✔
358
                auto backend = make_sycl_backend(select_backend(sycl_backend_enumerator{}, devices), devices, backend_config);
235✔
359
                const auto system = backend->get_system_info(); // backend is about to be moved
235✔
360

361
                if(m_cfg->is_dry_run()) {
235✔
362
                        m_exec = std::make_unique<dry_run_executor>(static_cast<executor::delegate*>(this));
6✔
363
                } else {
364
#if CELERITY_ENABLE_MPI
365
                        auto comm = std::make_unique<mpi_communicator>(collective_clone_from, MPI_COMM_WORLD);
229✔
366
#else
367
                        auto comm = std::make_unique<local_communicator>();
368
#endif
369
                        m_exec = std::make_unique<live_executor>(std::move(backend), std::move(comm), static_cast<executor::delegate*>(this));
229✔
370
                }
229✔
371

372
                if(m_cfg->should_record()) {
235✔
373
                        m_task_recorder = std::make_unique<task_recorder>();
16✔
374
                        m_command_recorder = std::make_unique<command_recorder>();
16✔
375
                        m_instruction_recorder = std::make_unique<instruction_recorder>();
16✔
376
                }
377

378
                task_manager::policy_set task_mngr_policy;
235✔
379
                // Merely _declaring_ an uninitialized read is legitimate as long as the kernel does not actually perform the read at runtime - this might happen in the
380
                // first iteration of a submit-loop. We could get rid of this case by making access-modes a runtime property of accessors (cf
381
                // https://github.com/celerity/meta/issues/74).
382
                task_mngr_policy.uninitialized_read_error = CELERITY_ACCESS_PATTERN_DIAGNOSTICS ? error_policy::log_warning : error_policy::ignore;
235✔
383

384
                m_task_mngr = std::make_unique<task_manager>(m_num_nodes, m_tdag, m_task_recorder.get(), static_cast<task_manager::delegate*>(this), task_mngr_policy);
235✔
385
                if(m_cfg->get_horizon_step()) m_task_mngr->set_horizon_step(m_cfg->get_horizon_step().value());
235!
386
                if(m_cfg->get_horizon_max_parallelism()) m_task_mngr->set_horizon_max_parallelism(m_cfg->get_horizon_max_parallelism().value());
235!
387

388
                scheduler::policy_set schdlr_policy;
235✔
389
                // Any uninitialized read that is observed on CDAG generation was already logged on task generation, unless we have a bug.
390
                schdlr_policy.command_graph_generator.uninitialized_read_error = error_policy::ignore;
235✔
391
                schdlr_policy.instruction_graph_generator.uninitialized_read_error = error_policy::ignore;
235✔
392
                schdlr_policy.command_graph_generator.overlapping_write_error = CELERITY_ACCESS_PATTERN_DIAGNOSTICS ? error_policy::log_error : error_policy::ignore;
235✔
393
                schdlr_policy.instruction_graph_generator.overlapping_write_error =
235✔
394
                    CELERITY_ACCESS_PATTERN_DIAGNOSTICS ? error_policy::log_error : error_policy::ignore;
395
                schdlr_policy.instruction_graph_generator.unsafe_oversubscription_error = error_policy::log_warning;
235✔
396

397
                // The scheduler references tasks by pointer, so we make sure its lifetime is shorter than the task_manager's.
398
                m_schdlr = std::make_unique<scheduler>(
705✔
399
                    m_num_nodes, m_local_nid, system, static_cast<scheduler::delegate*>(this), m_command_recorder.get(), m_instruction_recorder.get(), schdlr_policy);
470✔
400
                if(m_cfg->get_lookahead() != experimental::lookahead::automatic) { m_schdlr->set_lookahead(m_cfg->get_lookahead()); }
235✔
401

402
                // task_manager will pass generated tasks through its delegate, so generate the init epoch only after the scheduler has been initialized
403
                m_task_mngr->generate_epoch_task(epoch_action::init);
235✔
404

405
                m_num_local_devices = system.devices.size();
235✔
406
        }
470✔
407

408
        void runtime::impl::require_call_from_application_thread() const {
7,740✔
409
                if(std::this_thread::get_id() != m_application_thread) {
7,740✔
410
                        utils::panic("Celerity runtime, queue, handler, buffer and host_object types must only be constructed, used, and destroyed from the "
40✔
411
                                     "application thread. Make sure that you did not accidentally capture one of these types in a host_task.");
412
                }
413
        }
7,730✔
414

415
        runtime::impl::~impl() {
235✔
416
                // LCOV_EXCL_START
417
                if(m_num_live_queues != 0 || !m_live_buffers.empty() || !m_live_host_objects.empty()) {
418
                        // this call might originate from static destruction - we cannot assume spdlog to still be around
419
                        utils::panic("Detected an attempt to destroy runtime while at least one queue, buffer or host_object was still alive. This likely means "
420
                                     "that one of these objects was leaked, or at least its lifetime extended beyond the scope of main(). This is undefined.");
421
                }
422
                // LCOV_EXCL_STOP
423

424
                require_call_from_application_thread();
235✔
425

426
                CELERITY_DETAIL_TRACY_ZONE_SCOPED("runtime::shutdown", runtime_shutdown);
427

428
                // Create and await the shutdown epoch
429
                sync(epoch_action::shutdown);
235✔
430

431
                const auto starvation_time = m_exec->get_starvation_time();
235✔
432
                const auto active_time = m_exec->get_active_time();
235✔
433
                const auto ratio = static_cast<double>(starvation_time.count()) / static_cast<double>(active_time.count());
235✔
434
                CELERITY_DEBUG(
235!
435
                    "Executor active time: {:.1f}. Starvation time: {:.1f} ({:.1f}%).", as_sub_second(active_time), as_sub_second(starvation_time), 100.0 * ratio);
436
                if(active_time > std::chrono::milliseconds(5) && ratio > 0.2) {
235!
437
                        CELERITY_WARN("The executor was starved for instructions for {:.1f}, or {:.1f}% of the total active time of {:.1f}. This may indicate that "
113!
438
                                      "your application is scheduler-bound. If you are interleaving Celerity tasks with other work, try flushing the queue.",
439
                            as_sub_second(starvation_time), 100.0 * ratio, as_sub_second(active_time));
440
                }
441

442
                // The shutdown epoch is, by definition, the last task (and command / instruction) issued. Since it has now completed, no more scheduler -> executor
443
                // traffic will occur, and `runtime` can stop functioning as a scheduler_delegate (which would require m_exec to be live).
444
                m_exec.reset();
235✔
445

446
                // task_manager references the scheduler as its delegate, so we destroy it first.
447
                m_task_mngr.reset();
235✔
448

449
                // ~executor() joins its thread after notifying the scheduler that the shutdown epoch has been reached, which means that this notification is
450
                // sequenced-before the destructor return, and `runtime` can now stop functioning as an executor_delegate (which would require m_schdlr to be live).
451
                m_schdlr.reset();
235✔
452

453
                // With scheduler and executor threads gone, all recorders can be safely accessed from the runtime / application thread
454
                if(spdlog::should_log(log_level::info) && m_cfg->should_print_graphs()) {
235!
455
                        if(m_local_nid == 0) { // It's the same across all nodes
16✔
456
                                assert(m_task_recorder.get() != nullptr);
8✔
457
                                const auto tdag_str = detail::print_task_graph(*m_task_recorder);
24✔
458
                                CELERITY_INFO("Task graph:\n\n{}\n", tdag_str);
8!
459
                        }
8✔
460

461
                        assert(m_command_recorder.get() != nullptr);
16✔
462
                        auto cdag_str = print_command_graph(m_local_nid, *m_command_recorder);
48✔
463
                        if(!is_dry_run()) { cdag_str = gather_command_graph(cdag_str, m_num_nodes, m_local_nid); } // must be called on all nodes
16!
464

465
                        if(m_local_nid == 0) {
16✔
466
                                // Avoid racing on stdout with other nodes (funneled through mpirun)
467
                                if(!is_dry_run()) { std::this_thread::sleep_for(std::chrono::milliseconds(500)); }
8!
468
                                CELERITY_INFO("Command graph:\n\n{}\n", cdag_str);
8!
469
                        }
470

471
                        // IDAGs become unreadable when all nodes print them at the same time - TODO attempt gathering them as well?
472
                        if(m_local_nid == 0) {
16✔
473
                                // we are allowed to deref m_instruction_recorder / m_command_recorder because the scheduler thread has exited at this point
474
                                const auto idag_str = detail::print_instruction_graph(*m_instruction_recorder, *m_command_recorder, *m_task_recorder);
24✔
475
                                CELERITY_INFO("Instruction graph on node 0:\n\n{}\n", idag_str);
8!
476
                        }
8✔
477
                }
16✔
478

479
                m_instruction_recorder.reset();
235✔
480
                m_command_recorder.reset();
235✔
481
                m_task_recorder.reset();
235✔
482

483
                cgf_diagnostics::teardown();
235✔
484

485
                if(!s_test_mode) { mpi_finalize_once(); }
235✔
486
        }
235✔
487

488
        task_id runtime::impl::submit(raw_command_group&& cg) {
1,880✔
489
                require_call_from_application_thread();
1,880✔
490
                maybe_prune_task_graph();
1,879✔
491
                return m_task_mngr->generate_command_group_task(std::move(cg));
1,879✔
492
        }
493

494
        task_id runtime::impl::fence(buffer_access access, std::unique_ptr<task_promise> fence_promise) {
51✔
495
                require_call_from_application_thread();
51✔
496
                maybe_prune_task_graph();
51✔
497
                return m_task_mngr->generate_fence_task(std::move(access), std::move(fence_promise));
51✔
498
        }
499

500
        task_id runtime::impl::fence(host_object_effect effect, std::unique_ptr<task_promise> fence_promise) {
18✔
501
                require_call_from_application_thread();
18✔
502
                maybe_prune_task_graph();
17✔
503
                return m_task_mngr->generate_fence_task(effect, std::move(fence_promise));
17✔
504
        }
505

506
        task_id runtime::impl::sync(epoch_action action) {
584✔
507
                require_call_from_application_thread();
584✔
508

509
                maybe_prune_task_graph();
583✔
510
                auto promise = std::make_unique<epoch_promise>();
583✔
511
                const auto future = promise->get_future();
583✔
512
                const auto epoch = m_task_mngr->generate_epoch_task(action, std::move(promise));
583✔
513
                future.wait();
583✔
514
                return epoch;
583✔
515
        }
583✔
516

517
        void runtime::impl::maybe_prune_task_graph() {
2,530✔
518
                require_call_from_application_thread();
2,530✔
519

520
                const auto current_epoch = m_latest_epoch_reached.load(std::memory_order_relaxed);
2,530✔
521
                if(current_epoch > m_last_epoch_pruned_before) {
2,530✔
522
                        m_tdag.erase_before_epoch(current_epoch);
847✔
523
                        m_last_epoch_pruned_before = current_epoch;
847✔
524
                }
525
        }
2,530✔
526

527
        std::string gather_command_graph(const std::string& graph_str, const size_t num_nodes, const node_id local_nid) {
18✔
528
#if CELERITY_ENABLE_MPI
529
                const auto comm = MPI_COMM_WORLD;
18✔
530
                const int tag = 0xCDA6; // aka 'CDAG' - Celerity does not perform any other peer-to-peer communication over MPI_COMM_WORLD
18✔
531

532
                // Send local graph to rank 0 on all other nodes
533
                if(local_nid != 0) {
18✔
534
                        const uint64_t usize = graph_str.size();
9✔
535
                        assert(usize < std::numeric_limits<int32_t>::max());
9✔
536
                        const int32_t size = static_cast<int32_t>(usize);
9✔
537
                        MPI_Send(&size, 1, MPI_INT32_T, 0, tag, comm);
9✔
538
                        if(size > 0) MPI_Send(graph_str.data(), static_cast<int32_t>(size), MPI_BYTE, 0, tag, comm);
9!
539
                        return "";
27✔
540
                }
541
                // On node 0, receive and combine
542
                std::vector<std::string> graphs;
9✔
543
                graphs.push_back(graph_str);
9✔
544
                for(node_id peer = 1; peer < num_nodes; ++peer) {
18✔
545
                        int32_t size = 0;
9✔
546
                        MPI_Recv(&size, 1, MPI_INT32_T, static_cast<int>(peer), tag, comm, MPI_STATUS_IGNORE);
9✔
547
                        if(size > 0) {
9!
548
                                std::string graph;
9✔
549
                                graph.resize(size);
9✔
550
                                MPI_Recv(graph.data(), size, MPI_BYTE, static_cast<int>(peer), tag, comm, MPI_STATUS_IGNORE);
9✔
551
                                graphs.push_back(std::move(graph));
9✔
552
                        }
9✔
553
                }
554
                return combine_command_graphs(graphs);
27✔
555
#else  // CELERITY_ENABLE_MPI
556
                assert(num_nodes == 1 && local_nid == 0);
557
                return graph_str;
558
#endif // CELERITY_ENABLE_MPI
559
        }
9✔
560

561
        // task_manager::delegate
562

563
        void runtime::impl::task_created(const task* tsk) {
3,345✔
564
                assert(m_schdlr != nullptr);
3,345✔
565
                m_schdlr->notify_task_created(tsk);
3,345✔
566
        }
3,345✔
567

568
        // scheduler::delegate
569

570
        void runtime::impl::flush(std::vector<const instruction*> instructions, std::vector<outbound_pilot> pilots) {
3,987✔
571
                // thread-safe
572
                assert(m_exec != nullptr);
3,987✔
573
                m_exec->submit(std::move(instructions), std::move(pilots));
3,987✔
574
        }
3,987✔
575

576
        void runtime::impl::on_scheduler_idle() {
1,921✔
577
                CELERITY_TRACE("Scheduler is idle");
1,921✔
578
                // The executor may have already been destroyed if we are currently shutting down
579
                if(m_exec != nullptr) { m_exec->notify_scheduler_idle(true); }
1,921!
580
        }
1,921✔
581

582
        void runtime::impl::on_scheduler_busy() {
2,156✔
583
                CELERITY_TRACE("Scheduler is busy");
2,156✔
584
                // The executor may have already been destroyed if we are currently shutting down
585
                if(m_exec != nullptr) { m_exec->notify_scheduler_idle(false); }
2,156✔
586
        }
2,156✔
587

588
        // executor::delegate
589

590
        void runtime::impl::horizon_reached(const task_id horizon_tid) {
582✔
591
                assert(!m_latest_horizon_reached || *m_latest_horizon_reached < horizon_tid);
582✔
592
                assert(m_latest_epoch_reached.load(std::memory_order::relaxed) < horizon_tid); // relaxed: written only by this thread
1,164✔
593

594
                if(m_latest_horizon_reached.has_value()) {
582✔
595
                        m_latest_epoch_reached.store(*m_latest_horizon_reached, std::memory_order_relaxed);
553✔
596
                        m_schdlr->notify_epoch_reached(*m_latest_horizon_reached);
553✔
597
                }
598
                m_latest_horizon_reached = horizon_tid;
582✔
599
        }
582✔
600

601
        void runtime::impl::epoch_reached(const task_id epoch_tid) {
818✔
602
                // m_latest_horizon_reached does not need synchronization (see definition), all other accesses are implicitly synchronized.
603
                assert(!m_latest_horizon_reached || *m_latest_horizon_reached < epoch_tid);
818✔
604
                assert(epoch_tid == 0 || m_latest_epoch_reached.load(std::memory_order_relaxed) < epoch_tid);
1,401✔
605

606
                m_latest_epoch_reached.store(epoch_tid, std::memory_order_relaxed);
818✔
607
                m_schdlr->notify_epoch_reached(epoch_tid);
818✔
608
                m_latest_horizon_reached = std::nullopt; // Any non-applied horizon is now behind the epoch and will therefore never become an epoch itself
818✔
609
        }
818✔
610

611
        void runtime::impl::create_queue() {
228✔
612
                require_call_from_application_thread();
228✔
613
                ++m_num_live_queues;
227✔
614
        }
227✔
615

616
        void runtime::impl::destroy_queue() {
228✔
617
                require_call_from_application_thread();
228✔
618

619
                assert(m_num_live_queues > 0);
227✔
620
                --m_num_live_queues;
227✔
621
        }
227✔
622

623
        bool runtime::impl::is_dry_run() const { return m_cfg->is_dry_run(); }
30✔
624

625
        allocation_id runtime::impl::create_user_allocation(void* const ptr) {
114✔
626
                require_call_from_application_thread();
114✔
627
                const auto aid = allocation_id(user_memory_id, m_next_user_allocation_id++);
113✔
628
                m_exec->track_user_allocation(aid, ptr);
113✔
629
                return aid;
113✔
630
        }
631

632
        buffer_id runtime::impl::create_buffer(const range<3>& range, const size_t elem_size, const size_t elem_align, const allocation_id user_aid) {
354✔
633
                require_call_from_application_thread();
354✔
634

635
                const auto bid = m_next_buffer_id++;
353✔
636
                m_live_buffers.emplace(bid);
353✔
637
                m_task_mngr->notify_buffer_created(bid, range, user_aid != null_allocation_id);
353✔
638
                m_schdlr->notify_buffer_created(bid, range, elem_size, elem_align, user_aid);
353✔
639
                return bid;
706✔
640
        }
641

642
        void runtime::impl::set_buffer_debug_name(const buffer_id bid, const std::string& debug_name) {
23✔
643
                require_call_from_application_thread();
23✔
644

645
                assert(utils::contains(m_live_buffers, bid));
23✔
646
                m_task_mngr->notify_buffer_debug_name_changed(bid, debug_name);
23✔
647
                m_schdlr->notify_buffer_debug_name_changed(bid, debug_name);
23✔
648
        }
23✔
649

650
        void runtime::impl::destroy_buffer(const buffer_id bid) {
354✔
651
                require_call_from_application_thread();
354✔
652

653
                assert(utils::contains(m_live_buffers, bid));
353✔
654
                m_schdlr->notify_buffer_destroyed(bid);
353✔
655
                m_task_mngr->notify_buffer_destroyed(bid);
353✔
656
                m_live_buffers.erase(bid);
353✔
657
        }
353✔
658

659
        host_object_id runtime::impl::create_host_object(std::unique_ptr<host_object_instance> instance) {
34✔
660
                require_call_from_application_thread();
34✔
661

662
                const auto hoid = m_next_host_object_id++;
33✔
663
                m_live_host_objects.emplace(hoid);
33✔
664
                const bool owns_instance = instance != nullptr;
33✔
665
                if(owns_instance) { m_exec->track_host_object_instance(hoid, std::move(instance)); }
33✔
666
                m_task_mngr->notify_host_object_created(hoid);
33✔
667
                m_schdlr->notify_host_object_created(hoid, owns_instance);
33✔
668
                return hoid;
66✔
669
        }
670

671
        void runtime::impl::destroy_host_object(const host_object_id hoid) {
34✔
672
                require_call_from_application_thread();
34✔
673

674
                assert(utils::contains(m_live_host_objects, hoid));
33✔
675
                m_schdlr->notify_host_object_destroyed(hoid);
33✔
676
                m_task_mngr->notify_host_object_destroyed(hoid);
33✔
677
                m_live_host_objects.erase(hoid);
33✔
678
        }
33✔
679

680
        reduction_id runtime::impl::create_reduction(std::unique_ptr<reducer> reducer) {
65✔
681
                require_call_from_application_thread();
65✔
682

683
                const auto rid = m_next_reduction_id++;
65✔
684
                m_exec->track_reducer(rid, std::move(reducer));
65✔
685
                return rid;
130✔
686
        }
687

688
        void runtime::impl::set_scheduler_lookahead(const experimental::lookahead lookahead) {
6✔
689
                require_call_from_application_thread();
6✔
690
                m_schdlr->set_lookahead(lookahead);
6✔
691
        }
6✔
692

693
        void runtime::impl::flush_scheduler() {
1,002✔
694
                require_call_from_application_thread();
1,002✔
695
                m_schdlr->flush_commands();
1,002✔
696
        }
1,002✔
697

698
        bool runtime::s_mpi_initialized = false;
699
        bool runtime::s_mpi_finalized = false;
700

701
        runtime runtime::s_instance; // definition of static member
702

703
        void runtime::mpi_initialize_once(int* argc, char*** argv) {
56✔
704
#if CELERITY_ENABLE_MPI
705
                CELERITY_DETAIL_TRACY_ZONE_SCOPED_V("mpi::init", mpi_init, "MPI_Init");
706
                assert(!s_mpi_initialized);
56✔
707
                int provided = -1;
56✔
708
                MPI_Init_thread(argc, argv, MPI_THREAD_MULTIPLE, &provided);
56✔
709
                assert(provided == MPI_THREAD_MULTIPLE);
56✔
710
#endif // CELERITY_ENABLE_MPI
711
                s_mpi_initialized = true;
56✔
712
        }
56✔
713

714
        void runtime::mpi_finalize_once() {
56✔
715
#if CELERITY_ENABLE_MPI
716
                CELERITY_DETAIL_TRACY_ZONE_SCOPED_V("mpi::finalize", mpi_finalize, "MPI_Finalize");
717
                assert(s_mpi_initialized && !s_mpi_finalized && (!s_test_mode || !has_instance()));
56✔
718
                MPI_Finalize();
56✔
719
#endif // CELERITY_ENABLE_MPI
720
                s_mpi_finalized = true;
56✔
721
        }
56✔
722

723
        void runtime::init(int* argc, char** argv[], const devices_or_selector& user_devices_or_selector) {
235✔
724
                assert(!has_instance());
235✔
725
                s_instance.m_impl = std::make_unique<runtime::impl>(argc, argv, user_devices_or_selector);
235✔
726
                if(!s_test_mode) { atexit(shutdown); }
235✔
727
        }
235✔
728

729
        runtime& runtime::get_instance() {
4,824✔
730
                if(!has_instance()) { throw std::runtime_error("Runtime has not been initialized"); }
4,824!
731
                return s_instance;
4,824✔
732
        }
733

734
        void runtime::shutdown() { s_instance.m_impl.reset(); }
245✔
735

736
        task_id runtime::submit(raw_command_group&& cg) { return m_impl->submit(std::move(cg)); }
1,880✔
737

738
        task_id runtime::fence(buffer_access access, std::unique_ptr<task_promise> fence_promise) {
51✔
739
                return m_impl->fence(std::move(access), std::move(fence_promise));
51✔
740
        }
741

742
        task_id runtime::fence(host_object_effect effect, std::unique_ptr<task_promise> fence_promise) { return m_impl->fence(effect, std::move(fence_promise)); }
19✔
743

744
        task_id runtime::sync(detail::epoch_action action) { return m_impl->sync(action); }
349✔
745

746
        void runtime::create_queue() { m_impl->create_queue(); }
228✔
747

748
        void runtime::destroy_queue() { m_impl->destroy_queue(); }
228✔
749

750
        allocation_id runtime::create_user_allocation(void* const ptr) { return m_impl->create_user_allocation(ptr); }
114✔
751

752
        buffer_id runtime::create_buffer(const range<3>& range, const size_t elem_size, const size_t elem_align, const allocation_id user_aid) {
354✔
753
                return m_impl->create_buffer(range, elem_size, elem_align, user_aid);
354✔
754
        }
755

756
        void runtime::set_buffer_debug_name(const buffer_id bid, const std::string& debug_name) { m_impl->set_buffer_debug_name(bid, debug_name); }
23✔
757

758
        void runtime::destroy_buffer(const buffer_id bid) { m_impl->destroy_buffer(bid); }
354✔
759

760
        host_object_id runtime::create_host_object(std::unique_ptr<host_object_instance> instance) { return m_impl->create_host_object(std::move(instance)); }
35✔
761

762
        void runtime::destroy_host_object(const host_object_id hoid) { m_impl->destroy_host_object(hoid); }
34✔
763

764
        reduction_id runtime::create_reduction(std::unique_ptr<reducer> reducer) { return m_impl->create_reduction(std::move(reducer)); }
65✔
765

766
        bool runtime::is_dry_run() const { return m_impl->is_dry_run(); }
6✔
767

768
        void runtime::set_scheduler_lookahead(const experimental::lookahead lookahead) { m_impl->set_scheduler_lookahead(lookahead); }
6✔
769

770
        void runtime::flush_scheduler() { m_impl->flush_scheduler(); }
1,002✔
771

772
        bool runtime::s_test_mode = false;
773
        bool runtime::s_test_active = false;
774
        bool runtime::s_test_runtime_was_instantiated = false;
775

776
} // namespace detail
777
} // namespace celerity
778

779

780
#define CELERITY_DETAIL_TAIL_INCLUDE
781
#include "testspy/runtime_testspy.inl"
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc