• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 15258506201

26 May 2025 04:31PM UTC coverage: 95.061% (+0.01%) from 95.05%
15258506201

Pull #323

github

web-flow
Merge aa6c8b8ce into 09f5469c8
Pull Request #323: Change split functions to work on box instead of chunk

3247 of 3682 branches covered (88.19%)

Branch coverage included in aggregate %.

64 of 64 new or added lines in 3 files covered. (100.0%)

19 existing lines in 7 files now uncovered.

7146 of 7251 relevant lines covered (98.55%)

1878177.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.33
/src/live_executor.cc
1
#include "live_executor.h"
2
#include "backend/backend.h"
3
#include "closure_hydrator.h"
4
#include "communicator.h"
5
#include "grid.h"
6
#include "host_object.h"
7
#include "instruction_graph.h"
8
#include "log.h"
9
#include "named_threads.h"
10
#include "out_of_order_engine.h"
11
#include "print_utils.h"
12
#include "print_utils_internal.h"
13
#include "receive_arbiter.h"
14
#include "system_info.h"
15
#include "tracy.h"
16
#include "types.h"
17
#include "utils.h"
18
#include "version.h"
19

20
#include <deque>
21
#include <memory>
22
#include <optional>
23
#include <string>
24
#include <unordered_map>
25
#include <vector>
26

27
#include <matchbox.hh>
28

29

30
using namespace celerity::detail::live_executor_detail;
31

32
namespace celerity::detail {
33

34
#if CELERITY_TRACY_SUPPORT
35

36
struct tracy_integration {
37
        struct instruction_info {
38
                instruction_id iid = -1;
39
                gch::small_vector<instruction_id> dependencies;
40
                int priority = -1;
41
                std::optional<size_t> bytes_processed;
42
                TracyCZoneCtx (*begin_zone)(const instruction_info& info, bool was_eagerly_submitted) = nullptr;
43
        };
44

45
        struct async_zone {
46
                size_t submission_idx_on_lane;
47
                instruction_info info;
48
                std::string trace;
49
        };
50

51
        /// References a position in an `async_lane_state::zone_queue` from within `live_executor::impl::async_instruction_state`
52
        struct async_lane_cursor {
53
                size_t global_lane_id = 0;
54
                size_t submission_idx_on_lane = 0;
55
        };
56

57
        /// Unique identifier for an `async_lane_state`.
58
        struct async_lane_id {
59
                out_of_order_engine::target target = out_of_order_engine::target::immediate;
60
                std::optional<device_id> device;
61
                size_t local_lane_id = 0;
62
        };
63

64
        /// State for an async (fiber) lane. Keeps the active (suspended) zone as well as the queue of eagerly submitted but not yet begun zones.
65
        struct async_lane_state {
66
                async_lane_id id;
67
                const char* fiber_name = nullptr;
68
                int32_t fiber_order = 0;
69
                size_t next_submission_idx = 0;
70
                std::optional<TracyCZoneCtx> active_zone_ctx;
71
                std::deque<async_zone> zone_queue; ///< front(): currently active zone, front() + 1: zone to start immediately after front() has ended
72

73
                explicit async_lane_state(const async_lane_id& id);
74
        };
75

76
        std::vector<async_lane_state> async_lanes; // vector instead of map, because elements need to be referenced to by global lane id
77

78
        std::string last_instruction_trace; ///< written by `CELERITY_DETAIL_TRACE_INSTRUCTION()`, read by `live_executor::impl::dispatch()`
79

80
        tracy_detail::plot<int64_t> assigned_instructions_plot{"assigned instructions"};
81
        tracy_detail::plot<int64_t> assignment_queue_length_plot{"assignment queue length"};
82

83
        static instruction_info make_instruction_info(const instruction& instr);
84

85
        /// Open a Tracy zone, setting tag, color - and name, if full tracing is enabled.
86
        static TracyCZoneCtx begin_instruction_zone(const instruction_info& info, bool was_eagerly_submitted);
87

88
        /// Close a Tracy zone - after emitting the instruction trace and generic instruction info if full tracing is enabled.
89
        static void end_instruction_zone(const TracyCZoneCtx& ctx, const instruction_info& info, const std::string& trace, const async_event* opt_event = nullptr);
90

91
        /// Picks the (optionally) pre-existing lane for an in-order queue submission, or an arbitrary free lane for async unordered send/receive instructions.
92
        async_lane_cursor get_async_lane_cursor(const out_of_order_engine::assignment& assignment);
93

94
        /// Adds an async instruction to its designated lane queue; beginning a Tracy zone immediately if it is the only instruction in the queue
95
        async_lane_cursor issue_async_instruction(instruction_info&& info, const out_of_order_engine::assignment& assignment, std::string&& trace);
96

97
        /// Closes the tracy zone for an active async instruction; beginning the next queued zone in the same lane, if any.
98
        void retire_async_instruction(const async_lane_cursor& cursor, const async_event& event);
99
};
100

101

102
tracy_integration::async_lane_state::async_lane_state(const async_lane_id& id) : id(id) {
103
        switch(id.target) {
104
        case out_of_order_engine::target::immediate: {
105
                fiber_name = tracy_detail::leak_name(fmt::format("cy-async-p2p #{}", id.local_lane_id));
106
                fiber_order = tracy_detail::lane_order::send_receive_first_lane + static_cast<int32_t>(id.local_lane_id);
107
                break;
108
        }
109
        case out_of_order_engine::target::alloc_queue:
110
                fiber_name = "cy-async-alloc";
111
                fiber_order = tracy_detail::lane_order::alloc_lane;
112
                break;
113
        case out_of_order_engine::target::host_queue:
114
                fiber_name = tracy_detail::leak_name(fmt::format("cy-async-host #{}", id.local_lane_id));
115
                fiber_order = tracy_detail::lane_order::host_first_lane + static_cast<int32_t>(id.local_lane_id);
116
                break;
117
        case out_of_order_engine::target::device_queue:
118
                fiber_name = tracy_detail::leak_name(fmt::format("cy-async-device D{} #{}", id.device.value(), id.local_lane_id));
119
                fiber_order = tracy_detail::lane_order::first_device_first_lane
120
                              + static_cast<int32_t>(id.device.value()) * tracy_detail::lane_order::num_lanes_per_device + static_cast<int32_t>(id.local_lane_id);
121
                break;
122
        default: utils::unreachable();
123
        }
124
}
125

126
tracy_integration::async_lane_cursor tracy_integration::get_async_lane_cursor(const out_of_order_engine::assignment& assignment) {
127
        const auto target = assignment.target;
128
        // on alloc_queue, assignment.device signals on which device to allocate memory, not on which device to queue the instruction
129
        const auto device = assignment.target == out_of_order_engine::target::device_queue ? assignment.device : std::nullopt;
130
        // out_of_order_engine does not assign a lane for alloc_queue, but there exists a single (in-order) one which we identify as `0`,
131
        // to continue using `nullopt` to pick an arbitrary empty lane in the code below for the immediate-but-async send / receive instruction types.
132
        const auto local_lane_id = assignment.target == out_of_order_engine::target::alloc_queue ? std::optional<size_t>(0) : assignment.lane;
133

134
        size_t next_local_lane_id = 0;
135
        auto lane_it = std::find_if(async_lanes.begin(), async_lanes.end(), [&](const async_lane_state& lane) {
136
                if(lane.id.target != target || lane.id.device != device) return false;
137
                ++next_local_lane_id; // if lambda never returns true, this will identify an unused local lane id for insertion below
138
                return local_lane_id.has_value() ? lane.id.local_lane_id == *local_lane_id /* exact match */ : lane.zone_queue.empty() /* arbitrary empty lane */;
139
        });
140
        if(lane_it == async_lanes.end()) {
141
                lane_it = async_lanes.emplace(async_lanes.end(), async_lane_id{target, device, local_lane_id.value_or(next_local_lane_id)});
142
        }
143
        const auto global_lane_id = static_cast<size_t>(lane_it - async_lanes.begin());
144
        return async_lane_cursor{global_lane_id, lane_it->next_submission_idx++};
145
}
146

147
tracy_integration::instruction_info tracy_integration::make_instruction_info(const instruction& instr) {
148
        tracy_integration::instruction_info info;
149
        info.iid = instr.get_id();
150

151
        // Tracy stages zone tag and color in a static local, so we can't move TracyCZoneNC out of match statement
152
#define CELERITY_DETAIL_BEGIN_INSTRUCTION_ZONE(INSTR, COLOR)                                                                                                   \
153
        [&](const INSTR##_instruction& /* instr */) {                                                                                                              \
154
                return [](const instruction_info& info, bool was_eagerly_submitted) {                                                                                  \
155
                        TracyCZoneNC(ctx, "executor::" #INSTR, tracy::Color::COLOR, true /* active */);                                                                    \
156
                        if(tracy_detail::is_enabled_full()) {                                                                                                              \
157
                                const auto name = fmt::format("{}I{} " #INSTR, was_eagerly_submitted ? "+" : "", info.iid);                                                    \
158
                                TracyCZoneName(ctx, name.data(), name.size());                                                                                                 \
159
                        }                                                                                                                                                  \
160
                        return ctx;                                                                                                                                        \
161
                };                                                                                                                                                     \
162
        }
163

164
        info.begin_zone = matchbox::match(instr,                                   //
165
            CELERITY_DETAIL_BEGIN_INSTRUCTION_ZONE(clone_collective_group, Brown), //
166
            CELERITY_DETAIL_BEGIN_INSTRUCTION_ZONE(alloc, Turquoise),              //
167
            CELERITY_DETAIL_BEGIN_INSTRUCTION_ZONE(free, Turquoise),               //
168
            CELERITY_DETAIL_BEGIN_INSTRUCTION_ZONE(copy, Lime),                    //
169
            CELERITY_DETAIL_BEGIN_INSTRUCTION_ZONE(device_kernel, Orange),         //
170
            CELERITY_DETAIL_BEGIN_INSTRUCTION_ZONE(host_task, Orange),             //
171
            CELERITY_DETAIL_BEGIN_INSTRUCTION_ZONE(send, Violet),                  //
172
            CELERITY_DETAIL_BEGIN_INSTRUCTION_ZONE(receive, DarkViolet),           //
173
            CELERITY_DETAIL_BEGIN_INSTRUCTION_ZONE(split_receive, DarkViolet),     //
174
            CELERITY_DETAIL_BEGIN_INSTRUCTION_ZONE(await_receive, DarkViolet),     //
175
            CELERITY_DETAIL_BEGIN_INSTRUCTION_ZONE(gather_receive, DarkViolet),    //
176
            CELERITY_DETAIL_BEGIN_INSTRUCTION_ZONE(fill_identity, Blue),           //
177
            CELERITY_DETAIL_BEGIN_INSTRUCTION_ZONE(reduce, Blue),                  //
178
            CELERITY_DETAIL_BEGIN_INSTRUCTION_ZONE(fence, Blue),                   //
179
            CELERITY_DETAIL_BEGIN_INSTRUCTION_ZONE(destroy_host_object, Gray),     //
180
            CELERITY_DETAIL_BEGIN_INSTRUCTION_ZONE(horizon, Gray),                 //
181
            CELERITY_DETAIL_BEGIN_INSTRUCTION_ZONE(epoch, Gray));
182

183
#undef CELERITY_DETAIL_BEGIN_INSTRUCTION_ZONE
184

185
        CELERITY_DETAIL_IF_TRACY_ENABLED_FULL({
186
                info.dependencies = instr.get_dependencies();
187
                info.priority = instr.get_priority();
188
                info.bytes_processed = matchbox::match<std::optional<size_t>>(
189
                    instr,                                                                                                          //
190
                    [](const alloc_instruction& ainstr) { return ainstr.get_size_bytes(); },                                        //
191
                    [](const copy_instruction& cinstr) { return cinstr.get_copy_region().get_area() * cinstr.get_element_size(); }, //
192
                    [](const send_instruction& sinstr) { return sinstr.get_send_range().size() * sinstr.get_element_size(); },      //
193
                    [](const device_kernel_instruction& dkinstr) { return dkinstr.get_estimated_global_memory_traffic_bytes(); },   //
194
                    [](const auto& /* other */) { return std::nullopt; });
195
        })
196

197
        return info;
198
}
199

200
TracyCZoneCtx tracy_integration::begin_instruction_zone(const instruction_info& info, bool was_eagerly_submitted) {
201
        assert(info.begin_zone != nullptr);
202
        return info.begin_zone(info, was_eagerly_submitted);
203
}
204

205
void tracy_integration::end_instruction_zone(const TracyCZoneCtx& ctx, const instruction_info& info, const std::string& trace, const async_event* opt_event) {
206
        if(tracy_detail::is_enabled_full()) {
207
                std::string text;
208
                text.reserve(512); // Observation: typical size for kernel instructions is 200 - 300 characters
209

210
                // Dump the trace collected from CELERITY_DETAIL_TRACE_INSTRUCTION, replacing /; */ with '\n' for better legibility
211
                for(size_t trace_line_start = 0; trace_line_start < trace.size();) {
212
                        const auto trace_line_end = trace.find(';', trace_line_start);
213
                        text.append(trace, trace_line_start, trace_line_end - trace_line_start);
214
                        if(trace_line_end == std::string::npos) break;
215
                        text.push_back('\n');
216
                        trace_line_start = trace.find_first_not_of(' ', trace_line_end + 1);
217
                }
218

219
                // Dump time and throughput measures if available. We pass an `async_event*` instead of a `optional<duration>` to this function because querying
220
                // execution time of SYCL events is comparatively costly (~1µs) and can be skipped when CELERITY_TRACE=fast.
221
                if(opt_event != nullptr) {
222
                        if(const auto native_execution_time = opt_event->get_native_execution_time(); native_execution_time.has_value()) {
223
                                fmt::format_to(std::back_inserter(text), "\nnative execution time: {:.2f}", as_sub_second(*native_execution_time));
224
                                if(info.bytes_processed.has_value()) {
225
                                        fmt::format_to(std::back_inserter(text), "\nthroughput: {:.2f}", as_decimal_throughput(*info.bytes_processed, *native_execution_time));
226
                                }
227
                        }
228
                }
229

230
                for(size_t i = 0; i < info.dependencies.size(); ++i) {
231
                        text += i == 0 ? "\ndepends: " : ", ";
232
                        fmt::format_to(std::back_inserter(text), "I{}", info.dependencies[i]);
233
                }
234
                fmt::format_to(std::back_inserter(text), "\npriority: {}", info.priority);
235

236
                TracyCZoneText(ctx, text.data(), text.size());
237
        }
238

239
        TracyCZoneEnd(ctx);
240
}
241

242
tracy_integration::async_lane_cursor tracy_integration::issue_async_instruction(
243
    instruction_info&& info, const out_of_order_engine::assignment& assignment, std::string&& trace) //
244
{
245
        const auto cursor = get_async_lane_cursor(assignment);
246

247
        auto& lane = async_lanes[cursor.global_lane_id];
248
        lane.zone_queue.push_back({cursor.submission_idx_on_lane, std::move(info), std::move(trace)});
249
        auto& info_enqueued = lane.zone_queue.back().info;
250

251
        if(lane.zone_queue.size() == 1) {
252
                // zone_queue.back() == zone_queue.front(): The instruction starts immediately
253
                assert(!lane.active_zone_ctx.has_value());
254
                TracyFiberEnterHint(lane.fiber_name, lane.fiber_order);
255
                lane.active_zone_ctx = begin_instruction_zone(info_enqueued, false /* eager */);
256
                TracyFiberLeave;
257
        } else if(tracy_detail::is_enabled_full()) {
258
                // The instruction zone will be started as soon as its predecessor is retired - indicate when it was issued
259
                const auto mark = fmt::format("I{} issued", info_enqueued.iid);
260
                TracyFiberEnterHint(lane.fiber_name, lane.fiber_order);
261
                TracyMessageC(mark.data(), mark.size(), tracy::Color::DarkGray);
262
                TracyFiberLeave;
263
        }
264

265
        return cursor;
266
}
267

268
void tracy_integration::retire_async_instruction(const async_lane_cursor& cursor, const async_event& event) {
269
        auto& lane = async_lanes.at(cursor.global_lane_id);
270

271
        TracyFiberEnterHint(lane.fiber_name, lane.fiber_order);
272
        while(!lane.zone_queue.empty() && lane.zone_queue.front().submission_idx_on_lane <= cursor.submission_idx_on_lane) {
273
                // Complete the front() == active zone in the lane.
274
                {
275
                        auto& completed_zone = lane.zone_queue.front();
276
                        assert(lane.active_zone_ctx.has_value());
277
                        end_instruction_zone(*lane.active_zone_ctx, completed_zone.info, completed_zone.trace, &event);
278
                        lane.active_zone_ctx.reset();
279
                        lane.zone_queue.pop_front();
280
                }
281
                // If there remains another (eagerly issued) instruction in the queue after popping the active one, show it as having started immediately.
282
                if(!lane.zone_queue.empty()) {
283
                        auto& eagerly_following_zone = lane.zone_queue.front();
284
                        assert(!lane.active_zone_ctx.has_value());
285
                        lane.active_zone_ctx = begin_instruction_zone(eagerly_following_zone.info, true /* eager */);
286
                }
287
        }
288
        TracyFiberLeave;
289
}
290

291
#endif // CELERITY_DETAIL_ENABLE_TRACY
292

293

294
#if CELERITY_ACCESSOR_BOUNDARY_CHECK
295

296
struct boundary_check_info {
297
        struct accessor_info {
298
                detail::buffer_id buffer_id = 0;
299
                std::string buffer_name;
300
                box<3> accessible_box;
301
        };
302

303
        detail::task_type task_type;
304
        detail::task_id task_id;
305
        std::string task_name;
306

307
        oob_bounding_box* illegal_access_bounding_boxes = nullptr;
308
        std::vector<accessor_info> accessors;
309

310
        boundary_check_info(detail::task_type tt, detail::task_id tid, const std::string& task_name) : task_type(tt), task_id(tid), task_name(task_name) {}
1,994✔
311
};
312

313
#endif // CELERITY_ACCESSOR_BOUNDARY_CHECK
314

315

316
#define CELERITY_DETAIL_TRACE_INSTRUCTION(INSTR, FMT_STRING, ...)                                                                                              \
317
        CELERITY_TRACE("[executor] I{}: " FMT_STRING, INSTR.get_id(), ##__VA_ARGS__);                                                                              \
318
        CELERITY_DETAIL_IF_TRACY_ENABLED_FULL(tracy->last_instruction_trace = fmt::format(FMT_STRING, ##__VA_ARGS__))
319

320

321
struct async_instruction_state {
322
        allocation_id alloc_aid = null_allocation_id; ///< non-null iff instruction is an alloc_instruction
323
        async_event event;
324
        CELERITY_DETAIL_IF_ACCESSOR_BOUNDARY_CHECK(std::unique_ptr<boundary_check_info> oob_info;) // unique_ptr: oob_info is optional and rather large
325
        CELERITY_DETAIL_IF_TRACY_SUPPORTED(std::optional<tracy_integration::async_lane_cursor> tracy_lane_cursor;)
326
};
327

328
struct live_executor::impl {
329
        const std::unique_ptr<detail::backend> backend;
330
        communicator* const root_communicator;
331
        double_buffered_queue<submission>* const submission_queue;
332
        executor::delegate* const delegate;
333
        const live_executor::policy_set policy;
334

335
        receive_arbiter recv_arbiter{*root_communicator};
336
        out_of_order_engine engine{backend->get_system_info()};
337

338
        bool expecting_more_submissions = true; ///< shutdown epoch has not been executed yet
339
        std::unordered_map<instruction_id, async_instruction_state> in_flight_async_instructions;
340
        std::unordered_map<allocation_id, void*> allocations{{null_allocation_id, nullptr}}; ///< obtained from alloc_instruction or track_user_allocation
341
        std::unordered_map<host_object_id, std::unique_ptr<host_object_instance>> host_object_instances; ///< passed in through track_host_object_instance
342
        std::unordered_map<collective_group_id, std::unique_ptr<communicator>> cloned_communicators;     ///< transitive clones of root_communicator
343
        std::unordered_map<reduction_id, std::unique_ptr<reducer>> reducers; ///< passed in through track_reducer, erased on epochs / horizons
344

345
        std::optional<std::chrono::steady_clock::time_point> last_progress_timestamp; ///< last successful call to check_progress
346
        bool made_progress = false;                                                   ///< progress was made since `last_progress_timestamp`
347
        bool progress_warning_emitted = false;                                        ///< no progress was made since warning was emitted
348

349
        bool scheduler_is_idle = true;                  ///< scheduler is currently not producing new instructions
350
        std::atomic_uint64_t total_starvation_time = 0; ///< total time spent waiting for new instructions while scheduler was busy, in nanoseconds
351
        std::atomic_uint64_t total_active_time = 0;     ///< total time spent processing instructions, in nanoseconds
352

353
        CELERITY_DETAIL_IF_TRACY_SUPPORTED(std::unique_ptr<tracy_integration> tracy;)
354

355
        impl(std::unique_ptr<detail::backend> backend, communicator* root_comm, double_buffered_queue<submission>& submission_queue, executor::delegate* dlg,
356
            const live_executor::policy_set& policy);
357

358
        void run();
359
        void poll_in_flight_async_instructions();
360
        void poll_submission_queue();
361
        void try_issue_one_instruction();
362
        void retire_async_instruction(instruction_id iid, async_instruction_state& async);
363
        void check_progress();
364

365
        // Instruction types that complete synchronously within the executor.
366
        void issue(const clone_collective_group_instruction& ccginstr);
367
        void issue(const split_receive_instruction& srinstr);
368
        void issue(const fill_identity_instruction& fiinstr);
369
        void issue(const reduce_instruction& rinstr);
370
        void issue(const fence_instruction& finstr);
371
        void issue(const destroy_host_object_instruction& dhoinstr);
372
        void issue(const horizon_instruction& hinstr);
373
        void issue(const epoch_instruction& einstr);
374

375
        template <typename Instr>
376
        auto dispatch(const Instr& instr, const out_of_order_engine::assignment& assignment)
377
            // SFINAE: there is a (synchronous) `issue` overload above for the concrete Instr type
378
            -> decltype(issue(instr));
379

380
        // Instruction types that complete asynchronously via async_event, outside the executor.
381
        void issue_async(const alloc_instruction& ainstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
382
        void issue_async(const free_instruction& finstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
383
        void issue_async(const copy_instruction& cinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
384
        void issue_async(const device_kernel_instruction& dkinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
385
        void issue_async(const host_task_instruction& htinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
386
        void issue_async(const send_instruction& sinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
387
        void issue_async(const receive_instruction& rinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
388
        void issue_async(const await_receive_instruction& arinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
389
        void issue_async(const gather_receive_instruction& grinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
390

391
        template <typename Instr>
392
        auto dispatch(const Instr& instr, const out_of_order_engine::assignment& assignment)
393
            // SFINAE: there is an `issue_async` overload above for the concrete Instr type
394
            -> decltype(issue_async(instr, assignment, std::declval<async_instruction_state&>()));
395

396
        std::vector<closure_hydrator::accessor_info> make_accessor_infos(const buffer_access_allocation_map& amap) const;
397

398
#if CELERITY_ACCESSOR_BOUNDARY_CHECK
399
        std::unique_ptr<boundary_check_info> attach_boundary_check_info(std::vector<closure_hydrator::accessor_info>& accessor_infos,
400
            const buffer_access_allocation_map& amap, task_type tt, task_id tid, const std::string& task_name) const;
401
#endif
402

403
        void collect(const instruction_garbage& garbage);
404
};
405

406
live_executor::impl::impl(std::unique_ptr<detail::backend> backend, communicator* const root_comm, double_buffered_queue<submission>& submission_queue,
250✔
407
    executor::delegate* const dlg, const live_executor::policy_set& policy)
250✔
408
    : backend(std::move(backend)), root_communicator(root_comm), submission_queue(&submission_queue), delegate(dlg), policy(policy) //
750✔
409
{
410
        CELERITY_DETAIL_IF_TRACY_ENABLED(tracy = std::make_unique<tracy_integration>();)
411
}
250✔
412

413
void live_executor::impl::run() {
250✔
414
        // this closure hydrator instantiation is not necessary in normal execution iff device submission threads are enabled,
415
        // but it is still required for testing purposes, so always making it available on this thread is the simplest solution
416
        closure_hydrator::make_available();
250✔
417
        backend->init();
250✔
418

419
        uint8_t check_overflow_counter = 0;
250✔
420
        std::optional<std::chrono::steady_clock::time_point> active_since;
250✔
421
        for(;;) {
422
                if(engine.is_idle()) {
45,501,643✔
423
                        if(active_since.has_value()) {
4,704✔
424
                                total_active_time += std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now() - *active_since).count();
4,454✔
425
                        }
426
                        if(!expecting_more_submissions) break; // shutdown complete
4,704✔
427

428
                        if(scheduler_is_idle) {
4,454✔
429
                                CELERITY_DETAIL_TRACY_ZONE_SCOPED("executor::idle", executor_idle);
430
                                submission_queue->wait_while_empty(); // scheduler is idle, suspend thread until new tasks are submitted
2,083✔
431
                        } else {
432
                                CELERITY_DETAIL_TRACY_ZONE_SCOPED("executor::starve", executor_starve);
433
                                const auto before = std::chrono::steady_clock::now();
2,371✔
434
                                submission_queue->wait_while_empty(); // we are being starved by scheduler, suspend thread
2,371✔
435
                                const auto after = std::chrono::steady_clock::now();
2,371✔
436
                                total_starvation_time += std::chrono::duration_cast<std::chrono::nanoseconds>(after - before).count();
2,371✔
437
                        }
438
                        last_progress_timestamp.reset(); // do not treat suspension as being stuck
4,454✔
439
                        active_since = std::chrono::steady_clock::now();
4,454✔
440
                }
441

442
                recv_arbiter.poll_communicator();
45,501,393✔
443
                poll_in_flight_async_instructions();
45,501,393✔
444
                poll_submission_queue();
45,501,393✔
445
                try_issue_one_instruction(); // potentially expensive, so only issue one per loop to continue checking for async completion in between
45,501,393✔
446

447
                if(++check_overflow_counter == 0) { // once every 256 iterations
45,501,393✔
448
                        backend->check_async_errors();
177,647✔
449
                        check_progress();
177,647✔
450
                }
451
        }
45,501,393✔
452

453
        assert(in_flight_async_instructions.empty());
250✔
454
        // check that for each alloc_instruction, we executed a corresponding free_instruction
455
        assert(std::all_of(allocations.begin(), allocations.end(),
500✔
456
            [](const std::pair<allocation_id, void*>& p) { return p.first == null_allocation_id || p.first.get_memory_id() == user_memory_id; }));
457
        // check that for each track_host_object_instance, we executed a destroy_host_object_instruction
458
        assert(host_object_instances.empty());
250✔
459

460
        closure_hydrator::teardown();
250✔
461
}
250✔
462

463
void live_executor::impl::poll_in_flight_async_instructions() {
45,501,393✔
464
        // collect completed instruction ids up-front, since retire_async_instruction would alter the execution front
465
        std::vector<instruction_id> completed_now; // std::vector because it will be empty in the common case
45,501,393✔
466
        for(const auto iid : engine.get_execution_front()) {
184,926,974✔
467
                if(in_flight_async_instructions.at(iid).event.is_complete()) { completed_now.push_back(iid); }
139,425,581✔
468
        }
469
        for(const auto iid : completed_now) {
45,508,558✔
470
                retire_async_instruction(iid, in_flight_async_instructions.at(iid));
7,165✔
471
                in_flight_async_instructions.erase(iid);
7,165✔
472
                made_progress = true;
7,165✔
473
        }
474

475
        CELERITY_DETAIL_IF_TRACY_ENABLED(tracy->assigned_instructions_plot.update(in_flight_async_instructions.size()));
476
}
91,002,786✔
477

478
void live_executor::impl::poll_submission_queue() {
45,501,393✔
479
        for(auto& submission : submission_queue->pop_all()) {
45,509,782✔
480
                CELERITY_DETAIL_TRACY_ZONE_SCOPED("executor::fetch", executor_fetch);
481
                matchbox::match(
8,389✔
482
                    submission,
483
                    [&](const instruction_pilot_batch& batch) {
8,389✔
484
                            for(const auto incoming_instr : batch.instructions) {
12,862✔
485
                                    engine.submit(incoming_instr);
8,808✔
486
                            }
487
                            for(const auto& pilot : batch.pilots) {
4,716✔
488
                                    root_communicator->send_outbound_pilot(pilot);
662✔
489
                            }
490
                            CELERITY_DETAIL_IF_TRACY_ENABLED(tracy->assignment_queue_length_plot.update(engine.get_assignment_queue_length()));
491
                    },
4,054✔
492
                    [&](const user_allocation_transfer& uat) {
16,778✔
493
                            assert(uat.aid != null_allocation_id);
113✔
494
                            assert(uat.aid.get_memory_id() == user_memory_id);
113✔
495
                            assert(allocations.count(uat.aid) == 0);
113✔
496
                            allocations.emplace(uat.aid, uat.ptr);
113✔
497
                    },
113✔
498
                    [&](host_object_transfer& hot) {
16,778✔
499
                            assert(host_object_instances.count(hot.hoid) == 0);
30✔
500
                            host_object_instances.emplace(hot.hoid, std::move(hot.instance));
30✔
501
                    },
30✔
502
                    [&](reducer_transfer& rt) {
16,778✔
503
                            assert(reducers.count(rt.rid) == 0);
68✔
504
                            reducers.emplace(rt.rid, std::move(rt.reduction));
68✔
505
                    },
68✔
506
                    [&](const scheduler_idle_state_change& state) { //
16,778✔
507
                            scheduler_is_idle = state.is_idle;
4,124✔
508
                    });
4,124✔
509
        }
510
}
45,501,393✔
511

512
void live_executor::impl::retire_async_instruction(const instruction_id iid, async_instruction_state& async) {
7,165✔
513
        CELERITY_DETAIL_TRACY_ZONE_SCOPED("executor::retire", executor_retire);
514

515
#if CELERITY_ACCESSOR_BOUNDARY_CHECK
516
        if(async.oob_info != nullptr) {
7,165✔
517
                CELERITY_DETAIL_TRACY_ZONE_SCOPED_V("executor::oob_check", executor_oob_check, "I{} bounds check", iid);
518
                const auto& oob_info = *async.oob_info;
1,994✔
519
                for(size_t i = 0; i < oob_info.accessors.size(); ++i) {
4,658✔
520
                        if(const auto oob_box = oob_info.illegal_access_bounding_boxes[i].into_box(); !oob_box.empty()) {
2,664✔
521
                                const auto& accessor_info = oob_info.accessors[i];
12✔
522
                                CELERITY_ERROR("Out-of-bounds access detected in {}: accessor {} attempted to access buffer {} indicies between {} and outside the "
12✔
523
                                               "declared range {}.",
524
                                    utils::make_task_debug_label(oob_info.task_type, oob_info.task_id, oob_info.task_name), i,
525
                                    utils::make_buffer_debug_label(accessor_info.buffer_id, accessor_info.buffer_name), oob_box, accessor_info.accessible_box);
526
                        }
527
                }
528
                if(oob_info.illegal_access_bounding_boxes != nullptr /* i.e. there was at least one accessor */) {
1,994!
529
                        backend->debug_free(oob_info.illegal_access_bounding_boxes);
1,994✔
530
                }
531
        }
532
#endif
533

534
        if(spdlog::should_log(spdlog::level::trace)) {
7,165✔
535
                if(const auto native_time = async.event.get_native_execution_time(); native_time.has_value()) {
3,642!
UNCOV
536
                        CELERITY_TRACE("[executor] retired I{} after {:.2f}", iid, as_sub_second(*native_time));
×
537
                } else {
538
                        CELERITY_TRACE("[executor] retired I{}", iid);
3,642!
539
                }
540
        }
541

542
        CELERITY_DETAIL_IF_TRACY_ENABLED(tracy->retire_async_instruction(*async.tracy_lane_cursor, async.event));
543

544
        if(async.alloc_aid != null_allocation_id) {
7,165✔
545
                const auto ptr = async.event.get_result();
933✔
546
                assert(ptr != nullptr && "backend allocation returned nullptr");
933✔
547
                CELERITY_TRACE("[executor] {} allocated as {}", async.alloc_aid, ptr);
933!
548
                assert(allocations.count(async.alloc_aid) == 0);
933✔
549
                allocations.emplace(async.alloc_aid, ptr);
933✔
550
        }
551

552
        engine.complete_assigned(iid);
7,165✔
553

554
        CELERITY_DETAIL_IF_TRACY_ENABLED(tracy->assignment_queue_length_plot.update(engine.get_assignment_queue_length()));
555
}
7,165✔
556

557
template <typename Instr>
558
auto live_executor::impl::dispatch(const Instr& instr, const out_of_order_engine::assignment& assignment)
1,643✔
559
    // SFINAE: there is a (synchronous) `issue` overload above for the concrete Instr type
560
    -> decltype(issue(instr)) //
561
{
562
        assert(assignment.target == out_of_order_engine::target::immediate);
1,643✔
563
        assert(!assignment.lane.has_value());
1,643✔
564

565
        const auto iid = instr.get_id(); // instr may dangle after issue()
1,643✔
566

567
        CELERITY_DETAIL_IF_TRACY_SUPPORTED(TracyCZoneCtx ctx;)
568
        CELERITY_DETAIL_IF_TRACY_SUPPORTED(tracy_integration::instruction_info info);
569
        CELERITY_DETAIL_IF_TRACY_ENABLED({
570
                info = tracy_integration::make_instruction_info(instr);
571
                TracyFiberEnterHint("cy-immediate", tracy_detail::lane_order::immediate_lane);
572
                ctx = tracy->begin_instruction_zone(info, false /* eager */);
573
                tracy->assigned_instructions_plot.update(in_flight_async_instructions.size() + 1);
574
        })
575

576
        issue(instr); // completes immediately - instr may now dangle
1,643✔
577

578
        CELERITY_DETAIL_IF_TRACY_ENABLED({
579
                tracy->end_instruction_zone(ctx, info, tracy->last_instruction_trace);
580
                TracyFiberLeave;
581
        })
582

583
        engine.complete_assigned(iid);
1,643✔
584

585
        CELERITY_DETAIL_IF_TRACY_ENABLED({
586
                tracy->assigned_instructions_plot.update(in_flight_async_instructions.size());
587
                tracy->assignment_queue_length_plot.update(engine.get_assignment_queue_length());
588
        })
589
}
1,643✔
590

591
template <typename Instr>
592
auto live_executor::impl::dispatch(const Instr& instr, const out_of_order_engine::assignment& assignment)
7,165✔
593
    // SFINAE: there is an `issue_async` overload above for the concrete Instr type
594
    -> decltype(issue_async(instr, assignment, std::declval<async_instruction_state&>())) //
595
{
596
        CELERITY_DETAIL_IF_TRACY_SUPPORTED(tracy_integration::instruction_info info);
597
        CELERITY_DETAIL_IF_TRACY_ENABLED(info = tracy_integration::make_instruction_info(instr));
598

599
        auto& async = in_flight_async_instructions.emplace(assignment.instruction->get_id(), async_instruction_state{}).first->second;
14,330✔
600
        issue_async(instr, assignment, async); // stores event in `async` and completes asynchronously
7,165✔
601
        // instr may now dangle
602

603
        CELERITY_DETAIL_IF_TRACY_ENABLED({
604
                async.tracy_lane_cursor = tracy->issue_async_instruction(std::move(info), assignment, std::move(tracy->last_instruction_trace));
605
                tracy->assigned_instructions_plot.update(in_flight_async_instructions.size());
606
        })
607
}
14,330✔
608

609
void live_executor::impl::try_issue_one_instruction() {
45,501,393✔
610
        auto assignment = engine.assign_one();
45,501,393✔
611
        if(!assignment.has_value()) return;
45,501,393✔
612

613
        CELERITY_DETAIL_IF_TRACY_ENABLED(tracy->assignment_queue_length_plot.update(engine.get_assignment_queue_length()));
614

615
        CELERITY_DETAIL_TRACY_ZONE_SCOPED("executor::issue", executor_issue);
616
        matchbox::match(*assignment->instruction, [&](const auto& instr) { dispatch(instr, *assignment); });
17,616✔
617
        made_progress = true;
8,808✔
618
}
619

620
void live_executor::impl::check_progress() {
177,647✔
621
        if(!policy.progress_warning_timeout.has_value()) return;
177,647!
622

623
        if(made_progress) {
177,647✔
624
                last_progress_timestamp = std::chrono::steady_clock::now();
1,047✔
625
                progress_warning_emitted = false;
1,047✔
626
                made_progress = false;
1,047✔
627
        } else if(last_progress_timestamp.has_value()) {
176,600!
628
                // being stuck either means a deadlock in the user application, or a bug in Celerity.
629
                const auto elapsed_since_last_progress = std::chrono::steady_clock::now() - *last_progress_timestamp;
176,600✔
630
                if(elapsed_since_last_progress > *policy.progress_warning_timeout && !progress_warning_emitted) {
176,600✔
631
                        std::string instr_list;
1✔
632
                        for(auto& [iid, async] : in_flight_async_instructions) {
2✔
633
                                if(!instr_list.empty()) instr_list += ", ";
1!
634
                                fmt::format_to(std::back_inserter(instr_list), "I{}", iid);
1✔
635
                        }
636
                        CELERITY_WARN("[executor] no progress for {:.0f}, might be stuck. Active instructions: {}", as_sub_second(elapsed_since_last_progress),
1✔
637
                            in_flight_async_instructions.empty() ? "none" : instr_list);
638
                        progress_warning_emitted = true;
1✔
639
                }
1✔
640
        }
641
}
642

643
void live_executor::impl::issue(const clone_collective_group_instruction& ccginstr) {
33✔
644
        const auto original_cgid = ccginstr.get_original_collective_group_id();
33✔
645
        assert(original_cgid != non_collective_group_id);
33✔
646
        assert(original_cgid == root_collective_group_id || cloned_communicators.count(original_cgid) != 0);
33✔
647

648
        const auto new_cgid = ccginstr.get_new_collective_group_id();
33✔
649
        assert(new_cgid != non_collective_group_id && new_cgid != root_collective_group_id);
33✔
650
        assert(cloned_communicators.count(new_cgid) == 0);
33✔
651

652
        CELERITY_DETAIL_TRACE_INSTRUCTION(ccginstr, "clone collective group CG{} -> CG{}", original_cgid, new_cgid);
33✔
653

654
        const auto original_communicator = original_cgid == root_collective_group_id ? root_communicator : cloned_communicators.at(original_cgid).get();
33✔
655
        cloned_communicators.emplace(new_cgid, original_communicator->collective_clone());
33✔
656
}
33✔
657

658

659
void live_executor::impl::issue(const split_receive_instruction& srinstr) {
20✔
660
        CELERITY_DETAIL_TRACE_INSTRUCTION(srinstr, "split receive {} {}x{} bytes into {} ({}),", srinstr.get_transfer_id(), srinstr.get_requested_region(),
20✔
661
            srinstr.get_element_size(), srinstr.get_dest_allocation_id(), srinstr.get_allocated_box());
662

663
        const auto allocation = allocations.at(srinstr.get_dest_allocation_id());
20✔
664
        recv_arbiter.begin_split_receive(
20✔
665
            srinstr.get_transfer_id(), srinstr.get_requested_region(), allocation, srinstr.get_allocated_box(), srinstr.get_element_size());
666
}
20✔
667

668
void live_executor::impl::issue(const fill_identity_instruction& fiinstr) {
19✔
669
        CELERITY_DETAIL_TRACE_INSTRUCTION(
19✔
670
            fiinstr, "fill identity {} x{} values for R{}", fiinstr.get_allocation_id(), fiinstr.get_num_values(), fiinstr.get_reduction_id());
671

672
        const auto allocation = allocations.at(fiinstr.get_allocation_id());
19✔
673
        const auto& reduction = *reducers.at(fiinstr.get_reduction_id());
19✔
674
        reduction.fill_identity(allocation, fiinstr.get_num_values());
19✔
675
}
19✔
676

677
void live_executor::impl::issue(const reduce_instruction& rinstr) {
47✔
678
        CELERITY_DETAIL_TRACE_INSTRUCTION(rinstr, "reduce {} x{} values into {} for R{}", rinstr.get_source_allocation_id(), rinstr.get_num_source_values(),
47✔
679
            rinstr.get_dest_allocation_id(), rinstr.get_reduction_id());
680

681
        const auto gather_allocation = allocations.at(rinstr.get_source_allocation_id());
47✔
682
        const auto dest_allocation = allocations.at(rinstr.get_dest_allocation_id());
47✔
683
        const auto& reduction = *reducers.at(rinstr.get_reduction_id());
47✔
684
        reduction.reduce(dest_allocation, gather_allocation, rinstr.get_num_source_values());
47✔
685
}
47✔
686

687
void live_executor::impl::issue(
66✔
688
    const fence_instruction& finstr) { // NOLINT(readability-make-member-function-const, readability-convert-member-functions-to-static)
689
        CELERITY_DETAIL_TRACE_INSTRUCTION(finstr, "fence");
66✔
690

691
        finstr.get_promise()->fulfill();
66✔
692
}
66✔
693

694
void live_executor::impl::issue(const destroy_host_object_instruction& dhoinstr) {
30✔
695
        assert(host_object_instances.count(dhoinstr.get_host_object_id()) != 0);
30✔
696
        CELERITY_DETAIL_TRACE_INSTRUCTION(dhoinstr, "destroy H{}", dhoinstr.get_host_object_id());
30✔
697

698
        host_object_instances.erase(dhoinstr.get_host_object_id());
30✔
699
}
30✔
700

701
void live_executor::impl::issue(const horizon_instruction& hinstr) {
582✔
702
        CELERITY_DETAIL_TRACE_INSTRUCTION(hinstr, "horizon");
582✔
703

704
        if(delegate != nullptr) { delegate->horizon_reached(hinstr.get_horizon_task_id()); }
582!
705
        collect(hinstr.get_garbage());
582✔
706

707
        CELERITY_DETAIL_IF_TRACY_ENABLED(FrameMarkNamed("Horizon"));
708
}
582✔
709

710
void live_executor::impl::issue(const epoch_instruction& einstr) {
846✔
711
        switch(einstr.get_epoch_action()) {
846!
712
        case epoch_action::none: //
343✔
713
                CELERITY_DETAIL_TRACE_INSTRUCTION(einstr, "epoch");
343✔
714
                break;
343✔
715
        case epoch_action::init: //
250✔
716
                CELERITY_DETAIL_TRACE_INSTRUCTION(einstr, "epoch (init)");
250✔
717
                break;
250✔
718
        case epoch_action::barrier: //
3✔
719
                CELERITY_DETAIL_TRACE_INSTRUCTION(einstr, "epoch (barrier)");
3✔
720
                root_communicator->collective_barrier();
3✔
721
                break;
3✔
722
        case epoch_action::shutdown: //
250✔
723
                CELERITY_DETAIL_TRACE_INSTRUCTION(einstr, "epoch (shutdown)");
250✔
724
                expecting_more_submissions = false;
250✔
725
                break;
250✔
726
        }
727

728
        // Update the runtime last-epoch *before* fulfilling the promise to ensure that the new state can be observed as soon as runtime::sync returns.
729
        // This in turn allows the TDAG to be pruned before any new work is submitted after the epoch.
730
        if(delegate != nullptr) { delegate->epoch_reached(einstr.get_epoch_task_id()); }
846!
731

732
        if(einstr.get_promise() != nullptr) { einstr.get_promise()->fulfill(); }
846✔
733
        collect(einstr.get_garbage());
846✔
734

735
        CELERITY_DETAIL_IF_TRACY_ENABLED(FrameMarkNamed("Horizon"));
736
        CELERITY_DETAIL_IF_TRACY_ENABLED(FrameMark); // top-level "Frame"
737
}
846✔
738

739
void live_executor::impl::issue_async(const alloc_instruction& ainstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async) {
933✔
740
        assert(ainstr.get_allocation_id().get_memory_id() != user_memory_id);
933✔
741
        assert(assignment.target == out_of_order_engine::target::alloc_queue);
933✔
742
        assert(!assignment.lane.has_value());
933✔
743
        assert(assignment.device.has_value() == (ainstr.get_allocation_id().get_memory_id() > host_memory_id));
933✔
744

745
        CELERITY_DETAIL_TRACE_INSTRUCTION(ainstr, "alloc {}, {} % {} bytes", ainstr.get_allocation_id(), ainstr.get_size_bytes(), ainstr.get_alignment_bytes());
933✔
746

747
        if(assignment.device.has_value()) {
933✔
748
                async.event = backend->enqueue_device_alloc(*assignment.device, ainstr.get_size_bytes(), ainstr.get_alignment_bytes());
501✔
749
        } else {
750
                async.event = backend->enqueue_host_alloc(ainstr.get_size_bytes(), ainstr.get_alignment_bytes());
432✔
751
        }
752
        async.alloc_aid = ainstr.get_allocation_id(); // setting alloc_aid != null will make `retire_async_instruction` insert the result into `allocations`
933✔
753
}
933✔
754

755
void live_executor::impl::issue_async(const free_instruction& finstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async) {
933✔
756
        const auto it = allocations.find(finstr.get_allocation_id());
933✔
757
        assert(it != allocations.end());
933✔
758
        const auto ptr = it->second;
933✔
759
        allocations.erase(it);
933✔
760

761
        CELERITY_DETAIL_TRACE_INSTRUCTION(finstr, "free {}", finstr.get_allocation_id());
933✔
762

763
        if(assignment.device.has_value()) {
933✔
764
                async.event = backend->enqueue_device_free(*assignment.device, ptr);
501✔
765
        } else {
766
                async.event = backend->enqueue_host_free(ptr);
432✔
767
        }
768
}
933✔
769

770
void live_executor::impl::issue_async(const copy_instruction& cinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async) {
2,165✔
771
        CELERITY_DETAIL_TRACY_ZONE_SCOPED("executor::issue_copy", executor_issue_copy);
772

773
        assert(assignment.target == out_of_order_engine::target::host_queue || assignment.target == out_of_order_engine::target::device_queue);
2,165✔
774
        assert((assignment.target == out_of_order_engine::target::device_queue) == assignment.device.has_value());
2,165✔
775
        assert(assignment.lane.has_value());
2,165✔
776

777
        CELERITY_DETAIL_TRACE_INSTRUCTION(cinstr, "copy {} ({}) -> {} ({}); {}x{} bytes, {} bytes total", cinstr.get_source_allocation_id(),
2,165✔
778
            cinstr.get_source_layout(), cinstr.get_dest_allocation_id(), cinstr.get_dest_layout(), cinstr.get_copy_region(), cinstr.get_element_size(),
779
            cinstr.get_copy_region().get_area() * cinstr.get_element_size());
780

781
        const auto source_base = allocations.at(cinstr.get_source_allocation_id());
2,165✔
782
        const auto dest_base = allocations.at(cinstr.get_dest_allocation_id());
2,165✔
783

784
        if(assignment.device.has_value()) {
2,165✔
785
                async.event = backend->enqueue_device_copy(*assignment.device, *assignment.lane, source_base, dest_base, cinstr.get_source_layout(),
4,052✔
786
                    cinstr.get_dest_layout(), cinstr.get_copy_region(), cinstr.get_element_size());
2,026✔
787
        } else {
788
                async.event = backend->enqueue_host_copy(*assignment.lane, source_base, dest_base, cinstr.get_source_layout(), cinstr.get_dest_layout(),
278✔
789
                    cinstr.get_copy_region(), cinstr.get_element_size());
139✔
790
        }
791
}
2,165✔
792

793
std::string format_access_log(const buffer_access_allocation_map& map) {
1,495✔
794
        std::string acc_log;
1,495✔
795
        for(size_t i = 0; i < map.size(); ++i) {
3,053✔
796
                auto& aa = map[i];
1,558✔
797
                const auto accessed_bounding_box_in_allocation = box(aa.accessed_bounding_box_in_buffer.get_min() - aa.allocated_box_in_buffer.get_offset(),
4,674✔
798
                    aa.accessed_bounding_box_in_buffer.get_max() - aa.allocated_box_in_buffer.get_offset());
4,674✔
799
                fmt::format_to(std::back_inserter(acc_log), "{} {} {}", i == 0 ? "; accessing" : ",", aa.allocation_id, accessed_bounding_box_in_allocation);
1,558✔
800
        }
801
        return acc_log;
1,495✔
UNCOV
802
}
×
803

804
void live_executor::impl::issue_async(
918✔
805
    const device_kernel_instruction& dkinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async) {
806
        CELERITY_DETAIL_TRACY_ZONE_SCOPED("executor::issue_device_kernel", executor_issue_device_kernel);
807

808
        assert(assignment.target == out_of_order_engine::target::device_queue);
918✔
809
        assert(assignment.device == dkinstr.get_device_id());
918✔
810
        assert(assignment.lane.has_value());
918✔
811

812
        CELERITY_DETAIL_TRACE_INSTRUCTION(dkinstr, "device kernel on D{}, {}{}; estimated global memory traffic: {:.2f}", dkinstr.get_device_id(),
918✔
813
            dkinstr.get_execution_range(), format_access_log(dkinstr.get_access_allocations()),
814
            as_decimal_size(dkinstr.get_estimated_global_memory_traffic_bytes()));
815

816
        auto accessor_infos = make_accessor_infos(dkinstr.get_access_allocations());
918✔
817
#if CELERITY_ACCESSOR_BOUNDARY_CHECK
818
        async.oob_info = attach_boundary_check_info(
1,836✔
819
            accessor_infos, dkinstr.get_access_allocations(), dkinstr.get_oob_task_type(), dkinstr.get_oob_task_id(), dkinstr.get_oob_task_name());
918✔
820
#endif
821

822
        const auto& reduction_allocs = dkinstr.get_reduction_allocations();
918✔
823
        std::vector<void*> reduction_ptrs(reduction_allocs.size());
2,754✔
824
        for(size_t i = 0; i < reduction_allocs.size(); ++i) {
1,023✔
825
                reduction_ptrs[i] = allocations.at(reduction_allocs[i].allocation_id);
105✔
826
        }
827

828
        async.event = backend->enqueue_device_kernel(
3,672✔
829
            dkinstr.get_device_id(), *assignment.lane, dkinstr.get_launcher(), std::move(accessor_infos), dkinstr.get_execution_range(), reduction_ptrs);
2,754✔
830
}
1,836✔
831

832
void live_executor::impl::issue_async(const host_task_instruction& htinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async) {
1,182✔
833
        assert(assignment.target == out_of_order_engine::target::host_queue);
1,182✔
834
        assert(!assignment.device.has_value());
1,182✔
835
        assert(assignment.lane.has_value());
1,182✔
836

837
        CELERITY_DETAIL_TRACE_INSTRUCTION(htinstr, "host task, {}{}", htinstr.get_execution_range(), format_access_log(htinstr.get_access_allocations()));
1,182✔
838

839
        auto accessor_infos = make_accessor_infos(htinstr.get_access_allocations());
1,182✔
840
#if CELERITY_ACCESSOR_BOUNDARY_CHECK
841
        async.oob_info = attach_boundary_check_info(
2,364✔
842
            accessor_infos, htinstr.get_access_allocations(), htinstr.get_oob_task_type(), htinstr.get_oob_task_id(), htinstr.get_oob_task_name());
1,182✔
843
#endif
844

845
        const auto collective_comm =
846
            htinstr.get_collective_group_id() != non_collective_group_id ? cloned_communicators.at(htinstr.get_collective_group_id()).get() : nullptr;
1,182!
847

848
        async.event = backend->enqueue_host_task(
4,728✔
849
            *assignment.lane, htinstr.get_launcher(), std::move(accessor_infos), htinstr.get_global_range(), htinstr.get_execution_range(), collective_comm);
3,546✔
850
}
2,364✔
851

852
void live_executor::impl::issue_async(
662✔
853
    const send_instruction& sinstr, [[maybe_unused]] const out_of_order_engine::assignment& assignment, async_instruction_state& async) //
854
{
855
        assert(assignment.target == out_of_order_engine::target::immediate);
662✔
856

857
        CELERITY_DETAIL_TRACE_INSTRUCTION(sinstr, "send {}+{}, {}x{} bytes to N{} (MSG{})", sinstr.get_source_allocation_id(),
662✔
858
            sinstr.get_offset_in_source_allocation(), sinstr.get_send_range(), sinstr.get_element_size(), sinstr.get_dest_node_id(), sinstr.get_message_id());
859

860
        const auto allocation_base = allocations.at(sinstr.get_source_allocation_id());
662✔
861
        const communicator::stride stride{
662✔
862
            sinstr.get_source_allocation_range(),
662✔
863
            subrange<3>{sinstr.get_offset_in_source_allocation(), sinstr.get_send_range()},
864
            sinstr.get_element_size(),
662✔
865
        };
662✔
866
        async.event = root_communicator->send_payload(sinstr.get_dest_node_id(), sinstr.get_message_id(), allocation_base, stride);
662✔
867
}
662✔
868

869
void live_executor::impl::issue_async(
306✔
870
    const receive_instruction& rinstr, [[maybe_unused]] const out_of_order_engine::assignment& assignment, async_instruction_state& async) //
871
{
872
        assert(assignment.target == out_of_order_engine::target::immediate);
306✔
873

874
        CELERITY_DETAIL_TRACE_INSTRUCTION(rinstr, "receive {} {}x{} bytes into {} ({})", rinstr.get_transfer_id(), rinstr.get_requested_region(),
306✔
875
            rinstr.get_element_size(), rinstr.get_dest_allocation_id(), rinstr.get_allocated_box());
876

877
        const auto allocation = allocations.at(rinstr.get_dest_allocation_id());
306✔
878
        async.event =
306✔
879
            recv_arbiter.receive(rinstr.get_transfer_id(), rinstr.get_requested_region(), allocation, rinstr.get_allocated_box(), rinstr.get_element_size());
306✔
880
}
306✔
881

882
void live_executor::impl::issue_async(
48✔
883
    const await_receive_instruction& arinstr, [[maybe_unused]] const out_of_order_engine::assignment& assignment, async_instruction_state& async) //
884
{
885
        assert(assignment.target == out_of_order_engine::target::immediate);
48✔
886

887
        CELERITY_DETAIL_TRACE_INSTRUCTION(arinstr, "await receive {} {}", arinstr.get_transfer_id(), arinstr.get_received_region());
48✔
888

889
        async.event = recv_arbiter.await_split_receive_subregion(arinstr.get_transfer_id(), arinstr.get_received_region());
48✔
890
}
48✔
891

892
void live_executor::impl::issue_async(
18✔
893
    const gather_receive_instruction& grinstr, [[maybe_unused]] const out_of_order_engine::assignment& assignment, async_instruction_state& async) //
894
{
895
        assert(assignment.target == out_of_order_engine::target::immediate);
18✔
896

897
        CELERITY_DETAIL_TRACE_INSTRUCTION(
18✔
898
            grinstr, "gather receive {} into {}, {} bytes / node", grinstr.get_transfer_id(), grinstr.get_dest_allocation_id(), grinstr.get_node_chunk_size());
899

900
        const auto allocation = allocations.at(grinstr.get_dest_allocation_id());
18✔
901
        async.event = recv_arbiter.gather_receive(grinstr.get_transfer_id(), allocation, grinstr.get_node_chunk_size());
18✔
902
}
18✔
903

904
void live_executor::impl::collect(const instruction_garbage& garbage) {
1,428✔
905
        for(const auto rid : garbage.reductions) {
1,495✔
906
                assert(reducers.count(rid) != 0);
67✔
907
                reducers.erase(rid);
67✔
908
        }
909
        for(const auto aid : garbage.user_allocations) {
1,541✔
910
                assert(aid.get_memory_id() == user_memory_id);
113✔
911
                assert(allocations.count(aid) != 0);
113✔
912
                allocations.erase(aid);
113✔
913
        }
914
}
1,428✔
915

916
std::vector<closure_hydrator::accessor_info> live_executor::impl::make_accessor_infos(const buffer_access_allocation_map& amap) const {
2,100✔
917
        CELERITY_DETAIL_TRACY_ZONE_SCOPED("executor::make_accessor_info", executor_make_accessor_info);
918

919
        std::vector<closure_hydrator::accessor_info> accessor_infos(amap.size());
6,300✔
920
        for(size_t i = 0; i < amap.size(); ++i) {
4,764✔
921
                const auto ptr = allocations.at(amap[i].allocation_id);
2,664✔
922
                accessor_infos[i] = closure_hydrator::accessor_info{ptr, amap[i].allocated_box_in_buffer, amap[i].accessed_bounding_box_in_buffer};
2,664✔
923
        }
924
        return accessor_infos;
2,100✔
UNCOV
925
}
×
926

927
#if CELERITY_ACCESSOR_BOUNDARY_CHECK
928
std::unique_ptr<boundary_check_info> live_executor::impl::attach_boundary_check_info(std::vector<closure_hydrator::accessor_info>& accessor_infos,
2,100✔
929
    const buffer_access_allocation_map& amap, task_type tt, task_id tid, const std::string& task_name) const //
930
{
931
        if(amap.empty()) return nullptr;
2,100✔
932

933
        CELERITY_DETAIL_TRACY_ZONE_SCOPED("executor::oob_init", executor_oob_init);
934
        auto oob_info = std::make_unique<boundary_check_info>(tt, tid, task_name);
1,994✔
935

936
        oob_info->illegal_access_bounding_boxes = static_cast<oob_bounding_box*>(backend->debug_alloc(amap.size() * sizeof(oob_bounding_box)));
1,994✔
937
        std::uninitialized_default_construct_n(oob_info->illegal_access_bounding_boxes, amap.size());
1,994✔
938

939
        oob_info->accessors.resize(amap.size());
1,994✔
940
        for(size_t i = 0; i < amap.size(); ++i) {
4,658✔
941
                oob_info->accessors[i] = boundary_check_info::accessor_info{amap[i].oob_buffer_id, amap[i].oob_buffer_name, amap[i].accessed_bounding_box_in_buffer};
2,664✔
942
                accessor_infos[i].out_of_bounds_indices = oob_info->illegal_access_bounding_boxes + i;
2,664✔
943
        }
944
        return oob_info;
1,994✔
945
}
1,994✔
946
#endif // CELERITY_ACCESSOR_BOUNDARY_CHECK
947

948
live_executor::live_executor(std::unique_ptr<backend> backend, std::unique_ptr<communicator> root_comm, executor::delegate* const dlg, const policy_set& policy)
250✔
949
    : m_root_comm(std::move(root_comm)), m_impl(std::make_unique<impl>(std::move(backend), m_root_comm.get(), m_submission_queue, dlg, policy)),
500✔
950
      m_thread(&live_executor::thread_main, this) {}
750✔
951

952
live_executor::~live_executor() {
500✔
953
        m_thread.join(); // thread_main will exit only after executing shutdown epoch
250✔
954
}
500✔
955

956
void live_executor::track_user_allocation(const allocation_id aid, void* const ptr) {
113✔
957
        m_submission_queue.push(live_executor_detail::user_allocation_transfer{aid, ptr});
113✔
958
}
113✔
959

960
void live_executor::track_host_object_instance(const host_object_id hoid, std::unique_ptr<host_object_instance> instance) {
30✔
961
        assert(instance != nullptr);
30✔
962
        m_submission_queue.push(live_executor_detail::host_object_transfer{hoid, std::move(instance)});
30✔
963
}
30✔
964

965
void live_executor::track_reducer(const reduction_id rid, std::unique_ptr<reducer> reducer) {
68✔
966
        assert(reducer != nullptr);
68✔
967
        m_submission_queue.push(live_executor_detail::reducer_transfer{rid, std::move(reducer)});
68✔
968
}
68✔
969

970
void live_executor::submit(std::vector<const instruction*> instructions, std::vector<outbound_pilot> pilots) {
4,054✔
971
        m_submission_queue.push(live_executor_detail::instruction_pilot_batch{std::move(instructions), std::move(pilots)});
4,054!
972
}
4,054✔
973

974
void live_executor::notify_scheduler_idle(const bool is_idle) {
4,405✔
975
        m_submission_queue.push(live_executor_detail::scheduler_idle_state_change{.is_idle = is_idle});
4,405✔
976
}
4,405✔
977

978
std::chrono::nanoseconds live_executor::get_starvation_time() const { return std::chrono::nanoseconds(m_impl->total_starvation_time.load()); }
462✔
979

980
std::chrono::nanoseconds live_executor::get_active_time() const { return std::chrono::nanoseconds(m_impl->total_active_time.load()); }
460✔
981

982
void live_executor::thread_main() {
250✔
983
        name_and_pin_and_order_this_thread(named_threads::thread_type::executor);
250✔
984
        m_impl->run();
250✔
985
}
250✔
986

987
} // namespace celerity::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc