• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 10216674169

02 Aug 2024 01:45PM UTC coverage: 94.951% (+2.1%) from 92.884%
10216674169

push

github

fknorr
Remove experimental::user_benchmarker

user_benchmarker has been obsolete ever since we moved away from
structured logging as a the profiler (CPAT) interface.

2978 of 3372 branches covered (88.32%)

Branch coverage included in aggregate %.

6557 of 6670 relevant lines covered (98.31%)

1534446.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.66
/src/live_executor.cc
1
#include "live_executor.h"
2
#include "backend/backend.h"
3
#include "closure_hydrator.h"
4
#include "communicator.h"
5
#include "host_object.h"
6
#include "instruction_graph.h"
7
#include "named_threads.h"
8
#include "out_of_order_engine.h"
9
#include "receive_arbiter.h"
10
#include "system_info.h"
11
#include "types.h"
12
#include "utils.h"
13

14
#include <memory>
15
#include <optional>
16
#include <string>
17
#include <unordered_map>
18
#include <vector>
19

20
#include <matchbox.hh>
21

22
namespace celerity::detail::live_executor_detail {
23

24
#if CELERITY_ACCESSOR_BOUNDARY_CHECK
25
struct boundary_check_info {
26
        struct accessor_info {
27
                detail::buffer_id buffer_id = 0;
28
                std::string buffer_name;
29
                box<3> accessible_box;
30
        };
31

32
        detail::task_type task_type;
33
        detail::task_id task_id;
34
        std::string task_name;
35

36
        oob_bounding_box* illegal_access_bounding_boxes = nullptr;
37
        std::vector<accessor_info> accessors;
38

39
        boundary_check_info(detail::task_type tt, detail::task_id tid, const std::string& task_name) : task_type(tt), task_id(tid), task_name(task_name) {}
2,959✔
40
};
41
#endif
42

43
struct async_instruction_state {
44
        const instruction* instr = nullptr;
45
        async_event event;
46
        CELERITY_DETAIL_IF_ACCESSOR_BOUNDARY_CHECK(std::unique_ptr<boundary_check_info> oob_info;) // unique_ptr: oob_info is optional and rather large
47
};
48

49
struct executor_impl {
50
        const std::unique_ptr<detail::backend> backend;
51
        communicator* const root_communicator;
52
        double_buffered_queue<submission>* const submission_queue;
53
        live_executor::delegate* const delegate;
54
        const live_executor::policy_set policy;
55

56
        receive_arbiter recv_arbiter{*root_communicator};
57
        out_of_order_engine engine{backend->get_system_info()};
58

59
        bool expecting_more_submissions = true; ///< shutdown epoch has not been executed yet
60
        std::vector<async_instruction_state> in_flight_async_instructions;
61
        std::unordered_map<allocation_id, void*> allocations{{null_allocation_id, nullptr}}; ///< obtained from alloc_instruction or track_user_allocation
62
        std::unordered_map<host_object_id, std::unique_ptr<host_object_instance>> host_object_instances; ///< passed in through track_host_object_instance
63
        std::unordered_map<collective_group_id, std::unique_ptr<communicator>> cloned_communicators;     ///< transitive clones of root_communicator
64
        std::unordered_map<reduction_id, std::unique_ptr<reducer>> reducers; ///< passed in through track_reducer, erased on epochs / horizons
65

66
        std::optional<std::chrono::steady_clock::time_point> last_progress_timestamp; ///< last successful call to check_progress
67
        bool made_progress = false;                                                   ///< progress was made since `last_progress_timestamp`
68
        bool progress_warning_emitted = false;                                        ///< no progress was made since warning was emitted
69

70
        executor_impl(std::unique_ptr<detail::backend> backend, communicator* root_comm, double_buffered_queue<submission>& submission_queue,
71
            live_executor::delegate* dlg, const live_executor::policy_set& policy);
72

73
        void run();
74
        void poll_in_flight_async_instructions();
75
        void poll_submission_queue();
76
        void try_issue_one_instruction();
77
        void retire_async_instruction(async_instruction_state& async);
78
        void check_progress();
79

80
        // Instruction types that complete synchronously within the executor.
81
        void issue(const clone_collective_group_instruction& ccginstr);
82
        void issue(const split_receive_instruction& srinstr);
83
        void issue(const fill_identity_instruction& fiinstr);
84
        void issue(const reduce_instruction& rinstr);
85
        void issue(const fence_instruction& finstr);
86
        void issue(const destroy_host_object_instruction& dhoinstr);
87
        void issue(const horizon_instruction& hinstr);
88
        void issue(const epoch_instruction& einstr);
89

90
        template <typename Instr>
91
        auto dispatch(const Instr& instr, const out_of_order_engine::assignment& assignment)
92
            // SFINAE: there is a (synchronous) `issue` overload above for the concrete Instr type
93
            -> decltype(issue(instr));
94

95
        // Instruction types that complete asynchronously via async_event, outside the executor.
96
        void issue_async(const alloc_instruction& ainstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
97
        void issue_async(const free_instruction& finstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
98
        void issue_async(const copy_instruction& cinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
99
        void issue_async(const device_kernel_instruction& dkinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
100
        void issue_async(const host_task_instruction& htinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
101
        void issue_async(const send_instruction& sinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
102
        void issue_async(const receive_instruction& rinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
103
        void issue_async(const await_receive_instruction& arinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
104
        void issue_async(const gather_receive_instruction& grinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async);
105

106
        template <typename Instr>
107
        auto dispatch(const Instr& instr, const out_of_order_engine::assignment& assignment)
108
            // SFINAE: there is an `issue_async` overload above for the concrete Instr type
109
            -> decltype(issue_async(instr, assignment, std::declval<async_instruction_state&>()));
110

111
        std::vector<closure_hydrator::accessor_info> make_accessor_infos(const buffer_access_allocation_map& amap) const;
112

113
#if CELERITY_ACCESSOR_BOUNDARY_CHECK
114
        std::unique_ptr<boundary_check_info> attach_boundary_check_info(std::vector<closure_hydrator::accessor_info>& accessor_infos,
115
            const buffer_access_allocation_map& amap, task_type tt, task_id tid, const std::string& task_name) const;
116
#endif
117

118
        void collect(const instruction_garbage& garbage);
119
};
120

121
executor_impl::executor_impl(std::unique_ptr<detail::backend> backend, communicator* const root_comm, double_buffered_queue<submission>& submission_queue,
247✔
122
    live_executor::delegate* const dlg, const live_executor::policy_set& policy)
247✔
123
    : backend(std::move(backend)), root_communicator(root_comm), submission_queue(&submission_queue), delegate(dlg), policy(policy) {}
741✔
124

125
void executor_impl::run() {
247✔
126
        closure_hydrator::make_available();
247✔
127

128
        uint8_t check_overflow_counter = 0;
247✔
129
        for(;;) {
130
                if(engine.is_idle()) {
6,657,978✔
131
                        if(!expecting_more_submissions) break; // shutdown complete
3,272✔
132
                        submission_queue->wait_while_empty();  // we are stalled on the scheduler, suspend thread
3,025✔
133
                        last_progress_timestamp.reset();       // do not treat suspension as being stuck
3,025✔
134
                }
135

136
                recv_arbiter.poll_communicator();
6,657,731✔
137
                poll_in_flight_async_instructions();
6,657,731✔
138
                poll_submission_queue();
6,657,731✔
139
                try_issue_one_instruction(); // potentially expensive, so only issue one per loop to continue checking for async completion in between
6,657,731✔
140

141
                if(++check_overflow_counter == 0) { // once every 256 iterations
6,657,731✔
142
                        backend->check_async_errors();
25,953✔
143
                        check_progress();
25,953✔
144
                }
145
        }
146

147
        assert(in_flight_async_instructions.empty());
247✔
148
        // check that for each alloc_instruction, we executed a corresponding free_instruction
149
        assert(std::all_of(allocations.begin(), allocations.end(),
494✔
150
            [](const std::pair<allocation_id, void*>& p) { return p.first == null_allocation_id || p.first.get_memory_id() == user_memory_id; }));
151
        // check that for each track_host_object_instance, we executed a destroy_host_object_instruction
152
        assert(host_object_instances.empty());
247✔
153

154
        closure_hydrator::teardown();
247✔
155
}
247✔
156

157
void executor_impl::poll_in_flight_async_instructions() {
6,657,731✔
158
        utils::erase_if(in_flight_async_instructions, [&](async_instruction_state& async) {
6,657,731✔
159
                if(!async.event.is_complete()) return false;
10,720,719✔
160
                retire_async_instruction(async);
9,707✔
161
                made_progress = true;
9,707✔
162
                return true;
9,707✔
163
        });
164
}
6,657,731✔
165

166
void executor_impl::poll_submission_queue() {
6,657,731✔
167
        for(auto& submission : submission_queue->pop_all()) {
6,664,186✔
168
                matchbox::match(
6,455✔
169
                    submission,
170
                    [&](const instruction_pilot_batch& batch) {
6,455✔
171
                            for(const auto incoming_instr : batch.instructions) {
17,580✔
172
                                    engine.submit(incoming_instr);
11,330✔
173
                            }
174
                            for(const auto& pilot : batch.pilots) {
6,912✔
175
                                    root_communicator->send_outbound_pilot(pilot);
662✔
176
                            }
177
                    },
6,250✔
178
                    [&](const user_allocation_transfer& uat) {
12,910✔
179
                            assert(uat.aid != null_allocation_id);
107✔
180
                            assert(uat.aid.get_memory_id() == user_memory_id);
107✔
181
                            assert(allocations.count(uat.aid) == 0);
107✔
182
                            allocations.emplace(uat.aid, uat.ptr);
107✔
183
                    },
107✔
184
                    [&](host_object_transfer& hot) {
12,910✔
185
                            assert(host_object_instances.count(hot.hoid) == 0);
30✔
186
                            host_object_instances.emplace(hot.hoid, std::move(hot.instance));
30✔
187
                    },
30✔
188
                    [&](reducer_transfer& rt) {
12,910✔
189
                            assert(reducers.count(rt.rid) == 0);
68✔
190
                            reducers.emplace(rt.rid, std::move(rt.reduction));
68✔
191
                    });
68✔
192
        }
193
}
6,657,731✔
194

195
void executor_impl::retire_async_instruction(async_instruction_state& async) {
9,707✔
196
#if CELERITY_ACCESSOR_BOUNDARY_CHECK
197
        if(async.oob_info != nullptr) {
9,707✔
198
                const auto& oob_info = *async.oob_info;
2,959✔
199
                for(size_t i = 0; i < oob_info.accessors.size(); ++i) {
6,524✔
200
                        if(const auto oob_box = oob_info.illegal_access_bounding_boxes[i].into_box(); !oob_box.empty()) {
3,565✔
201
                                const auto& accessor_info = oob_info.accessors[i];
12✔
202
                                CELERITY_ERROR("Out-of-bounds access detected in {}: accessor {} attempted to access buffer {} indicies between {} and outside the "
12✔
203
                                               "declared range {}.",
204
                                    utils::make_task_debug_label(oob_info.task_type, oob_info.task_id, oob_info.task_name), i,
205
                                    utils::make_buffer_debug_label(accessor_info.buffer_id, accessor_info.buffer_name), oob_box, accessor_info.accessible_box);
206
                        }
207
                }
208
                if(oob_info.illegal_access_bounding_boxes != nullptr /* i.e. there was at least one accessor */) {
2,959!
209
                        backend->debug_free(oob_info.illegal_access_bounding_boxes);
2,959✔
210
                }
211
        }
212
#endif
213

214
        if(spdlog::should_log(spdlog::level::trace)) {
9,707✔
215
                if(const auto native_time = async.event.get_native_execution_time(); native_time.has_value()) {
5,782!
216
                        CELERITY_TRACE("[executor] retired I{} after {:.2f}", async.instr->get_id(), as_sub_second(*native_time));
×
217
                } else {
218
                        CELERITY_TRACE("[executor] retired I{}", async.instr->get_id());
5,782✔
219
                }
220
        }
221

222
        if(utils::isa<alloc_instruction>(async.instr)) {
9,707✔
223
                const auto ainstr = utils::as<alloc_instruction>(async.instr);
1,085✔
224
                const auto ptr = async.event.get_result();
1,085✔
225
                assert(ptr != nullptr && "backend allocation returned nullptr");
1,085✔
226
                const auto aid = ainstr->get_allocation_id();
1,085✔
227
                CELERITY_TRACE("[executor] {} allocated as {}", aid, ptr);
1,085!
228
                assert(allocations.count(aid) == 0);
1,085✔
229
                allocations.emplace(aid, ptr);
1,085✔
230
        }
231

232
        engine.complete_assigned(async.instr);
9,707✔
233
}
9,707✔
234

235
template <typename Instr>
236
auto executor_impl::dispatch(const Instr& instr, const out_of_order_engine::assignment& assignment)
1,623✔
237
    // SFINAE: there is a (synchronous) `issue` overload above for the concrete Instr type
238
    -> decltype(issue(instr)) //
239
{
240
        assert(assignment.target == out_of_order_engine::target::immediate);
1,623✔
241
        assert(!assignment.lane.has_value());
1,623✔
242
        issue(instr); // completes immediately
1,623✔
243
        engine.complete_assigned(&instr);
1,623✔
244
}
1,623✔
245

246
template <typename Instr>
247
auto executor_impl::dispatch(const Instr& instr, const out_of_order_engine::assignment& assignment)
9,707✔
248
    // SFINAE: there is an `issue_async` overload above for the concrete Instr type
249
    -> decltype(issue_async(instr, assignment, std::declval<async_instruction_state&>())) //
250
{
251
        auto& async = in_flight_async_instructions.emplace_back();
9,707✔
252
        async.instr = assignment.instruction;
9,707✔
253
        issue_async(instr, assignment, async); // stores event in `async` and completes asynchronously
9,707✔
254
}
9,707✔
255

256
void executor_impl::try_issue_one_instruction() {
6,657,731✔
257
        auto assignment = engine.assign_one();
6,657,731✔
258
        if(!assignment.has_value()) return;
6,657,731✔
259

260
        matchbox::match(*assignment->instruction, [&](const auto& instr) { dispatch(instr, *assignment); });
22,660✔
261
        made_progress = true;
11,330✔
262
}
263

264
void executor_impl::check_progress() {
25,953✔
265
        if(!policy.progress_warning_timeout.has_value()) return;
25,953!
266

267
        if(made_progress) {
25,953✔
268
                last_progress_timestamp = std::chrono::steady_clock::now();
412✔
269
                progress_warning_emitted = false;
412✔
270
                made_progress = false;
412✔
271
        } else if(last_progress_timestamp.has_value()) {
25,541!
272
                // being stuck either means a deadlock in the user application, or a bug in Celerity.
273
                const auto elapsed_since_last_progress = std::chrono::steady_clock::now() - *last_progress_timestamp;
25,541✔
274
                if(elapsed_since_last_progress > *policy.progress_warning_timeout && !progress_warning_emitted) {
25,541✔
275
                        std::string instr_list;
2✔
276
                        for(auto& in_flight : in_flight_async_instructions) {
4✔
277
                                if(!instr_list.empty()) instr_list += ", ";
2!
278
                                fmt::format_to(std::back_inserter(instr_list), "I{}", in_flight.instr->get_id());
4✔
279
                        }
280
                        CELERITY_WARN("[executor] no progress for {:.2f}, might be stuck. Active instructions: {}", as_sub_second(elapsed_since_last_progress),
2✔
281
                            in_flight_async_instructions.empty() ? "none" : instr_list);
282
                        progress_warning_emitted = true;
2✔
283
                }
2✔
284
        }
285
}
286

287
void executor_impl::issue(const clone_collective_group_instruction& ccginstr) {
33✔
288
        const auto original_cgid = ccginstr.get_original_collective_group_id();
33✔
289
        assert(original_cgid != non_collective_group_id);
33✔
290
        assert(original_cgid == root_collective_group_id || cloned_communicators.count(original_cgid) != 0);
33✔
291

292
        const auto new_cgid = ccginstr.get_new_collective_group_id();
33✔
293
        assert(new_cgid != non_collective_group_id && new_cgid != root_collective_group_id);
33✔
294
        assert(cloned_communicators.count(new_cgid) == 0);
33✔
295

296
        CELERITY_TRACE("[executor] I{}: clone collective group CG{} -> CG{}", ccginstr.get_id(), original_cgid, new_cgid);
33✔
297

298
        const auto original_communicator = original_cgid == root_collective_group_id ? root_communicator : cloned_communicators.at(original_cgid).get();
33✔
299
        cloned_communicators.emplace(new_cgid, original_communicator->collective_clone());
33✔
300
}
33✔
301

302

303
void executor_impl::issue(const split_receive_instruction& srinstr) {
20✔
304
        CELERITY_TRACE("[executor] I{}: split receive {} {}x{} bytes into {} ({}),", srinstr.get_id(), srinstr.get_transfer_id(), srinstr.get_requested_region(),
20✔
305
            srinstr.get_element_size(), srinstr.get_dest_allocation_id(), srinstr.get_allocated_box());
306

307
        const auto allocation = allocations.at(srinstr.get_dest_allocation_id());
20✔
308
        recv_arbiter.begin_split_receive(
20✔
309
            srinstr.get_transfer_id(), srinstr.get_requested_region(), allocation, srinstr.get_allocated_box(), srinstr.get_element_size());
310
}
20✔
311

312
void executor_impl::issue(const fill_identity_instruction& fiinstr) {
19✔
313
        CELERITY_TRACE("[executor] I{}: fill identity {} x{} values for R{}", fiinstr.get_id(), fiinstr.get_allocation_id(), fiinstr.get_num_values(),
19✔
314
            fiinstr.get_reduction_id());
315

316
        const auto allocation = allocations.at(fiinstr.get_allocation_id());
19✔
317
        const auto& reduction = *reducers.at(fiinstr.get_reduction_id());
19✔
318
        reduction.fill_identity(allocation, fiinstr.get_num_values());
19✔
319
}
19✔
320

321
void executor_impl::issue(const reduce_instruction& rinstr) {
47✔
322
        CELERITY_TRACE("[executor] I{}: reduce {} x{} values into {} for R{}", rinstr.get_id(), rinstr.get_source_allocation_id(), rinstr.get_num_source_values(),
47✔
323
            rinstr.get_dest_allocation_id(), rinstr.get_reduction_id());
324

325
        const auto gather_allocation = allocations.at(rinstr.get_source_allocation_id());
47✔
326
        const auto dest_allocation = allocations.at(rinstr.get_dest_allocation_id());
47✔
327
        const auto& reduction = *reducers.at(rinstr.get_reduction_id());
47✔
328
        reduction.reduce(dest_allocation, gather_allocation, rinstr.get_num_source_values());
47✔
329
}
47✔
330

331
void executor_impl::issue(const fence_instruction& finstr) { // NOLINT(readability-convert-member-functions-to-static)
63✔
332
        CELERITY_TRACE("[executor] I{}: fence", finstr.get_id());
63✔
333

334
        finstr.get_promise()->fulfill();
63✔
335
}
63✔
336

337
void executor_impl::issue(const destroy_host_object_instruction& dhoinstr) {
30✔
338
        assert(host_object_instances.count(dhoinstr.get_host_object_id()) != 0);
30✔
339
        CELERITY_TRACE("[executor] I{}: destroy H{}", dhoinstr.get_id(), dhoinstr.get_host_object_id());
30✔
340

341
        host_object_instances.erase(dhoinstr.get_host_object_id());
30✔
342
}
30✔
343

344
void executor_impl::issue(const horizon_instruction& hinstr) {
840✔
345
        CELERITY_TRACE("[executor] I{}: horizon", hinstr.get_id());
840✔
346

347
        if(delegate != nullptr) { delegate->horizon_reached(hinstr.get_horizon_task_id()); }
840!
348
        collect(hinstr.get_garbage());
840✔
349
}
840✔
350

351
void executor_impl::issue(const epoch_instruction& einstr) {
571✔
352
        switch(einstr.get_epoch_action()) {
571!
353
        case epoch_action::none: //
271✔
354
                CELERITY_TRACE("[executor] I{}: epoch", einstr.get_id());
271✔
355
                break;
271✔
356
        case epoch_action::barrier: //
53✔
357
                CELERITY_TRACE("[executor] I{}: epoch (barrier)", einstr.get_id());
53✔
358
                root_communicator->collective_barrier();
53✔
359
                break;
53✔
360
        case epoch_action::shutdown: //
247✔
361
                CELERITY_TRACE("[executor] I{}: epoch (shutdown)", einstr.get_id());
247✔
362
                expecting_more_submissions = false;
247✔
363
                break;
247✔
364
        }
365
        if(delegate != nullptr && einstr.get_epoch_task_id() != 0 /* TODO task_manager doesn't expect us to actually execute the init epoch */) {
571!
366
                delegate->epoch_reached(einstr.get_epoch_task_id());
324✔
367
        }
368
        collect(einstr.get_garbage());
571✔
369
}
571✔
370

371
void executor_impl::issue_async(const alloc_instruction& ainstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async) {
1,085✔
372
        assert(ainstr.get_allocation_id().get_memory_id() != user_memory_id);
1,085✔
373
        assert(assignment.target == out_of_order_engine::target::alloc_queue);
1,085✔
374
        assert(!assignment.lane.has_value());
1,085✔
375
        assert(assignment.device.has_value() == (ainstr.get_allocation_id().get_memory_id() > host_memory_id));
1,085✔
376

377
        CELERITY_TRACE(
1,085✔
378
            "[executor] I{}: alloc {}, {} % {} bytes", ainstr.get_id(), ainstr.get_allocation_id(), ainstr.get_size_bytes(), ainstr.get_alignment_bytes());
379

380
        if(assignment.device.has_value()) {
1,085✔
381
                async.event = backend->enqueue_device_alloc(*assignment.device, ainstr.get_size_bytes(), ainstr.get_alignment_bytes());
563✔
382
        } else {
383
                async.event = backend->enqueue_host_alloc(ainstr.get_size_bytes(), ainstr.get_alignment_bytes());
522✔
384
        }
385
}
1,085✔
386

387
void executor_impl::issue_async(const free_instruction& finstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async) {
1,085✔
388
        const auto it = allocations.find(finstr.get_allocation_id());
1,085✔
389
        assert(it != allocations.end());
1,085✔
390
        const auto ptr = it->second;
1,085✔
391
        allocations.erase(it);
1,085✔
392

393
        CELERITY_TRACE("[executor] I{}: free {}", finstr.get_id(), finstr.get_allocation_id());
1,085✔
394

395
        if(assignment.device.has_value()) {
1,085✔
396
                async.event = backend->enqueue_device_free(*assignment.device, ptr);
563✔
397
        } else {
398
                async.event = backend->enqueue_host_free(ptr);
522✔
399
        }
400
}
1,085✔
401

402
void executor_impl::issue_async(const copy_instruction& cinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async) {
2,311✔
403
        assert(assignment.target == out_of_order_engine::target::host_queue || assignment.target == out_of_order_engine::target::device_queue);
2,311✔
404
        assert((assignment.target == out_of_order_engine::target::device_queue) == assignment.device.has_value());
2,311✔
405
        assert(assignment.lane.has_value());
2,311✔
406

407
        CELERITY_TRACE("[executor] I{}: copy {} ({}) -> {} ({}), {}x{} bytes", cinstr.get_id(), cinstr.get_source_allocation(), cinstr.get_source_box(),
2,311✔
408
            cinstr.get_dest_allocation(), cinstr.get_dest_box(), cinstr.get_copy_region(), cinstr.get_element_size());
409

410
        const auto source_base = static_cast<const std::byte*>(allocations.at(cinstr.get_source_allocation().id)) + cinstr.get_source_allocation().offset_bytes;
2,311✔
411
        const auto dest_base = static_cast<std::byte*>(allocations.at(cinstr.get_dest_allocation().id)) + cinstr.get_dest_allocation().offset_bytes;
2,311✔
412

413
        if(assignment.device.has_value()) {
2,311✔
414
                async.event = backend->enqueue_device_copy(*assignment.device, *assignment.lane, source_base, dest_base, cinstr.get_source_box(), cinstr.get_dest_box(),
4,240✔
415
                    cinstr.get_copy_region(), cinstr.get_element_size());
2,120✔
416
        } else {
417
                async.event = backend->enqueue_host_copy(
573✔
418
                    *assignment.lane, source_base, dest_base, cinstr.get_source_box(), cinstr.get_dest_box(), cinstr.get_copy_region(), cinstr.get_element_size());
382✔
419
        }
420
}
2,311✔
421

422
std::string format_access_log(const buffer_access_allocation_map& map) {
3,523✔
423
        std::string acc_log;
3,523✔
424
        for(size_t i = 0; i < map.size(); ++i) {
5,982✔
425
                auto& aa = map[i];
2,459✔
426
                const auto accessed_box_in_allocation = box(aa.accessed_box_in_buffer.get_min() - aa.allocated_box_in_buffer.get_offset(),
7,377✔
427
                    aa.accessed_box_in_buffer.get_max() - aa.allocated_box_in_buffer.get_offset());
7,377✔
428
                fmt::format_to(std::back_inserter(acc_log), "{} {} {}", i == 0 ? ", accessing" : ",", aa.allocation_id, accessed_box_in_allocation);
4,918✔
429
        }
430
        return acc_log;
3,523✔
431
}
×
432

433
void executor_impl::issue_async(const device_kernel_instruction& dkinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async) {
894✔
434
        assert(assignment.target == out_of_order_engine::target::device_queue);
894✔
435
        assert(assignment.device == dkinstr.get_device_id());
894✔
436
        assert(assignment.lane.has_value());
894✔
437

438
        CELERITY_TRACE("[executor] I{}: launch device kernel on D{}, {}{}", dkinstr.get_id(), dkinstr.get_device_id(), dkinstr.get_execution_range(),
894✔
439
            format_access_log(dkinstr.get_access_allocations()));
440

441
        auto accessor_infos = make_accessor_infos(dkinstr.get_access_allocations());
894✔
442
#if CELERITY_ACCESSOR_BOUNDARY_CHECK
443
        async.oob_info = attach_boundary_check_info(
1,788✔
444
            accessor_infos, dkinstr.get_access_allocations(), dkinstr.get_oob_task_type(), dkinstr.get_oob_task_id(), dkinstr.get_oob_task_name());
894✔
445
#endif
446

447
        const auto& reduction_allocs = dkinstr.get_reduction_allocations();
894✔
448
        std::vector<void*> reduction_ptrs(reduction_allocs.size());
2,682✔
449
        for(size_t i = 0; i < reduction_allocs.size(); ++i) {
999✔
450
                reduction_ptrs[i] = allocations.at(reduction_allocs[i].allocation_id);
105✔
451
        }
452

453
        async.event = backend->enqueue_device_kernel(
3,576✔
454
            dkinstr.get_device_id(), *assignment.lane, dkinstr.get_launcher(), std::move(accessor_infos), dkinstr.get_execution_range(), reduction_ptrs);
2,682✔
455
}
1,788✔
456

457
void executor_impl::issue_async(const host_task_instruction& htinstr, const out_of_order_engine::assignment& assignment, async_instruction_state& async) {
3,234✔
458
        assert(assignment.target == out_of_order_engine::target::host_queue);
3,234✔
459
        assert(!assignment.device.has_value());
3,234✔
460
        assert(assignment.lane.has_value());
3,234✔
461

462
        CELERITY_TRACE(
3,234✔
463
            "[executor] I{}: launch host task, {}{}", htinstr.get_id(), htinstr.get_execution_range(), format_access_log(htinstr.get_access_allocations()));
464

465
        auto accessor_infos = make_accessor_infos(htinstr.get_access_allocations());
3,234✔
466
#if CELERITY_ACCESSOR_BOUNDARY_CHECK
467
        async.oob_info = attach_boundary_check_info(
6,468✔
468
            accessor_infos, htinstr.get_access_allocations(), htinstr.get_oob_task_type(), htinstr.get_oob_task_id(), htinstr.get_oob_task_name());
3,234✔
469
#endif
470

471
        const auto& execution_range = htinstr.get_execution_range();
3,234✔
472
        const auto collective_comm =
473
            htinstr.get_collective_group_id() != non_collective_group_id ? cloned_communicators.at(htinstr.get_collective_group_id()).get() : nullptr;
3,234!
474

475
        async.event = backend->enqueue_host_task(*assignment.lane, htinstr.get_launcher(), std::move(accessor_infos), execution_range, collective_comm);
3,234✔
476
}
6,468✔
477

478
void executor_impl::issue_async(
662✔
479
    const send_instruction& sinstr, [[maybe_unused]] const out_of_order_engine::assignment& assignment, async_instruction_state& async) //
480
{
481
        assert(assignment.target == out_of_order_engine::target::immediate);
662✔
482

483
        CELERITY_TRACE("[executor] I{}: send {}+{}, {}x{} bytes to N{} (MSG{})", sinstr.get_id(), sinstr.get_source_allocation_id(),
662✔
484
            sinstr.get_offset_in_source_allocation(), sinstr.get_send_range(), sinstr.get_element_size(), sinstr.get_dest_node_id(), sinstr.get_message_id());
485

486
        const auto allocation_base = allocations.at(sinstr.get_source_allocation_id());
662✔
487
        const communicator::stride stride{
662✔
488
            sinstr.get_source_allocation_range(),
662✔
489
            subrange<3>{sinstr.get_offset_in_source_allocation(), sinstr.get_send_range()},
490
            sinstr.get_element_size(),
662✔
491
        };
662✔
492
        async.event = root_communicator->send_payload(sinstr.get_dest_node_id(), sinstr.get_message_id(), allocation_base, stride);
662✔
493
}
662✔
494

495
void executor_impl::issue_async(
370✔
496
    const receive_instruction& rinstr, [[maybe_unused]] const out_of_order_engine::assignment& assignment, async_instruction_state& async) //
497
{
498
        assert(assignment.target == out_of_order_engine::target::immediate);
370✔
499

500
        CELERITY_TRACE("[executor] I{}: receive {} {}x{} bytes into {} ({})", rinstr.get_id(), rinstr.get_transfer_id(), rinstr.get_requested_region(),
370✔
501
            rinstr.get_element_size(), rinstr.get_dest_allocation_id(), rinstr.get_allocated_box());
502

503
        const auto allocation = allocations.at(rinstr.get_dest_allocation_id());
370✔
504
        async.event =
370✔
505
            recv_arbiter.receive(rinstr.get_transfer_id(), rinstr.get_requested_region(), allocation, rinstr.get_allocated_box(), rinstr.get_element_size());
370✔
506
}
370✔
507

508
void executor_impl::issue_async(
48✔
509
    const await_receive_instruction& arinstr, [[maybe_unused]] const out_of_order_engine::assignment& assignment, async_instruction_state& async) //
510
{
511
        assert(assignment.target == out_of_order_engine::target::immediate);
48✔
512

513
        CELERITY_TRACE("[executor] I{}: await receive {} {}", arinstr.get_id(), arinstr.get_transfer_id(), arinstr.get_received_region());
48✔
514

515
        async.event = recv_arbiter.await_split_receive_subregion(arinstr.get_transfer_id(), arinstr.get_received_region());
48✔
516
}
48✔
517

518
void executor_impl::issue_async(
18✔
519
    const gather_receive_instruction& grinstr, [[maybe_unused]] const out_of_order_engine::assignment& assignment, async_instruction_state& async) //
520
{
521
        assert(assignment.target == out_of_order_engine::target::immediate);
18✔
522

523
        CELERITY_TRACE("[executor] I{}: gather receive {} into {}, {} bytes / node", grinstr.get_id(), grinstr.get_transfer_id(), grinstr.get_dest_allocation_id(),
18✔
524
            grinstr.get_node_chunk_size());
525

526
        const auto allocation = allocations.at(grinstr.get_dest_allocation_id());
18✔
527
        async.event = recv_arbiter.gather_receive(grinstr.get_transfer_id(), allocation, grinstr.get_node_chunk_size());
18✔
528
}
18✔
529

530
void executor_impl::collect(const instruction_garbage& garbage) {
1,411✔
531
        for(const auto rid : garbage.reductions) {
1,478✔
532
                assert(reducers.count(rid) != 0);
67✔
533
                reducers.erase(rid);
67✔
534
        }
535
        for(const auto aid : garbage.user_allocations) {
1,518✔
536
                assert(aid.get_memory_id() == user_memory_id);
107✔
537
                assert(allocations.count(aid) != 0);
107✔
538
                allocations.erase(aid);
107✔
539
        }
540
}
1,411✔
541

542
std::vector<closure_hydrator::accessor_info> executor_impl::make_accessor_infos(const buffer_access_allocation_map& amap) const {
4,128✔
543
        std::vector<closure_hydrator::accessor_info> accessor_infos(amap.size());
12,384✔
544
        for(size_t i = 0; i < amap.size(); ++i) {
7,693✔
545
                const auto ptr = allocations.at(amap[i].allocation_id);
3,565✔
546
                accessor_infos[i] = closure_hydrator::accessor_info{ptr, amap[i].allocated_box_in_buffer, amap[i].accessed_box_in_buffer};
3,565✔
547
        }
548
        return accessor_infos;
4,128✔
549
}
×
550

551
#if CELERITY_ACCESSOR_BOUNDARY_CHECK
552
std::unique_ptr<boundary_check_info> executor_impl::attach_boundary_check_info(std::vector<closure_hydrator::accessor_info>& accessor_infos,
4,128✔
553
    const buffer_access_allocation_map& amap, task_type tt, task_id tid, const std::string& task_name) const //
554
{
555
        if(amap.empty()) return nullptr;
4,128✔
556

557
        auto oob_info = std::make_unique<boundary_check_info>(tt, tid, task_name);
2,959✔
558

559
        oob_info->illegal_access_bounding_boxes = static_cast<oob_bounding_box*>(backend->debug_alloc(amap.size() * sizeof(oob_bounding_box)));
2,959✔
560
        std::uninitialized_default_construct_n(oob_info->illegal_access_bounding_boxes, amap.size());
2,959✔
561

562
        oob_info->accessors.resize(amap.size());
2,959✔
563
        for(size_t i = 0; i < amap.size(); ++i) {
6,524✔
564
                oob_info->accessors[i] = boundary_check_info::accessor_info{amap[i].oob_buffer_id, amap[i].oob_buffer_name, amap[i].accessed_box_in_buffer};
3,565✔
565
                accessor_infos[i].out_of_bounds_indices = oob_info->illegal_access_bounding_boxes + i;
3,565✔
566
        }
567
        return oob_info;
2,959✔
568
}
2,959✔
569
#endif // CELERITY_ACCESSOR_BOUNDARY_CHECK
570

571
} // namespace celerity::detail::live_executor_detail
572

573
namespace celerity::detail {
574

575
live_executor::live_executor(std::unique_ptr<backend> backend, std::unique_ptr<communicator> root_comm, delegate* const dlg, const policy_set& policy)
247✔
576
    : m_root_comm(std::move(root_comm)), m_thread(&live_executor::thread_main, this, std::move(backend), dlg, policy) //
247✔
577
{
578
        set_thread_name(m_thread.native_handle(), "cy-executor");
741✔
579
}
247✔
580

581
live_executor::~live_executor() {
494✔
582
        m_thread.join(); // thread_main will exit only after executing shutdown epoch
247✔
583
}
494✔
584

585
void live_executor::track_user_allocation(const allocation_id aid, void* const ptr) {
107✔
586
        m_submission_queue.push(live_executor_detail::user_allocation_transfer{aid, ptr});
107✔
587
}
107✔
588

589
void live_executor::track_host_object_instance(const host_object_id hoid, std::unique_ptr<host_object_instance> instance) {
30✔
590
        assert(instance != nullptr);
30✔
591
        m_submission_queue.push(live_executor_detail::host_object_transfer{hoid, std::move(instance)});
30✔
592
}
30✔
593

594
void live_executor::track_reducer(const reduction_id rid, std::unique_ptr<reducer> reducer) {
68✔
595
        assert(reducer != nullptr);
68✔
596
        m_submission_queue.push(live_executor_detail::reducer_transfer{rid, std::move(reducer)});
68✔
597
}
68✔
598

599
void live_executor::submit(std::vector<const instruction*> instructions, std::vector<outbound_pilot> pilots) {
6,250✔
600
        m_submission_queue.push(live_executor_detail::instruction_pilot_batch{std::move(instructions), std::move(pilots)});
6,250!
601
}
6,250✔
602

603
void live_executor::thread_main(std::unique_ptr<backend> backend, delegate* const dlg, const policy_set& policy) {
247✔
604
        try {
605
                live_executor_detail::executor_impl(std::move(backend), m_root_comm.get(), m_submission_queue, dlg, policy).run();
247✔
606
        }
607
        // LCOV_EXCL_START
608
        catch(const std::exception& e) {
609
                CELERITY_CRITICAL("[executor] {}", e.what());
610
                std::abort();
611
        }
612
        // LCOV_EXCL_STOP
613
}
247✔
614

615
} // namespace celerity::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc