• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 10304230986

08 Aug 2024 02:40PM UTC coverage: 95.07% (+0.1%) from 94.946%
10304230986

push

github

fknorr
Fix executor-scheduler race between instruction retirement and pruning

live_executor previously dereferenced instruction pointers on retirement,
which raced with their deletion on IDAG pruning when the executing
instruction was an horizon or epoch. To avoid future similar issues,
we now strictly regard the pointer to any instruction that has been issued
as dangling and work with instruction ids instead in those cases.

out_of_order_engine also had potentially hazardous uses of instruction
pointers that might - under the right circumstances - incorrectly regard
two instruction as identical when their pointers happen to alias as the
old one being freed before the new one was allocated in the same memory
location.

2979 of 3378 branches covered (88.19%)

Branch coverage included in aggregate %.

22 of 23 new or added lines in 2 files covered. (95.65%)

14 existing lines in 5 files now uncovered.

6547 of 6642 relevant lines covered (98.57%)

1554450.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.77
/src/live_executor.cc
1
#include "live_executor.h"
2
#include "backend/backend.h"
3
#include "closure_hydrator.h"
4
#include "communicator.h"
5
#include "host_object.h"
6
#include "instruction_graph.h"
7
#include "named_threads.h"
8
#include "out_of_order_engine.h"
9
#include "receive_arbiter.h"
10
#include "system_info.h"
11
#include "types.h"
12
#include "utils.h"
13

14
#include <memory>
15
#include <optional>
16
#include <string>
17
#include <unordered_map>
18
#include <vector>
19

20
#include <matchbox.hh>
21

22
namespace celerity::detail::live_executor_detail {
23

24
#if CELERITY_ACCESSOR_BOUNDARY_CHECK
25
struct boundary_check_info {
26
        struct accessor_info {
27
                detail::buffer_id buffer_id = 0;
28
                std::string buffer_name;
29
                box<3> accessible_box;
30
        };
31

32
        detail::task_type task_type;
33
        detail::task_id task_id;
34
        std::string task_name;
35

36
        oob_bounding_box* illegal_access_bounding_boxes = nullptr;
37
        std::vector<accessor_info> accessors;
38

39
        boundary_check_info(detail::task_type tt, detail::task_id tid, const std::string& task_name) : task_type(tt), task_id(tid), task_name(task_name) {}
2,959✔
40
};
41
#endif
42

43
struct async_instruction_state {
44
        instruction_id iid = -1;
45
        allocation_id alloc_aid = null_allocation_id; ///< non-null iff instruction is an alloc_instruction
46
        async_event event;
47
        CELERITY_DETAIL_IF_ACCESSOR_BOUNDARY_CHECK(std::unique_ptr<boundary_check_info> oob_info;) // unique_ptr: oob_info is optional and rather large
48
};
49

50
struct executor_impl {
51
        const std::unique_ptr<detail::backend> backend;
52
        communicator* const root_communicator;
53
        double_buffered_queue<submission>* const submission_queue;
54
        live_executor::delegate* const delegate;
55
        const live_executor::policy_set policy;
56

57
        receive_arbiter recv_arbiter{*root_communicator};
58
        out_of_order_engine engine{backend->get_system_info()};
59

60
        bool expecting_more_submissions = true; ///< shutdown epoch has not been executed yet
61
        std::vector<async_instruction_state> in_flight_async_instructions;
62
        std::unordered_map<allocation_id, void*> allocations{{null_allocation_id, nullptr}}; ///< obtained from alloc_instruction or track_user_allocation
63
        std::unordered_map<host_object_id, std::unique_ptr<host_object_instance>> host_object_instances; ///< passed in through track_host_object_instance
64
        std::unordered_map<collective_group_id, std::unique_ptr<communicator>> cloned_communicators;     ///< transitive clones of root_communicator
65
        std::unordered_map<reduction_id, std::unique_ptr<reducer>> reducers; ///< passed in through track_reducer, erased on epochs / horizons
66

67
        std::optional<std::chrono::steady_clock::time_point> last_progress_timestamp; ///< last successful call to check_progress
68
        bool made_progress = false;                                                   ///< progress was made since `last_progress_timestamp`
69
        bool progress_warning_emitted = false;                                        ///< no progress was made since warning was emitted
70

71
        executor_impl(std::unique_ptr<detail::backend> backend, communicator* root_comm, double_buffered_queue<submission>& submission_queue,
72
            live_executor::delegate* dlg, const live_executor::policy_set& policy);
73

74
        void run();
75
        void poll_in_flight_async_instructions();
76
        void poll_submission_queue();
77
        void try_issue_one_instruction();
78
        void retire_async_instruction(async_instruction_state& async);
79
        void check_progress();
80

81
        // Instruction types that complete synchronously within the executor.
82
        void issue(const clone_collective_group_instruction& ccginstr);
83
        void issue(const split_receive_instruction& srinstr);
84
        void issue(const fill_identity_instruction& fiinstr);
85
        void issue(const reduce_instruction& rinstr);
86
        void issue(const fence_instruction& finstr);
87
        void issue(const destroy_host_object_instruction& dhoinstr);
88
        void issue(const horizon_instruction& hinstr);
89
        void issue(const epoch_instruction& einstr);
90

91
        template <typename Instr>
92
        auto dispatch(const Instr& instr, const out_of_order_engine::assignment& assignment)
93
            // SFINAE: there is a (synchronous) `issue` overload above for the concrete Instr type
94
            -> decltype(issue(instr));
95

96
        // Instruction types that complete asynchronously via async_event, outside the executor.
97
        void issue_async(const alloc_instruction& ainstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
98
        void issue_async(const free_instruction& finstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
99
        void issue_async(const copy_instruction& cinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
100
        void issue_async(const device_kernel_instruction& dkinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
101
        void issue_async(const host_task_instruction& htinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
102
        void issue_async(const send_instruction& sinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
103
        void issue_async(const receive_instruction& rinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
104
        void issue_async(const await_receive_instruction& arinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
105
        void issue_async(const gather_receive_instruction& grinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
106

107
        template <typename Instr>
108
        auto dispatch(const Instr& instr, const out_of_order_engine::assignment& assignment)
109
            // SFINAE: there is an `issue_async` overload above for the concrete Instr type
110
            -> decltype(issue_async(instr, assignment, std::declval<async_instruction_state&>()));
111

112
        std::vector<closure_hydrator::accessor_info> make_accessor_infos(const buffer_access_allocation_map& amap) const;
113

114
#if CELERITY_ACCESSOR_BOUNDARY_CHECK
115
        std::unique_ptr<boundary_check_info> attach_boundary_check_info(std::vector<closure_hydrator::accessor_info>& accessor_infos,
116
            const buffer_access_allocation_map& amap, task_type tt, task_id tid, const std::string& task_name) const;
117
#endif
118

119
        void collect(const instruction_garbage& garbage);
120
};
121

122
executor_impl::executor_impl(std::unique_ptr<detail::backend> backend, communicator* const root_comm, double_buffered_queue<submission>& submission_queue,
247✔
123
    live_executor::delegate* const dlg, const live_executor::policy_set& policy)
247✔
124
    : backend(std::move(backend)), root_communicator(root_comm), submission_queue(&submission_queue), delegate(dlg), policy(policy) {}
741✔
125

126
void executor_impl::run() {
247✔
127
        closure_hydrator::make_available();
247✔
128

129
        uint8_t check_overflow_counter = 0;
247✔
130
        for(;;) {
131
                if(engine.is_idle()) {
7,380,359✔
132
                        if(!expecting_more_submissions) break; // shutdown complete
4,022✔
133
                        submission_queue->wait_while_empty();  // we are stalled on the scheduler, suspend thread
3,775✔
134
                        last_progress_timestamp.reset();       // do not treat suspension as being stuck
3,775✔
135
                }
136

137
                recv_arbiter.poll_communicator();
7,380,112✔
138
                poll_in_flight_async_instructions();
7,380,112✔
139
                poll_submission_queue();
7,380,112✔
140
                try_issue_one_instruction(); // potentially expensive, so only issue one per loop to continue checking for async completion in between
7,380,112✔
141

142
                if(++check_overflow_counter == 0) { // once every 256 iterations
7,380,112✔
143
                        backend->check_async_errors();
28,773✔
144
                        check_progress();
28,773✔
145
                }
146
        }
147

148
        assert(in_flight_async_instructions.empty());
247✔
149
        // check that for each alloc_instruction, we executed a corresponding free_instruction
150
        assert(std::all_of(allocations.begin(), allocations.end(),
494✔
151
            [](const std::pair<allocation_id, void*>& p) { return p.first == null_allocation_id || p.first.get_memory_id() == user_memory_id; }));
152
        // check that for each track_host_object_instance, we executed a destroy_host_object_instruction
153
        assert(host_object_instances.empty());
247✔
154

155
        closure_hydrator::teardown();
247✔
156
}
247✔
157

158
void executor_impl::poll_in_flight_async_instructions() {
7,380,112✔
159
        utils::erase_if(in_flight_async_instructions, [&](async_instruction_state& async) {
7,380,112✔
160
                if(!async.event.is_complete()) return false;
13,917,128✔
161
                retire_async_instruction(async);
9,707✔
162
                made_progress = true;
9,707✔
163
                return true;
9,707✔
164
        });
165
}
7,380,112✔
166

167
void executor_impl::poll_submission_queue() {
7,380,112✔
168
        for(auto& submission : submission_queue->pop_all()) {
7,386,567✔
169
                matchbox::match(
6,455✔
170
                    submission,
171
                    [&](const instruction_pilot_batch& batch) {
6,455✔
172
                            for(const auto incoming_instr : batch.instructions) {
17,580✔
173
                                    engine.submit(incoming_instr);
11,330✔
174
                            }
175
                            for(const auto& pilot : batch.pilots) {
6,912✔
176
                                    root_communicator->send_outbound_pilot(pilot);
662✔
177
                            }
178
                    },
6,250✔
179
                    [&](const user_allocation_transfer& uat) {
12,910✔
180
                            assert(uat.aid != null_allocation_id);
107✔
181
                            assert(uat.aid.get_memory_id() == user_memory_id);
107✔
182
                            assert(allocations.count(uat.aid) == 0);
107✔
183
                            allocations.emplace(uat.aid, uat.ptr);
107✔
184
                    },
107✔
185
                    [&](host_object_transfer& hot) {
12,910✔
186
                            assert(host_object_instances.count(hot.hoid) == 0);
30✔
187
                            host_object_instances.emplace(hot.hoid, std::move(hot.instance));
30✔
188
                    },
30✔
189
                    [&](reducer_transfer& rt) {
12,910✔
190
                            assert(reducers.count(rt.rid) == 0);
68✔
191
                            reducers.emplace(rt.rid, std::move(rt.reduction));
68✔
192
                    });
68✔
193
        }
194
}
7,380,112✔
195

196
void executor_impl::retire_async_instruction(async_instruction_state& async) {
9,707✔
197
#if CELERITY_ACCESSOR_BOUNDARY_CHECK
198
        if(async.oob_info != nullptr) {
9,707✔
199
                const auto& oob_info = *async.oob_info;
2,959✔
200
                for(size_t i = 0; i < oob_info.accessors.size(); ++i) {
6,524✔
201
                        if(const auto oob_box = oob_info.illegal_access_bounding_boxes[i].into_box(); !oob_box.empty()) {
3,565✔
202
                                const auto& accessor_info = oob_info.accessors[i];
12✔
203
                                CELERITY_ERROR("Out-of-bounds access detected in {}: accessor {} attempted to access buffer {} indicies between {} and outside the "
12✔
204
                                               "declared range {}.",
205
                                    utils::make_task_debug_label(oob_info.task_type, oob_info.task_id, oob_info.task_name), i,
206
                                    utils::make_buffer_debug_label(accessor_info.buffer_id, accessor_info.buffer_name), oob_box, accessor_info.accessible_box);
207
                        }
208
                }
209
                if(oob_info.illegal_access_bounding_boxes != nullptr /* i.e. there was at least one accessor */) {
2,959!
210
                        backend->debug_free(oob_info.illegal_access_bounding_boxes);
2,959✔
211
                }
212
        }
213
#endif
214

215
        if(spdlog::should_log(spdlog::level::trace)) {
9,707✔
216
                if(const auto native_time = async.event.get_native_execution_time(); native_time.has_value()) {
5,782!
NEW
217
                        CELERITY_TRACE("[executor] retired I{} after {:.2f}", async.iid, as_sub_second(*native_time));
×
218
                } else {
219
                        CELERITY_TRACE("[executor] retired I{}", async.iid);
5,782!
220
                }
221
        }
222

223
        if(async.alloc_aid != null_allocation_id) {
9,707✔
224
                const auto ptr = async.event.get_result();
1,085✔
225
                assert(ptr != nullptr && "backend allocation returned nullptr");
1,085✔
226
                CELERITY_TRACE("[executor] {} allocated as {}", async.alloc_aid, ptr);
1,085!
227
                assert(allocations.count(async.alloc_aid) == 0);
1,085✔
228
                allocations.emplace(async.alloc_aid, ptr);
1,085✔
229
        }
230

231
        engine.complete_assigned(async.iid);
9,707✔
232
}
9,707✔
233

234
template <typename Instr>
235
auto executor_impl::dispatch(const Instr& instr, const out_of_order_engine::assignment& assignment)
1,623✔
236
    // SFINAE: there is a (synchronous) `issue` overload above for the concrete Instr type
237
    -> decltype(issue(instr)) //
238
{
239
        assert(assignment.target == out_of_order_engine::target::immediate);
1,623✔
240
        assert(!assignment.lane.has_value());
1,623✔
241

242
        const auto iid = instr.get_id(); // instr may dangle after issue()
1,623✔
243
        issue(instr);                    // completes immediately
1,623✔
244
        engine.complete_assigned(iid);
1,623✔
245
}
1,623✔
246

247
template <typename Instr>
248
auto executor_impl::dispatch(const Instr& instr, const out_of_order_engine::assignment& assignment)
9,707✔
249
    // SFINAE: there is an `issue_async` overload above for the concrete Instr type
250
    -> decltype(issue_async(instr, assignment, std::declval<async_instruction_state&>())) //
251
{
252
        auto& async = in_flight_async_instructions.emplace_back();
9,707✔
253
        async.iid = assignment.instruction->get_id();
9,707✔
254
        issue_async(instr, assignment, async); // stores event in `async` and completes asynchronously
9,707✔
255
}
9,707✔
256

257
void executor_impl::try_issue_one_instruction() {
7,380,112✔
258
        auto assignment = engine.assign_one();
7,380,112✔
259
        if(!assignment.has_value()) return;
7,380,112✔
260

261
        matchbox::match(*assignment->instruction, [&](const auto& instr) { dispatch(instr, *assignment); });
22,660✔
262
        made_progress = true;
11,330✔
263
}
264

265
void executor_impl::check_progress() {
28,773✔
266
        if(!policy.progress_warning_timeout.has_value()) return;
28,773!
267

268
        if(made_progress) {
28,773✔
269
                last_progress_timestamp = std::chrono::steady_clock::now();
382✔
270
                progress_warning_emitted = false;
382✔
271
                made_progress = false;
382✔
272
        } else if(last_progress_timestamp.has_value()) {
28,391!
273
                // being stuck either means a deadlock in the user application, or a bug in Celerity.
274
                const auto elapsed_since_last_progress = std::chrono::steady_clock::now() - *last_progress_timestamp;
28,391✔
275
                if(elapsed_since_last_progress > *policy.progress_warning_timeout && !progress_warning_emitted) {
28,391✔
276
                        std::string instr_list;
1✔
277
                        for(auto& in_flight : in_flight_async_instructions) {
2✔
278
                                if(!instr_list.empty()) instr_list += ", ";
1!
279
                                fmt::format_to(std::back_inserter(instr_list), "I{}", in_flight.iid);
1✔
280
                        }
281
                        CELERITY_WARN("[executor] no progress for {:.0f}, might be stuck. Active instructions: {}", as_sub_second(elapsed_since_last_progress),
1✔
282
                            in_flight_async_instructions.empty() ? "none" : instr_list);
283
                        progress_warning_emitted = true;
1✔
284
                }
1✔
285
        }
286
}
287

288
void executor_impl::issue(const clone_collective_group_instruction& ccginstr) {
33✔
289
        const auto original_cgid = ccginstr.get_original_collective_group_id();
33✔
290
        assert(original_cgid != non_collective_group_id);
33✔
291
        assert(original_cgid == root_collective_group_id || cloned_communicators.count(original_cgid) != 0);
33✔
292

293
        const auto new_cgid = ccginstr.get_new_collective_group_id();
33✔
294
        assert(new_cgid != non_collective_group_id && new_cgid != root_collective_group_id);
33✔
295
        assert(cloned_communicators.count(new_cgid) == 0);
33✔
296

297
        CELERITY_TRACE("[executor] I{}: clone collective group CG{} -> CG{}", ccginstr.get_id(), original_cgid, new_cgid);
33✔
298

299
        const auto original_communicator = original_cgid == root_collective_group_id ? root_communicator : cloned_communicators.at(original_cgid).get();
33✔
300
        cloned_communicators.emplace(new_cgid, original_communicator->collective_clone());
33✔
301
}
33✔
302

303

304
void executor_impl::issue(const split_receive_instruction& srinstr) {
20✔
305
        CELERITY_TRACE("[executor] I{}: split receive {} {}x{} bytes into {} ({}),", srinstr.get_id(), srinstr.get_transfer_id(), srinstr.get_requested_region(),
20✔
306
            srinstr.get_element_size(), srinstr.get_dest_allocation_id(), srinstr.get_allocated_box());
307

308
        const auto allocation = allocations.at(srinstr.get_dest_allocation_id());
20✔
309
        recv_arbiter.begin_split_receive(
20✔
310
            srinstr.get_transfer_id(), srinstr.get_requested_region(), allocation, srinstr.get_allocated_box(), srinstr.get_element_size());
311
}
20✔
312

313
void executor_impl::issue(const fill_identity_instruction& fiinstr) {
19✔
314
        CELERITY_TRACE("[executor] I{}: fill identity {} x{} values for R{}", fiinstr.get_id(), fiinstr.get_allocation_id(), fiinstr.get_num_values(),
19✔
315
            fiinstr.get_reduction_id());
316

317
        const auto allocation = allocations.at(fiinstr.get_allocation_id());
19✔
318
        const auto& reduction = *reducers.at(fiinstr.get_reduction_id());
19✔
319
        reduction.fill_identity(allocation, fiinstr.get_num_values());
19✔
320
}
19✔
321

322
void executor_impl::issue(const reduce_instruction& rinstr) {
47✔
323
        CELERITY_TRACE("[executor] I{}: reduce {} x{} values into {} for R{}", rinstr.get_id(), rinstr.get_source_allocation_id(), rinstr.get_num_source_values(),
47✔
324
            rinstr.get_dest_allocation_id(), rinstr.get_reduction_id());
325

326
        const auto gather_allocation = allocations.at(rinstr.get_source_allocation_id());
47✔
327
        const auto dest_allocation = allocations.at(rinstr.get_dest_allocation_id());
47✔
328
        const auto& reduction = *reducers.at(rinstr.get_reduction_id());
47✔
329
        reduction.reduce(dest_allocation, gather_allocation, rinstr.get_num_source_values());
47✔
330
}
47✔
331

332
void executor_impl::issue(const fence_instruction& finstr) { // NOLINT(readability-make-member-function-const, readability-convert-member-functions-to-static)
63✔
333
        CELERITY_TRACE("[executor] I{}: fence", finstr.get_id());
63✔
334

335
        finstr.get_promise()->fulfill();
63✔
336
}
63✔
337

338
void executor_impl::issue(const destroy_host_object_instruction& dhoinstr) {
30✔
339
        assert(host_object_instances.count(dhoinstr.get_host_object_id()) != 0);
30✔
340
        CELERITY_TRACE("[executor] I{}: destroy H{}", dhoinstr.get_id(), dhoinstr.get_host_object_id());
30✔
341

342
        host_object_instances.erase(dhoinstr.get_host_object_id());
30✔
343
}
30✔
344

345
void executor_impl::issue(const horizon_instruction& hinstr) {
840✔
346
        CELERITY_TRACE("[executor] I{}: horizon", hinstr.get_id());
840✔
347

348
        if(delegate != nullptr) { delegate->horizon_reached(hinstr.get_horizon_task_id()); }
840!
349
        collect(hinstr.get_garbage());
840✔
350
}
840✔
351

352
void executor_impl::issue(const epoch_instruction& einstr) {
571✔
353
        switch(einstr.get_epoch_action()) {
571!
354
        case epoch_action::none: //
271✔
355
                CELERITY_TRACE("[executor] I{}: epoch", einstr.get_id());
271✔
356
                break;
271✔
357
        case epoch_action::barrier: //
53✔
358
                CELERITY_TRACE("[executor] I{}: epoch (barrier)", einstr.get_id());
53✔
359
                root_communicator->collective_barrier();
53✔
360
                break;
53✔
361
        case epoch_action::shutdown: //
247✔
362
                CELERITY_TRACE("[executor] I{}: epoch (shutdown)", einstr.get_id());
247✔
363
                expecting_more_submissions = false;
247✔
364
                break;
247✔
365
        }
366
        if(delegate != nullptr && einstr.get_epoch_task_id() != 0 /* TODO task_manager doesn't expect us to actually execute the init epoch */) {
571!
367
                delegate->epoch_reached(einstr.get_epoch_task_id());
324✔
368
        }
369
        collect(einstr.get_garbage());
571✔
370
}
571✔
371

372
void executor_impl::issue_async(const alloc_instruction& ainstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async) {
1,085✔
373
        assert(ainstr.get_allocation_id().get_memory_id() != user_memory_id);
1,085✔
374
        assert(assignment.target == out_of_order_engine::target::alloc_queue);
1,085✔
375
        assert(!assignment.lane.has_value());
1,085✔
376
        assert(assignment.device.has_value() == (ainstr.get_allocation_id().get_memory_id() > host_memory_id));
1,085✔
377

378
        CELERITY_TRACE(
1,085✔
379
            "[executor] I{}: alloc {}, {} % {} bytes", ainstr.get_id(), ainstr.get_allocation_id(), ainstr.get_size_bytes(), ainstr.get_alignment_bytes());
380

381
        if(assignment.device.has_value()) {
1,085✔
382
                async.event = backend->enqueue_device_alloc(*assignment.device, ainstr.get_size_bytes(), ainstr.get_alignment_bytes());
563✔
383
        } else {
384
                async.event = backend->enqueue_host_alloc(ainstr.get_size_bytes(), ainstr.get_alignment_bytes());
522✔
385
        }
386
        async.alloc_aid = ainstr.get_allocation_id(); // setting alloc_aid != null will make `retire_async_instruction` insert the result into `allocations`
1,085✔
387
}
1,085✔
388

389
void executor_impl::issue_async(const free_instruction& finstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async) {
1,085✔
390
        const auto it = allocations.find(finstr.get_allocation_id());
1,085✔
391
        assert(it != allocations.end());
1,085✔
392
        const auto ptr = it->second;
1,085✔
393
        allocations.erase(it);
1,085✔
394

395
        CELERITY_TRACE("[executor] I{}: free {}", finstr.get_id(), finstr.get_allocation_id());
1,085✔
396

397
        if(assignment.device.has_value()) {
1,085✔
398
                async.event = backend->enqueue_device_free(*assignment.device, ptr);
563✔
399
        } else {
400
                async.event = backend->enqueue_host_free(ptr);
522✔
401
        }
402
}
1,085✔
403

404
void executor_impl::issue_async(const copy_instruction& cinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async) {
2,311✔
405
        assert(assignment.target == out_of_order_engine::target::host_queue || assignment.target == out_of_order_engine::target::device_queue);
2,311✔
406
        assert((assignment.target == out_of_order_engine::target::device_queue) == assignment.device.has_value());
2,311✔
407
        assert(assignment.lane.has_value());
2,311✔
408

409
        CELERITY_TRACE("[executor] I{}: copy {} ({}) -> {} ({}), {}x{} bytes", cinstr.get_id(), cinstr.get_source_allocation(), cinstr.get_source_box(),
2,311✔
410
            cinstr.get_dest_allocation(), cinstr.get_dest_box(), cinstr.get_copy_region(), cinstr.get_element_size());
411

412
        const auto source_base = static_cast<const std::byte*>(allocations.at(cinstr.get_source_allocation().id)) + cinstr.get_source_allocation().offset_bytes;
2,311✔
413
        const auto dest_base = static_cast<std::byte*>(allocations.at(cinstr.get_dest_allocation().id)) + cinstr.get_dest_allocation().offset_bytes;
2,311✔
414

415
        if(assignment.device.has_value()) {
2,311✔
416
                async.event = backend->enqueue_device_copy(*assignment.device, *assignment.lane, source_base, dest_base, cinstr.get_source_box(), cinstr.get_dest_box(),
4,240✔
417
                    cinstr.get_copy_region(), cinstr.get_element_size());
2,120✔
418
        } else {
419
                async.event = backend->enqueue_host_copy(
573✔
420
                    *assignment.lane, source_base, dest_base, cinstr.get_source_box(), cinstr.get_dest_box(), cinstr.get_copy_region(), cinstr.get_element_size());
382✔
421
        }
422
}
2,311✔
423

424
std::string format_access_log(const buffer_access_allocation_map& map) {
3,523✔
425
        std::string acc_log;
3,523✔
426
        for(size_t i = 0; i < map.size(); ++i) {
5,982✔
427
                auto& aa = map[i];
2,459✔
428
                const auto accessed_box_in_allocation = box(aa.accessed_box_in_buffer.get_min() - aa.allocated_box_in_buffer.get_offset(),
7,377✔
429
                    aa.accessed_box_in_buffer.get_max() - aa.allocated_box_in_buffer.get_offset());
7,377✔
430
                fmt::format_to(std::back_inserter(acc_log), "{} {} {}", i == 0 ? ", accessing" : ",", aa.allocation_id, accessed_box_in_allocation);
4,918✔
431
        }
432
        return acc_log;
3,523✔
433
}
×
434

435
void executor_impl::issue_async(const device_kernel_instruction& dkinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async) {
894✔
436
        assert(assignment.target == out_of_order_engine::target::device_queue);
894✔
437
        assert(assignment.device == dkinstr.get_device_id());
894✔
438
        assert(assignment.lane.has_value());
894✔
439

440
        CELERITY_TRACE("[executor] I{}: launch device kernel on D{}, {}{}", dkinstr.get_id(), dkinstr.get_device_id(), dkinstr.get_execution_range(),
894✔
441
            format_access_log(dkinstr.get_access_allocations()));
442

443
        auto accessor_infos = make_accessor_infos(dkinstr.get_access_allocations());
894✔
444
#if CELERITY_ACCESSOR_BOUNDARY_CHECK
445
        async.oob_info = attach_boundary_check_info(
1,788✔
446
            accessor_infos, dkinstr.get_access_allocations(), dkinstr.get_oob_task_type(), dkinstr.get_oob_task_id(), dkinstr.get_oob_task_name());
894✔
447
#endif
448

449
        const auto& reduction_allocs = dkinstr.get_reduction_allocations();
894✔
450
        std::vector<void*> reduction_ptrs(reduction_allocs.size());
2,682✔
451
        for(size_t i = 0; i < reduction_allocs.size(); ++i) {
999✔
452
                reduction_ptrs[i] = allocations.at(reduction_allocs[i].allocation_id);
105✔
453
        }
454

455
        async.event = backend->enqueue_device_kernel(
3,576✔
456
            dkinstr.get_device_id(), *assignment.lane, dkinstr.get_launcher(), std::move(accessor_infos), dkinstr.get_execution_range(), reduction_ptrs);
2,682✔
457
}
1,788✔
458

459
void executor_impl::issue_async(const host_task_instruction& htinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async) {
3,234✔
460
        assert(assignment.target == out_of_order_engine::target::host_queue);
3,234✔
461
        assert(!assignment.device.has_value());
3,234✔
462
        assert(assignment.lane.has_value());
3,234✔
463

464
        CELERITY_TRACE(
3,234✔
465
            "[executor] I{}: launch host task, {}{}", htinstr.get_id(), htinstr.get_execution_range(), format_access_log(htinstr.get_access_allocations()));
466

467
        auto accessor_infos = make_accessor_infos(htinstr.get_access_allocations());
3,234✔
468
#if CELERITY_ACCESSOR_BOUNDARY_CHECK
469
        async.oob_info = attach_boundary_check_info(
6,468✔
470
            accessor_infos, htinstr.get_access_allocations(), htinstr.get_oob_task_type(), htinstr.get_oob_task_id(), htinstr.get_oob_task_name());
3,234✔
471
#endif
472

473
        const auto& execution_range = htinstr.get_execution_range();
3,234✔
474
        const auto collective_comm =
475
            htinstr.get_collective_group_id() != non_collective_group_id ? cloned_communicators.at(htinstr.get_collective_group_id()).get() : nullptr;
3,234!
476

477
        async.event = backend->enqueue_host_task(*assignment.lane, htinstr.get_launcher(), std::move(accessor_infos), execution_range, collective_comm);
3,234✔
478
}
6,468✔
479

480
void executor_impl::issue_async(
662✔
481
    const send_instruction& sinstr, [[maybe_unused]] const out_of_order_engine::assignment& assignment, async_instruction_state& async) //
482
{
483
        assert(assignment.target == out_of_order_engine::target::immediate);
662✔
484

485
        CELERITY_TRACE("[executor] I{}: send {}+{}, {}x{} bytes to N{} (MSG{})", sinstr.get_id(), sinstr.get_source_allocation_id(),
662✔
486
            sinstr.get_offset_in_source_allocation(), sinstr.get_send_range(), sinstr.get_element_size(), sinstr.get_dest_node_id(), sinstr.get_message_id());
487

488
        const auto allocation_base = allocations.at(sinstr.get_source_allocation_id());
662✔
489
        const communicator::stride stride{
662✔
490
            sinstr.get_source_allocation_range(),
662✔
491
            subrange<3>{sinstr.get_offset_in_source_allocation(), sinstr.get_send_range()},
492
            sinstr.get_element_size(),
662✔
493
        };
662✔
494
        async.event = root_communicator->send_payload(sinstr.get_dest_node_id(), sinstr.get_message_id(), allocation_base, stride);
662✔
495
}
662✔
496

497
void executor_impl::issue_async(
370✔
498
    const receive_instruction& rinstr, [[maybe_unused]] const out_of_order_engine::assignment& assignment, async_instruction_state& async) //
499
{
500
        assert(assignment.target == out_of_order_engine::target::immediate);
370✔
501

502
        CELERITY_TRACE("[executor] I{}: receive {} {}x{} bytes into {} ({})", rinstr.get_id(), rinstr.get_transfer_id(), rinstr.get_requested_region(),
370✔
503
            rinstr.get_element_size(), rinstr.get_dest_allocation_id(), rinstr.get_allocated_box());
504

505
        const auto allocation = allocations.at(rinstr.get_dest_allocation_id());
370✔
506
        async.event =
370✔
507
            recv_arbiter.receive(rinstr.get_transfer_id(), rinstr.get_requested_region(), allocation, rinstr.get_allocated_box(), rinstr.get_element_size());
370✔
508
}
370✔
509

510
void executor_impl::issue_async(
48✔
511
    const await_receive_instruction& arinstr, [[maybe_unused]] const out_of_order_engine::assignment& assignment, async_instruction_state& async) //
512
{
513
        assert(assignment.target == out_of_order_engine::target::immediate);
48✔
514

515
        CELERITY_TRACE("[executor] I{}: await receive {} {}", arinstr.get_id(), arinstr.get_transfer_id(), arinstr.get_received_region());
48✔
516

517
        async.event = recv_arbiter.await_split_receive_subregion(arinstr.get_transfer_id(), arinstr.get_received_region());
48✔
518
}
48✔
519

520
void executor_impl::issue_async(
18✔
521
    const gather_receive_instruction& grinstr, [[maybe_unused]] const out_of_order_engine::assignment& assignment, async_instruction_state& async) //
522
{
523
        assert(assignment.target == out_of_order_engine::target::immediate);
18✔
524

525
        CELERITY_TRACE("[executor] I{}: gather receive {} into {}, {} bytes / node", grinstr.get_id(), grinstr.get_transfer_id(), grinstr.get_dest_allocation_id(),
18✔
526
            grinstr.get_node_chunk_size());
527

528
        const auto allocation = allocations.at(grinstr.get_dest_allocation_id());
18✔
529
        async.event = recv_arbiter.gather_receive(grinstr.get_transfer_id(), allocation, grinstr.get_node_chunk_size());
18✔
530
}
18✔
531

532
void executor_impl::collect(const instruction_garbage& garbage) {
1,411✔
533
        for(const auto rid : garbage.reductions) {
1,478✔
534
                assert(reducers.count(rid) != 0);
67✔
535
                reducers.erase(rid);
67✔
536
        }
537
        for(const auto aid : garbage.user_allocations) {
1,518✔
538
                assert(aid.get_memory_id() == user_memory_id);
107✔
539
                assert(allocations.count(aid) != 0);
107✔
540
                allocations.erase(aid);
107✔
541
        }
542
}
1,411✔
543

544
std::vector<closure_hydrator::accessor_info> executor_impl::make_accessor_infos(const buffer_access_allocation_map& amap) const {
4,128✔
545
        std::vector<closure_hydrator::accessor_info> accessor_infos(amap.size());
12,384✔
546
        for(size_t i = 0; i < amap.size(); ++i) {
7,693✔
547
                const auto ptr = allocations.at(amap[i].allocation_id);
3,565✔
548
                accessor_infos[i] = closure_hydrator::accessor_info{ptr, amap[i].allocated_box_in_buffer, amap[i].accessed_box_in_buffer};
3,565✔
549
        }
550
        return accessor_infos;
4,128✔
551
}
×
552

553
#if CELERITY_ACCESSOR_BOUNDARY_CHECK
554
std::unique_ptr<boundary_check_info> executor_impl::attach_boundary_check_info(std::vector<closure_hydrator::accessor_info>& accessor_infos,
4,128✔
555
    const buffer_access_allocation_map& amap, task_type tt, task_id tid, const std::string& task_name) const //
556
{
557
        if(amap.empty()) return nullptr;
4,128✔
558

559
        auto oob_info = std::make_unique<boundary_check_info>(tt, tid, task_name);
2,959✔
560

561
        oob_info->illegal_access_bounding_boxes = static_cast<oob_bounding_box*>(backend->debug_alloc(amap.size() * sizeof(oob_bounding_box)));
2,959✔
562
        std::uninitialized_default_construct_n(oob_info->illegal_access_bounding_boxes, amap.size());
2,959✔
563

564
        oob_info->accessors.resize(amap.size());
2,959✔
565
        for(size_t i = 0; i < amap.size(); ++i) {
6,524✔
566
                oob_info->accessors[i] = boundary_check_info::accessor_info{amap[i].oob_buffer_id, amap[i].oob_buffer_name, amap[i].accessed_box_in_buffer};
3,565✔
567
                accessor_infos[i].out_of_bounds_indices = oob_info->illegal_access_bounding_boxes + i;
3,565✔
568
        }
569
        return oob_info;
2,959✔
570
}
2,959✔
571
#endif // CELERITY_ACCESSOR_BOUNDARY_CHECK
572

573
} // namespace celerity::detail::live_executor_detail
574

575
namespace celerity::detail {
576

577
live_executor::live_executor(std::unique_ptr<backend> backend, std::unique_ptr<communicator> root_comm, delegate* const dlg, const policy_set& policy)
247✔
578
    : m_root_comm(std::move(root_comm)), m_thread(&live_executor::thread_main, this, std::move(backend), dlg, policy) //
247✔
579
{
580
        set_thread_name(m_thread.native_handle(), "cy-executor");
741✔
581
}
247✔
582

583
live_executor::~live_executor() {
494✔
584
        m_thread.join(); // thread_main will exit only after executing shutdown epoch
247✔
585
}
494✔
586

587
void live_executor::track_user_allocation(const allocation_id aid, void* const ptr) {
107✔
588
        m_submission_queue.push(live_executor_detail::user_allocation_transfer{aid, ptr});
107✔
589
}
107✔
590

591
void live_executor::track_host_object_instance(const host_object_id hoid, std::unique_ptr<host_object_instance> instance) {
30✔
592
        assert(instance != nullptr);
30✔
593
        m_submission_queue.push(live_executor_detail::host_object_transfer{hoid, std::move(instance)});
30✔
594
}
30✔
595

596
void live_executor::track_reducer(const reduction_id rid, std::unique_ptr<reducer> reducer) {
68✔
597
        assert(reducer != nullptr);
68✔
598
        m_submission_queue.push(live_executor_detail::reducer_transfer{rid, std::move(reducer)});
68✔
599
}
68✔
600

601
void live_executor::submit(std::vector<const instruction*> instructions, std::vector<outbound_pilot> pilots) {
6,250✔
602
        m_submission_queue.push(live_executor_detail::instruction_pilot_batch{std::move(instructions), std::move(pilots)});
6,250!
603
}
6,250✔
604

605
void live_executor::thread_main(std::unique_ptr<backend> backend, delegate* const dlg, const policy_set& policy) {
247✔
606
        try {
607
                live_executor_detail::executor_impl(std::move(backend), m_root_comm.get(), m_submission_queue, dlg, policy).run();
247✔
608
        }
609
        // LCOV_EXCL_START
610
        catch(const std::exception& e) {
611
                CELERITY_CRITICAL("[executor] {}", e.what());
612
                std::abort();
613
        }
614
        // LCOV_EXCL_STOP
615
}
247✔
616

617
} // namespace celerity::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc