• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 10216674169

02 Aug 2024 01:45PM UTC coverage: 94.951% (+2.1%) from 92.884%
10216674169

push

github

fknorr
Remove experimental::user_benchmarker

user_benchmarker has been obsolete ever since we moved away from
structured logging as a the profiler (CPAT) interface.

2978 of 3372 branches covered (88.32%)

Branch coverage included in aggregate %.

6557 of 6670 relevant lines covered (98.31%)

1534446.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.46
/src/runtime.cc
1
#include "runtime.h"
2

3
#include <limits>
4
#include <string>
5

6
#ifdef _MSC_VER
7
#include <process.h>
8
#else
9
#include <unistd.h>
10
#endif
11

12
#include <mpi.h>
13

14
#if CELERITY_USE_MIMALLOC
15
// override default new/delete operators to use the mimalloc memory allocator
16
#include <mimalloc-new-delete.h>
17
#endif
18

19
#include "affinity.h"
20
#include "backend/sycl_backend.h"
21
#include "cgf_diagnostics.h"
22
#include "device_selection.h"
23
#include "distributed_graph_generator.h"
24
#include "dry_run_executor.h"
25
#include "host_object.h"
26
#include "instruction_graph_generator.h"
27
#include "live_executor.h"
28
#include "log.h"
29
#include "mpi_communicator.h"
30
#include "print_graph.h"
31
#include "reduction.h"
32
#include "scheduler.h"
33
#include "system_info.h"
34
#include "task_manager.h"
35
#include "version.h"
36

37
namespace celerity {
38
namespace detail {
39

40
        std::unique_ptr<runtime> runtime::s_instance = nullptr;
41

42
        void runtime::mpi_initialize_once(int* argc, char*** argv) {
56✔
43
                assert(!s_mpi_initialized);
56✔
44
                int provided;
56✔
45
                MPI_Init_thread(argc, argv, MPI_THREAD_MULTIPLE, &provided);
56✔
46
                assert(provided == MPI_THREAD_MULTIPLE);
56✔
47
                s_mpi_initialized = true;
56✔
48
        }
56✔
49

50
        void runtime::mpi_finalize_once() {
56✔
51
                assert(s_mpi_initialized && !s_mpi_finalized && (!s_test_mode || !s_instance));
56✔
52
                MPI_Finalize();
56✔
53
                s_mpi_finalized = true;
56✔
54
        }
56✔
55

56
        void runtime::init(int* argc, char** argv[], const devices_or_selector& user_devices_or_selector) {
223✔
57
                assert(!s_instance);
223✔
58
                s_instance = std::unique_ptr<runtime>(new runtime(argc, argv, user_devices_or_selector));
223!
59
        }
223✔
60

61
        runtime& runtime::get_instance() {
5,493✔
62
                if(s_instance == nullptr) { throw std::runtime_error("Runtime has not been initialized"); }
5,493!
63
                return *s_instance;
5,493✔
64
        }
65

66
        static auto get_pid() {
223✔
67
#ifdef _MSC_VER
68
                return _getpid();
69
#else
70
                return getpid();
223✔
71
#endif
72
        }
73

74
        static std::string get_version_string() {
223✔
75
                using namespace celerity::version;
76
                return fmt::format("{}.{}.{} {}{}", major, minor, patch, git_revision, git_dirty ? "-dirty" : "");
446!
77
        }
78

79
        static const char* get_build_type() {
223✔
80
#if defined(CELERITY_DETAIL_ENABLE_DEBUG)
81
                return "debug";
223✔
82
#else
83
                return "release";
84
#endif
85
        }
86

87
        static const char* get_mimalloc_string() {
223✔
88
#if CELERITY_USE_MIMALLOC
89
                return "using mimalloc";
90
#else
91
                return "using the default allocator";
223✔
92
#endif
93
        }
94

95
        static std::string get_sycl_version() {
223✔
96
#if defined(__HIPSYCL__) || defined(__HIPSYCL_TRANSFORM__)
97
                return fmt::format("hipSYCL {}.{}.{}", HIPSYCL_VERSION_MAJOR, HIPSYCL_VERSION_MINOR, HIPSYCL_VERSION_PATCH);
98
#elif CELERITY_DPCPP
99
                return "DPC++ / Clang " __clang_version__;
100
#elif CELERITY_SIMSYCL
101
                return "SimSYCL " SIMSYCL_VERSION;
669✔
102
#else
103
#error "unknown SYCL implementation"
104
#endif
105
        }
106

107
        runtime::runtime(int* argc, char** argv[], const devices_or_selector& user_devices_or_selector) {
223✔
108
                m_application_thread = std::this_thread::get_id();
223✔
109

110
                if(s_test_mode) {
223✔
111
                        assert(s_test_active && "initializing the runtime from a test without a runtime_fixture");
180✔
112
                        s_test_runtime_was_instantiated = true;
180✔
113
                } else {
114
                        mpi_initialize_once(argc, argv);
43✔
115
                }
116

117
                m_cfg = std::make_unique<config>(argc, argv);
223✔
118

119
                if(m_cfg->is_dry_run()) {
223✔
120
                        m_num_nodes = static_cast<size_t>(m_cfg->get_dry_run_nodes());
6✔
121
                        m_local_nid = 0;
6✔
122
                } else {
123
                        int world_size = -1;
217✔
124
                        MPI_Comm_size(MPI_COMM_WORLD, &world_size);
217✔
125
                        m_num_nodes = static_cast<size_t>(world_size);
217✔
126

127
                        int world_rank = -1;
217✔
128
                        MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
217✔
129
                        m_local_nid = static_cast<node_id>(world_rank);
217✔
130
                }
131

132
                if(!s_test_mode) { // do not touch logger settings in tests, where the full (trace) logs are captured
223✔
133
                        spdlog::set_level(m_cfg->get_log_level());
43✔
134
                        // TODO is the runtime ctor really the right place to set these globals?
135
                        spdlog::set_pattern(fmt::format("[%Y-%m-%d %H:%M:%S.%e] [{:0{}}] [%^%l%$] %v", m_local_nid, int(ceil(log10(double(m_num_nodes))))));
86✔
136
                }
137

138
                CELERITY_INFO("Celerity runtime version {} running on {}. PID = {}, build type = {}, {}", get_version_string(), get_sycl_version(), get_pid(),
223✔
139
                    get_build_type(), get_mimalloc_string());
140

141
#ifndef __APPLE__
142
                if(const uint32_t cores = affinity_cores_available(); cores < min_cores_needed) {
223!
143
                        CELERITY_WARN("Celerity has detected that only {} logical cores are available to this process. It is recommended to assign at least {} "
×
144
                                      "logical cores. Performance may be negatively impacted.",
145
                            cores, min_cores_needed);
146
                }
147
#endif
148

149
                cgf_diagnostics::make_available();
223✔
150

151
                auto devices = std::visit(
223✔
152
                    [&](const auto& value) { return pick_devices(m_cfg->get_host_config(), value, sycl::platform::get_platforms()); }, user_devices_or_selector);
446✔
153
                assert(!devices.empty()); // postcondition of pick_devices
223✔
154

155
                const bool enable_profiling = m_cfg->get_enable_device_profiling().value_or(false);
223✔
156
                auto backend = make_sycl_backend(select_backend(sycl_backend_enumerator{}, devices), devices, enable_profiling);
223✔
157
                const auto system = backend->get_system_info(); // backend is about to be moved
223✔
158

159
                if(m_cfg->is_dry_run()) {
223✔
160
                        m_exec = std::make_unique<dry_run_executor>(static_cast<executor::delegate*>(this));
6✔
161
                } else {
162
                        auto comm = std::make_unique<mpi_communicator>(collective_clone_from, MPI_COMM_WORLD);
217✔
163
                        m_exec = std::make_unique<live_executor>(std::move(backend), std::move(comm), static_cast<executor::delegate*>(this));
217✔
164
                }
217✔
165

166
                if(m_cfg->should_record()) {
223✔
167
                        m_task_recorder = std::make_unique<task_recorder>();
16✔
168
                        m_command_recorder = std::make_unique<command_recorder>();
16✔
169
                        m_instruction_recorder = std::make_unique<instruction_recorder>();
16✔
170
                }
171

172
                task_manager::policy_set task_mngr_policy;
223✔
173
                // Merely _declaring_ an uninitialized read is legitimate as long as the kernel does not actually perform the read at runtime - this might happen in the
174
                // first iteration of a submit-loop. We could get rid of this case by making access-modes a runtime property of accessors (cf
175
                // https://github.com/celerity/meta/issues/74).
176
                task_mngr_policy.uninitialized_read_error = CELERITY_ACCESS_PATTERN_DIAGNOSTICS ? error_policy::log_warning : error_policy::ignore;
223✔
177

178
                m_task_mngr = std::make_unique<task_manager>(m_num_nodes, m_task_recorder.get(), task_mngr_policy);
223✔
179
                if(m_cfg->get_horizon_step()) m_task_mngr->set_horizon_step(m_cfg->get_horizon_step().value());
223!
180
                if(m_cfg->get_horizon_max_parallelism()) m_task_mngr->set_horizon_max_parallelism(m_cfg->get_horizon_max_parallelism().value());
223!
181

182
                scheduler::policy_set schdlr_policy;
223✔
183
                // Any uninitialized read that is observed on CDAG generation was already logged on task generation, unless we have a bug.
184
                schdlr_policy.command_graph_generator.uninitialized_read_error = error_policy::ignore;
223✔
185
                schdlr_policy.instruction_graph_generator.uninitialized_read_error = error_policy::ignore;
223✔
186
                schdlr_policy.command_graph_generator.overlapping_write_error = CELERITY_ACCESS_PATTERN_DIAGNOSTICS ? error_policy::log_error : error_policy::ignore;
223✔
187
                schdlr_policy.instruction_graph_generator.overlapping_write_error =
223✔
188
                    CELERITY_ACCESS_PATTERN_DIAGNOSTICS ? error_policy::log_error : error_policy::ignore;
189
                schdlr_policy.instruction_graph_generator.unsafe_oversubscription_error = error_policy::log_warning;
223✔
190

191
                m_schdlr = std::make_unique<scheduler>(m_num_nodes, m_local_nid, system, *m_task_mngr, static_cast<abstract_scheduler::delegate*>(this),
892✔
192
                    m_command_recorder.get(), m_instruction_recorder.get(), schdlr_policy);
669✔
193
                m_task_mngr->register_task_callback([this](const task* tsk) { m_schdlr->notify_task_created(tsk); });
5,302✔
194

195
                m_num_local_devices = system.devices.size();
223✔
196
        }
446✔
197

198
        void runtime::require_call_from_application_thread() const {
5,864✔
199
                if(std::this_thread::get_id() != m_application_thread) {
5,864✔
200
                        utils::panic("Celerity runtime, distr_queue, handler, buffer and host_object types must only be constructed, used, and destroyed from the "
40✔
201
                                     "application thread. Make sure that you did not accidentally capture one of these types in a host_task.");
202
                }
203
        }
5,854✔
204

205
        runtime::~runtime() {
223✔
206
                // LCOV_EXCL_START
207
                if(!is_unreferenced()) {
208
                        // this call might originate from static destruction - we cannot assume spdlog to still be around
209
                        utils::panic("Detected an attempt to destroy runtime while at least one distr_queue, buffer or host_object was still alive. This likely means "
210
                                     "that one of these objects was leaked, or at least its lifetime extended beyond the scope of main(). This is undefined.");
211
                }
212
                // LCOV_EXCL_STOP
213

214
                require_call_from_application_thread();
223✔
215

216
                // Create and await the shutdown epoch
217
                sync(epoch_action::shutdown);
223✔
218

219
                // The shutdown epoch is, by definition, the last task (and command / instruction) issued. Since it has now completed, no more scheduler -> executor
220
                // traffic will occur, and `runtime` can stop functioning as a scheduler_delegate (which would require m_exec to be live).
221
                m_exec.reset();
223✔
222

223
                // ~executor() joins its thread after notifying the scheduler that the shutdown epoch has been reached, which means that this notification is
224
                // sequenced-before the destructor return, and `runtime` can now stop functioning as an executor_delegate (which would require m_schdlr to be live).
225
                m_schdlr.reset();
223✔
226

227
                // Since scheduler and executor threads are gone, task_manager::epoch_monitor is not shared across threads anymore
228
                m_task_mngr.reset();
223✔
229

230
                // With scheduler and executor threads gone, all recorders can be safely accessed from the runtime / application thread
231
                if(spdlog::should_log(log_level::info) && m_cfg->should_print_graphs()) {
223!
232
                        if(m_local_nid == 0) { // It's the same across all nodes
16✔
233
                                assert(m_task_recorder.get() != nullptr);
8✔
234
                                const auto tdag_str = detail::print_task_graph(*m_task_recorder);
24✔
235
                                CELERITY_INFO("Task graph:\n\n{}\n", tdag_str);
8!
236
                        }
8✔
237

238
                        assert(m_command_recorder.get() != nullptr);
16✔
239
                        auto cdag_str = print_command_graph(m_local_nid, *m_command_recorder);
48✔
240
                        if(!is_dry_run()) { cdag_str = gather_command_graph(cdag_str, m_num_nodes, m_local_nid); } // must be called on all nodes
16!
241

242
                        if(m_local_nid == 0) {
16✔
243
                                // Avoid racing on stdout with other nodes (funneled through mpirun)
244
                                if(!is_dry_run()) { std::this_thread::sleep_for(std::chrono::milliseconds(500)); }
8!
245
                                CELERITY_INFO("Command graph:\n\n{}\n", cdag_str);
8!
246
                        }
247

248
                        // IDAGs become unreadable when all nodes print them at the same time - TODO attempt gathering them as well?
249
                        if(m_local_nid == 0) {
16✔
250
                                // we are allowed to deref m_instruction_recorder / m_command_recorder because the scheduler thread has exited at this point
251
                                const auto idag_str = detail::print_instruction_graph(*m_instruction_recorder, *m_command_recorder, *m_task_recorder);
24✔
252
                                CELERITY_INFO("Instruction graph on node 0:\n\n{}\n", idag_str);
8!
253
                        }
8✔
254
                }
16✔
255

256
                m_instruction_recorder.reset();
223✔
257
                m_command_recorder.reset();
223✔
258
                m_task_recorder.reset();
223✔
259

260
                cgf_diagnostics::teardown();
223✔
261

262
                if(!s_test_mode) { mpi_finalize_once(); }
223✔
263
        }
223✔
264

265
        void runtime::sync(epoch_action action) {
297✔
266
                require_call_from_application_thread();
297✔
267
                const auto epoch = m_task_mngr->generate_epoch_task(action);
296✔
268
                m_task_mngr->await_epoch(epoch);
296✔
269
        }
296✔
270

271
        task_manager& runtime::get_task_manager() const {
3,975✔
272
                require_call_from_application_thread();
3,975✔
273
                return *m_task_mngr;
3,973✔
274
        }
275

276
        std::string gather_command_graph(const std::string& graph_str, const size_t num_nodes, const node_id local_nid) {
18✔
277
                const auto comm = MPI_COMM_WORLD;
18✔
278
                const int tag = 0xCDA6; // aka 'CDAG' - Celerity does not perform any other peer-to-peer communication over MPI_COMM_WORLD
18✔
279

280
                // Send local graph to rank 0 on all other nodes
281
                if(local_nid != 0) {
18✔
282
                        const uint64_t usize = graph_str.size();
9✔
283
                        assert(usize < std::numeric_limits<int32_t>::max());
9✔
284
                        const int32_t size = static_cast<int32_t>(usize);
9✔
285
                        MPI_Send(&size, 1, MPI_INT32_T, 0, tag, comm);
9✔
286
                        if(size > 0) MPI_Send(graph_str.data(), static_cast<int32_t>(size), MPI_BYTE, 0, tag, comm);
9!
287
                        return "";
27✔
288
                }
289
                // On node 0, receive and combine
290
                std::vector<std::string> graphs;
9✔
291
                graphs.push_back(graph_str);
9✔
292
                for(node_id peer = 1; peer < num_nodes; ++peer) {
18✔
293
                        int32_t size = 0;
9✔
294
                        MPI_Recv(&size, 1, MPI_INT32_T, static_cast<int>(peer), tag, comm, MPI_STATUS_IGNORE);
9✔
295
                        if(size > 0) {
9!
296
                                std::string graph;
9✔
297
                                graph.resize(size);
9✔
298
                                MPI_Recv(graph.data(), size, MPI_BYTE, static_cast<int>(peer), tag, comm, MPI_STATUS_IGNORE);
9✔
299
                                graphs.push_back(std::move(graph));
9✔
300
                        }
9✔
301
                }
302
                return combine_command_graphs(graphs);
27✔
303
        }
9✔
304

305
        // scheduler::delegate
306

307
        void runtime::flush(std::vector<const instruction*> instructions, std::vector<outbound_pilot> pilots) {
6,120✔
308
                // thread-safe
309
                assert(m_exec != nullptr);
6,120✔
310
                m_exec->submit(std::move(instructions), std::move(pilots));
6,120✔
311
        }
6,120✔
312

313
        // executor::delegate
314

315
        void runtime::horizon_reached(const task_id horizon_tid) {
839✔
316
                assert(m_task_mngr != nullptr);
839✔
317
                m_task_mngr->notify_horizon_reached(horizon_tid); // thread-safe
839✔
318

319
                // The two-horizon logic is duplicated from task_manager::notify_horizon_reached. TODO move epoch_monitor from task_manager to runtime.
320
                assert(m_schdlr != nullptr);
839✔
321
                if(m_latest_horizon_reached.has_value()) { m_schdlr->notify_epoch_reached(*m_latest_horizon_reached); }
839✔
322
                m_latest_horizon_reached = horizon_tid;
839✔
323
        }
839✔
324

325
        void runtime::epoch_reached(const task_id epoch_tid) {
296✔
326
                assert(m_task_mngr != nullptr);
296✔
327
                m_task_mngr->notify_epoch_reached(epoch_tid); // thread-safe
296✔
328

329
                assert(m_schdlr != nullptr);
296✔
330
                m_schdlr->notify_epoch_reached(epoch_tid);
296✔
331
                m_latest_horizon_reached = std::nullopt; // Any non-applied horizon is now behind the epoch and will therefore never become an epoch itself
296✔
332
        }
296✔
333

334
        void runtime::create_queue() {
213✔
335
                require_call_from_application_thread();
213✔
336

337
                if(m_has_live_queue) { throw std::runtime_error("Only one celerity::distr_queue can be created per process (but it can be copied!)"); }
212✔
338
                m_has_live_queue = true;
211✔
339
        }
211✔
340

341
        void runtime::destroy_queue() {
212✔
342
                require_call_from_application_thread();
212✔
343

344
                assert(m_has_live_queue);
211✔
345
                m_has_live_queue = false;
211✔
346
                destroy_instance_if_unreferenced();
211✔
347
        }
211✔
348

349
        allocation_id runtime::create_user_allocation(void* const ptr) {
108✔
350
                require_call_from_application_thread();
108✔
351
                const auto aid = allocation_id(user_memory_id, m_next_user_allocation_id++);
107✔
352
                m_exec->track_user_allocation(aid, ptr);
107✔
353
                return aid;
107✔
354
        }
355

356
        buffer_id runtime::create_buffer(const range<3>& range, const size_t elem_size, const size_t elem_align, const allocation_id user_aid) {
340✔
357
                require_call_from_application_thread();
340✔
358

359
                const auto bid = m_next_buffer_id++;
339✔
360
                m_live_buffers.emplace(bid);
339✔
361
                m_task_mngr->notify_buffer_created(bid, range, user_aid != null_allocation_id);
339✔
362
                m_schdlr->notify_buffer_created(bid, range, elem_size, elem_align, user_aid);
339✔
363
                return bid;
678✔
364
        }
365

366
        void runtime::set_buffer_debug_name(const buffer_id bid, const std::string& debug_name) {
23✔
367
                require_call_from_application_thread();
23✔
368

369
                assert(utils::contains(m_live_buffers, bid));
23✔
370
                m_task_mngr->notify_buffer_debug_name_changed(bid, debug_name);
23✔
371
                m_schdlr->notify_buffer_debug_name_changed(bid, debug_name);
23✔
372
        }
23✔
373

374
        void runtime::destroy_buffer(const buffer_id bid) {
340✔
375
                require_call_from_application_thread();
340✔
376

377
                assert(utils::contains(m_live_buffers, bid));
339✔
378
                m_schdlr->notify_buffer_destroyed(bid);
339✔
379
                m_task_mngr->notify_buffer_destroyed(bid);
339✔
380
                m_live_buffers.erase(bid);
339✔
381
                destroy_instance_if_unreferenced();
339✔
382
        }
339✔
383

384
        host_object_id runtime::create_host_object(std::unique_ptr<host_object_instance> instance) {
34✔
385
                require_call_from_application_thread();
34✔
386

387
                const auto hoid = m_next_host_object_id++;
33✔
388
                m_live_host_objects.emplace(hoid);
33✔
389
                const bool owns_instance = instance != nullptr;
33✔
390
                if(owns_instance) { m_exec->track_host_object_instance(hoid, std::move(instance)); }
33✔
391
                m_task_mngr->notify_host_object_created(hoid);
33✔
392
                m_schdlr->notify_host_object_created(hoid, owns_instance);
33✔
393
                return hoid;
66✔
394
        }
395

396
        void runtime::destroy_host_object(const host_object_id hoid) {
34✔
397
                require_call_from_application_thread();
34✔
398

399
                assert(utils::contains(m_live_host_objects, hoid));
33✔
400
                m_schdlr->notify_host_object_destroyed(hoid);
33✔
401
                m_task_mngr->notify_host_object_destroyed(hoid);
33✔
402
                m_live_host_objects.erase(hoid);
33✔
403
                destroy_instance_if_unreferenced();
33✔
404
        }
33✔
405

406

407
        reduction_id runtime::create_reduction(std::unique_ptr<reducer> reducer) {
65✔
408
                require_call_from_application_thread();
65✔
409

410
                const auto rid = m_next_reduction_id++;
65✔
411
                m_exec->track_reducer(rid, std::move(reducer));
65✔
412
                return rid;
130✔
413
        }
414

415
        bool runtime::is_unreferenced() const { return !m_has_live_queue && m_live_buffers.empty() && m_live_host_objects.empty(); }
806✔
416

417
        void runtime::destroy_instance_if_unreferenced() {
583✔
418
                if(s_instance == nullptr) return;
583!
419
                if(s_instance->is_unreferenced()) { s_instance.reset(); }
583✔
420
        }
421

422
} // namespace detail
423
} // namespace celerity
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc