• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 9945915519

15 Jul 2024 08:07PM UTC coverage: 93.077% (-1.3%) from 94.362%
9945915519

push

github

fknorr
Rename existing backend / executor -> legacy_backend / legacy_executor

Names 'backend' and 'executor' will be re-used, but we want to keep the
old APIs around in the meantime to keep changesets small.

3188 of 3687 branches covered (86.47%)

Branch coverage included in aggregate %.

17 of 23 new or added lines in 6 files covered. (73.91%)

95 existing lines in 8 files now uncovered.

7232 of 7508 relevant lines covered (96.32%)

169246.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.74
/src/instruction_graph_generator.cc
1
#include "instruction_graph_generator.h"
2

3
#include "access_modes.h"
4
#include "command.h"
5
#include "grid.h"
6
#include "instruction_graph.h"
7
#include "recorders.h"
8
#include "region_map.h"
9
#include "split.h"
10
#include "system_info.h"
11
#include "task.h"
12
#include "task_manager.h"
13
#include "types.h"
14

15
#include <unordered_map>
16
#include <unordered_set>
17
#include <vector>
18

19

20
namespace celerity::detail::instruction_graph_generator_detail {
21

22
/// Helper for split_into_communicator_compatible_boxes().
23
void split_into_communicator_compatible_boxes_recurse(
4,190✔
24
    box_vector<3>& compatible_boxes, const box<3>& send_box, id<3> min, id<3> max, const int compatible_starting_from_dim, const int dim) {
25
        assert(dim <= compatible_starting_from_dim);
4,190✔
26
        const auto& full_box_min = send_box.get_min();
4,190✔
27
        const auto& full_box_max = send_box.get_max();
4,190✔
28

29
        if(dim == compatible_starting_from_dim) {
4,190✔
30
                // There are no incompatible strides in faster dimensions, so we simply split along this dimension into communicator_max_coordinate-sized chunks
31
                for(min[dim] = full_box_min[dim]; min[dim] < full_box_max[dim]; min[dim] += communicator_max_coordinate) {
8,384✔
32
                        max[dim] = std::min(full_box_max[dim], min[dim] + communicator_max_coordinate);
4,200✔
33
                        compatible_boxes.emplace_back(min, max);
4,200✔
34
                }
35
        } else {
36
                // A fast dimension (> 0) has incompatible strides - we can't do better than iterating over the slow dimension
37
                for(min[dim] = full_box_min[dim]; min[dim] < full_box_max[dim]; ++min[dim]) {
4,124✔
38
                        max[dim] = min[dim] + 1;
4,118✔
39
                        split_into_communicator_compatible_boxes_recurse(compatible_boxes, send_box, min, max, compatible_starting_from_dim, dim + 1);
4,118✔
40
                }
41
        }
42
}
4,190✔
43

44
/// Computes a decomposition of `send_box` into boxes that are compatible with every communicator. Note that this takes `buffer_range` rather than
45
/// `allocation_range` as a parameter, because a box that is compatible with the sender allocation might not be compatible with the allocation on the receiver
46
/// side (which we don't know anything about), but splitting for `buffer_range` will guarantee both.
47
///
48
/// On the MPI side, we implement buffer transfers as peer-to-peer operations with sub-array datatypes. These are implemented with 32-bit signed integer
49
/// strides, so transfers on buffers with a range > 2^31 in any dimension might have to be split to work around the coordinate limit by adjusting the base
50
/// pointer to using the stride as an offset. For 1D buffers this will no practical performance consequences because of the implied transfer sizes, but in
51
/// degenerate higher-dimensional cases we might end up transferring individual buffer elements per send instruction.
52
///
53
/// TODO The proper solution to this is to explicitly implement linearization / delinearization in the IDAG, which is also required to enable RDMA transfers.
54
box_vector<3> split_into_communicator_compatible_boxes(const range<3>& buffer_range, const box<3>& send_box) {
72✔
55
        assert(box(subrange<3>(zeros, buffer_range)).covers(send_box));
72✔
56

57
        int compatible_starting_from_dim = 0;
72✔
58
        for(int d = 1; d < 3; ++d) {
216✔
59
                if(buffer_range[d] > communicator_max_coordinate) { compatible_starting_from_dim = d; }
144✔
60
        }
61

62
        // There are pathological patterns like single-element columns in a 2D buffer with y-extent > communicator_max_coordinate that generate a huge number of
63
        // individual transfers with a small payload each. This might take very long and / or derail the instruction graph generator, so we log a warning before
64
        // computing the actual set.
65
        size_t max_compatible_box_area = 1;
72✔
66
        size_t min_num_compatible_boxes = 1;
72✔
67
        for(int d = 0; d < 3; ++d) {
288✔
68
                (d < compatible_starting_from_dim ? min_num_compatible_boxes : max_compatible_box_area) *= send_box.get_range()[d];
216✔
69
        }
70
        if(min_num_compatible_boxes > 256 && max_compatible_box_area < 65536) {
72!
71
                CELERITY_WARN("Celerity is generating an excessive amount of small transfers to keep strides representable as 32-bit integers for MPI compatibility. "
2✔
72
                              "This might be very slow and / or exhaust system memory. Consider transposing your buffer layout to remedy this.");
73
        }
74

75
        box_vector<3> compatible_boxes;
72✔
76
        split_into_communicator_compatible_boxes_recurse(compatible_boxes, send_box, send_box.get_min(), send_box.get_max(), compatible_starting_from_dim, 0);
72✔
77
        return compatible_boxes;
72✔
78
}
×
79

80
/// Determines whether two boxes are either overlapping or touching on edges (not corners). This means on 2-connectivity for 1d boxes, 4-connectivity for 2d
81
/// boxes and 6-connectivity for 3d boxes. For 0-dimensional boxes, always returns true.
82
template <int Dims>
83
bool boxes_edge_connected(const box<Dims>& box1, const box<Dims>& box2) {
94✔
84
        if(box1.empty() || box2.empty()) return false;
94✔
85

86
        // boxes can be 2/4/6 connected by either fully overlapping, or by overlapping in Dims-1 dimensions and touching (a.max[d] == b.min[d]) in the remaining one
87
        bool disconnected = false;
92✔
88
        int n_dims_touching = 0;
92✔
89
        for(int d = 0; d < Dims; ++d) {
303✔
90
                // compute the intersection but without normalizing the box to distinguish the "disconnected" from the "touching" case
91
                const auto min = std::max(box1.get_min()[d], box2.get_min()[d]);
211✔
92
                const auto max = std::min(box1.get_max()[d], box2.get_max()[d]);
211✔
93
                if(min < max) {
211✔
94
                        // boxes overlap in this dimension
95
                } else if(min == max) {
118✔
96
                        n_dims_touching += 1;
80✔
97
                } else /* min > max */ {
98
                        disconnected = true;
38✔
99
                }
100
        }
101
        return !disconnected && n_dims_touching <= 1;
92✔
102
}
103

104
// explicit instantiations for tests
105
template bool boxes_edge_connected(const box<0>& box1, const box<0>& box2);
106
template bool boxes_edge_connected(const box<1>& box1, const box<1>& box2);
107
template bool boxes_edge_connected(const box<2>& box1, const box<2>& box2);
108
template bool boxes_edge_connected(const box<3>& box1, const box<3>& box2);
109

110
/// Subdivide a region into connected partitions (where connectivity is established by `boxes_edge_connected`) and return the bounding box of each partition.
111
/// Note that the returned boxes are not necessarily disjoint event through the partitions always are.
112
///
113
/// This logic is employed to find connected subregions in a pending-receive that might be satisfied by a peer through a single send operation and thus requires
114
/// a contiguous backing allocation.
115
template <int Dims>
116
box_vector<Dims> connected_subregion_bounding_boxes(const region<Dims>& region) {
32✔
117
        auto boxes = region.get_boxes();
32✔
118
        auto begin = boxes.begin();
32✔
119
        auto end = boxes.end();
32✔
120
        box_vector<Dims> bounding_boxes;
32✔
121
        while(begin != end) {
71✔
122
                auto connected_end = std::next(begin);
39✔
123
                auto connected_bounding_box = *begin; // optimization: skip connectivity checks if bounding box is disconnected
39✔
124
                for(; connected_end != end; ++connected_end) {
43✔
125
                        const auto next_connected = std::find_if(connected_end, end, [&](const auto& candidate) {
31✔
126
                                return boxes_edge_connected(connected_bounding_box, candidate)
19✔
127
                                       && std::any_of(begin, connected_end, [&](const auto& box) { return boxes_edge_connected(candidate, box); });
36!
128
                        });
129
                        if(next_connected == end) break;
12✔
130
                        connected_bounding_box = bounding_box(connected_bounding_box, *next_connected);
4✔
131
                        std::swap(*next_connected, *connected_end);
4✔
132
                }
133
                bounding_boxes.push_back(connected_bounding_box);
39✔
134
                begin = connected_end;
39✔
135
        }
136
        return bounding_boxes;
64✔
137
}
32✔
138

139
// explicit instantiations for tests
140
template box_vector<0> connected_subregion_bounding_boxes(const region<0>& region);
141
template box_vector<1> connected_subregion_bounding_boxes(const region<1>& region);
142
template box_vector<2> connected_subregion_bounding_boxes(const region<2>& region);
143
template box_vector<3> connected_subregion_bounding_boxes(const region<3>& region);
144

145
/// Iteratively replaces each pair of overlapping boxes by their bounding box such that in the modified set of boxes, no two boxes overlap, but all original
146
/// input boxes are covered.
147
///
148
/// When a kernel or host task has multiple accessors into a single allocation, each must be backed by a contiguous allocation. This makes it necessary to
149
/// contiguously allocate the bounding box of all overlapping accesses.
150
template <int Dims>
151
void merge_overlapping_bounding_boxes(box_vector<Dims>& boxes) {
334✔
152
restart:
367✔
153
        for(auto first = boxes.begin(); first != boxes.end(); ++first) {
808✔
154
                const auto last = std::remove_if(std::next(first), boxes.end(), [&](const auto& box) {
948✔
155
                        const auto overlapping = !box_intersection(*first, box).empty();
214✔
156
                        if(overlapping) { *first = bounding_box(*first, box); }
214✔
157
                        return overlapping;
214✔
158
                });
159
                if(last != boxes.end()) {
474✔
160
                        boxes.erase(last, boxes.end());
33✔
161
                        goto restart; // NOLINT(cppcoreguidelines-avoid-goto)
33✔
162
                }
163
        }
164
}
334✔
165

166
/// In a set of potentially overlapping regions, removes the overlap between any two regions {A, B} by replacing them with {A ∖ B, A ∩ B, B ∖ A}.
167
///
168
/// This is used when generating copy and receive-instructions such that every data item is only copied or received once, but the following device kernels /
169
/// host tasks have their dependencies satisfied as soon as their subset of input data is available.
170
template <int Dims>
171
void symmetrically_split_overlapping_regions(std::vector<region<Dims>>& regions) {
192✔
172
        for(size_t i = 0; i < regions.size(); ++i) {
473✔
173
                for(size_t j = i + 1; j < regions.size(); ++j) {
987✔
174
                        auto intersection = region_intersection(regions[i], regions[j]);
353✔
175
                        if(!intersection.empty()) {
353✔
176
                                // merely shrinking regions will not introduce new intersections downstream, so we do not need to restart the loop
177
                                regions[i] = region_difference(regions[i], intersection);
42✔
178
                                regions[j] = region_difference(regions[j], intersection);
42✔
179
                                regions.push_back(std::move(intersection));
42✔
180
                        }
181
                }
182
        }
183
        // if any of the intersections above are actually subsets, we will end up with empty regions
184
        regions.erase(std::remove_if(regions.begin(), regions.end(), std::mem_fn(&region<Dims>::empty)), regions.end());
192✔
185
}
192✔
186

187
// explicit instantiations for tests
188
template void symmetrically_split_overlapping_regions(std::vector<region<0>>& regions);
189
template void symmetrically_split_overlapping_regions(std::vector<region<1>>& regions);
190
template void symmetrically_split_overlapping_regions(std::vector<region<2>>& regions);
191
template void symmetrically_split_overlapping_regions(std::vector<region<3>>& regions);
192

193
/// Returns whether an iterator range of instruction pointers is topologically sorted, i.e. sequential execution would satisfy all internal dependencies.
194
template <typename Iterator>
195
bool is_topologically_sorted(Iterator begin, Iterator end) {
970✔
196
        for(auto check = begin; check != end; ++check) {
3,089✔
197
                for(const auto dep : (*check)->get_dependencies()) {
5,047✔
198
                        if(std::find_if(std::next(check), end, [dep](const auto& node) { return node->get_id() == dep; }) != end) return false;
12,142!
199
                }
200
        }
201
        return true;
970✔
202
}
203

204
/// Starting from `first` (inclusive), selects the next memory_id which is set in `location`.
205
memory_id next_location(const memory_mask& location, memory_id first) {
23✔
206
        for(size_t i = 0; i < max_num_memories; ++i) {
23!
207
                const memory_id mem = (first + i) % max_num_memories;
23✔
208
                if(location[mem]) { return mem; }
46!
209
        }
210
        utils::panic("data is requested to be read, but not located in any memory");
×
211
}
212

213
/// `allocation_id`s are "namespaced" to their memory ID, so we maintain the next `raw_allocation_id` for each memory separately.
214
struct memory_state {
215
        raw_allocation_id next_raw_aid = 1; // 0 is reserved for null_allocation_id
216
};
217

218
/// Maintains a set of concurrent instructions that are accessing a subrange of a buffer allocation.
219
/// Instruction pointers are ordered by id to allow equality comparision on the internal vector structure.
220
class access_front {
221
  public:
222
        enum mode { none, allocate, read, write };
223

224
        access_front() = default;
24✔
225
        explicit access_front(const mode mode) : m_mode(mode) {}
536✔
226
        explicit access_front(instruction* const instr, const mode mode) : m_instructions{instr}, m_mode(mode) {}
5,457✔
227

228
        void add_instruction(instruction* const instr) {
398✔
229
                // we insert instructions as soon as they are generated, so inserting at the end keeps the vector sorted
230
                m_instructions.push_back(instr);
398✔
231
                assert(std::is_sorted(m_instructions.begin(), m_instructions.end(), instruction_id_less()));
398✔
232
        }
398✔
233

234
        [[nodiscard]] access_front apply_epoch(instruction* const epoch) const {
8✔
235
                const auto first_retained = std::upper_bound(m_instructions.begin(), m_instructions.end(), epoch, instruction_id_less());
8✔
236
                const auto last_retained = m_instructions.end();
8✔
237

238
                // only include the new epoch in the access front if it in fact subsumes another instruction
239
                if(first_retained == m_instructions.begin()) return *this;
8!
240

241
                access_front pruned(m_mode);
8✔
242
                pruned.m_instructions.resize(1 + static_cast<size_t>(std::distance(first_retained, last_retained)));
8✔
243
                pruned.m_instructions.front() = epoch;
8✔
244
                std::copy(first_retained, last_retained, std::next(pruned.m_instructions.begin()));
16✔
245
                assert(std::is_sorted(pruned.m_instructions.begin(), pruned.m_instructions.end(), instruction_id_less()));
8✔
246
                return pruned;
8✔
247
        };
8✔
248

249
        const gch::small_vector<instruction*>& get_instructions() const { return m_instructions; }
1,723✔
250
        mode get_mode() const { return m_mode; }
2,614✔
251

252
        friend bool operator==(const access_front& lhs, const access_front& rhs) { return lhs.m_instructions == rhs.m_instructions && lhs.m_mode == rhs.m_mode; }
3,195✔
253
        friend bool operator!=(const access_front& lhs, const access_front& rhs) { return !(lhs == rhs); }
702✔
254

255
  private:
256
        gch::small_vector<instruction*> m_instructions; // ordered by id to allow equality comparison
257
        mode m_mode = none;
258
};
259

260
/// Per-allocation state for a single buffer. This is where we track last-writer instructions and access fronts.
261
struct buffer_allocation_state {
262
        allocation_id aid;
263
        detail::box<3> box;                                ///< in buffer coordinates
264
        region_map<access_front> last_writers;             ///< in buffer coordinates
265
        region_map<access_front> last_concurrent_accesses; ///< in buffer coordinates
266

267
        explicit buffer_allocation_state(const allocation_id aid, alloc_instruction* const ainstr /* optional: null for user allocations */,
347✔
268
            const detail::box<3>& allocated_box, const range<3>& buffer_range)
269
            : aid(aid), box(allocated_box), //
347✔
270
              last_writers(allocated_box, ainstr != nullptr ? access_front(ainstr, access_front::allocate) : access_front()),
347✔
271
              last_concurrent_accesses(allocated_box, ainstr != nullptr ? access_front(ainstr, access_front::allocate) : access_front()) {}
347✔
272

273
        /// Add `instr` to the active set of concurrent reads, or replace the current access front if the last access was not a read.
274
        void track_concurrent_read(const region<3>& region, instruction* const instr) {
839✔
275
                if(region.empty()) return;
839✔
276
                for(auto& [box, front] : last_concurrent_accesses.get_region_values(region)) {
1,174✔
277
                        if(front.get_mode() == access_front::read) {
621✔
278
                                front.add_instruction(instr);
128✔
279
                        } else {
280
                                front = access_front(instr, access_front::read);
493✔
281
                        }
282
                        last_concurrent_accesses.update_region(box, front);
621✔
283
                }
553✔
284
        }
285

286
        /// Replace the current access front with a write. The write is treated as "atomic" in the sense that there is never a second, concurrent write operation
287
        /// happening simultaneously. This is true for all writes except those from device kernels and host tasks, which might specify overlapping write-accessors.
288
        void track_atomic_write(const region<3>& region, instruction* const instr) {
328✔
289
                if(region.empty()) return;
328!
290
                last_writers.update_region(region, access_front(instr, access_front::write));
328✔
291
                last_concurrent_accesses.update_region(region, access_front(instr, access_front::write));
328✔
292
        }
293

294
        /// Replace the current access front with an empty write-front. This is done in preparation of writes from device kernels and host tasks.
295
        void begin_concurrent_writes(const region<3>& region) {
457✔
296
                if(region.empty()) return;
457✔
297
                last_writers.update_region(region, access_front(access_front::write));
264✔
298
                last_concurrent_accesses.update_region(region, access_front(access_front::write));
264✔
299
        }
300

301
        /// Add an instruction to the current set of concurrent writes. This is used to track writes from device kernels and host tasks and requires
302
        /// begin_concurrent_writes to be called beforehand. Multiple concurrent writes will only occur when a task declares overlapping writes and
303
        /// overlapping-write detection is disabled via the error policy. In order to still produce an executable (albeit racy instruction graph) in that case, we
304
        /// track multiple last-writers for the same buffer element.
305
        void track_concurrent_write(const region<3>& region, instruction* const instr) {
457✔
306
                if(region.empty()) return;
457✔
307
                for(auto& [box, front] : last_writers.get_region_values(region)) {
534✔
308
                        assert(front.get_mode() == access_front::write && "must call begin_concurrent_writes first");
270✔
309
                        front.add_instruction(instr);
270✔
310
                        last_writers.update_region(box, front);
270✔
311
                        last_concurrent_accesses.update_region(box, front);
270✔
312
                }
264✔
313
        }
314

315
        /// Replace all tracked instructions that older than `epoch` with `epoch`.
316
        void apply_epoch(instruction* const epoch) {
2✔
317
                last_writers.apply_to_values([epoch](const access_front& front) { return front.apply_epoch(epoch); });
6✔
318
                last_concurrent_accesses.apply_to_values([epoch](const access_front& front) { return front.apply_epoch(epoch); });
6✔
319
        }
2✔
320
};
321

322
/// Per-memory state for a single buffer. Dependencies and last writers are tracked on the contained allocations.
323
struct buffer_memory_state {
324
        // TODO bound the number of allocations per buffer in order to avoid runaway tracking overhead (similar to horizons)
325
        // TODO evaluate if it ever makes sense to use a region_map here, or if we're better off expecting very few allocations and sticking to a vector here
326
        std::vector<buffer_allocation_state> allocations; // disjoint
327

328
        const buffer_allocation_state& get_allocation(const allocation_id aid) const {
329
                const auto it = std::find_if(allocations.begin(), allocations.end(), [=](const buffer_allocation_state& a) { return a.aid == aid; });
330
                assert(it != allocations.end());
331
                return *it;
332
        }
333

334
        buffer_allocation_state& get_allocation(const allocation_id aid) { return const_cast<buffer_allocation_state&>(std::as_const(*this).get_allocation(aid)); }
335

336
        /// Returns the (unique) allocation covering `box` if such an allocation exists, otherwise nullptr.
337
        const buffer_allocation_state* find_contiguous_allocation(const box<3>& box) const {
1,161✔
338
                const auto it = std::find_if(allocations.begin(), allocations.end(), [&](const buffer_allocation_state& a) { return a.box.covers(box); });
2,094✔
339
                return it != allocations.end() ? &*it : nullptr;
2,322✔
340
        }
341

342
        buffer_allocation_state* find_contiguous_allocation(const box<3>& box) {
343
                return const_cast<buffer_allocation_state*>(std::as_const(*this).find_contiguous_allocation(box));
344
        }
345

346
        /// Returns the (unique) allocation covering `box`.
347
        const buffer_allocation_state& get_contiguous_allocation(const box<3>& box) const {
661✔
348
                const auto alloc = find_contiguous_allocation(box);
661✔
349
                assert(alloc != nullptr);
661✔
350
                return *alloc;
661✔
351
        }
352

353
        buffer_allocation_state& get_contiguous_allocation(const box<3>& box) {
227✔
354
                return const_cast<buffer_allocation_state&>(std::as_const(*this).get_contiguous_allocation(box));
227✔
355
        }
356

357
        bool is_allocated_contiguously(const box<3>& box) const { return find_contiguous_allocation(box) != nullptr; }
500✔
358

359
        /// Replace all tracked instructions that older than `epoch` with `epoch`.
360
        void apply_epoch(instruction* const epoch) {
6✔
361
                for(auto& alloc : allocations) {
8✔
362
                        alloc.apply_epoch(epoch);
2✔
363
                }
364
        }
6✔
365
};
366

367
/// State for a single buffer.
368
struct buffer_state {
369
        /// Tracks a pending non-reduction await-push that will be compiled into a receive_instructions as soon as a command reads from its region.
370
        struct region_receive {
371
                task_id consumer_tid;
372
                region<3> received_region;
373
                box_vector<3> required_contiguous_allocations;
374

375
                region_receive(const task_id consumer_tid, region<3> received_region, box_vector<3> required_contiguous_allocations)
27✔
376
                    : consumer_tid(consumer_tid), received_region(std::move(received_region)),
27✔
377
                      required_contiguous_allocations(std::move(required_contiguous_allocations)) {}
27✔
378
        };
379

380
        /// Tracks a pending reduction await-push, which will be compiled into gather_receive_instructions and reduce_instructions when read from.
381
        struct gather_receive {
382
                task_id consumer_tid;
383
                reduction_id rid;
384
                box<3> gather_box;
385

386
                gather_receive(const task_id consumer_tid, const reduction_id rid, const box<3> gather_box)
25✔
387
                    : consumer_tid(consumer_tid), rid(rid), gather_box(gather_box) {}
25✔
388
        };
389

390
        std::string debug_name;
391
        celerity::range<3> range;
392
        size_t elem_size;  ///< in bytes
393
        size_t elem_align; ///< in bytes
394

395
        /// Per-memory and per-allocation state of this buffer.
396
        dense_map<memory_id, buffer_memory_state> memories;
397

398
        /// Contains a mask for every memory_id that either was written to by the original-producer instruction or that has already been made coherent previously.
399
        region_map<memory_mask> up_to_date_memories;
400

401
        /// Tracks the instruction that initially produced each buffer element on the local node - this can be a kernel, host task, region-receive or
402
        /// reduce-instruction, or - in case of a host-initialized buffer - an epoch. It is different from `buffer_allocation_state::last_writers` in that it never
403
        /// contains copy instructions. Copy- and send source regions are split on their original producer instructions to facilitate computation-communication
404
        /// overlap when different producers finish at different times.
405
        region_map<instruction*> original_writers;
406

407
        /// Tracks the memory to which the original_writer instruction wrote each buffer element. `original_write_memories[box]` is meaningless when
408
        /// `up_to_date_memories[box]` is empty (i.e. the buffer is not up-to-date on the local node due to being uninitialized or await-pushed without being
409
        /// consumed).
410
        region_map<memory_id> original_write_memories;
411

412
        // We store pending receives (await push regions) in a vector instead of a region map since we must process their entire regions en-bloc rather than on
413
        // a per-element basis.
414
        std::vector<region_receive> pending_receives;
415
        std::vector<gather_receive> pending_gathers;
416

417
        explicit buffer_state(const celerity::range<3>& range, const size_t elem_size, const size_t elem_align, const size_t n_memories)
161✔
418
            : range(range), elem_size(elem_size), elem_align(elem_align), memories(n_memories), up_to_date_memories(range), original_writers(range),
161✔
419
              original_write_memories(range) {}
322✔
420

421
        void track_original_write(const region<3>& region, instruction* const instr, const memory_id mid) {
488✔
422
                original_writers.update_region(region, instr);
488✔
423
                original_write_memories.update_region(region, mid);
488✔
424
                up_to_date_memories.update_region(region, memory_mask().set(mid));
488✔
425
        }
488✔
426

427
        /// Replace all tracked instructions that are older than `epoch` with `epoch`.
428
        void apply_epoch(instruction* const epoch) {
2✔
429
                for(auto& memory : memories) {
8✔
430
                        memory.apply_epoch(epoch);
6✔
431
                }
432
                original_writers.apply_to_values([epoch](instruction* const instr) { return instr != nullptr && instr->get_id() < epoch->get_id() ? epoch : instr; });
6!
433

434
                // This is an opportune point to verify that all await-pushes are fully consumed by the commands that require them.
435
                // TODO Assumes that the command graph generator issues await-pushes immediately before the commands that need them.
436
                assert(pending_receives.empty());
2✔
437
                assert(pending_gathers.empty());
2✔
438
        }
2✔
439
};
440

441
struct host_object_state {
442
        bool owns_instance; ///< If true, `destroy_host_object_instruction` will be emitted when `destroy_host_object` is called (false for `host_object<T&/void>`)
443
        instruction* last_side_effect; ///< Host tasks with side effects will be serialized wrt/ the host object.
444

445
        explicit host_object_state(const bool owns_instance, instruction* const last_epoch) : owns_instance(owns_instance), last_side_effect(last_epoch) {}
20✔
446

447
        /// If the last side-effect instruction was older than `epoch`, replaces it with `epoch`.
448
        void apply_epoch(instruction* const epoch) {
3✔
449
                if(last_side_effect != nullptr && last_side_effect->get_id() < epoch->get_id()) { last_side_effect = epoch; }
3!
450
        }
3✔
451
};
452

453
struct collective_group_state {
454
        /// Collective host tasks will be serialized wrt/ the collective group to ensure that the user can freely submit MPI collectives on their communicator.
455
        instruction* last_collective_operation;
456

457
        explicit collective_group_state(instruction* const last_host_task) : last_collective_operation(last_host_task) {}
172✔
458

459
        /// If the last host-task instruction was older than `epoch`, replaces it with `epoch`.
460
        void apply_epoch(instruction* const epoch) {
163✔
461
                if(last_collective_operation != nullptr && last_collective_operation->get_id() < epoch->get_id()) { last_collective_operation = epoch; }
163!
462
        }
163✔
463
};
464

465
/// We submit the set of instructions and pilots generated within a call to compile() en-bloc to relieve contention on the executor queue lock. To collect all
466
/// instructions that are generated in the call stack without polluting internal state, we pass a `batch&` output parameter to any function that transitively
467
/// generates instructions or pilots.
468
struct batch { // NOLINT(cppcoreguidelines-special-member-functions) (do not complain about the asserting destructor)
469
        std::vector<const instruction*> generated_instructions;
470
        std::vector<outbound_pilot> generated_pilots;
471

472
        /// The base priority of a batch adds to the priority per instruction type to transitively prioritize dependencies of important instructions.
473
        int base_priority = 0;
474

475
#ifndef NDEBUG
476
        ~batch() {
980✔
477
                if(std::uncaught_exceptions() == 0) { assert(generated_instructions.empty() && generated_pilots.empty() && "unflushed batch detected"); }
980!
478
        }
980✔
479
#endif
480
};
481

482
// We assign instruction priorities heuristically by deciding on a batch::base_priority at the beginning of a compile_* function and offsetting it by the
483
// instruction-type specific values defined here. This aims to perform low-latency submits of long-running instructions first to maximize overlap.
484
// clang-format off
485
template <typename Instruction> constexpr int instruction_type_priority = 0; // higher means more urgent
486
template <> constexpr int instruction_type_priority<free_instruction> = -1; // only free when forced to - nothing except an epoch or horizon will depend on this
487
template <> constexpr int instruction_type_priority<alloc_instruction> = 1; // allocations are synchronous and slow, so we postpone them as much as possible
488
template <> constexpr int instruction_type_priority<copy_instruction> = 2;
489
template <> constexpr int instruction_type_priority<await_receive_instruction> = 2;
490
template <> constexpr int instruction_type_priority<split_receive_instruction> = 2;
491
template <> constexpr int instruction_type_priority<receive_instruction> = 2;
492
template <> constexpr int instruction_type_priority<send_instruction> = 2;
493
template <> constexpr int instruction_type_priority<fence_instruction> = 3;
494
template <> constexpr int instruction_type_priority<host_task_instruction> = 4; // we expect kernel launches to have low latency but comparatively long run time
495
template <> constexpr int instruction_type_priority<device_kernel_instruction> = 4; 
496
template <> constexpr int instruction_type_priority<epoch_instruction> = 5; // epochs and horizons are low-latency and stop the task buffers from reaching capacity
497
template <> constexpr int instruction_type_priority<horizon_instruction> = 5;
498
// clang-format on
499

500
/// A chunk of a task's execution space that will be assigned to a device (or the host) and thus knows which memory its instructions will operate on.
501
struct localized_chunk {
502
        detail::memory_id memory_id = host_memory_id;
503
        std::optional<detail::device_id> device_id;
504
        box<3> execution_range;
505
};
506

507
/// Transient state for a node-local eager reduction that is emitted around kernels that explicitly include a reduction. Tracks the gather-allocation that is
508
/// created early to rescue the current buffer value across the kernel invocation in case the reduction is not `initialize_to_identity` and the command graph
509
/// designates the local node to be the reduction initializer.
510
struct local_reduction {
511
        bool include_local_buffer_value = false; ///< If true local node is the one to include its current version of the existing buffer in the reduction.
512
        size_t first_kernel_chunk_offset = 0;    ///< If the local reduction includes the current buffer value, we add an additional reduction-chunk at the front.
513
        size_t num_input_chunks = 1;             ///< One per participating local chunk, plus one if the local node includes the current buffer value.
514
        size_t chunk_size_bytes = 0;
515
        allocation_id gather_aid = null_allocation_id;
516
        alloc_instruction* gather_alloc_instr = nullptr;
517
};
518

519
/// Maps instruction DAG types to their record type.
520
template <typename Instruction>
521
using record_type_for_t = utils::type_switch_t<Instruction, clone_collective_group_instruction(clone_collective_group_instruction_record),
522
    alloc_instruction(alloc_instruction_record), free_instruction(free_instruction_record), copy_instruction(copy_instruction_record),
523
    device_kernel_instruction(device_kernel_instruction_record), host_task_instruction(host_task_instruction_record), send_instruction(send_instruction_record),
524
    receive_instruction(receive_instruction_record), split_receive_instruction(split_receive_instruction_record),
525
    await_receive_instruction(await_receive_instruction_record), gather_receive_instruction(gather_receive_instruction_record),
526
    fill_identity_instruction(fill_identity_instruction_record), reduce_instruction(reduce_instruction_record), fence_instruction(fence_instruction_record),
527
    destroy_host_object_instruction(destroy_host_object_instruction_record), horizon_instruction(horizon_instruction_record),
528
    epoch_instruction(epoch_instruction_record)>;
529

530
class generator_impl {
531
  public:
532
        generator_impl(const task_manager& tm, size_t num_nodes, node_id local_nid, const system_info& system, instruction_graph& idag,
533
            instruction_graph_generator::delegate* dlg, instruction_recorder* recorder, const instruction_graph_generator::policy_set& policy);
534

535
        void notify_buffer_created(buffer_id bid, const range<3>& range, size_t elem_size, size_t elem_align, allocation_id user_aid = null_allocation_id);
536
        void notify_buffer_debug_name_changed(buffer_id bid, const std::string& name);
537
        void notify_buffer_destroyed(buffer_id bid);
538
        void notify_host_object_created(host_object_id hoid, bool owns_instance);
539
        void notify_host_object_destroyed(host_object_id hoid);
540
        void compile(const abstract_command& cmd);
541

542
  private:
543
        inline static const box<3> scalar_reduction_box{zeros, ones};
544

545
        // construction parameters (immutable)
546
        instruction_graph* m_idag;
547
        const task_manager* m_tm; // TODO commands should reference tasks by pointer, not id - then we wouldn't need this member.
548
        size_t m_num_nodes;
549
        node_id m_local_nid;
550
        system_info m_system;
551
        instruction_graph_generator::delegate* m_delegate;
552
        instruction_recorder* m_recorder;
553
        instruction_graph_generator::policy_set m_policy;
554

555
        instruction_id m_next_instruction_id = 0;
556
        message_id m_next_message_id = 0;
557

558
        instruction* m_last_horizon = nullptr;
559
        instruction* m_last_epoch = nullptr; // set once the initial epoch instruction is generated in the constructor
560

561
        /// The set of all instructions that are not yet depended upon by other instructions. These are collected by collapse_execution_front_to() as part of
562
        /// horizon / epoch generation.
563
        std::unordered_set<instruction_id> m_execution_front;
564

565
        dense_map<memory_id, memory_state> m_memories;
566
        std::unordered_map<buffer_id, buffer_state> m_buffers;
567
        std::unordered_map<host_object_id, host_object_state> m_host_objects;
568
        std::unordered_map<collective_group_id, collective_group_state> m_collective_groups;
569

570
        /// The instruction executor maintains a mapping of allocation_id -> USM pointer. For IDAG-managed memory, these entries are deleted after executing a
571
        /// `free_instruction`, but since user allocations are not deallocated by us, we notify the executor on each horizon or epoch via the `instruction_garbage`
572
        /// struct about entries that will no longer be used and can therefore be collected. We include user allocations for buffer fences immediately after
573
        /// emitting the fence, and buffer host-initialization user allocations after the buffer has been destroyed.
574
        std::vector<allocation_id> m_unreferenced_user_allocations;
575

576
        /// True if a recorder is present and create() will call the `record_with` lambda passed as its last parameter.
577
        bool is_recording() const { return m_recorder != nullptr; }
6,225✔
578

579
        allocation_id new_allocation_id(const memory_id mid);
580

581
        template <typename Instruction, typename... CtorParamsAndRecordWithFn, size_t... CtorParamIndices, size_t RecordWithFnIndex>
582
        Instruction* create_internal(batch& batch, const std::tuple<CtorParamsAndRecordWithFn...>& ctor_args_and_record_with,
583
            std::index_sequence<CtorParamIndices...> /* ctor_param_indices*/, std::index_sequence<RecordWithFnIndex> /* record_with_fn_index */);
584

585
        /// Create an instruction, insert it into the IDAG and the current execution front, and record it if a recorder is present.
586
        ///
587
        /// Invoke as
588
        /// ```
589
        /// create<instruction-type>(instruction-ctor-params...,
590
        ///         [&](const auto record_debug_info) { return record_debug_info(instruction-record-additional-ctor-params)})
591
        /// ```
592
        template <typename Instruction, typename... CtorParamsAndRecordWithFn>
593
        Instruction* create(batch& batch, CtorParamsAndRecordWithFn&&... ctor_args_and_record_with);
594

595
        message_id create_outbound_pilot(batch& batch, node_id target, const transfer_id& trid, const box<3>& box);
596

597
        /// Inserts a graph dependency and removes `to` form the execution front (if present). The `record_origin` is debug information.
598
        void add_dependency(instruction* const from, instruction* const to, const instruction_dependency_origin record_origin);
599

600
        void add_dependencies_on_last_writers(instruction* const accessing_instruction, buffer_allocation_state& allocation, const region<3>& region);
601

602
        /// Add dependencies to the last writer of a region, and track the instruction as the new last (concurrent) reader.
603
        void perform_concurrent_read_from_allocation(instruction* const reading_instruction, buffer_allocation_state& allocation, const region<3>& region);
604

605
        void add_dependencies_on_last_concurrent_accesses(instruction* const accessing_instruction, buffer_allocation_state& allocation, const region<3>& region,
606
            const instruction_dependency_origin origin_for_read_write_front);
607

608
        /// Add dependencies to the last concurrent accesses of a region, and track the instruction as the new last (unique) writer.
609
        void perform_atomic_write_to_allocation(instruction* const writing_instruction, buffer_allocation_state& allocation, const region<3>& region);
610

611
        /// Replace all tracked instructions that older than `epoch` with `epoch`.
612
        void apply_epoch(instruction* const epoch);
613

614
        /// Add dependencies from `horizon_or_epoch` to all instructions in `m_execution_front` and clear the set.
615
        void collapse_execution_front_to(instruction* const horizon_or_epoch);
616

617
        /// Ensure that all boxes in `required_contiguous_boxes` have a contiguous allocation on `mid`.
618
        /// Re-allocation of one buffer on one memory never interacts with other buffers or other memories backing the same buffer, this function can be called
619
        /// in any order of allocation requirements without generating additional dependencies.
620
        void allocate_contiguously(batch& batch, buffer_id bid, memory_id mid, box_vector<3>&& required_contiguous_boxes);
621

622
        /// Insert one or more receive instructions in order to fulfil a pending receive, making the received data available in host_memory_id. This may entail
623
        /// receiving a region that is larger than the union of all regions read.
624
        void commit_pending_region_receive_to_host_memory(
625
            batch& batch, buffer_id bid, const buffer_state::region_receive& receives, const std::vector<region<3>>& concurrent_reads);
626

627
        /// Insert coherence copy instructions where necessary to make `dest_mid` coherent for all `concurrent_reads`. Requires the necessary allocations in
628
        /// `dest_mid` to already be present. We deliberately allow overlapping read-regions to avoid aggregated copies introducing synchronization points between
629
        /// otherwise independent instructions.
630
        void establish_coherence_between_buffer_memories(batch& batch, buffer_id bid, memory_id dest_mid, const std::vector<region<3>>& concurrent_reads);
631

632
        /// Issue instructions to create any collective group required by a task.
633
        void create_task_collective_groups(batch& command_batch, const task& tsk);
634

635
        /// Split a tasks local execution range (given by execution_command) into chunks according to device configuration and a possible oversubscription hint.
636
        std::vector<localized_chunk> split_task_execution_range(const execution_command& ecmd, const task& tsk);
637

638
        /// Detect overlapping writes between local chunks of a task and report it according to m_policy.
639
        void report_task_overlapping_writes(const task& tsk, const std::vector<localized_chunk>& concurrent_chunks) const;
640

641
        /// Allocate memory, apply any pending receives, and issue resize- and coherence copies to prepare all buffer memories for a task's execution.
642
        void satisfy_task_buffer_requirements(batch& batch, buffer_id bid, const task& tsk, const subrange<3>& local_execution_range, bool is_reduction_initializer,
643
            const std::vector<localized_chunk>& concurrent_chunks_after_split);
644

645
        /// Create a gather allocation and optionally save the current buffer value before creating partial reduction results in any kernel.
646
        local_reduction prepare_task_local_reduction(
647
            batch& command_batch, const reduction_info& rinfo, const execution_command& ecmd, const task& tsk, const size_t num_concurrent_chunks);
648

649
        /// Combine any partial reduction results computed by local chunks and write it to buffer host memory.
650
        void finish_task_local_reduction(batch& command_batch, const local_reduction& red, const reduction_info& rinfo, const execution_command& ecmd,
651
            const task& tsk, const std::vector<localized_chunk>& concurrent_chunks);
652

653
        /// Launch a device kernel for each local chunk of a task, passing the relevant buffer allocations in place of accessors and reduction descriptors.
654
        instruction* launch_task_kernel(batch& command_batch, const execution_command& ecmd, const task& tsk, const localized_chunk& chunk);
655

656
        /// Add dependencies for all buffer accesses and reductions of a task, then update tracking structures accordingly.
657
        void perform_task_buffer_accesses(
658
            const task& tsk, const std::vector<localized_chunk>& concurrent_chunks, const std::vector<instruction*>& command_instructions);
659

660
        /// If a task has side effects, serialize it with respect to the last task that shares a host object.
661
        void perform_task_side_effects(
662
            const task& tsk, const std::vector<localized_chunk>& concurrent_chunks, const std::vector<instruction*>& command_instructions);
663

664
        /// If a task is part of a collective group, serialize it with respect to the last host task in that group.
665
        void perform_task_collective_operations(
666
            const task& tsk, const std::vector<localized_chunk>& concurrent_chunks, const std::vector<instruction*>& command_instructions);
667

668
        void compile_execution_command(batch& batch, const execution_command& ecmd);
669
        void compile_push_command(batch& batch, const push_command& pcmd);
670
        void defer_await_push_command(const await_push_command& apcmd);
671
        void compile_reduction_command(batch& batch, const reduction_command& rcmd);
672
        void compile_fence_command(batch& batch, const fence_command& fcmd);
673
        void compile_horizon_command(batch& batch, const horizon_command& hcmd);
674
        void compile_epoch_command(batch& batch, const epoch_command& ecmd);
675

676
        /// Passes all instructions and outbound pilots that have been accumulated in `batch` to the delegate (if any). Called after compiling a command, creating
677
        /// or destroying a buffer or host object, and also in our constructor for the creation of the initial epoch.
678
        void flush_batch(batch&& batch);
679

680
        std::string print_buffer_debug_label(buffer_id bid) const;
681
};
682

683
generator_impl::generator_impl(const task_manager& tm, const size_t num_nodes, const node_id local_nid, const system_info& system, instruction_graph& idag,
163✔
684
    instruction_graph_generator::delegate* const dlg, instruction_recorder* const recorder, const instruction_graph_generator::policy_set& policy)
163✔
685
    : m_idag(&idag), m_tm(&tm), m_num_nodes(num_nodes), m_local_nid(local_nid), m_system(system), m_delegate(dlg), m_recorder(recorder), m_policy(policy),
163✔
686
      m_memories(m_system.memories.size()) //
163✔
687
{
688
#ifndef NDEBUG
689
        assert(m_system.memories.size() <= max_num_memories);
163✔
690
        assert(std::all_of(
404✔
691
            m_system.devices.begin(), m_system.devices.end(), [&](const device_info& device) { return device.native_memory < m_system.memories.size(); }));
692
        for(memory_id mid_a = 0; mid_a < m_system.memories.size(); ++mid_a) {
730✔
693
                assert(m_system.memories[mid_a].copy_peers[mid_a]);
567✔
694
                for(memory_id mid_b = mid_a + 1; mid_b < m_system.memories.size(); ++mid_b) {
1,323✔
695
                        assert(m_system.memories[mid_a].copy_peers[mid_b] == m_system.memories[mid_b].copy_peers[mid_a]
756✔
696
                               && "system_info::memories::copy_peers must be reflexive");
697
                }
698
        }
699
#endif
700

701
        batch init_epoch_batch;
163✔
702
        m_idag->begin_epoch(task_manager::initial_epoch_task);
163✔
703
        const auto init_epoch = create<epoch_instruction>(init_epoch_batch, task_manager::initial_epoch_task, epoch_action::none, instruction_garbage{},
489✔
704
            [](const auto& record_debug_info) { record_debug_info(command_id(0 /* or so we assume */)); });
652✔
705
        m_last_epoch = init_epoch;
163✔
706
        flush_batch(std::move(init_epoch_batch));
163✔
707

708
        // The root collective group already exists in the runtime, but we must still equip it with a meaningful last_host_task.
709
        m_collective_groups.emplace(root_collective_group_id, init_epoch);
163✔
710
}
326✔
711

712
void generator_impl::notify_buffer_created(
161✔
713
    const buffer_id bid, const range<3>& range, const size_t elem_size, const size_t elem_align, allocation_id user_aid) //
714
{
715
        const auto [iter, inserted] =
322✔
716
            m_buffers.emplace(std::piecewise_construct, std::tuple(bid), std::tuple(range, elem_size, elem_align, m_system.memories.size()));
161✔
717
        assert(inserted);
161✔
718

719
        if(user_aid != null_allocation_id) {
161✔
720
                // The buffer was host-initialized through a user-specified pointer, which we consider a fully coherent allocation in user_memory_id.
721
                assert(user_aid.get_memory_id() == user_memory_id);
12✔
722
                const box entire_buffer = subrange({}, range);
36✔
723

724
                auto& buffer = iter->second;
12✔
725
                auto& memory = buffer.memories[user_memory_id];
12✔
726
                auto& allocation = memory.allocations.emplace_back(user_aid, nullptr /* alloc_instruction */, entire_buffer, buffer.range);
12✔
727

728
                allocation.track_atomic_write(entire_buffer, m_last_epoch);
12✔
729
                buffer.track_original_write(entire_buffer, m_last_epoch, user_memory_id);
12✔
730
        }
731
}
161✔
732

733
void generator_impl::notify_buffer_debug_name_changed(const buffer_id bid, const std::string& name) { m_buffers.at(bid).debug_name = name; }
×
734

735
void generator_impl::notify_buffer_destroyed(const buffer_id bid) {
151✔
736
        const auto iter = m_buffers.find(bid);
151✔
737
        assert(iter != m_buffers.end());
151✔
738
        auto& buffer = iter->second;
151✔
739

740
        batch free_batch;
151✔
741
        for(memory_id mid = 0; mid < buffer.memories.size(); ++mid) {
678✔
742
                auto& memory = buffer.memories[mid];
527✔
743
                if(mid == user_memory_id) {
527✔
744
                        // When the buffer is gone, we can also drop the user allocation from executor tracking (there currently are either 0 or 1 of them).
745
                        for(const auto& user_alloc : memory.allocations) {
163✔
746
                                m_unreferenced_user_allocations.push_back(user_alloc.aid);
12✔
747
                        }
748
                } else {
749
                        for(auto& allocation : memory.allocations) {
694✔
750
                                const auto free_instr = create<free_instruction>(free_batch, allocation.aid, [&](const auto& record_debug_info) {
318✔
751
                                        record_debug_info(allocation.box.get_area() * buffer.elem_size, buffer_allocation_record{bid, buffer.debug_name, allocation.box});
636✔
752
                                });
954!
753
                                add_dependencies_on_last_concurrent_accesses(free_instr, allocation, allocation.box, instruction_dependency_origin::allocation_lifetime);
318✔
754
                                // no need to modify the access front - we're removing the buffer altogether!
755
                        }
756
                }
757
        }
758
        flush_batch(std::move(free_batch));
151✔
759

760
        m_buffers.erase(iter);
151✔
761
}
302✔
762

763
void generator_impl::notify_host_object_created(const host_object_id hoid, const bool owns_instance) {
20✔
764
        assert(m_host_objects.count(hoid) == 0);
20✔
765
        m_host_objects.emplace(std::piecewise_construct, std::tuple(hoid), std::tuple(owns_instance, m_last_epoch));
20✔
766
        // The host object is created in "userspace" and no instructions need to be emitted.
767
}
20✔
768

769
void generator_impl::notify_host_object_destroyed(const host_object_id hoid) {
16✔
770
        const auto iter = m_host_objects.find(hoid);
16✔
771
        assert(iter != m_host_objects.end());
16✔
772
        auto& obj = iter->second;
16✔
773

774
        if(obj.owns_instance) { // this is false for host_object<T&> and host_object<void>
16✔
775
                batch destroy_batch;
15✔
776
                const auto destroy_instr = create<destroy_host_object_instruction>(destroy_batch, hoid, [](const auto& record_debug_info) { record_debug_info(); });
30✔
777
                add_dependency(destroy_instr, obj.last_side_effect, instruction_dependency_origin::side_effect);
15✔
778
                flush_batch(std::move(destroy_batch));
15✔
779
        }
15✔
780

781
        m_host_objects.erase(iter);
16✔
782
}
16✔
783

784
allocation_id generator_impl::new_allocation_id(const memory_id mid) {
375✔
785
        assert(mid < m_memories.size());
375✔
786
        assert(mid != user_memory_id && "user allocation ids are not managed by the instruction graph generator");
375✔
787
        return allocation_id(mid, m_memories[mid].next_raw_aid++);
375✔
788
}
789

790
template <typename Instruction, typename... CtorParamsAndRecordWithFn, size_t... CtorParamIndices, size_t RecordWithFnIndex>
791
Instruction* generator_impl::create_internal(batch& batch, const std::tuple<CtorParamsAndRecordWithFn...>& ctor_args_and_record_with,
2,120✔
792
    std::index_sequence<CtorParamIndices...> /* ctor_param_indices*/, std::index_sequence<RecordWithFnIndex> /* record_with_fn_index */) {
793
        const auto iid = m_next_instruction_id++;
2,120✔
794
        const auto priority = batch.base_priority + instruction_type_priority<Instruction>;
2,120✔
795
        auto unique_instr = std::make_unique<Instruction>(iid, priority, std::get<CtorParamIndices>(ctor_args_and_record_with)...);
2,120✔
796
        const auto instr = unique_instr.get(); // we need to access the raw pointer after moving unique_ptr
2,120✔
797

798
        m_idag->push_instruction(std::move(unique_instr));
2,120✔
799
        m_execution_front.insert(iid);
2,120✔
800
        batch.generated_instructions.push_back(instr);
2,120✔
801

802
        if(is_recording()) {
2,120!
803
                const auto& record_with = std::get<RecordWithFnIndex>(ctor_args_and_record_with);
2,120✔
804
#ifndef NDEBUG
805
                bool recorded = false;
2,120✔
806
#endif
807
                record_with([&](auto&&... debug_info) {
6,360✔
808
                        m_recorder->record_instruction(
4,240✔
809
                            std::make_unique<record_type_for_t<Instruction>>(std::as_const(*instr), std::forward<decltype(debug_info)>(debug_info)...));
2,120✔
810
#ifndef NDEBUG
811
                        recorded = true;
2,120✔
812
#endif
813
                });
814
                assert(recorded && "record_debug_info() not called within recording functor");
2,120✔
815
        }
816

817
        return instr;
2,120✔
818
}
2,120✔
819

820
template <typename Instruction, typename... CtorParamsAndRecordWithFn>
821
Instruction* generator_impl::create(batch& batch, CtorParamsAndRecordWithFn&&... ctor_args_and_record_with) {
2,120✔
822
        constexpr auto n_args = sizeof...(CtorParamsAndRecordWithFn);
2,120✔
823
        static_assert(n_args > 0);
824
        return create_internal<Instruction>(batch, std::forward_as_tuple(std::forward<CtorParamsAndRecordWithFn>(ctor_args_and_record_with)...),
2,120✔
825
            std::make_index_sequence<n_args - 1>(), std::index_sequence<n_args - 1>());
4,240✔
826
}
827

828
message_id generator_impl::create_outbound_pilot(batch& current_batch, const node_id target, const transfer_id& trid, const box<3>& box) {
89✔
829
        // message ids (IDAG equivalent of MPI send/receive tags) tie send / receive instructions to their respective pilots.
830
        const message_id msgid = m_next_message_id++;
89✔
831
        const outbound_pilot pilot{target, pilot_message{msgid, trid, box}};
89✔
832
        current_batch.generated_pilots.push_back(pilot);
89✔
833
        if(is_recording()) { m_recorder->record_outbound_pilot(pilot); }
89!
834
        return msgid;
178✔
835
}
836

837
void generator_impl::add_dependency(instruction* const from, instruction* const to, const instruction_dependency_origin record_origin) {
2,595✔
838
        from->add_dependency(to->get_id());
2,595✔
839
        if(is_recording()) { m_recorder->record_dependency(instruction_dependency_record(to->get_id(), from->get_id(), record_origin)); }
2,595!
840
        m_execution_front.erase(to->get_id());
2,595✔
841
}
2,595✔
842

843
void generator_impl::add_dependencies_on_last_writers(instruction* const accessing_instruction, buffer_allocation_state& allocation, const region<3>& region) {
839✔
844
        for(const auto& [box, front] : allocation.last_writers.get_region_values(region)) {
1,462✔
845
                const auto record_origin = front.get_mode() == access_front::allocate ? instruction_dependency_origin::allocation_lifetime
623✔
846
                                                                                      : instruction_dependency_origin::read_from_allocation;
623✔
847
                for(const auto writer : front.get_instructions()) {
1,247✔
848
                        add_dependency(accessing_instruction, writer, record_origin);
624✔
849
                }
850
        }
839✔
851
}
839✔
852

853
void generator_impl::perform_concurrent_read_from_allocation(
382✔
854
    instruction* const reading_instruction, buffer_allocation_state& allocation, const region<3>& region) {
855
        add_dependencies_on_last_writers(reading_instruction, allocation, region);
382✔
856
        allocation.track_concurrent_read(region, reading_instruction);
382✔
857
}
382✔
858

859
void generator_impl::add_dependencies_on_last_concurrent_accesses(instruction* const accessing_instruction, buffer_allocation_state& allocation,
1,081✔
860
    const region<3>& region, const instruction_dependency_origin origin_for_read_write_front) {
861
        for(const auto& [box, front] : allocation.last_concurrent_accesses.get_region_values(region)) {
2,181✔
862
                const auto record_origin =
863
                    front.get_mode() == access_front::allocate ? instruction_dependency_origin::allocation_lifetime : origin_for_read_write_front;
1,100✔
864
                for(const auto dep_instr : front.get_instructions()) {
2,328✔
865
                        add_dependency(accessing_instruction, dep_instr, record_origin);
1,228✔
866
                }
867
        }
1,081✔
868
}
1,081✔
869

870
void generator_impl::perform_atomic_write_to_allocation(instruction* const writing_instruction, buffer_allocation_state& allocation, const region<3>& region) {
281✔
871
        add_dependencies_on_last_concurrent_accesses(writing_instruction, allocation, region, instruction_dependency_origin::write_to_allocation);
281✔
872
        allocation.track_atomic_write(region, writing_instruction);
281✔
873
}
281✔
874

875
void generator_impl::apply_epoch(instruction* const epoch) {
155✔
876
        for(auto& [_, buffer] : m_buffers) {
157✔
877
                buffer.apply_epoch(epoch);
2✔
878
        }
879
        for(auto& [_, host_object] : m_host_objects) {
158✔
880
                host_object.apply_epoch(epoch);
3✔
881
        }
882
        for(auto& [_, collective_group] : m_collective_groups) {
318✔
883
                collective_group.apply_epoch(epoch);
163✔
884
        }
885
        m_last_epoch = epoch;
155✔
886
}
155✔
887

888
void generator_impl::collapse_execution_front_to(instruction* const horizon_or_epoch) {
169✔
889
        for(const auto iid : m_execution_front) {
799✔
890
                if(iid == horizon_or_epoch->get_id()) continue;
630✔
891
                // we can't use instruction_graph_generator::add_dependency since it modifies the m_execution_front which we're iterating over here
892
                horizon_or_epoch->add_dependency(iid);
461✔
893
                if(is_recording()) {
461!
894
                        m_recorder->record_dependency(instruction_dependency_record(iid, horizon_or_epoch->get_id(), instruction_dependency_origin::execution_front));
461✔
895
                }
896
        }
897
        m_execution_front.clear();
169✔
898
        m_execution_front.insert(horizon_or_epoch->get_id());
169✔
899
}
169✔
900

901
void generator_impl::allocate_contiguously(batch& current_batch, const buffer_id bid, const memory_id mid, box_vector<3>&& required_contiguous_boxes) //
1,090✔
902
{
903
        if(required_contiguous_boxes.empty()) return;
1,090✔
904

905
        auto& buffer = m_buffers.at(bid);
498✔
906
        auto& memory = buffer.memories[mid];
498✔
907

908
        assert(std::all_of(required_contiguous_boxes.begin(), required_contiguous_boxes.end(),
1,676✔
909
            [&](const box<3>& box) { return !box.empty() && detail::box<3>::full_range(buffer.range).covers(box); }));
910

911
        if(std::all_of(required_contiguous_boxes.begin(), required_contiguous_boxes.end(), //
498✔
912
               [&](const box<3>& box) { return memory.is_allocated_contiguously(box); })) {
500✔
913
                return;
164✔
914
        }
915

916
        // We currently only ever *grow* the allocation of buffers on each memory, which means that we must merge (re-size) existing allocations that overlap with
917
        // but do not fully contain one of the required contiguous boxes. *Overlapping* here strictly means having a non-empty intersection; two allocations whose
918
        // boxes merely touch can continue to co-exist
919
        auto&& contiguous_boxes_after_realloc = std::move(required_contiguous_boxes);
334✔
920
        for(auto& alloc : memory.allocations) {
392✔
921
                contiguous_boxes_after_realloc.push_back(alloc.box);
58✔
922
        }
923
        merge_overlapping_bounding_boxes(contiguous_boxes_after_realloc);
334✔
924

925
        // Allocations that are now fully contained in (but not equal to) one of the newly contiguous bounding boxes will be freed at the end of the reallocation
926
        // step, because we currently disallow overlapping allocations for simplicity. These will function as sources for resize-copies below.
927
        const auto resize_from_begin = std::partition(memory.allocations.begin(), memory.allocations.end(), [&](const buffer_allocation_state& allocation) {
334✔
928
                return std::find(contiguous_boxes_after_realloc.begin(), contiguous_boxes_after_realloc.end(), allocation.box) != contiguous_boxes_after_realloc.end();
58✔
929
        });
930
        const auto resize_from_end = memory.allocations.end();
334✔
931

932
        // Derive the set of new boxes to allocate by removing all existing boxes from the set of contiguous boxes.
933
        auto&& new_alloc_boxes = std::move(contiguous_boxes_after_realloc);
334✔
934
        const auto last_new_allocation = std::remove_if(new_alloc_boxes.begin(), new_alloc_boxes.end(),
334✔
935
            [&](auto& box) { return std::any_of(memory.allocations.begin(), memory.allocations.end(), [&](auto& alloc) { return alloc.box == box; }); });
547✔
936
        new_alloc_boxes.erase(last_new_allocation, new_alloc_boxes.end());
334✔
937
        assert(!new_alloc_boxes.empty()); // otherwise we would have returned early
334✔
938

939
        // Opportunistically merge connected boxes to keep the number of allocations and the tracking overhead low. This will not introduce artificial
940
        // synchronization points because resize-copies are still rooted on the original last-writers.
941
        // TODO consider over-allocating to avoid future reallocations, i.e. by using bounding boxes of boxes that have a common boundary but are not "connected" in
942
        // the sense that they can simply be merged).
943
        merge_connected_boxes(new_alloc_boxes);
334✔
944

945
        // We collect new allocations in a vector *separate* from memory.allocations as to not invalidate iterators (and to avoid resize-copying from them).
946
        std::vector<buffer_allocation_state> new_allocations;
334✔
947
        new_allocations.reserve(new_alloc_boxes.size());
334✔
948

949
        // Create new allocations and initialize them via resize-copies if necessary.
950
        for(const auto& new_box : new_alloc_boxes) {
669✔
951
                const auto aid = new_allocation_id(mid);
335✔
952
                const auto alloc_instr =
335✔
953
                    create<alloc_instruction>(current_batch, aid, new_box.get_area() * buffer.elem_size, buffer.elem_align, [&](const auto& record_debug_info) {
670✔
954
                            record_debug_info(alloc_instruction_record::alloc_origin::buffer, buffer_allocation_record{bid, buffer.debug_name, new_box}, std::nullopt);
670✔
955
                    });
1,340!
956
                add_dependency(alloc_instr, m_last_epoch, instruction_dependency_origin::last_epoch);
335✔
957

958
                auto& new_alloc = new_allocations.emplace_back(aid, alloc_instr, new_box, buffer.range);
335✔
959

960
                // Since allocations don't overlap, we copy from those that are about to be freed
961
                for(auto source_it = resize_from_begin; source_it != resize_from_end; ++source_it) {
350✔
962
                        auto& resize_source_alloc = *source_it;
15✔
963

964
                        // Only copy those boxes to the new allocation that are still up-to-date in the old allocation. The caller of allocate_contiguously should remove
965
                        // any region from up_to_date_memories that they intend to discard / overwrite immediately to avoid dead resize copies.
966
                        // TODO investigate a garbage-collection heuristic that omits these copies if there are other up-to-date memories and we do not expect the region to
967
                        // be read again on this memory.
968
                        const auto full_copy_box = box_intersection(new_alloc.box, resize_source_alloc.box);
15✔
969
                        if(full_copy_box.empty()) continue; // not every previous allocation necessarily intersects with every new allocation
15!
970

971
                        box_vector<3> live_copy_boxes;
15✔
972
                        for(const auto& [copy_box, location] : buffer.up_to_date_memories.get_region_values(full_copy_box)) {
35✔
973
                                if(location.test(mid)) { live_copy_boxes.push_back(copy_box); }
20✔
974
                        }
15✔
975
                        // even if allocations intersect, the entire intersection might be overwritten by the task that requested reallocation - in which case the caller
976
                        // would have reset up_to_date_memories for the corresponding elements
977
                        if(live_copy_boxes.empty()) continue;
15✔
978

979
                        region<3> live_copy_region(std::move(live_copy_boxes));
13✔
980
                        const auto copy_instr = create<copy_instruction>(current_batch, resize_source_alloc.aid, new_alloc.aid, resize_source_alloc.box, new_alloc.box,
26✔
981
                            live_copy_region, buffer.elem_size,
13✔
982
                            [&](const auto& record_debug_info) { record_debug_info(copy_instruction_record::copy_origin::resize, bid, buffer.debug_name); });
26✔
983

984
                        perform_concurrent_read_from_allocation(copy_instr, resize_source_alloc, live_copy_region);
13✔
985
                        perform_atomic_write_to_allocation(copy_instr, new_alloc, live_copy_region);
13✔
986
                }
15✔
987
        }
988

989
        // Free old allocations now that all required resize-copies have been issued.
990
        // TODO consider keeping old allocations around until their box is written to (or at least until the end of the current instruction batch) in order to
991
        // resolve "buffer-locking" anti-dependencies
992
        for(auto it = resize_from_begin; it != resize_from_end; ++it) {
349✔
993
                auto& old_alloc = *it;
15✔
994

995
                const auto free_instr = create<free_instruction>(current_batch, old_alloc.aid, [&](const auto& record_debug_info) {
15✔
996
                        record_debug_info(old_alloc.box.get_area() * buffer.elem_size, buffer_allocation_record{bid, buffer.debug_name, old_alloc.box});
30✔
997
                });
45!
998
                add_dependencies_on_last_concurrent_accesses(free_instr, old_alloc, old_alloc.box, instruction_dependency_origin::allocation_lifetime);
15✔
999
        }
1000

1001
        // TODO garbage-collect allocations that are not up-to-date and not written to in this task
1002

1003
        memory.allocations.erase(resize_from_begin, memory.allocations.end());
334✔
1004
        memory.allocations.insert(memory.allocations.end(), std::make_move_iterator(new_allocations.begin()), std::make_move_iterator(new_allocations.end()));
334✔
1005
}
334✔
1006

1007
void generator_impl::commit_pending_region_receive_to_host_memory(
27✔
1008
    batch& current_batch, const buffer_id bid, const buffer_state::region_receive& receive, const std::vector<region<3>>& concurrent_reads) //
1009
{
1010
        const auto trid = transfer_id(receive.consumer_tid, bid, no_reduction_id);
27✔
1011

1012
        // For simplicity of the initial IDAG implementation, we choose to receive directly into host-buffer allocations. This saves us from juggling
1013
        // staging-buffers, but comes at a price in performance since the communicator needs to linearize and de-linearize transfers from and to regions that have
1014
        // non-zero strides within their host allocation.
1015
        //
1016
        // TODO 1) maintain staging allocations and move (de-)linearization to the device in order to profit from higher memory bandwidths
1017
        //      2) explicitly support communicators that can send and receive directly to and from device memory (NVIDIA GPUDirect RDMA)
1018

1019
        auto& buffer = m_buffers.at(bid);
27✔
1020
        auto& host_memory = buffer.memories[host_memory_id];
27✔
1021

1022
        std::vector<buffer_allocation_state*> allocations;
27✔
1023
        for(const auto& min_contiguous_box : receive.required_contiguous_allocations) {
55✔
1024
                // The caller (aka satisfy_task_buffer_requirements) must ensure that all received boxes are allocated contiguously
1025
                auto& alloc = host_memory.get_contiguous_allocation(min_contiguous_box);
28✔
1026
                if(std::find(allocations.begin(), allocations.end(), &alloc) == allocations.end()) { allocations.push_back(&alloc); }
28!
1027
        }
1028

1029
        for(const auto alloc : allocations) {
55✔
1030
                const auto region_received_into_alloc = region_intersection(alloc->box, receive.received_region);
28✔
1031
                std::vector<region<3>> independent_await_regions;
28✔
1032
                for(const auto& read_region : concurrent_reads) {
72✔
1033
                        const auto await_region = region_intersection(read_region, region_received_into_alloc);
44✔
1034
                        if(!await_region.empty()) { independent_await_regions.push_back(await_region); }
44!
1035
                }
44✔
1036
                assert(!independent_await_regions.empty());
28✔
1037

1038
                // Ensure that receive-instructions inserted for concurrent readers are themselves concurrent.
1039
                symmetrically_split_overlapping_regions(independent_await_regions);
28✔
1040

1041
                if(independent_await_regions.size() > 1) {
28✔
1042
                        // If there are multiple concurrent readers requiring different parts of the received region, we emit independent await_receive_instructions so as
1043
                        // to not introduce artificial synchronization points (and facilitate computation-communication overlap). Since the (remote) sender might still
1044
                        // choose to perform the entire transfer en-bloc, we must inform the receive_arbiter of the target allocation and the full transfer region via a
1045
                        // split_receive_instruction.
1046
                        const auto split_recv_instr = create<split_receive_instruction>(current_batch, trid, region_received_into_alloc, alloc->aid, alloc->box,
20✔
1047
                            buffer.elem_size, [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
20✔
1048

1049
                        // We add dependencies to the split_receive_instruction as if it were a writer, but update the last_writers only at the await_receive_instruction.
1050
                        // The actual write happens somewhere in-between these instructions as orchestrated by the receive_arbiter, so no other access must depend on
1051
                        // split_receive_instruction directly.
1052
                        add_dependencies_on_last_concurrent_accesses(
10✔
1053
                            split_recv_instr, *alloc, region_received_into_alloc, instruction_dependency_origin::write_to_allocation);
1054

1055
                        for(const auto& await_region : independent_await_regions) {
45✔
1056
                                const auto await_instr = create<await_receive_instruction>(
70✔
1057
                                    current_batch, trid, await_region, [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
70✔
1058

1059
                                add_dependency(await_instr, split_recv_instr, instruction_dependency_origin::split_receive);
35✔
1060

1061
                                alloc->track_atomic_write(await_region, await_instr);
35✔
1062
                                buffer.original_writers.update_region(await_region, await_instr);
35✔
1063
                        }
1064
                } else {
1065
                        // A receive_instruction is equivalent to a spit_receive_instruction followed by a single await_receive_instruction, but (as the common case) has
1066
                        // less tracking overhead in the instruction graph.
1067
                        const auto recv_instr = create<receive_instruction>(current_batch, trid, region_received_into_alloc, alloc->aid, alloc->box, buffer.elem_size,
36✔
1068
                            [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
36✔
1069

1070
                        perform_atomic_write_to_allocation(recv_instr, *alloc, region_received_into_alloc);
18✔
1071
                        buffer.original_writers.update_region(region_received_into_alloc, recv_instr);
18✔
1072
                }
1073
        }
28✔
1074

1075
        buffer.original_write_memories.update_region(receive.received_region, host_memory_id);
27✔
1076
        buffer.up_to_date_memories.update_region(receive.received_region, memory_mask().set(host_memory_id));
27✔
1077
}
54✔
1078

1079
void generator_impl::establish_coherence_between_buffer_memories(
1,065✔
1080
    batch& current_batch, const buffer_id bid, const memory_id dest_mid, const std::vector<region<3>>& concurrent_reads) //
1081
{
1082
        auto& buffer = m_buffers.at(bid);
1,065✔
1083

1084
        // Given the full region to be read, find all regions not up to date in `dest_mid`.
1085
        std::vector<region<3>> unsatisfied_concurrent_reads;
1,065✔
1086
        for(const auto& read_region : concurrent_reads) {
1,325✔
1087
                box_vector<3> unsatisfied_boxes;
260✔
1088
                for(const auto& [box, location] : buffer.up_to_date_memories.get_region_values(read_region)) {
545✔
1089
                        if(location.any() /* gracefully handle uninitialized read */ && !location.test(dest_mid)) { unsatisfied_boxes.push_back(box); }
285✔
1090
                }
260✔
1091
                if(!unsatisfied_boxes.empty()) { unsatisfied_concurrent_reads.emplace_back(std::move(unsatisfied_boxes)); }
260✔
1092
        }
260✔
1093
        if(unsatisfied_concurrent_reads.empty()) return;
1,065✔
1094

1095
        // Ensure that coherence-copies inserted for concurrent readers are themselves concurrent.
1096
        symmetrically_split_overlapping_regions(unsatisfied_concurrent_reads);
159✔
1097

1098
        // Collect the regions to be copied from each memory. As long as we don't touch `buffer.up_to_date_memories` we could also create copy_instructions directly
1099
        // within this loop, but separating these two steps keeps code more readable at the expense of allocating one more vector.
1100
        std::vector<std::pair<memory_id, region<3>>> concurrent_copies_from_source;
159✔
1101
        for(auto& unsatisfied_region : unsatisfied_concurrent_reads) {
345✔
1102
                // Split the region on original writers to enable concurrency between the write of one region and a copy on another, already written region.
1103
                std::unordered_map<instruction_id, box_vector<3>> unsatisfied_boxes_by_writer;
186✔
1104
                for(const auto& [writer_box, original_writer] : buffer.original_writers.get_region_values(unsatisfied_region)) {
399✔
1105
                        assert(original_writer != nullptr);
213✔
1106
                        unsatisfied_boxes_by_writer[original_writer->get_id()].push_back(writer_box);
213✔
1107
                }
186✔
1108

1109
                for(auto& [_, unsatisfied_boxes] : unsatisfied_boxes_by_writer) {
396✔
1110
                        const region unsatisfied_region(std::move(unsatisfied_boxes));
210✔
1111
                        // There can be multiple original-writer memories if the original writer has been subsumed by an epoch or a horizon.
1112
                        dense_map<memory_id, box_vector<3>> copy_from_source(buffer.memories.size());
210✔
1113

1114
                        for(const auto& [copy_box, original_write_mid] : buffer.original_write_memories.get_region_values(unsatisfied_region)) {
423✔
1115
                                if(m_system.memories[original_write_mid].copy_peers.test(dest_mid)) {
213✔
1116
                                        // Prefer copying from the original writer's memory to avoid introducing copy-chains between the instructions of multiple commands.
1117
                                        copy_from_source[original_write_mid].push_back(copy_box);
203✔
1118
                                } else {
1119
                                        // If this is not possible, we expect that an earlier call to locally_satisfy_read_requirements has made the host memory coherent, meaning
1120
                                        // that we are in the second hop of a host-staged copy.
1121
#ifndef NDEBUG
1122
                                        assert(m_system.memories[dest_mid].copy_peers.test(host_memory_id));
10✔
1123
                                        for(const auto& [_, location] : buffer.up_to_date_memories.get_region_values(copy_box)) {
20✔
1124
                                                assert(location.test(host_memory_id));
10✔
1125
                                        }
10✔
1126
#endif
1127
                                        copy_from_source[host_memory_id].push_back(copy_box);
10✔
1128
                                }
1129
                        }
210✔
1130
                        for(memory_id source_mid = 0; source_mid < buffer.memories.size(); ++source_mid) {
1,050✔
1131
                                auto& copy_boxes = copy_from_source[source_mid];
840✔
1132
                                if(!copy_boxes.empty()) { concurrent_copies_from_source.emplace_back(source_mid, region(std::move(copy_boxes))); }
840✔
1133
                        }
1134
                }
210✔
1135
        }
186✔
1136

1137
        // Create, between pairs of source and dest allocations, the copy instructions according to the previously collected regions.
1138
        for(auto& [source_mid, copy_from_memory_region] : concurrent_copies_from_source) {
369✔
1139
                assert(dest_mid != source_mid);
210✔
1140
                for(auto& source_alloc : buffer.memories[source_mid].allocations) {
445✔
1141
                        const auto read_from_allocation_region = region_intersection(copy_from_memory_region, source_alloc.box);
235✔
1142
                        if(read_from_allocation_region.empty()) continue;
235✔
1143

1144
                        for(auto& dest_alloc : buffer.memories[dest_mid].allocations) {
457✔
1145
                                const auto copy_between_allocations_region = region_intersection(read_from_allocation_region, dest_alloc.box);
247✔
1146
                                if(copy_between_allocations_region.empty()) continue;
247✔
1147

1148
                                const auto copy_instr = create<copy_instruction>(current_batch, source_alloc.aid, dest_alloc.aid, source_alloc.box, dest_alloc.box,
420✔
1149
                                    copy_between_allocations_region, buffer.elem_size,
210✔
1150
                                    [&](const auto& record_debug_info) { record_debug_info(copy_instruction_record::copy_origin::coherence, bid, buffer.debug_name); });
420✔
1151

1152
                                perform_concurrent_read_from_allocation(copy_instr, source_alloc, copy_between_allocations_region);
210✔
1153
                                perform_atomic_write_to_allocation(copy_instr, dest_alloc, copy_between_allocations_region);
210✔
1154
                        }
247✔
1155
                }
235✔
1156
        }
1157

1158
        // Update buffer.up_to_date_memories for the entire updated region at once.
1159
        for(const auto& region : unsatisfied_concurrent_reads) {
345✔
1160
                for(auto& [box, location] : buffer.up_to_date_memories.get_region_values(region)) {
384✔
1161
                        buffer.up_to_date_memories.update_region(box, memory_mask(location).set(dest_mid));
198✔
1162
                }
186✔
1163
        }
1164
}
1,065✔
1165

1166
void generator_impl::create_task_collective_groups(batch& command_batch, const task& tsk) {
327✔
1167
        const auto cgid = tsk.get_collective_group_id();
327✔
1168
        if(cgid == non_collective_group_id) return;
327✔
1169
        if(m_collective_groups.count(cgid) != 0) return;
15✔
1170

1171
        // New collective groups are created by cloning the root collective group (aka MPI_COMM_WORLD).
1172
        auto& root_cg = m_collective_groups.at(root_collective_group_id);
9✔
1173
        const auto clone_cg_isntr = create<clone_collective_group_instruction>(
18✔
1174
            command_batch, root_collective_group_id, tsk.get_collective_group_id(), [](const auto& record_debug_info) { record_debug_info(); });
27✔
1175

1176
        m_collective_groups.emplace(cgid, clone_cg_isntr);
9✔
1177

1178
        // Cloning itself is a collective operation and must be serialized as such.
1179
        add_dependency(clone_cg_isntr, root_cg.last_collective_operation, instruction_dependency_origin::collective_group_order);
9✔
1180
        root_cg.last_collective_operation = clone_cg_isntr;
9✔
1181
}
1182

1183
std::vector<localized_chunk> generator_impl::split_task_execution_range(const execution_command& ecmd, const task& tsk) {
327✔
1184
        if(tsk.get_execution_target() == execution_target::device && m_system.devices.empty()) { utils::panic("no device on which to execute device kernel"); }
327!
1185

1186
        const bool is_splittable_locally =
1187
            tsk.has_variable_split() && tsk.get_side_effect_map().empty() && tsk.get_collective_group_id() == non_collective_group_id;
327!
1188
        const auto split = tsk.get_hint<experimental::hints::split_2d>() != nullptr ? split_2d : split_1d;
327✔
1189

1190
        const auto command_sr = ecmd.get_execution_range();
327✔
1191
        const auto command_chunk = chunk<3>(command_sr.offset, command_sr.range, tsk.get_global_size());
327✔
1192

1193
        // As a heuristic to keep inter-device communication to a minimum, we split the execution range twice when oversubscription is active: Once to obtain
1194
        // contiguous chunks per device, and one more (below) to subdivide the ranges on each device (which can help with computation-communication overlap).
1195
        std::vector<chunk<3>> coarse_chunks;
327✔
1196
        if(is_splittable_locally && tsk.get_execution_target() == execution_target::device) {
327✔
1197
                coarse_chunks = split(command_chunk, tsk.get_granularity(), m_system.devices.size());
218✔
1198
        } else {
1199
                coarse_chunks = {command_chunk};
109✔
1200
        }
1201

1202
        size_t oversubscribe_factor = 1;
327✔
1203
        if(const auto oversubscribe = tsk.get_hint<experimental::hints::oversubscribe>(); oversubscribe != nullptr) {
327✔
1204
                // Our local reduction setup uses the normal per-device backing buffer allocation as the reduction output of each device. Since we can't track
1205
                // overlapping allocations at the moment, we have no way of oversubscribing reduction kernels without introducing a data race between multiple "fine
1206
                // chunks" on the final write. This could be solved by creating separate reduction-output allocations for each device chunk and not touching the
1207
                // actual buffer allocation. This is left as *future work* for a general overhaul of reductions.
1208
                if(is_splittable_locally && tsk.get_reductions().empty()) {
25✔
1209
                        oversubscribe_factor = oversubscribe->get_factor();
17✔
1210
                } else if(m_policy.unsafe_oversubscription_error != error_policy::ignore) {
8✔
1211
                        utils::report_error(m_policy.unsafe_oversubscription_error, "Refusing to oversubscribe {}{}.", print_task_debug_label(tsk),
12✔
1212
                            !tsk.get_reductions().empty()                              ? " because it performs a reduction"
11✔
1213
                            : !tsk.get_side_effect_map().empty()                       ? " because it has side effects"
5✔
1214
                            : tsk.get_collective_group_id() != non_collective_group_id ? " because it participates in a collective group"
7!
1215
                            : !tsk.has_variable_split()                                ? " because its iteration space cannot be split"
1!
1216
                                                                                       : "");
1217
                }
1218
        }
1219

1220
        // Split a second time (if oversubscribed) and assign native memory and devices (if the task is a device kernel).
1221
        std::vector<localized_chunk> concurrent_chunks;
323✔
1222
        for(size_t coarse_idx = 0; coarse_idx < coarse_chunks.size(); ++coarse_idx) {
737✔
1223
                for(const auto& fine_chunk : split(coarse_chunks[coarse_idx], tsk.get_granularity(), oversubscribe_factor)) {
892✔
1224
                        auto& localized_chunk = concurrent_chunks.emplace_back();
478✔
1225
                        localized_chunk.execution_range = box(subrange(fine_chunk.offset, fine_chunk.range));
478✔
1226
                        if(tsk.get_execution_target() == execution_target::device) {
478✔
1227
                                assert(coarse_idx < m_system.devices.size());
354✔
1228
                                localized_chunk.memory_id = m_system.devices[coarse_idx].native_memory;
354✔
1229
                                localized_chunk.device_id = device_id(coarse_idx);
354✔
1230
                        } else {
1231
                                localized_chunk.memory_id = host_memory_id;
124✔
1232
                        }
1233
                }
414✔
1234
        }
1235
        return concurrent_chunks;
646✔
1236
}
327✔
1237

1238
void generator_impl::report_task_overlapping_writes(const task& tsk, const std::vector<localized_chunk>& concurrent_chunks) const {
319✔
1239
        box_vector<3> concurrent_execution_ranges(concurrent_chunks.size(), box<3>());
2,233✔
1240
        std::transform(concurrent_chunks.begin(), concurrent_chunks.end(), concurrent_execution_ranges.begin(),
319✔
1241
            [](const localized_chunk& chunk) { return chunk.execution_range; });
472✔
1242

1243
        if(const auto overlapping_writes = detect_overlapping_writes(tsk, concurrent_execution_ranges); !overlapping_writes.empty()) {
319✔
1244
                auto error = fmt::format("{} has overlapping writes on N{} in", print_task_debug_label(tsk, true /* title case */), m_local_nid);
4✔
1245
                for(const auto& [bid, overlap] : overlapping_writes) {
4✔
1246
                        fmt::format_to(std::back_inserter(error), " {} {}", print_buffer_debug_label(bid), overlap);
4✔
1247
                }
1248
                error += ". Choose a non-overlapping range mapper for this write access or constrain the split via experimental::constrain_split to make the access "
1249
                         "non-overlapping.";
2✔
1250
                utils::report_error(m_policy.overlapping_write_error, "{}", error);
2✔
1251
        }
321✔
1252
}
636✔
1253

1254
void generator_impl::satisfy_task_buffer_requirements(batch& current_batch, const buffer_id bid, const task& tsk, const subrange<3>& local_execution_range,
299✔
1255
    const bool local_node_is_reduction_initializer, const std::vector<localized_chunk>& concurrent_chunks_after_split) //
1256
{
1257
        assert(!concurrent_chunks_after_split.empty());
299✔
1258

1259
        auto& buffer = m_buffers.at(bid);
299✔
1260

1261
        dense_map<memory_id, box_vector<3>> required_contiguous_allocations(m_memories.size());
299✔
1262

1263
        box_vector<3> accessed_boxes; // which elements are accessed (to figure out applying receives)
299✔
1264
        box_vector<3> consumed_boxes; // which elements are accessed with a consuming access (these need to be preserved across resizes)
299✔
1265

1266
        const auto& bam = tsk.get_buffer_access_map();
299✔
1267
        for(const auto mode : bam.get_access_modes(bid)) {
560✔
1268
                const auto req = bam.get_mode_requirements(bid, mode, tsk.get_dimensions(), local_execution_range, tsk.get_global_size());
261✔
1269
                accessed_boxes.append(req.get_boxes());
261✔
1270
                if(access::mode_traits::is_consumer(mode)) { consumed_boxes.append(req.get_boxes()); }
261✔
1271
        }
560✔
1272

1273
        // reductions can introduce buffer reads if they do not initialize_to_identity (but they cannot be split), so we evaluate them first
1274
        assert(std::count_if(tsk.get_reductions().begin(), tsk.get_reductions().end(), [=](const reduction_info& r) { return r.bid == bid; }) <= 1
340✔
1275
               && "task defines multiple reductions on the same buffer");
1276
        const auto reduction = std::find_if(tsk.get_reductions().begin(), tsk.get_reductions().end(), [=](const reduction_info& r) { return r.bid == bid; });
340✔
1277
        if(reduction != tsk.get_reductions().end()) {
299✔
1278
                for(const auto& chunk : concurrent_chunks_after_split) {
103✔
1279
                        required_contiguous_allocations[chunk.memory_id].push_back(scalar_reduction_box);
63✔
1280
                }
1281
                const auto include_current_value = local_node_is_reduction_initializer && reduction->init_from_buffer;
40!
1282
                if(concurrent_chunks_after_split.size() > 1 || include_current_value) {
40✔
1283
                        // We insert a host-side reduce-instruction in the multi-chunk scenario; its result will end up in the host buffer allocation.
1284
                        // If the user did not specify `initialize_to_identity`, we treat the existing buffer contents as an additional reduction chunk, so we can always
1285
                        // perform SYCL reductions with `initialize_to_identity` semantics. This is unavoidable since hipSYCL does not support reduction properties.
1286
                        required_contiguous_allocations[host_memory_id].push_back(scalar_reduction_box);
16✔
1287
                }
1288
                accessed_boxes.push_back(scalar_reduction_box);
40✔
1289
                if(include_current_value) {
40✔
1290
                        // scalar_reduction_box will be copied into the local-reduction gather buffer ahead of the kernel instruction
1291
                        consumed_boxes.push_back(scalar_reduction_box);
4✔
1292
                }
1293
        }
1294

1295
        const region accessed_region(std::move(accessed_boxes));
299✔
1296
        const region consumed_region(std::move(consumed_boxes));
299✔
1297

1298
        // Boxes that are accessed but not consumed do not need to be preserved across resizes. This set operation is not equivalent to accumulating all
1299
        // non-consumer mode accesses above, since a kernel can have both a read_only and a discard_write access for the same buffer element, and Celerity must
1300
        // treat the overlap as-if it were a read_write access according to the SYCL spec.
1301
        // We maintain a box_vector here because we also add all received boxes, as these are overwritten by a recv_instruction before being read from the kernel.
1302
        box_vector<3> discarded_boxes = region_difference(accessed_region, consumed_region).into_boxes();
299✔
1303

1304
        // Collect all pending receives (await-push commands) that we must apply before executing this task.
1305
        std::vector<buffer_state::region_receive> applied_receives;
299✔
1306
        {
1307
                const auto first_applied_receive = std::partition(buffer.pending_receives.begin(), buffer.pending_receives.end(),
299✔
1308
                    [&](const buffer_state::region_receive& r) { return region_intersection(consumed_region, r.received_region).empty(); });
27✔
1309
                const auto last_applied_receive = buffer.pending_receives.end();
299✔
1310
                for(auto it = first_applied_receive; it != last_applied_receive; ++it) {
326✔
1311
                        // we (re) allocate before receiving, but there's no need to preserve previous data at the receive location
1312
                        discarded_boxes.append(it->received_region.get_boxes());
27✔
1313
                        // split_receive_instruction needs contiguous allocations for the bounding boxes of potentially received fragments
1314
                        required_contiguous_allocations[host_memory_id].insert(
135✔
1315
                            required_contiguous_allocations[host_memory_id].end(), it->required_contiguous_allocations.begin(), it->required_contiguous_allocations.end());
108✔
1316
                }
1317

1318
                if(first_applied_receive != last_applied_receive) {
299✔
1319
                        applied_receives.assign(first_applied_receive, last_applied_receive);
27✔
1320
                        buffer.pending_receives.erase(first_applied_receive, last_applied_receive);
27✔
1321
                }
1322
        }
1323

1324
        if(reduction != tsk.get_reductions().end()) {
299✔
1325
                assert(std::all_of(buffer.pending_receives.begin(), buffer.pending_receives.end(), [&](const buffer_state::region_receive& r) {
40✔
1326
                        return region_intersection(r.received_region, scalar_reduction_box).empty();
1327
                }) && std::all_of(buffer.pending_gathers.begin(), buffer.pending_gathers.end(), [&](const buffer_state::gather_receive& r) {
1328
                        return box_intersection(r.gather_box, scalar_reduction_box).empty();
1329
                }) && "buffer has an unprocessed await-push into a region that is going to be used as a reduction output");
1330
        }
1331

1332
        const region discarded_region = region(std::move(discarded_boxes));
299✔
1333

1334
        // Detect and report uninitialized reads
1335
        if(m_policy.uninitialized_read_error != error_policy::ignore) {
299✔
1336
                box_vector<3> uninitialized_reads;
296✔
1337
                const auto locally_required_region = region_difference(consumed_region, discarded_region);
296✔
1338
                for(const auto& [box, location] : buffer.up_to_date_memories.get_region_values(locally_required_region)) {
422✔
1339
                        if(!location.any()) { uninitialized_reads.push_back(box); }
126✔
1340
                }
296✔
1341
                if(!uninitialized_reads.empty()) {
296✔
1342
                        // Observing an uninitialized read that is not visible in the TDAG means we have a bug.
1343
                        utils::report_error(m_policy.uninitialized_read_error,
16✔
1344
                            "Instructions for {} are trying to read {} {}, which is neither found locally nor has been await-pushed before.", print_task_debug_label(tsk),
12✔
1345
                            print_buffer_debug_label(bid), detail::region(std::move(uninitialized_reads)));
24✔
1346
                }
1347
        }
300✔
1348

1349
        // Do not preserve any received or overwritten region across receives or buffer resizes later on: allocate_contiguously will insert resize-copy instructions
1350
        // for all up_to_date regions of allocations that it replaces with larger ones.
1351
        buffer.up_to_date_memories.update_region(discarded_region, memory_mask());
295✔
1352

1353
        // Collect chunk-reads by memory to establish local coherence later
1354
        dense_map<memory_id, std::vector<region<3>>> concurrent_reads_from_memory(m_memories.size());
295✔
1355
        for(const auto& chunk : concurrent_chunks_after_split) {
742✔
1356
                required_contiguous_allocations[chunk.memory_id].append(
894✔
1357
                    bam.get_required_contiguous_boxes(bid, tsk.get_dimensions(), chunk.execution_range.get_subrange(), tsk.get_global_size()));
894✔
1358

1359
                box_vector<3> chunk_read_boxes;
447✔
1360
                for(const auto mode : access::consumer_modes) {
2,235✔
1361
                        const auto req = bam.get_mode_requirements(bid, mode, tsk.get_dimensions(), chunk.execution_range.get_subrange(), tsk.get_global_size());
1,788✔
1362
                        chunk_read_boxes.append(req.get_boxes());
1,788✔
1363
                }
1,788✔
1364
                if(!chunk_read_boxes.empty()) { concurrent_reads_from_memory[chunk.memory_id].push_back(region(std::move(chunk_read_boxes))); }
447✔
1365
        }
447✔
1366
        if(local_node_is_reduction_initializer && reduction != tsk.get_reductions().end() && reduction->init_from_buffer) {
295!
1367
                concurrent_reads_from_memory[host_memory_id].emplace_back(scalar_reduction_box);
3✔
1368
        }
1369

1370
        // If the system_info indicates a pair of (device-native) memory_ids between which no direct peer-to-peer copy is possible but data still must be
1371
        // transferred between them, we stage the copy by making the host memory coherent first. To generate the desired copy-chain, it is sufficient to treat each
1372
        // such buffer subregion as also being read from host memory (see the call to `establish_coherence_between_buffer_memories` below).
1373
        for(memory_id read_mid = 0; read_mid < concurrent_reads_from_memory.size(); ++read_mid) {
1,301✔
1374
                for(auto& read_region : concurrent_reads_from_memory[read_mid]) {
1,190✔
1375
                        box_vector<3> host_staged_boxes;
184✔
1376
                        for(const auto& [box, location] : buffer.up_to_date_memories.get_region_values(read_region)) {
384✔
1377
                                if(location.any() /* gracefully handle uninitialized read */ && !location.test(read_mid)
456✔
1378
                                    && (m_system.memories[read_mid].copy_peers & location).none()) {
456!
1379
                                        assert(read_mid != host_memory_id);
10✔
1380
                                        required_contiguous_allocations[host_memory_id].push_back(box);
10✔
1381
                                        host_staged_boxes.push_back(box);
10✔
1382
                                }
1383
                        }
184✔
1384
                        if(!host_staged_boxes.empty()) { concurrent_reads_from_memory[host_memory_id].emplace_back(std::move(host_staged_boxes)); }
184✔
1385
                }
184✔
1386
        }
1387

1388
        // Now that we know all required contiguous allocations, issue any required alloc- and resize-copy instructions
1389
        for(memory_id mid = 0; mid < required_contiguous_allocations.size(); ++mid) {
1,301✔
1390
                allocate_contiguously(current_batch, bid, mid, std::move(required_contiguous_allocations[mid]));
1,006✔
1391
        }
1392

1393
        // Receive all remote data (which overlaps with the accessed region) into host memory
1394
        std::vector<region<3>> all_concurrent_reads;
295✔
1395
        for(const auto& reads : concurrent_reads_from_memory) {
1,301✔
1396
                all_concurrent_reads.insert(all_concurrent_reads.end(), reads.begin(), reads.end());
1,006✔
1397
        }
1398
        for(const auto& receive : applied_receives) {
322✔
1399
                commit_pending_region_receive_to_host_memory(current_batch, bid, receive, all_concurrent_reads);
27✔
1400
        }
1401

1402
        // Create the necessary coherence copy instructions to satisfy all remaining requirements locally. The iterations of this loop are independent with the
1403
        // notable exception of host_memory_id in the presence of staging copies: There we rely on the fact that `host_memory_id < device_memory_id` to allow
1404
        // coherence copies to device memory to create device -> host -> device copy chains.
1405
        static_assert(host_memory_id < first_device_memory_id);
1406
        for(memory_id mid = 0; mid < concurrent_reads_from_memory.size(); ++mid) {
1,301✔
1407
                establish_coherence_between_buffer_memories(current_batch, bid, mid, concurrent_reads_from_memory[mid]);
1,006✔
1408
        }
1409
}
622✔
1410

1411
local_reduction generator_impl::prepare_task_local_reduction(
39✔
1412
    batch& command_batch, const reduction_info& rinfo, const execution_command& ecmd, const task& tsk, const size_t num_concurrent_chunks) //
1413
{
1414
        const auto [rid_, bid_, reduction_task_includes_buffer_value] = rinfo;
39✔
1415
        const auto bid = bid_; // allow capturing in lambda
39✔
1416

1417
        auto& buffer = m_buffers.at(bid);
39✔
1418

1419
        local_reduction red;
39✔
1420
        red.include_local_buffer_value = reduction_task_includes_buffer_value && ecmd.is_reduction_initializer();
39!
1421
        red.first_kernel_chunk_offset = red.include_local_buffer_value ? 1 : 0;
39✔
1422
        red.num_input_chunks = red.first_kernel_chunk_offset + num_concurrent_chunks;
39✔
1423
        red.chunk_size_bytes = scalar_reduction_box.get_area() * buffer.elem_size;
39✔
1424

1425
        // If the reduction only has a single local contribution, we simply accept it as the fully-reduced final value without issuing additional instructions.
1426
        // A local_reduction with num_input_chunks == 1 is treated as a no-op by `finish_task_local_reduction`.
1427
        assert(red.num_input_chunks > 0);
39✔
1428
        if(red.num_input_chunks == 1) return red;
39✔
1429

1430
        // Create a gather-allocation into which `finish_task_local_reduction` will copy each new partial result, and if the reduction is not
1431
        // initialize_to_identity, we copy the current buffer value before it is being overwritten by the kernel.
1432
        red.gather_aid = new_allocation_id(host_memory_id);
15✔
1433
        red.gather_alloc_instr = create<alloc_instruction>(
15✔
1434
            command_batch, red.gather_aid, red.num_input_chunks * red.chunk_size_bytes, buffer.elem_align, [&](const auto& record_debug_info) {
15✔
1435
                    record_debug_info(
15✔
1436
                        alloc_instruction_record::alloc_origin::gather, buffer_allocation_record{bid, buffer.debug_name, scalar_reduction_box}, red.num_input_chunks);
30✔
1437
            });
30!
1438
        add_dependency(red.gather_alloc_instr, m_last_epoch, instruction_dependency_origin::last_epoch);
15✔
1439

1440
        /// Normally, there is one _reduction chunk_ per _kernel chunk_, unless the local node is the designated reduction initializer and the reduction is not
1441
        /// `initialize_to_identity`, in which case we add an additional _reduction chunk_ for the current buffer value and insert it in the first position of the
1442
        /// local gather allocation.
1443
        if(red.include_local_buffer_value) {
15✔
1444
                // The source host allocation is already provided by satisfy_task_buffer_requirements
1445
                auto& source_allocation = buffer.memories[host_memory_id].get_contiguous_allocation(scalar_reduction_box);
3✔
1446

1447
                // copy to local gather space
1448
                const auto current_value_copy_instr = create<copy_instruction>(command_batch, source_allocation.aid, red.gather_aid, source_allocation.box,
6✔
1449
                    scalar_reduction_box, scalar_reduction_box, buffer.elem_size,
3✔
1450
                    [&](const auto& record_debug_info) { record_debug_info(copy_instruction_record::copy_origin::gather, bid, buffer.debug_name); });
6✔
1451

1452
                add_dependency(current_value_copy_instr, red.gather_alloc_instr, instruction_dependency_origin::allocation_lifetime);
3✔
1453
                perform_concurrent_read_from_allocation(current_value_copy_instr, source_allocation, scalar_reduction_box);
3✔
1454
        }
1455
        return red;
15✔
1456
}
1457

1458
void generator_impl::finish_task_local_reduction(batch& command_batch, const local_reduction& red, const reduction_info& rinfo, const execution_command& ecmd,
39✔
1459
    const task& tsk,
1460
    const std::vector<localized_chunk>& concurrent_chunks) //
1461
{
1462
        // If the reduction only has a single contribution, its write is already the final result and does not need to be reduced.
1463
        if(red.num_input_chunks == 1) return;
39✔
1464

1465
        const auto [rid, bid_, reduction_task_includes_buffer_value] = rinfo;
15✔
1466
        const auto bid = bid_; // allow capturing in lambda
15✔
1467

1468
        auto& buffer = m_buffers.at(bid);
15✔
1469
        auto& host_memory = buffer.memories[host_memory_id];
15✔
1470

1471
        // prepare_task_local_reduction has allocated gather space which preserves the current buffer value when the reduction does not initialize_to_identity
1472
        std::vector<copy_instruction*> gather_copy_instrs;
15✔
1473
        gather_copy_instrs.reserve(concurrent_chunks.size());
15✔
1474
        for(size_t j = 0; j < concurrent_chunks.size(); ++j) {
52✔
1475
                const auto source_mid = concurrent_chunks[j].memory_id;
37✔
1476
                auto& source_allocation = buffer.memories[source_mid].get_contiguous_allocation(scalar_reduction_box);
37✔
1477

1478
                // Copy local partial result to gather space
1479
                const auto copy_instr = create<copy_instruction>(command_batch, source_allocation.aid,
74✔
1480
                    allocation_with_offset(red.gather_aid, (red.first_kernel_chunk_offset + j) * buffer.elem_size), source_allocation.box, scalar_reduction_box,
37✔
1481
                    scalar_reduction_box, buffer.elem_size,
37✔
1482
                    [&](const auto& record_debug_info) { record_debug_info(copy_instruction_record::copy_origin::gather, bid, buffer.debug_name); });
111✔
1483

1484
                add_dependency(copy_instr, red.gather_alloc_instr, instruction_dependency_origin::allocation_lifetime);
37✔
1485
                perform_concurrent_read_from_allocation(copy_instr, source_allocation, scalar_reduction_box);
37✔
1486

1487
                gather_copy_instrs.push_back(copy_instr);
37✔
1488
        }
1489

1490
        // Insert a local reduce_instruction which reads from the gather buffer and writes to the host-buffer allocation for `scalar_reduction_box`.
1491
        auto& dest_allocation = host_memory.get_contiguous_allocation(scalar_reduction_box);
15✔
1492
        const auto reduce_instr =
1493
            create<reduce_instruction>(command_batch, rid, red.gather_aid, red.num_input_chunks, dest_allocation.aid, [&](const auto& record_debug_info) {
15✔
1494
                    record_debug_info(std::nullopt, bid, buffer.debug_name, scalar_reduction_box, reduce_instruction_record::reduction_scope::local);
15✔
1495
            });
30✔
1496

1497
        for(auto& copy_instr : gather_copy_instrs) {
52✔
1498
                add_dependency(reduce_instr, copy_instr, instruction_dependency_origin::read_from_allocation);
37✔
1499
        }
1500
        perform_atomic_write_to_allocation(reduce_instr, dest_allocation, scalar_reduction_box);
15✔
1501
        buffer.track_original_write(scalar_reduction_box, reduce_instr, host_memory_id);
15✔
1502

1503
        // Free the gather allocation created in `prepare_task_local_reduction`.
1504
        const auto gather_free_instr = create<free_instruction>(
30✔
1505
            command_batch, red.gather_aid, [&](const auto& record_debug_info) { record_debug_info(red.num_input_chunks * red.chunk_size_bytes, std::nullopt); });
30✔
1506
        add_dependency(gather_free_instr, reduce_instr, instruction_dependency_origin::allocation_lifetime);
15✔
1507
}
15✔
1508

1509
instruction* generator_impl::launch_task_kernel(batch& command_batch, const execution_command& ecmd, const task& tsk, const localized_chunk& chunk) {
466✔
1510
        const auto& bam = tsk.get_buffer_access_map();
466✔
1511

1512
        buffer_access_allocation_map allocation_map(bam.get_num_accesses());
1,398✔
1513
        buffer_access_allocation_map reduction_map(tsk.get_reductions().size());
1,398✔
1514

1515
        std::vector<buffer_memory_record> buffer_memory_access_map;       // if is_recording()
466✔
1516
        std::vector<buffer_reduction_record> buffer_memory_reduction_map; // if is_recording()
466✔
1517
        if(is_recording()) {
466!
1518
                buffer_memory_access_map.resize(bam.get_num_accesses());
466✔
1519
                buffer_memory_reduction_map.resize(tsk.get_reductions().size());
466✔
1520
        }
1521

1522
        // map buffer accesses (hydration_ids) to allocations in chunk-memory
1523
        for(size_t i = 0; i < bam.get_num_accesses(); ++i) {
847✔
1524
                const auto [bid, mode] = bam.get_nth_access(i);
381✔
1525
                const auto accessed_box = bam.get_requirements_for_nth_access(i, tsk.get_dimensions(), chunk.execution_range.get_subrange(), tsk.get_global_size());
381✔
1526
                const auto& buffer = m_buffers.at(bid);
381✔
1527
                if(!accessed_box.empty()) {
381✔
1528
                        const auto& alloc = buffer.memories[chunk.memory_id].get_contiguous_allocation(accessed_box);
373✔
1529
                        allocation_map[i] = {alloc.aid, alloc.box, accessed_box CELERITY_DETAIL_IF_ACCESSOR_BOUNDARY_CHECK(, bid, buffer.debug_name)};
746✔
1530
                        if(is_recording()) { buffer_memory_access_map[i] = buffer_memory_record{bid, buffer.debug_name}; }
373!
1531
                } else {
1532
                        allocation_map[i] = buffer_access_allocation{null_allocation_id, {}, {} CELERITY_DETAIL_IF_ACCESSOR_BOUNDARY_CHECK(, bid, buffer.debug_name)};
8✔
1533
                        if(is_recording()) { buffer_memory_access_map[i] = buffer_memory_record{bid, buffer.debug_name}; }
8!
1534
                }
1535
        }
1536

1537
        // map reduction outputs to allocations in chunk-memory
1538
        for(size_t i = 0; i < tsk.get_reductions().size(); ++i) {
527✔
1539
                const auto& rinfo = tsk.get_reductions()[i];
61✔
1540
                const auto& buffer = m_buffers.at(rinfo.bid);
61✔
1541
                const auto& alloc = buffer.memories[chunk.memory_id].get_contiguous_allocation(scalar_reduction_box);
61✔
1542
                reduction_map[i] = {alloc.aid, alloc.box, scalar_reduction_box CELERITY_DETAIL_IF_ACCESSOR_BOUNDARY_CHECK(, rinfo.bid, buffer.debug_name)};
122✔
1543
                if(is_recording()) { buffer_memory_reduction_map[i] = buffer_reduction_record{rinfo.bid, buffer.debug_name, rinfo.rid}; }
61!
1544
        }
1545

1546
        if(tsk.get_execution_target() == execution_target::device) {
466✔
1547
                assert(chunk.execution_range.get_area() > 0);
342✔
1548
                assert(chunk.device_id.has_value());
342✔
1549
                return create<device_kernel_instruction>(command_batch, *chunk.device_id, tsk.get_launcher<device_kernel_launcher>(), chunk.execution_range,
342✔
1550
                    std::move(allocation_map),
342✔
1551
                    std::move(reduction_map) //
342✔
1552
                    CELERITY_DETAIL_IF_ACCESSOR_BOUNDARY_CHECK(, tsk.get_type(), tsk.get_id(), tsk.get_debug_name()),
1,026✔
1553
                    [&](const auto& record_debug_info) {
684✔
1554
                            record_debug_info(ecmd.get_tid(), ecmd.get_cid(), tsk.get_debug_name(), buffer_memory_access_map, buffer_memory_reduction_map);
342✔
1555
                    });
1,710✔
1556
        } else {
1557
                assert(tsk.get_execution_target() == execution_target::host);
124✔
1558
                assert(chunk.memory_id == host_memory_id);
124✔
1559
                assert(reduction_map.empty());
124✔
1560
                return create<host_task_instruction>(command_batch, tsk.get_launcher<host_task_launcher>(), chunk.execution_range, tsk.get_global_size(),
248✔
1561
                    std::move(allocation_map),
124✔
1562
                    tsk.get_collective_group_id() //
248✔
1563
                    CELERITY_DETAIL_IF_ACCESSOR_BOUNDARY_CHECK(, tsk.get_type(), tsk.get_id(), tsk.get_debug_name()),
372✔
1564
                    [&](const auto& record_debug_info) { record_debug_info(ecmd.get_tid(), ecmd.get_cid(), tsk.get_debug_name(), buffer_memory_access_map); });
868✔
1565
        }
1566
}
900✔
1567

1568
void generator_impl::perform_task_buffer_accesses(
317✔
1569
    const task& tsk, const std::vector<localized_chunk>& concurrent_chunks, const std::vector<instruction*>& command_instructions) //
1570
{
1571
        const auto& bam = tsk.get_buffer_access_map();
317✔
1572
        if(bam.get_num_accesses() == 0 && tsk.get_reductions().empty()) return;
317✔
1573

1574
        // 1. Collect the read-sets and write-sets of all concurrent chunks on all buffers (TODO this is what buffer_access_map should actually return)
1575

1576
        struct read_write_sets {
1577
                region<3> reads;
1578
                region<3> writes;
1579
        };
1580

1581
        std::vector<std::unordered_map<buffer_id, read_write_sets>> concurrent_read_write_sets(concurrent_chunks.size());
834✔
1582

1583
        for(const auto bid : bam.get_accessed_buffers()) {
523✔
1584
                for(size_t i = 0; i < concurrent_chunks.size(); ++i) {
620✔
1585
                        read_write_sets rw;
375✔
1586
                        for(const auto mode : bam.get_access_modes(bid)) {
752✔
1587
                                const auto req =
377✔
1588
                                    bam.get_mode_requirements(bid, mode, tsk.get_dimensions(), concurrent_chunks[i].execution_range.get_subrange(), tsk.get_global_size());
377✔
1589
                                if(access::mode_traits::is_consumer(mode)) { rw.reads = region_union(rw.reads, req); }
377✔
1590
                                if(access::mode_traits::is_producer(mode)) { rw.writes = region_union(rw.writes, req); }
377✔
1591
                        }
752✔
1592
                        concurrent_read_write_sets[i].emplace(bid, std::move(rw));
375✔
1593
                }
375✔
1594
        }
278✔
1595

1596
        for(const auto& rinfo : tsk.get_reductions()) {
317✔
1597
                for(size_t i = 0; i < concurrent_chunks.size(); ++i) {
100✔
1598
                        auto& rw_map = concurrent_read_write_sets[i][rinfo.bid]; // allow default-insert on `bid`
61✔
1599
                        rw_map.writes = region_union(rw_map.writes, scalar_reduction_box);
61✔
1600
                }
1601
        }
1602

1603
        // 2. Insert all true-dependencies for reads and anti-dependencies for writes. We do this en-bloc instead of using `perform_concurrent_read_from_allocation`
1604
        // or `perform_atomic_write_to_allocation` to avoid incorrect dependencies between our concurrent chunks by updating tracking structures too early.
1605

1606
        for(size_t i = 0; i < concurrent_chunks.size(); ++i) {
705✔
1607
                for(const auto& [bid, rw] : concurrent_read_write_sets[i]) {
863✔
1608
                        auto& buffer = m_buffers.at(bid);
436✔
1609
                        auto& memory = buffer.memories[concurrent_chunks[i].memory_id];
436✔
1610

1611
                        for(auto& allocation : memory.allocations) {
893✔
1612
                                add_dependencies_on_last_writers(command_instructions[i], allocation, region_intersection(rw.reads, allocation.box));
457✔
1613
                                add_dependencies_on_last_concurrent_accesses(
914✔
1614
                                    command_instructions[i], allocation, region_intersection(rw.writes, allocation.box), instruction_dependency_origin::write_to_allocation);
914✔
1615
                        }
1616
                }
1617
        }
1618

1619
        // 3. Clear tracking structures for all regions that are being written to. We gracefully handle overlapping writes by treating the set of all conflicting
1620
        // writers as last writers of an allocation.
1621

1622
        for(size_t i = 0; i < concurrent_chunks.size(); ++i) {
705✔
1623
                for(const auto& [bid, rw] : concurrent_read_write_sets[i]) {
863✔
1624
                        assert(command_instructions[i] != nullptr);
436✔
1625
                        auto& buffer = m_buffers.at(bid);
436✔
1626
                        for(auto& alloc : buffer.memories[concurrent_chunks[i].memory_id].allocations) {
893✔
1627
                                alloc.begin_concurrent_writes(region_intersection(alloc.box, rw.writes));
457✔
1628
                        }
1629
                }
1630
        }
1631

1632
        // 4. Update data locations and last writers resulting from all concurrent reads and overlapping writes
1633

1634
        for(size_t i = 0; i < concurrent_chunks.size(); ++i) {
705✔
1635
                for(const auto& [bid, rw] : concurrent_read_write_sets[i]) {
863✔
1636
                        assert(command_instructions[i] != nullptr);
436✔
1637
                        auto& buffer = m_buffers.at(bid);
436✔
1638

1639
                        for(auto& alloc : buffer.memories[concurrent_chunks[i].memory_id].allocations) {
893✔
1640
                                alloc.track_concurrent_read(region_intersection(alloc.box, rw.reads), command_instructions[i]);
457✔
1641
                                alloc.track_concurrent_write(region_intersection(alloc.box, rw.writes), command_instructions[i]);
457✔
1642
                        }
1643
                        buffer.track_original_write(rw.writes, command_instructions[i], concurrent_chunks[i].memory_id);
436✔
1644
                }
1645
        }
1646
}
278✔
1647

1648
void generator_impl::perform_task_side_effects(
317✔
1649
    const task& tsk, const std::vector<localized_chunk>& concurrent_chunks, const std::vector<instruction*>& command_instructions) //
1650
{
1651
        if(tsk.get_side_effect_map().empty()) return;
317✔
1652

1653
        assert(concurrent_chunks.size() == 1); // splitting instructions with side effects would race
30✔
1654
        assert(!concurrent_chunks[0].device_id.has_value());
30✔
1655
        assert(concurrent_chunks[0].memory_id == host_memory_id);
30✔
1656

1657
        for(const auto& [hoid, order] : tsk.get_side_effect_map()) {
64✔
1658
                auto& host_object = m_host_objects.at(hoid);
34✔
1659
                if(const auto last_side_effect = host_object.last_side_effect) {
34!
1660
                        add_dependency(command_instructions[0], last_side_effect, instruction_dependency_origin::side_effect);
34✔
1661
                }
1662
                host_object.last_side_effect = command_instructions[0];
34✔
1663
        }
1664
}
1665

1666
void generator_impl::perform_task_collective_operations(
317✔
1667
    const task& tsk, const std::vector<localized_chunk>& concurrent_chunks, const std::vector<instruction*>& command_instructions) //
1668
{
1669
        if(tsk.get_collective_group_id() == non_collective_group_id) return;
317✔
1670

1671
        assert(concurrent_chunks.size() == 1); //
14✔
1672
        assert(!concurrent_chunks[0].device_id.has_value());
14✔
1673
        assert(concurrent_chunks[0].memory_id == host_memory_id);
14✔
1674

1675
        auto& group = m_collective_groups.at(tsk.get_collective_group_id()); // must be created previously with clone_collective_group_instruction
14✔
1676
        add_dependency(command_instructions[0], group.last_collective_operation, instruction_dependency_origin::collective_group_order);
14✔
1677
        group.last_collective_operation = command_instructions[0];
14✔
1678
}
1679

1680
void generator_impl::compile_execution_command(batch& command_batch, const execution_command& ecmd) {
327✔
1681
        const auto& tsk = *m_tm->get_task(ecmd.get_tid());
327✔
1682

1683
        // 1. If this is a collective host task, we might need to insert a `clone_collective_group_instruction` which the task instruction is later serialized on.
1684
        create_task_collective_groups(command_batch, tsk);
327✔
1685

1686
        // 2. Split the task into local chunks and (in case of a device kernel) assign it to devices
1687
        const auto concurrent_chunks = split_task_execution_range(ecmd, tsk);
327✔
1688

1689
        // 3. Detect and report overlapping writes - is not a fatal error to discover one, we always generate an executable (albeit racy) instruction graph
1690
        if(m_policy.overlapping_write_error != error_policy::ignore) { report_task_overlapping_writes(tsk, concurrent_chunks); }
323✔
1691

1692
        // 4. Perform all necessary receives, allocations, resize- and coherence copies to provide an appropriate set of buffer allocations and data distribution
1693
        // for all kernels and host tasks of this task. This is done simultaneously for all chunks to optimize the graph and avoid inefficient copy-chains.
1694
        auto accessed_bids = tsk.get_buffer_access_map().get_accessed_buffers();
321✔
1695
        for(const auto& rinfo : tsk.get_reductions()) {
361✔
1696
                accessed_bids.insert(rinfo.bid);
40✔
1697
        }
1698
        for(const auto bid : accessed_bids) {
605✔
1699
                satisfy_task_buffer_requirements(command_batch, bid, tsk, ecmd.get_execution_range(), ecmd.is_reduction_initializer(), concurrent_chunks);
288✔
1700
        }
1701

1702
        // 5. If the task contains reductions with more than one local input, create the appropriate gather allocations and (if the local node is the designated
1703
        // reduction initializer) copies the current buffer value into the new gather space.
1704
        std::vector<local_reduction> local_reductions(tsk.get_reductions().size());
951✔
1705
        for(size_t i = 0; i < local_reductions.size(); ++i) {
356✔
1706
                local_reductions[i] = prepare_task_local_reduction(command_batch, tsk.get_reductions()[i], ecmd, tsk, concurrent_chunks.size());
39✔
1707
        }
1708

1709
        // 6. Issue instructions to launch all concurrent kernels / host tasks.
1710
        std::vector<instruction*> command_instructions(concurrent_chunks.size());
951✔
1711
        for(size_t i = 0; i < concurrent_chunks.size(); ++i) {
783✔
1712
                command_instructions[i] = launch_task_kernel(command_batch, ecmd, tsk, concurrent_chunks[i]);
466✔
1713
        }
1714

1715
        // 7. Compute dependencies and update tracking data structures
1716
        perform_task_buffer_accesses(tsk, concurrent_chunks, command_instructions);
317✔
1717
        perform_task_side_effects(tsk, concurrent_chunks, command_instructions);
317✔
1718
        perform_task_collective_operations(tsk, concurrent_chunks, command_instructions);
317✔
1719

1720
        // 8. For any reductions with more than one local input, collect partial results and perform the reduction operation in host memory. This is done eagerly to
1721
        // avoid ever having to persist partial reduction states in our buffer tracking.
1722
        for(size_t i = 0; i < local_reductions.size(); ++i) {
356✔
1723
                finish_task_local_reduction(command_batch, local_reductions[i], tsk.get_reductions()[i], ecmd, tsk, concurrent_chunks);
39✔
1724
        }
1725

1726
        // 9. If any of the instructions have no predecessor, anchor them on the last epoch (this can only happen for chunks without any buffer accesses).
1727
        for(const auto instr : command_instructions) {
783✔
1728
                if(instr->get_dependencies().empty()) { add_dependency(instr, m_last_epoch, instruction_dependency_origin::last_epoch); }
466✔
1729
        }
1730
}
644✔
1731

1732
void generator_impl::compile_push_command(batch& command_batch, const push_command& pcmd) {
63✔
1733
        const auto trid = pcmd.get_transfer_id();
63✔
1734
        const auto push_box = box(pcmd.get_range());
63✔
1735

1736
        // If not all nodes contribute partial results to a global reductions, the remaining ones need to notify their peers that they should not expect any data.
1737
        // This is done by announcing an empty box through the pilot message, but not actually performing a send.
1738
        if(push_box.empty()) {
63✔
1739
                assert(trid.rid != no_reduction_id);
4✔
1740
                create_outbound_pilot(command_batch, pcmd.get_target(), trid, box<3>());
20✔
1741
                return;
4✔
1742
        }
1743

1744
        // Prioritize all instructions participating in a "push" to hide the latency of establishing local coherence behind the typically much longer latencies of
1745
        // inter-node communication
1746
        command_batch.base_priority = 10;
59✔
1747

1748
        auto& buffer = m_buffers.at(trid.bid);
59✔
1749
        auto& host_memory = buffer.memories[host_memory_id];
59✔
1750

1751
        // We want to generate the fewest number of send instructions possible without introducing new synchronization points between chunks of the same
1752
        // command that generated the pushed data. This will allow computation-communication overlap, especially in the case of oversubscribed splits.
1753
        std::vector<region<3>> concurrent_send_regions;
59✔
1754
        // Since we now send boxes individually, we do not need to allocate the entire push_box contiguously.
1755
        box_vector<3> required_host_allocation;
59✔
1756
        {
1757
                std::unordered_map<instruction_id, box_vector<3>> individual_send_boxes;
59✔
1758
                for(auto& [box, original_writer] : buffer.original_writers.get_region_values(push_box)) {
125✔
1759
                        individual_send_boxes[original_writer->get_id()].push_back(box);
66✔
1760
                        required_host_allocation.push_back(box);
66✔
1761
                }
59✔
1762
                for(auto& [original_writer, boxes] : individual_send_boxes) {
125✔
1763
                        concurrent_send_regions.push_back(region(std::move(boxes)));
66✔
1764
                }
1765
        }
59✔
1766

1767
        allocate_contiguously(command_batch, trid.bid, host_memory_id, std::move(required_host_allocation));
59✔
1768
        establish_coherence_between_buffer_memories(command_batch, trid.bid, host_memory_id, concurrent_send_regions);
59✔
1769

1770
        for(const auto& send_region : concurrent_send_regions) {
125✔
1771
                for(const auto& full_send_box : send_region.get_boxes()) {
132✔
1772
                        // Splitting must happen on buffer range instead of host allocation range to ensure boxes are also suitable for the receiver, which might have
1773
                        // a differently-shaped backing allocation
1774
                        for(const auto& compatible_send_box : split_into_communicator_compatible_boxes(buffer.range, full_send_box)) {
151✔
1775
                                const message_id msgid = create_outbound_pilot(command_batch, pcmd.get_target(), trid, compatible_send_box);
85✔
1776

1777
                                auto& allocation = host_memory.get_contiguous_allocation(compatible_send_box); // we allocate_contiguously above
85✔
1778

1779
                                const auto offset_in_allocation = compatible_send_box.get_offset() - allocation.box.get_offset();
85✔
1780
                                const auto send_instr = create<send_instruction>(command_batch, pcmd.get_target(), msgid, allocation.aid, allocation.box.get_range(),
170✔
1781
                                    offset_in_allocation, compatible_send_box.get_range(), buffer.elem_size,
170✔
1782
                                    [&](const auto& record_debug_info) { record_debug_info(pcmd.get_cid(), trid, buffer.debug_name, compatible_send_box.get_offset()); });
425✔
1783

1784
                                perform_concurrent_read_from_allocation(send_instr, allocation, compatible_send_box);
85✔
1785
                        }
66✔
1786
                }
1787
        }
1788
}
59✔
1789

1790
void generator_impl::defer_await_push_command(const await_push_command& apcmd) {
52✔
1791
        // We do not generate instructions for await-push commands immediately upon receiving them; instead, we buffer them and generate
1792
        // recv-instructions as soon as data is to be read by another instruction. This way, we can split the recv instructions and avoid
1793
        // unnecessary synchronization points between chunks that can otherwise profit from a computation-communication overlap.
1794

1795
        const auto& trid = apcmd.get_transfer_id();
52✔
1796
        if(is_recording()) { m_recorder->record_await_push_command_id(trid, apcmd.get_cid()); }
52!
1797

1798
        auto& buffer = m_buffers.at(trid.bid);
52✔
1799

1800
#ifndef NDEBUG
1801
        for(const auto& receive : buffer.pending_receives) {
52!
UNCOV
1802
                assert((trid.rid != no_reduction_id || receive.consumer_tid != trid.consumer_tid)
×
1803
                       && "received multiple await-pushes for the same consumer-task, buffer and reduction id");
1804
                assert(region_intersection(receive.received_region, apcmd.get_region()).empty()
×
1805
                       && "received an await-push command into a previously await-pushed region without an intermediate read");
1806
        }
1807
        for(const auto& gather : buffer.pending_gathers) {
52!
UNCOV
1808
                assert(std::pair(gather.consumer_tid, gather.rid) != std::pair(trid.consumer_tid, gather.rid)
×
1809
                       && "received multiple await-pushes for the same consumer-task, buffer and reduction id");
UNCOV
1810
                assert(region_intersection(gather.gather_box, apcmd.get_region()).empty()
×
1811
                       && "received an await-push command into a previously await-pushed region without an intermediate read");
1812
        }
1813
#endif
1814

1815
        if(trid.rid == no_reduction_id) {
52✔
1816
                buffer.pending_receives.emplace_back(trid.consumer_tid, apcmd.get_region(), connected_subregion_bounding_boxes(apcmd.get_region()));
27✔
1817
        } else {
1818
                assert(apcmd.get_region().get_boxes().size() == 1);
25✔
1819
                buffer.pending_gathers.emplace_back(trid.consumer_tid, trid.rid, apcmd.get_region().get_boxes().front());
25✔
1820
        }
1821
}
52✔
1822

1823
void generator_impl::compile_reduction_command(batch& command_batch, const reduction_command& rcmd) {
25✔
1824
        // In a single-node setting, global reductions are no-ops, so no reduction commands should ever be issued
1825
        assert(m_num_nodes > 1 && "received a reduction command in a single-node configuration");
25✔
1826

1827
        const auto [rid_, bid_, init_from_buffer] = rcmd.get_reduction_info();
25✔
1828
        const auto rid = rid_; // allow capturing in lambda
25✔
1829
        const auto bid = bid_; // allow capturing in lambda
25✔
1830

1831
        auto& buffer = m_buffers.at(bid);
25✔
1832

1833
        const auto gather = std::find_if(buffer.pending_gathers.begin(), buffer.pending_gathers.end(), [&](const buffer_state::gather_receive& g) {
25✔
1834
                return g.rid == rid; // assume that g.consumer_tid is correct because there cannot be multiple concurrent reductions for a single task
25✔
1835
        });
1836
        assert(gather != buffer.pending_gathers.end() && "received reduction command that is not preceded by an appropriate await-push");
25✔
1837
        assert(gather->gather_box == scalar_reduction_box);
25✔
1838

1839
        // 1. Create a host-memory allocation to gather the array of partial results
1840

1841
        const auto gather_aid = new_allocation_id(host_memory_id);
25✔
1842
        const auto node_chunk_size = gather->gather_box.get_area() * buffer.elem_size;
25✔
1843
        const auto gather_alloc_instr = create<
1844
            alloc_instruction>(command_batch, gather_aid, m_num_nodes * node_chunk_size, buffer.elem_align, [&](const auto& record_debug_info) {
25✔
1845
                record_debug_info(alloc_instruction_record::alloc_origin::gather, buffer_allocation_record{bid, buffer.debug_name, gather->gather_box}, m_num_nodes);
50✔
1846
        });
75!
1847
        add_dependency(gather_alloc_instr, m_last_epoch, instruction_dependency_origin::last_epoch);
25✔
1848

1849
        // 2. Fill the gather space with the reduction identity, so that the gather_receive_command can simply ignore empty boxes sent by peers that do not
1850
        // contribute to the reduction, and we can skip the gather-copy instruction if we ourselves do not contribute a partial result.
1851

1852
        const auto fill_identity_instr =
1853
            create<fill_identity_instruction>(command_batch, rid, gather_aid, m_num_nodes, [](const auto& record_debug_info) { record_debug_info(); });
50✔
1854
        add_dependency(fill_identity_instr, gather_alloc_instr, instruction_dependency_origin::allocation_lifetime);
25✔
1855

1856
        // 3. If the local node contributes to the reduction, copy the contribution to the appropriate position in the gather space. Testing `up_to_date_memories`
1857
        // locally is not enough to establish whether there is a local contribution, since the local node might not have participated in the task that initiated the
1858
        // reduction. Instead, we are informed about this condition by the command graph.
1859

1860
        copy_instruction* local_gather_copy_instr = nullptr;
25✔
1861
        if(rcmd.has_local_contribution()) {
25✔
1862
                const auto contribution_location = buffer.up_to_date_memories.get_region_values(scalar_reduction_box).front().second;
23✔
1863
                const auto source_mid = next_location(contribution_location, host_memory_id);
23✔
1864
                // if scalar_box is up to date in that memory, it (the single element) must also be contiguous
1865
                auto& source_allocation = buffer.memories[source_mid].get_contiguous_allocation(scalar_reduction_box);
23✔
1866

1867
                local_gather_copy_instr = create<copy_instruction>(command_batch, source_allocation.aid,
46✔
1868
                    allocation_with_offset(gather_aid, m_local_nid * buffer.elem_size), source_allocation.box, scalar_reduction_box, scalar_reduction_box,
46✔
1869
                    buffer.elem_size, [&](const auto& record_debug_info) { record_debug_info(copy_instruction_record::copy_origin::gather, bid, buffer.debug_name); });
69✔
1870
                add_dependency(local_gather_copy_instr, fill_identity_instr, instruction_dependency_origin::write_to_allocation);
23✔
1871
                perform_concurrent_read_from_allocation(local_gather_copy_instr, source_allocation, scalar_reduction_box);
23✔
1872
        }
1873

1874
        // 4. Gather remote contributions to the partial result array
1875

1876
        const transfer_id trid(gather->consumer_tid, bid, gather->rid);
25✔
1877
        const auto gather_recv_instr = create<gather_receive_instruction>(command_batch, trid, gather_aid, node_chunk_size,
50✔
1878
            [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name, gather->gather_box, m_num_nodes); });
50✔
1879
        add_dependency(gather_recv_instr, fill_identity_instr, instruction_dependency_origin::write_to_allocation);
25✔
1880

1881
        // 5. Perform the global reduction on the host by reading the array of inputs from the gather space and writing to the buffer's host allocation that covers
1882
        // `scalar_reduction_box`.
1883

1884
        allocate_contiguously(command_batch, bid, host_memory_id, {scalar_reduction_box});
75✔
1885

1886
        auto& host_memory = buffer.memories[host_memory_id];
25✔
1887
        auto& dest_allocation = host_memory.get_contiguous_allocation(scalar_reduction_box);
25✔
1888

1889
        const auto reduce_instr = create<reduce_instruction>(command_batch, rid, gather_aid, m_num_nodes, dest_allocation.aid, [&](const auto& record_debug_info) {
25✔
1890
                record_debug_info(rcmd.get_cid(), bid, buffer.debug_name, scalar_reduction_box, reduce_instruction_record::reduction_scope::global);
25✔
1891
        });
50✔
1892
        add_dependency(reduce_instr, gather_recv_instr, instruction_dependency_origin::read_from_allocation);
25✔
1893
        if(local_gather_copy_instr != nullptr) { add_dependency(reduce_instr, local_gather_copy_instr, instruction_dependency_origin::read_from_allocation); }
25✔
1894
        perform_atomic_write_to_allocation(reduce_instr, dest_allocation, scalar_reduction_box);
25✔
1895
        buffer.track_original_write(scalar_reduction_box, reduce_instr, host_memory_id);
25✔
1896

1897
        // 6. Free the gather space
1898

1899
        const auto gather_free_instr = create<free_instruction>(
50✔
1900
            command_batch, gather_aid, [&](const auto& record_debug_info) { record_debug_info(m_num_nodes * node_chunk_size, std::nullopt); });
50✔
1901
        add_dependency(gather_free_instr, reduce_instr, instruction_dependency_origin::allocation_lifetime);
25✔
1902

1903
        buffer.pending_gathers.clear();
25✔
1904

1905
        // The associated reducer will be garbage-collected form the executor as we pass the reduction id on via the instruction_garbage member of the next horizon
1906
        // or epoch instruction.
1907
}
25✔
1908

1909
void generator_impl::compile_fence_command(batch& command_batch, const fence_command& fcmd) {
15✔
1910
        const auto& tsk = *m_tm->get_task(fcmd.get_tid());
15✔
1911

1912
        assert(tsk.get_reductions().empty());
15✔
1913
        assert(tsk.get_collective_group_id() == non_collective_group_id);
15✔
1914

1915
        const auto& bam = tsk.get_buffer_access_map();
15✔
1916
        const auto& sem = tsk.get_side_effect_map();
15✔
1917
        assert(bam.get_num_accesses() + sem.size() == 1);
15✔
1918

1919
        // buffer fences encode their buffer id and subrange through buffer_access_map with a fixed range mapper (which is rather ugly)
1920
        if(bam.get_num_accesses() != 0) {
15✔
1921
                const auto bid = *bam.get_accessed_buffers().begin();
13✔
1922
                const auto fence_region = bam.get_mode_requirements(bid, access_mode::read, 0, {}, zeros);
65✔
1923
                const auto fence_box = !fence_region.empty() ? fence_region.get_boxes().front() : box<3>();
21✔
1924

1925
                const auto user_allocation_id = tsk.get_fence_promise()->get_user_allocation_id();
13✔
1926
                assert(user_allocation_id != null_allocation_id && user_allocation_id.get_memory_id() == user_memory_id);
13✔
1927

1928
                auto& buffer = m_buffers.at(bid);
13✔
1929
                copy_instruction* copy_instr = nullptr;
13✔
1930
                // gracefully handle empty-range buffer fences
1931
                if(!fence_box.empty()) {
13✔
1932
                        // We make the host buffer coherent first in order to apply pending await-pushes.
1933
                        // TODO this enforces a contiguous host-buffer allocation which may cause unnecessary resizes.
1934
                        satisfy_task_buffer_requirements(command_batch, bid, tsk, {}, false /* is_reduction_initializer: irrelevant */,
66✔
1935
                            std::vector{localized_chunk{host_memory_id, std::nullopt, box<3>()}} /* local_chunks: irrelevant */);
44✔
1936

1937
                        auto& host_buffer_allocation = buffer.memories[host_memory_id].get_contiguous_allocation(fence_box);
11✔
1938
                        copy_instr = create<copy_instruction>(command_batch, host_buffer_allocation.aid, user_allocation_id, host_buffer_allocation.box, fence_box,
11✔
1939
                            fence_box, buffer.elem_size,
11✔
1940
                            [&](const auto& record_debug_info) { record_debug_info(copy_instruction_record::copy_origin::fence, bid, buffer.debug_name); });
22✔
1941

1942
                        perform_concurrent_read_from_allocation(copy_instr, host_buffer_allocation, fence_box);
11✔
1943
                }
1944

1945
                const auto fence_instr = create<fence_instruction>(command_batch, tsk.get_fence_promise(),
13✔
1946
                    [&](const auto& record_debug_info) { record_debug_info(tsk.get_id(), fcmd.get_cid(), bid, buffer.debug_name, fence_box.get_subrange()); });
39✔
1947

1948
                if(copy_instr != nullptr) {
13✔
1949
                        add_dependency(fence_instr, copy_instr, instruction_dependency_origin::read_from_allocation);
11✔
1950
                } else {
1951
                        // an empty-range buffer fence has no data dependencies but must still be executed to fulfill its promise - attach it to the current epoch.
1952
                        add_dependency(fence_instr, m_last_epoch, instruction_dependency_origin::last_epoch);
2✔
1953
                }
1954

1955
                // we will just assume that the runtime does not intend to re-use the allocation it has passed
1956
                m_unreferenced_user_allocations.push_back(user_allocation_id);
13✔
1957
        }
13✔
1958

1959
        // host-object fences encode their host-object id in the task side effect map (which is also very ugly)
1960
        if(!sem.empty()) {
15✔
1961
                const auto hoid = sem.begin()->first;
2✔
1962

1963
                auto& obj = m_host_objects.at(hoid);
2✔
1964
                const auto fence_instr = create<fence_instruction>(
4✔
1965
                    command_batch, tsk.get_fence_promise(), [&, hoid = hoid](const auto& record_debug_info) { record_debug_info(tsk.get_id(), fcmd.get_cid(), hoid); });
4✔
1966

1967
                add_dependency(fence_instr, obj.last_side_effect, instruction_dependency_origin::side_effect);
2✔
1968
                obj.last_side_effect = fence_instr;
2✔
1969
        }
1970
}
15✔
1971

1972
void generator_impl::compile_horizon_command(batch& command_batch, const horizon_command& hcmd) {
15✔
1973
        m_idag->begin_epoch(hcmd.get_tid());
15✔
1974
        instruction_garbage garbage{hcmd.get_completed_reductions(), std::move(m_unreferenced_user_allocations)};
15✔
1975
        const auto horizon = create<horizon_instruction>(
15✔
1976
            command_batch, hcmd.get_tid(), std::move(garbage), [&](const auto& record_debug_info) { record_debug_info(hcmd.get_cid()); });
45✔
1977

1978
        collapse_execution_front_to(horizon);
15✔
1979
        if(m_last_horizon != nullptr) { apply_epoch(m_last_horizon); }
15✔
1980
        m_last_horizon = horizon;
15✔
1981
}
30✔
1982

1983
void generator_impl::compile_epoch_command(batch& command_batch, const epoch_command& ecmd) {
154✔
1984
        m_idag->begin_epoch(ecmd.get_tid());
154✔
1985
        instruction_garbage garbage{ecmd.get_completed_reductions(), std::move(m_unreferenced_user_allocations)};
154✔
1986
        const auto epoch = create<epoch_instruction>(
154✔
1987
            command_batch, ecmd.get_tid(), ecmd.get_epoch_action(), std::move(garbage), [&](const auto& record_debug_info) { record_debug_info(ecmd.get_cid()); });
462✔
1988

1989
        collapse_execution_front_to(epoch);
154✔
1990
        apply_epoch(epoch);
154✔
1991
        m_last_horizon = nullptr;
154✔
1992
}
308✔
1993

1994
void generator_impl::flush_batch(batch&& batch) { // NOLINT(cppcoreguidelines-rvalue-reference-param-not-moved) we do move the members of `batch`
970✔
1995
        // sanity check: every instruction except the initial epoch must be temporally anchored through at least one dependency
1996
        assert(std::all_of(batch.generated_instructions.begin(), batch.generated_instructions.end(),
3,089✔
1997
            [](const auto instr) { return instr->get_id() == 0 || !instr->get_dependencies().empty(); }));
1998
        assert(is_topologically_sorted(batch.generated_instructions.begin(), batch.generated_instructions.end()));
970✔
1999

2000
        // instructions must be recorded manually after each create<instr>() call; verify that we never flush an unrecorded instruction
2001
        assert(m_recorder == nullptr || std::all_of(batch.generated_instructions.begin(), batch.generated_instructions.end(), [this](const auto instr) {
24,716✔
2002
                return std::find_if(m_recorder->get_instructions().begin(), m_recorder->get_instructions().end(), [=](const auto& rec) {
2003
                        return rec->id == instr->get_id();
2004
                }) != m_recorder->get_instructions().end();
2005
        }));
2006

2007
        if(m_delegate != nullptr && (!batch.generated_instructions.empty() || !batch.generated_pilots.empty())) {
970!
UNCOV
2008
                m_delegate->flush(std::move(batch.generated_instructions), std::move(batch.generated_pilots));
×
2009
        }
2010

2011
#ifndef NDEBUG // ~batch() checks if it has been flushed, which we want to acknowledge even if m_delegate == nullptr
2012
        batch.generated_instructions = {};
970✔
2013
        batch.generated_pilots = {};
970✔
2014
#endif
2015
}
970✔
2016

2017
void generator_impl::compile(const abstract_command& cmd) {
651✔
2018
        batch command_batch;
651✔
2019
        matchbox::match(
651✔
2020
            cmd,                                                                                    //
2021
            [&](const execution_command& ecmd) { compile_execution_command(command_batch, ecmd); }, //
978✔
2022
            [&](const push_command& pcmd) { compile_push_command(command_batch, pcmd); },           //
1,365✔
2023
            [&](const await_push_command& apcmd) { defer_await_push_command(apcmd); },              //
1,354✔
2024
            [&](const horizon_command& hcmd) { compile_horizon_command(command_batch, hcmd); },     //
1,317✔
2025
            [&](const epoch_command& ecmd) { compile_epoch_command(command_batch, ecmd); },         //
1,456✔
2026
            [&](const reduction_command& rcmd) { compile_reduction_command(command_batch, rcmd); }, //
1,327✔
2027
            [&](const fence_command& fcmd) { compile_fence_command(command_batch, fcmd); }          //
1,317✔
2028
        );
2029
        flush_batch(std::move(command_batch));
641✔
2030
}
1,292✔
2031

2032
std::string generator_impl::print_buffer_debug_label(const buffer_id bid) const { return utils::make_buffer_debug_label(bid, m_buffers.at(bid).debug_name); }
6✔
2033

2034
} // namespace celerity::detail::instruction_graph_generator_detail
2035

2036
namespace celerity::detail {
2037

2038
instruction_graph_generator::instruction_graph_generator(const task_manager& tm, const size_t num_nodes, const node_id local_nid, const system_info& system,
163✔
2039
    instruction_graph& idag, delegate* dlg, instruction_recorder* const recorder, const policy_set& policy)
163✔
2040
    : m_impl(new instruction_graph_generator_detail::generator_impl(tm, num_nodes, local_nid, system, idag, dlg, recorder, policy)) {}
163!
2041

2042
instruction_graph_generator::~instruction_graph_generator() = default;
163✔
2043

2044
void instruction_graph_generator::notify_buffer_created(
161✔
2045
    const buffer_id bid, const range<3>& range, const size_t elem_size, const size_t elem_align, const allocation_id user_allocation_id) {
2046
        m_impl->notify_buffer_created(bid, range, elem_size, elem_align, user_allocation_id);
161✔
2047
}
161✔
2048

UNCOV
2049
void instruction_graph_generator::notify_buffer_debug_name_changed(const buffer_id bid, const std::string& name) {
×
UNCOV
2050
        m_impl->notify_buffer_debug_name_changed(bid, name);
×
UNCOV
2051
}
×
2052

2053
void instruction_graph_generator::notify_buffer_destroyed(const buffer_id bid) { m_impl->notify_buffer_destroyed(bid); }
151✔
2054

2055
void instruction_graph_generator::notify_host_object_created(const host_object_id hoid, const bool owns_instance) {
20✔
2056
        m_impl->notify_host_object_created(hoid, owns_instance);
20✔
2057
}
20✔
2058

2059
void instruction_graph_generator::notify_host_object_destroyed(const host_object_id hoid) { m_impl->notify_host_object_destroyed(hoid); }
16✔
2060

2061
void instruction_graph_generator::compile(const abstract_command& cmd) { m_impl->compile(cmd); }
651✔
2062

2063
} // namespace celerity::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc