• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 12047884020

27 Nov 2024 09:58AM UTC coverage: 94.956% (-0.02%) from 94.972%
12047884020

push

github

fknorr
Do not disable CGF disagnostics in test_utils::add_*_task

This eliminates dead code from an earlier incomplete refactoring.

The CGF teardown / reinit was only required by a single test, which was
coincidentally also broken and didn't test the feature advertised. This
commit splits up the test between runtime_ and runtime_deprecation_tests
and also moves runtime-independent sibling tests to task_graph_tests.

3202 of 3633 branches covered (88.14%)

Branch coverage included in aggregate %.

7151 of 7270 relevant lines covered (98.36%)

1224689.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.76
/src/instruction_graph_generator.cc
1
#include "instruction_graph_generator.h"
2

3
#include "access_modes.h"
4
#include "cgf.h"
5
#include "command_graph.h"
6
#include "dense_map.h"
7
#include "grid.h"
8
#include "hint.h"
9
#include "instruction_graph.h"
10
#include "log.h"
11
#include "nd_memory.h"
12
#include "pilot.h"
13
#include "print_utils_internal.h"
14
#include "ranges.h"
15
#include "recorders.h"
16
#include "reduction.h"
17
#include "region_map.h"
18
#include "split.h"
19
#include "system_info.h"
20
#include "task.h"
21
#include "tracy.h"
22
#include "types.h"
23
#include "utils.h"
24
#include "workaround.h"
25

26
#include <algorithm>
27
#include <cassert>
28
#include <cstddef>
29
#include <exception>
30
#include <iterator>
31
#include <limits>
32
#include <numeric>
33
#include <optional>
34
#include <tuple>
35
#include <unordered_map>
36
#include <unordered_set>
37
#include <utility>
38
#include <variant>
39
#include <vector>
40

41
#include <fmt/format.h>
42
#include <gch/small_vector.hpp>
43
#include <matchbox.hh>
44

45

46
namespace celerity::detail::instruction_graph_generator_detail {
47

48
/// Helper for split_into_communicator_compatible_boxes().
49
void split_into_communicator_compatible_boxes_recurse(
4,876✔
50
    box_vector<3>& compatible_boxes, const box<3>& send_box, id<3> min, id<3> max, const int compatible_starting_from_dim, const int dim) {
51
        assert(dim <= compatible_starting_from_dim);
4,876✔
52
        const auto& full_box_min = send_box.get_min();
4,876✔
53
        const auto& full_box_max = send_box.get_max();
4,876✔
54

55
        if(dim == compatible_starting_from_dim) {
4,876✔
56
                // There are no incompatible strides in faster dimensions, so we simply split along this dimension into communicator_max_coordinate-sized chunks
57
                for(min[dim] = full_box_min[dim]; min[dim] < full_box_max[dim]; min[dim] += communicator_max_coordinate) {
9,756✔
58
                        max[dim] = std::min(full_box_max[dim], min[dim] + communicator_max_coordinate);
4,886✔
59
                        compatible_boxes.emplace_back(min, max);
4,886✔
60
                }
61
        } else {
62
                // A fast dimension (> 0) has incompatible strides - we can't do better than iterating over the slow dimension
63
                for(min[dim] = full_box_min[dim]; min[dim] < full_box_max[dim]; ++min[dim]) {
4,124✔
64
                        max[dim] = min[dim] + 1;
4,118✔
65
                        split_into_communicator_compatible_boxes_recurse(compatible_boxes, send_box, min, max, compatible_starting_from_dim, dim + 1);
4,118✔
66
                }
67
        }
68
}
4,876✔
69

70
/// Computes a decomposition of `send_box` into boxes that are compatible with every communicator. Note that this takes `buffer_range` rather than
71
/// `allocation_range` as a parameter, because a box that is compatible with the sender allocation might not be compatible with the allocation on the receiver
72
/// side (which we don't know anything about), but splitting for `buffer_range` will guarantee both.
73
///
74
/// On the MPI side, we implement buffer transfers as peer-to-peer operations with sub-array datatypes. These are implemented with 32-bit signed integer
75
/// strides, so transfers on buffers with a range > 2^31 in any dimension might have to be split to work around the coordinate limit by adjusting the base
76
/// pointer to using the stride as an offset. For 1D buffers this will no practical performance consequences because of the implied transfer sizes, but in
77
/// degenerate higher-dimensional cases we might end up transferring individual buffer elements per send instruction.
78
///
79
/// TODO The proper solution to this is to apply the host-staging mechanism (which is currently in place for copies between devices that don't support peer
80
/// access) to sends / receives as well.
81
box_vector<3> split_into_communicator_compatible_boxes(const range<3>& buffer_range, const box<3>& send_box) {
758✔
82
        assert(box(subrange<3>(zeros, buffer_range)).covers(send_box));
758✔
83

84
        int compatible_starting_from_dim = 0;
758✔
85
        for(int d = 1; d < 3; ++d) {
2,274✔
86
                if(buffer_range[d] > communicator_max_coordinate) { compatible_starting_from_dim = d; }
1,516✔
87
        }
88

89
        // There are pathological patterns like single-element columns in a 2D buffer with y-extent > communicator_max_coordinate that generate a huge number of
90
        // individual transfers with a small payload each. This might take very long and / or derail the instruction graph generator, so we log a warning before
91
        // computing the actual set.
92
        size_t max_compatible_box_area = 1;
758✔
93
        size_t min_num_compatible_boxes = 1;
758✔
94
        for(int d = 0; d < 3; ++d) {
3,032✔
95
                (d < compatible_starting_from_dim ? min_num_compatible_boxes : max_compatible_box_area) *= send_box.get_range()[d];
2,274✔
96
        }
97
        if(min_num_compatible_boxes > 256 && max_compatible_box_area < 65536) {
758!
98
                CELERITY_WARN("Celerity is generating an excessive amount of small transfers to keep strides representable as 32-bit integers for MPI compatibility. "
1✔
99
                              "This might be very slow and / or exhaust system memory. Consider transposing your buffer layout to remedy this.");
100
        }
101

102
        box_vector<3> compatible_boxes;
758✔
103
        split_into_communicator_compatible_boxes_recurse(compatible_boxes, send_box, send_box.get_min(), send_box.get_max(), compatible_starting_from_dim, 0);
758✔
104
        return compatible_boxes;
758✔
105
}
×
106

107
/// Determines whether two boxes are either overlapping or touching on edges (not corners). This means on 2-connectivity for 1d boxes, 4-connectivity for 2d
108
/// boxes and 6-connectivity for 3d boxes. For 0-dimensional boxes, always returns true.
109
template <int Dims>
110
bool boxes_edge_connected(const box<Dims>& box1, const box<Dims>& box2) {
180✔
111
        if(box1.empty() || box2.empty()) return false;
180✔
112

113
        // boxes can be 2/4/6 connected by either fully overlapping, or by overlapping in Dims-1 dimensions and touching (a.max[d] == b.min[d]) in the remaining one
114
        bool disconnected = false;
178✔
115
        int n_dims_touching = 0;
178✔
116
        for(int d = 0; d < Dims; ++d) {
647✔
117
                // compute the intersection but without normalizing the box to distinguish the "disconnected" from the "touching" case
118
                const auto min = std::max(box1.get_min()[d], box2.get_min()[d]);
469✔
119
                const auto max = std::min(box1.get_max()[d], box2.get_max()[d]);
469✔
120
                if(min < max) {
469✔
121
                        // boxes overlap in this dimension
122
                } else if(min == max) {
204✔
123
                        n_dims_touching += 1;
80✔
124
                } else /* min > max */ {
125
                        disconnected = true;
124✔
126
                }
127
        }
128
        return !disconnected && n_dims_touching <= 1;
178✔
129
}
130

131
// explicit instantiations for tests
132
template bool boxes_edge_connected(const box<0>& box1, const box<0>& box2);
133
template bool boxes_edge_connected(const box<1>& box1, const box<1>& box2);
134
template bool boxes_edge_connected(const box<2>& box1, const box<2>& box2);
135
template bool boxes_edge_connected(const box<3>& box1, const box<3>& box2);
136

137
/// Subdivide a region into connected partitions (where connectivity is established by `boxes_edge_connected`) and return the bounding box of each partition.
138
/// Note that the returned boxes are not necessarily disjoint event through the partitions always are.
139
///
140
/// This logic is employed to find connected subregions in a pending-receive that might be satisfied by a peer through a single send operation and thus requires
141
/// a contiguous backing allocation.
142
template <int Dims>
143
box_vector<Dims> connected_subregion_bounding_boxes(const region<Dims>& region) {
348✔
144
        auto boxes = region.get_boxes();
348✔
145
        auto begin = boxes.begin();
348✔
146
        auto end = boxes.end();
348✔
147
        box_vector<Dims> bounding_boxes;
348✔
148
        while(begin != end) {
789✔
149
                auto connected_end = std::next(begin);
441✔
150
                auto connected_bounding_box = *begin; // optimization: skip connectivity checks if bounding box is disconnected
441✔
151
                for(; connected_end != end; ++connected_end) {
445✔
152
                        const auto next_connected = std::find_if(connected_end, end, [&](const auto& candidate) {
203✔
153
                                return boxes_edge_connected(connected_bounding_box, candidate)
105✔
154
                                       && std::any_of(begin, connected_end, [&](const auto& box) { return boxes_edge_connected(candidate, box); });
122!
155
                        });
156
                        if(next_connected == end) break;
98✔
157
                        connected_bounding_box = bounding_box(connected_bounding_box, *next_connected);
4✔
158
                        std::swap(*next_connected, *connected_end);
4✔
159
                }
160
                bounding_boxes.push_back(connected_bounding_box);
441✔
161
                begin = connected_end;
441✔
162
        }
163
        return bounding_boxes;
696✔
164
}
348✔
165

166
// explicit instantiations for tests
167
template box_vector<0> connected_subregion_bounding_boxes(const region<0>& region);
168
template box_vector<1> connected_subregion_bounding_boxes(const region<1>& region);
169
template box_vector<2> connected_subregion_bounding_boxes(const region<2>& region);
170
template box_vector<3> connected_subregion_bounding_boxes(const region<3>& region);
171

172
/// Iteratively replaces each pair of overlapping boxes by their bounding box such that in the modified set of boxes, no two boxes overlap, but all original
173
/// input boxes are covered.
174
///
175
/// When a kernel or host task has multiple accessors into a single allocation, each must be backed by a contiguous allocation. This makes it necessary to
176
/// contiguously allocate the bounding box of all overlapping accesses.
177
template <int Dims>
178
void merge_overlapping_bounding_boxes(box_vector<Dims>& boxes) {
2,255✔
179
restart:
3,185✔
180
        for(auto first = boxes.begin(); first != boxes.end(); ++first) {
6,421✔
181
                const auto last = std::remove_if(std::next(first), boxes.end(), [&](const auto& box) {
8,332✔
182
                        const auto overlapping = !box_intersection(*first, box).empty();
4,696✔
183
                        if(overlapping) { *first = bounding_box(*first, box); }
4,696✔
184
                        return overlapping;
4,696✔
185
                });
186
                if(last != boxes.end()) {
4,166✔
187
                        boxes.erase(last, boxes.end());
930✔
188
                        goto restart; // NOLINT(cppcoreguidelines-avoid-goto)
930✔
189
                }
190
        }
191
}
2,255✔
192

193
/// In a set of potentially overlapping regions, removes the overlap between any two regions {A, B} by replacing them with {A ∖ B, A ∩ B, B ∖ A}.
194
///
195
/// This is used when generating copy and receive-instructions such that every data item is only copied or received once, but the following device kernels /
196
/// host tasks have their dependencies satisfied as soon as their subset of input data is available.
197
template <int Dims>
198
void symmetrically_split_overlapping_regions(std::vector<region<Dims>>& regions) {
2,280✔
199
        for(size_t i = 0; i < regions.size(); ++i) {
5,485✔
200
                for(size_t j = i + 1; j < regions.size(); ++j) {
8,957✔
201
                        auto intersection = region_intersection(regions[i], regions[j]);
2,876✔
202
                        if(!intersection.empty()) {
2,876✔
203
                                // merely shrinking regions will not introduce new intersections downstream, so we do not need to restart the loop
204
                                regions[i] = region_difference(regions[i], intersection);
239✔
205
                                regions[j] = region_difference(regions[j], intersection);
239✔
206
                                regions.push_back(std::move(intersection));
239✔
207
                        }
208
                }
209
        }
210
        // if any of the intersections above are actually subsets, we will end up with empty regions
211
        regions.erase(std::remove_if(regions.begin(), regions.end(), std::mem_fn(&region<Dims>::empty)), regions.end());
2,280✔
212
}
2,280✔
213

214
// explicit instantiations for tests
215
template void symmetrically_split_overlapping_regions(std::vector<region<0>>& regions);
216
template void symmetrically_split_overlapping_regions(std::vector<region<1>>& regions);
217
template void symmetrically_split_overlapping_regions(std::vector<region<2>>& regions);
218
template void symmetrically_split_overlapping_regions(std::vector<region<3>>& regions);
219

220
/// Returns whether an iterator range of instruction pointers is topologically sorted, i.e. sequential execution would satisfy all internal dependencies.
221
template <typename Iterator>
222
bool is_topologically_sorted(Iterator begin, Iterator end) {
5,770✔
223
        for(auto check = begin; check != end; ++check) {
18,908✔
224
                for(const auto dep : (*check)->get_dependencies()) {
37,874✔
225
                        if(std::find_if(std::next(check), end, [dep](const auto& node) { return node->get_id() == dep; }) != end) return false;
147,342!
226
                }
227
        }
228
        return true;
5,770✔
229
}
230

231
/// Starting from `first` (inclusive), selects the next memory_id which is set in `location`.
232
memory_id next_location(const memory_mask& location, memory_id first) {
41✔
233
        for(size_t i = 0; i < max_num_memories; ++i) {
48!
234
                const memory_id mem = (first + i) % max_num_memories;
48✔
235
                if(location[mem]) { return mem; }
89✔
236
        }
237
        utils::panic("data is requested to be read, but not located in any memory");
×
238
}
239

240
/// Maintains a set of concurrent instructions that are accessing a subrange of a buffer allocation.
241
/// Instruction pointers are ordered by id to allow equality comparision on the internal vector structure.
242
class access_front {
243
  public:
244
        enum mode { none, allocate, read, write };
245

246
        access_front() = default;
154✔
247
        explicit access_front(const mode mode) : m_mode(mode) {}
11,393✔
248
        explicit access_front(instruction* const instr, const mode mode) : m_instructions{instr}, m_mode(mode) {}
50,874✔
249

250
        void add_instruction(instruction* const instr) {
6,288✔
251
                // we insert instructions as soon as they are generated, so inserting at the end keeps the vector sorted
252
                m_instructions.push_back(instr);
6,288✔
253
                assert(std::is_sorted(m_instructions.begin(), m_instructions.end(), instruction_id_less()));
6,288✔
254
        }
6,288✔
255

256
        [[nodiscard]] access_front apply_epoch(instruction* const epoch) const {
6,964✔
257
                const auto first_retained = std::upper_bound(m_instructions.begin(), m_instructions.end(), epoch, instruction_id_less());
6,964✔
258
                const auto last_retained = m_instructions.end();
6,964✔
259

260
                // only include the new epoch in the access front if it in fact subsumes another instruction
261
                if(first_retained == m_instructions.begin()) return *this;
6,964✔
262

263
                access_front pruned(m_mode);
2,304✔
264
                pruned.m_instructions.resize(1 + static_cast<size_t>(std::distance(first_retained, last_retained)));
2,304✔
265
                pruned.m_instructions.front() = epoch;
2,304✔
266
                std::copy(first_retained, last_retained, std::next(pruned.m_instructions.begin()));
4,608✔
267
                assert(std::is_sorted(pruned.m_instructions.begin(), pruned.m_instructions.end(), instruction_id_less()));
2,304✔
268
                return pruned;
2,304✔
269
        };
2,304✔
270

271
        const gch::small_vector<instruction*>& get_instructions() const { return m_instructions; }
20,211✔
272
        mode get_mode() const { return m_mode; }
33,201✔
273

274
        friend bool operator==(const access_front& lhs, const access_front& rhs) { return lhs.m_instructions == rhs.m_instructions && lhs.m_mode == rhs.m_mode; }
85,968✔
275
        friend bool operator!=(const access_front& lhs, const access_front& rhs) { return !(lhs == rhs); }
34,592✔
276

277
  private:
278
        gch::small_vector<instruction*> m_instructions; // ordered by id to allow equality comparison
279
        mode m_mode = none;
280
};
281

282
/// Per-allocation state for a single buffer. This is where we track last-writer instructions and access fronts.
283
struct buffer_allocation_state {
284
        allocation_id aid;
285
        detail::box<3> box;                                ///< in buffer coordinates
286
        region_map<access_front> last_writers;             ///< in buffer coordinates
287
        region_map<access_front> last_concurrent_accesses; ///< in buffer coordinates
288

289
        explicit buffer_allocation_state(const allocation_id aid, alloc_instruction* const ainstr /* optional: null for user allocations */,
1,422✔
290
            const detail::box<3>& allocated_box, const range<3>& buffer_range)
291
            : aid(aid), box(allocated_box), //
1,422✔
292
              last_writers(allocated_box, ainstr != nullptr ? access_front(ainstr, access_front::allocate) : access_front()),
1,422✔
293
              last_concurrent_accesses(allocated_box, ainstr != nullptr ? access_front(ainstr, access_front::allocate) : access_front()) {}
1,422✔
294

295
        /// Add `instr` to the active set of concurrent reads, or replace the current access front if the last access was not a read.
296
        void track_concurrent_read(const region<3>& region, instruction* const instr) {
8,133✔
297
                if(region.empty()) return;
8,133✔
298
                for(auto& [box, front] : last_concurrent_accesses.get_region_values(region)) {
16,900✔
299
                        if(front.get_mode() == access_front::read) {
10,569✔
300
                                front.add_instruction(instr);
2,860✔
301
                        } else {
302
                                front = access_front(instr, access_front::read);
7,709✔
303
                        }
304
                        last_concurrent_accesses.update_box(box, front);
10,569✔
305
                }
6,331✔
306
        }
307

308
        /// Replace the current access front with a write. The write is treated as "atomic" in the sense that there is never a second, concurrent write operation
309
        /// happening simultaneously. This is true for all writes except those from device kernels and host tasks, which might specify overlapping write-accessors.
310
        void track_atomic_write(const region<3>& region, instruction* const instr) {
3,227✔
311
                if(region.empty()) return;
3,227!
312
                last_writers.update_region(region, access_front(instr, access_front::write));
3,227✔
313
                last_concurrent_accesses.update_region(region, access_front(instr, access_front::write));
3,227✔
314
        }
315

316
        /// Replace the current access front with an empty write-front. This is done in preparation of writes from device kernels and host tasks.
317
        void begin_concurrent_writes(const region<3>& region) {
4,602✔
318
                if(region.empty()) return;
4,602✔
319
                last_writers.update_region(region, access_front(access_front::write));
2,871✔
320
                last_concurrent_accesses.update_region(region, access_front(access_front::write));
2,871✔
321
        }
322

323
        /// Add an instruction to the current set of concurrent writes. This is used to track writes from device kernels and host tasks and requires
324
        /// begin_concurrent_writes to be called beforehand. Multiple concurrent writes will only occur when a task declares overlapping writes and
325
        /// overlapping-write detection is disabled via the error policy. In order to still produce an executable (albeit racy instruction graph) in that case, we
326
        /// track multiple last-writers for the same buffer element.
327
        void track_concurrent_write(const region<3>& region, instruction* const instr) {
4,602✔
328
                if(region.empty()) return;
4,602✔
329
                for(auto& [box, front] : last_writers.get_region_values(region)) {
5,751✔
330
                        assert(front.get_mode() == access_front::write && "must call begin_concurrent_writes first");
2,880✔
331
                        front.add_instruction(instr);
2,880✔
332
                        last_writers.update_box(box, front);
2,880✔
333
                        last_concurrent_accesses.update_box(box, front);
2,880✔
334
                }
2,871✔
335
        }
336

337
        /// Replace all tracked instructions that older than `epoch` with `epoch`.
338
        void apply_epoch(instruction* const epoch) {
1,859✔
339
                last_writers.apply_to_values([epoch](const access_front& front) { return front.apply_epoch(epoch); });
4,996✔
340
                last_concurrent_accesses.apply_to_values([epoch](const access_front& front) { return front.apply_epoch(epoch); });
5,373✔
341
        }
1,859✔
342
};
343

344
/// Per-memory state for a single buffer. Dependencies and last writers are tracked on the contained allocations.
345
struct buffer_memory_state {
346
        // TODO bound the number of allocations per buffer in order to avoid runaway tracking overhead (similar to horizons)
347
        // TODO evaluate if it ever makes sense to use a region_map here, or if we're better off expecting very few allocations and sticking to a vector here
348
        std::vector<buffer_allocation_state> allocations; // disjoint
349

350
        // Aggregates boxes that are required to be contiguous but have not yet been allocated in anticipate(). When performing the first (re)allocation on this
351
        // memory, allocate_contiguously() will extend its allocations to cover all anticipated_contiguous_boxes, then clear the vector.
352
        box_vector<3> anticipated_contiguous_boxes;
353

354
        // Track the total number of allocations made during the buffer's lifetime to heuristically detect and warn about frequent re-sizes and split allocations
355
        // that could have been merged with proper scheduler lookahead.
356
        size_t total_allocations_performed = 0;
357
        bool frequent_allocations_warning_emitted = false;
358

359
        const buffer_allocation_state& get_allocation(const allocation_id aid) const {
4,988✔
360
                const auto it = std::find_if(allocations.begin(), allocations.end(), [=](const buffer_allocation_state& a) { return a.aid == aid; });
11,034✔
361
                assert(it != allocations.end());
4,988✔
362
                return *it;
9,976✔
363
        }
364

365
        buffer_allocation_state& get_allocation(const allocation_id aid) { return const_cast<buffer_allocation_state&>(std::as_const(*this).get_allocation(aid)); }
4,988✔
366

367
        /// Returns the (unique) allocation covering `box` if such an allocation exists, otherwise nullptr.
368
        const buffer_allocation_state* find_contiguous_allocation(const box<3>& box) const {
19,623✔
369
                const auto it = std::find_if(allocations.begin(), allocations.end(), [&](const buffer_allocation_state& a) { return a.box.covers(box); });
39,528✔
370
                return it != allocations.end() ? &*it : nullptr;
39,246✔
371
        }
372

373
        buffer_allocation_state* find_contiguous_allocation(const box<3>& box) {
374
                return const_cast<buffer_allocation_state*>(std::as_const(*this).find_contiguous_allocation(box));
375
        }
376

377
        /// Returns the (unique) allocation covering `box`.
378
        const buffer_allocation_state& get_contiguous_allocation(const box<3>& box) const {
10,709✔
379
                const auto alloc = find_contiguous_allocation(box);
10,709✔
380
                assert(alloc != nullptr);
10,709✔
381
                return *alloc;
10,709✔
382
        }
383

384
        buffer_allocation_state& get_contiguous_allocation(const box<3>& box) {
6,494✔
385
                return const_cast<buffer_allocation_state&>(std::as_const(*this).get_contiguous_allocation(box));
6,494✔
386
        }
387

388
        bool is_allocated_contiguously(const box<3>& box) const { return find_contiguous_allocation(box) != nullptr; }
8,914✔
389

390
        /// Replace all tracked instructions that older than `epoch` with `epoch`.
391
        void apply_epoch(instruction* const epoch) {
4,364✔
392
                for(auto& alloc : allocations) {
6,223✔
393
                        alloc.apply_epoch(epoch);
1,859✔
394
                }
395
        }
4,364✔
396
};
397

398
/// State for a single buffer.
399
struct buffer_state {
400
        /// Tracks a pending non-reduction await-push that will be compiled into a receive_instructions as soon as a command reads from its region.
401
        struct region_receive {
402
                task_id consumer_tid;
403
                region<3> received_region;
404
                box_vector<3> required_contiguous_allocations;
405

406
                region_receive(const task_id consumer_tid, region<3> received_region, box_vector<3> required_contiguous_allocations)
343✔
407
                    : consumer_tid(consumer_tid), received_region(std::move(received_region)),
343✔
408
                      required_contiguous_allocations(std::move(required_contiguous_allocations)) {}
343✔
409
        };
410

411
        /// Tracks a pending reduction await-push, which will be compiled into gather_receive_instructions and reduce_instructions when read from.
412
        struct gather_receive {
413
                task_id consumer_tid;
414
                reduction_id rid;
415
                box<3> gather_box;
416

417
                gather_receive(const task_id consumer_tid, const reduction_id rid, const box<3> gather_box)
43✔
418
                    : consumer_tid(consumer_tid), rid(rid), gather_box(gather_box) {}
43✔
419
        };
420

421
        std::string debug_name;
422
        celerity::range<3> range;
423
        size_t elem_size;  ///< in bytes
424
        size_t elem_align; ///< in bytes
425

426
        /// Per-memory and per-allocation state of this buffer.
427
        dense_map<memory_id, buffer_memory_state> memories;
428

429
        /// Contains a mask for every memory_id that either was written to by the original-producer instruction or that has already been made coherent previously.
430
        region_map<memory_mask> up_to_date_memories;
431

432
        /// Tracks the instruction that initially produced each buffer element on the local node - this can be a kernel, host task, region-receive or
433
        /// reduce-instruction, or - in case of a host-initialized buffer - an epoch. It is different from `buffer_allocation_state::last_writers` in that it never
434
        /// contains copy instructions. Copy- and send source regions are split on their original producer instructions to facilitate computation-communication
435
        /// overlap when different producers finish at different times.
436
        region_map<instruction*> original_writers;
437

438
        /// Tracks the memory to which the original_writer instruction wrote each buffer element. `original_write_memories[box]` is meaningless when
439
        /// `up_to_date_memories[box]` is empty (i.e. the buffer is not up-to-date on the local node due to being uninitialized or await-pushed without being
440
        /// consumed).
441
        region_map<memory_id> original_write_memories;
442

443
        // We store pending receives (await push regions) in a vector instead of a region map since we must process their entire regions en-bloc rather than on
444
        // a per-element basis.
445
        std::vector<region_receive> pending_receives;
446
        std::vector<gather_receive> pending_gathers;
447

448
        explicit buffer_state(const celerity::range<3>& range, const size_t elem_size, const size_t elem_align, const size_t n_memories)
591✔
449
            : range(range), elem_size(elem_size), elem_align(elem_align), memories(n_memories), up_to_date_memories(range), original_writers(range),
591✔
450
              original_write_memories(range) {}
1,182✔
451

452
        void track_original_write(const region<3>& region, instruction* const instr, const memory_id mid) {
4,352✔
453
                original_writers.update_region(region, instr);
4,352✔
454
                original_write_memories.update_region(region, mid);
4,352✔
455
                up_to_date_memories.update_region(region, memory_mask().set(mid));
4,352✔
456
        }
4,352✔
457

458
        /// Replace all tracked instructions that are older than `epoch` with `epoch`.
459
        void apply_epoch(instruction* const epoch) {
802✔
460
                for(auto& memory : memories) {
5,166✔
461
                        memory.apply_epoch(epoch);
4,364✔
462
                }
463
                original_writers.apply_to_values([epoch](instruction* const instr) { return instr != nullptr && instr->get_id() < epoch->get_id() ? epoch : instr; });
2,459✔
464

465
                // This is an opportune point to verify that all await-pushes are fully consumed by the commands that require them.
466
                // TODO Assumes that the command graph generator issues await-pushes immediately before the commands that need them.
467
                assert(pending_receives.empty());
802✔
468
                assert(pending_gathers.empty());
802✔
469
        }
802✔
470
};
471

472
struct host_object_state {
473
        bool owns_instance; ///< If true, `destroy_host_object_instruction` will be emitted when `destroy_host_object` is called (false for `host_object<T&/void>`)
474
        instruction* last_side_effect; ///< Host tasks with side effects will be serialized wrt/ the host object.
475

476
        explicit host_object_state(const bool owns_instance, instruction* const last_epoch) : owns_instance(owns_instance), last_side_effect(last_epoch) {}
101✔
477

478
        /// If the last side-effect instruction was older than `epoch`, replaces it with `epoch`.
479
        void apply_epoch(instruction* const epoch) {
66✔
480
                if(last_side_effect != nullptr && last_side_effect->get_id() < epoch->get_id()) { last_side_effect = epoch; }
66!
481
        }
66✔
482
};
483

484
struct collective_group_state {
485
        /// Collective host tasks will be serialized wrt/ the collective group to ensure that the user can freely submit MPI collectives on their communicator.
486
        instruction* last_collective_operation;
487

488
        explicit collective_group_state(instruction* const last_host_task) : last_collective_operation(last_host_task) {}
463✔
489

490
        /// If the last host-task instruction was older than `epoch`, replaces it with `epoch`.
491
        void apply_epoch(instruction* const epoch) {
1,427✔
492
                if(last_collective_operation != nullptr && last_collective_operation->get_id() < epoch->get_id()) { last_collective_operation = epoch; }
1,427!
493
        }
1,427✔
494
};
495

496
/// Staging allocations are kept around in a pool, so size and last-use information needs to be maintained for them.
497
struct staging_allocation {
498
        allocation_id aid;
499
        size_t size_bytes;
500
        size_t align_bytes;
501
        access_front last_accesses; // once the access front has been collapsed to an effective epoch, we consider the allocation for re-use
502

503
        staging_allocation(const allocation_id aid, const size_t size_bytes, const size_t align_bytes, alloc_instruction* const alloc_instr)
105✔
504
            : aid(aid), size_bytes(size_bytes), align_bytes(align_bytes), last_accesses(alloc_instr, access_front::allocate) {}
105✔
505

506
        void apply_epoch(instruction* const epoch) { last_accesses = last_accesses.apply_epoch(epoch); }
313✔
507
};
508

509
/// `allocation_id`s are "namespaced" to their memory ID, so we maintain the next `raw_allocation_id` for each memory separately.
510
struct memory_state {
511
        raw_allocation_id next_raw_aid = 1; // 0 is reserved for null_allocation_id
512

513
        // On host memory, we maintain a pool of staging allocations for host-staged device-to-device copies.
514
        std::vector<staging_allocation> staging_allocation_pool;
515

516
        void apply_epoch(instruction* const epoch) {
8,216✔
517
                for(auto& alloc : staging_allocation_pool) {
8,529✔
518
                        alloc.apply_epoch(epoch);
313✔
519
                }
520
        }
8,216✔
521
};
522

523
/// We submit the set of instructions and pilots generated within a call to compile() en-bloc to relieve contention on the executor queue lock. To collect all
524
/// instructions that are generated in the call stack without polluting internal state, we pass a `batch&` output parameter to any function that transitively
525
/// generates instructions or pilots.
526
struct batch { // NOLINT(cppcoreguidelines-special-member-functions) (do not complain about the asserting destructor)
527
        std::vector<const instruction*> generated_instructions;
528
        std::vector<outbound_pilot> generated_pilots;
529

530
        /// The base priority of a batch adds to the priority per instruction type to transitively prioritize dependencies of important instructions.
531
        int base_priority = 0;
532

533
#ifndef NDEBUG
534
        ~batch() {
5,780✔
535
                if(std::uncaught_exceptions() == 0) { assert(generated_instructions.empty() && generated_pilots.empty() && "unflushed batch detected"); }
5,780!
536
        }
5,780✔
537
#endif
538
};
539

540
// We assign instruction priorities heuristically by deciding on a batch::base_priority at the beginning of a compile_* function and offsetting it by the
541
// instruction-type specific values defined here. This aims to perform low-latency submits of long-running instructions first to maximize overlap.
542
// clang-format off
543
template <typename Instruction> constexpr int instruction_type_priority = 0; // higher means more urgent
544
template <> constexpr int instruction_type_priority<free_instruction> = -1; // only free when forced to - nothing except an epoch or horizon will depend on this
545
template <> constexpr int instruction_type_priority<alloc_instruction> = 1; // allocations are synchronous and slow, so we postpone them as much as possible
546
template <> constexpr int instruction_type_priority<await_receive_instruction> = 2;
547
template <> constexpr int instruction_type_priority<split_receive_instruction> = 2;
548
template <> constexpr int instruction_type_priority<receive_instruction> = 2;
549
template <> constexpr int instruction_type_priority<send_instruction> = 2;
550
template <> constexpr int instruction_type_priority<fence_instruction> = 3;
551
template <> constexpr int instruction_type_priority<host_task_instruction> = 4; // we expect kernel launches to have low latency but comparatively long run time
552
template <> constexpr int instruction_type_priority<device_kernel_instruction> = 4;
553
template <> constexpr int instruction_type_priority<epoch_instruction> = 5; // epochs and horizons are low-latency and stop the task buffers from reaching capacity
554
template <> constexpr int instruction_type_priority<horizon_instruction> = 5;
555
template <> constexpr int instruction_type_priority<copy_instruction> = 6; // stalled device-to-device copies can block kernel execution on peer devices
556
// clang-format on
557

558
/// A chunk of a task's execution space that will be assigned to a device (or the host) and thus knows which memory its instructions will operate on.
559
struct localized_chunk {
560
        detail::memory_id memory_id = host_memory_id;
561
        std::optional<detail::device_id> device_id;
562
        box<3> execution_range;
563
};
564

565
/// Transient state for a node-local eager reduction that is emitted around kernels that explicitly include a reduction. Tracks the gather-allocation that is
566
/// created early to rescue the current buffer value across the kernel invocation in case the reduction is not `initialize_to_identity` and the command graph
567
/// designates the local node to be the reduction initializer.
568
struct local_reduction {
569
        bool include_local_buffer_value = false; ///< If true local node is the one to include its current version of the existing buffer in the reduction.
570
        size_t first_kernel_chunk_offset = 0;    ///< If the local reduction includes the current buffer value, we add an additional reduction-chunk at the front.
571
        size_t num_input_chunks = 1;             ///< One per participating local chunk, plus one if the local node includes the current buffer value.
572
        size_t chunk_size_bytes = 0;
573
        allocation_id gather_aid = null_allocation_id;
574
        alloc_instruction* gather_alloc_instr = nullptr;
575
};
576

577
/// Maps instruction DAG types to their record type.
578
template <typename Instruction>
579
using record_type_for_t = utils::type_switch_t<Instruction, clone_collective_group_instruction(clone_collective_group_instruction_record),
580
    alloc_instruction(alloc_instruction_record), free_instruction(free_instruction_record), copy_instruction(copy_instruction_record),
581
    device_kernel_instruction(device_kernel_instruction_record), host_task_instruction(host_task_instruction_record), send_instruction(send_instruction_record),
582
    receive_instruction(receive_instruction_record), split_receive_instruction(split_receive_instruction_record),
583
    await_receive_instruction(await_receive_instruction_record), gather_receive_instruction(gather_receive_instruction_record),
584
    fill_identity_instruction(fill_identity_instruction_record), reduce_instruction(reduce_instruction_record), fence_instruction(fence_instruction_record),
585
    destroy_host_object_instruction(destroy_host_object_instruction_record), horizon_instruction(horizon_instruction_record),
586
    epoch_instruction(epoch_instruction_record)>;
587

588
class generator_impl {
589
  public:
590
        generator_impl(size_t num_nodes, node_id local_nid, const system_info& system, instruction_graph& idag, instruction_graph_generator::delegate* const dlg,
591
            instruction_recorder* recorder, const instruction_graph_generator::policy_set& policy);
592

593
        void notify_buffer_created(buffer_id bid, const range<3>& range, size_t elem_size, size_t elem_align, allocation_id user_aid = null_allocation_id);
594
        void notify_buffer_debug_name_changed(buffer_id bid, const std::string& name);
595
        void notify_buffer_destroyed(buffer_id bid);
596
        void notify_host_object_created(host_object_id hoid, bool owns_instance);
597
        void notify_host_object_destroyed(host_object_id hoid);
598
        instruction_graph_generator::scheduling_hint anticipate(const command& cmd);
599
        void compile(const command& cmd);
600

601
  private:
602
        inline static const box<3> scalar_reduction_box{zeros, ones};
603

604
        // construction parameters (immutable)
605
        instruction_graph* m_idag;
606
        size_t m_num_nodes;
607
        node_id m_local_nid;
608
        system_info m_system;
609
        instruction_graph_generator::delegate* m_delegate;
610
        instruction_recorder* m_recorder;
611
        instruction_graph_generator::policy_set m_policy;
612

613
        instruction_id m_next_instruction_id = 0;
614
        message_id m_next_message_id = 0;
615

616
        instruction* m_last_horizon = nullptr;
617
        instruction* m_last_epoch = nullptr; // set once the initial epoch instruction is generated in the constructor
618

619
        /// The set of all instructions that are not yet depended upon by other instructions. These are collected by collapse_execution_front_to() as part of
620
        /// horizon / epoch generation.
621
        std::unordered_set<instruction_id> m_execution_front;
622

623
        dense_map<memory_id, memory_state> m_memories;
624
        std::unordered_map<buffer_id, buffer_state> m_buffers;
625
        std::unordered_map<host_object_id, host_object_state> m_host_objects;
626
        std::unordered_map<collective_group_id, collective_group_state> m_collective_groups;
627

628
        /// The instruction executor maintains a mapping of allocation_id -> USM pointer. For IDAG-managed memory, these entries are deleted after executing a
629
        /// `free_instruction`, but since user allocations are not deallocated by us, we notify the executor on each horizon or epoch via the `instruction_garbage`
630
        /// struct about entries that will no longer be used and can therefore be collected. We include user allocations for buffer fences immediately after
631
        /// emitting the fence, and buffer host-initialization user allocations after the buffer has been destroyed.
632
        std::vector<allocation_id> m_unreferenced_user_allocations;
633

634
        /// True if a recorder is present and create() will call the `record_with` lambda passed as its last parameter.
635
        bool is_recording() const { return m_recorder != nullptr; }
51,991✔
636

637
        allocation_id new_allocation_id(const memory_id mid);
638

639
        template <typename Instruction, typename... CtorParamsAndRecordWithFn, size_t... CtorParamIndices, size_t RecordWithFnIndex>
640
        Instruction* create_internal(batch& batch, const std::tuple<CtorParamsAndRecordWithFn...>& ctor_args_and_record_with,
641
            std::index_sequence<CtorParamIndices...> /* ctor_param_indices*/, std::index_sequence<RecordWithFnIndex> /* record_with_fn_index */);
642

643
        /// Create an instruction, insert it into the IDAG and the current execution front, and record it if a recorder is present.
644
        ///
645
        /// Invoke as
646
        /// ```
647
        /// create<instruction-type>(instruction-ctor-params...,
648
        ///         [&](const auto record_debug_info) { return record_debug_info(instruction-record-additional-ctor-params)})
649
        /// ```
650
        template <typename Instruction, typename... CtorParamsAndRecordWithFn>
651
        Instruction* create(batch& batch, CtorParamsAndRecordWithFn&&... ctor_args_and_record_with);
652

653
        message_id create_outbound_pilot(batch& batch, node_id target, const transfer_id& trid, const box<3>& box);
654

655
        /// Inserts a graph dependency and removes `to` form the execution front (if present). The `record_origin` is debug information.
656
        void add_dependency(instruction* const from, instruction* const to, const instruction_dependency_origin record_origin);
657

658
        void add_dependencies_on_access_front(
659
            instruction* const accessing_instruction, const access_front& front, const instruction_dependency_origin origin_for_read_write_front);
660

661
        void add_dependencies_on_last_writers(instruction* const accessing_instruction, buffer_allocation_state& allocation, const region<3>& region);
662

663
        /// Add dependencies to the last writer of a region, and track the instruction as the new last (concurrent) reader.
664
        void perform_concurrent_read_from_allocation(instruction* const reading_instruction, buffer_allocation_state& allocation, const region<3>& region);
665

666
        void add_dependencies_on_last_concurrent_accesses(instruction* const accessing_instruction, buffer_allocation_state& allocation, const region<3>& region,
667
            const instruction_dependency_origin origin_for_read_write_front);
668

669
        /// Add dependencies to the last concurrent accesses of a region, and track the instruction as the new last (unique) writer.
670
        void perform_atomic_write_to_allocation(instruction* const writing_instruction, buffer_allocation_state& allocation, const region<3>& region);
671

672
        /// Replace all tracked instructions that older than `epoch` with `epoch`.
673
        void apply_epoch(instruction* const epoch);
674

675
        /// Add dependencies from `horizon_or_epoch` to all instructions in `m_execution_front` and clear the set.
676
        void collapse_execution_front_to(instruction* const horizon_or_epoch);
677

678
        /// Create a new host allocation for copy staging, or re-use a cached staging allocation whose last access is older than the current epoch.
679
        staging_allocation& acquire_staging_allocation(batch& current_batch, memory_id mid, size_t size_bytes, size_t align_bytes);
680

681
        /// Free all cached staging allocations allocated so far.
682
        void free_all_staging_allocations(batch& current_batch);
683

684
        /// Ensure that all boxes in `required_contiguous_boxes` have a contiguous allocation on `mid`.
685
        /// Re-allocation of one buffer on one memory never interacts with other buffers or other memories backing the same buffer, this function can be called
686
        /// in any order of allocation requirements without generating additional dependencies.
687
        void allocate_contiguously(batch& batch, buffer_id bid, memory_id mid, box_vector<3>&& required_contiguous_boxes);
688

689
        /// Insert one or more receive instructions in order to fulfil a pending receive, making the received data available in host_memory_id. This may entail
690
        /// receiving a region that is larger than the union of all regions read.
691
        void commit_pending_region_receive_to_host_memory(
692
            batch& batch, buffer_id bid, const buffer_state::region_receive& receives, const std::vector<region<3>>& concurrent_reads);
693

694
        /// Insert coherence copy instructions where necessary to make all specified regions coherent on their respective memories. Requires the necessary
695
        /// allocations in `dest_mid` to already be present. We deliberately allow overlapping read-regions to avoid aggregated copies introducing synchronization
696
        /// points between otherwise independent instructions.
697
        void establish_coherence_between_buffer_memories(
698
            batch& current_batch, const buffer_id bid, dense_map<memory_id, std::vector<region<3>>>& concurrent_reads_from_memory);
699

700
        /// Issue instructions to create any collective group required by a task.
701
        void create_task_collective_groups(batch& command_batch, const task& tsk);
702

703
        /// Split a tasks local execution range (given by execution_command) into chunks according to device configuration and a possible oversubscription hint.
704
        std::vector<localized_chunk> split_task_execution_range(const execution_command& ecmd, const task& tsk);
705

706
        /// Detect overlapping writes between local chunks of a task and report it according to m_policy.
707
        void report_task_overlapping_writes(const task& tsk, const std::vector<localized_chunk>& concurrent_chunks) const;
708

709
        /// Allocate memory, apply any pending receives, and issue resize- and coherence copies to prepare all buffer memories for a task's execution.
710
        void satisfy_task_buffer_requirements(batch& batch, buffer_id bid, const task& tsk, const subrange<3>& local_execution_range, bool is_reduction_initializer,
711
            const std::vector<localized_chunk>& concurrent_chunks_after_split);
712

713
        /// Create a gather allocation and optionally save the current buffer value before creating partial reduction results in any kernel.
714
        local_reduction prepare_task_local_reduction(
715
            batch& command_batch, const reduction_info& rinfo, const execution_command& ecmd, const task& tsk, const size_t num_concurrent_chunks);
716

717
        /// Combine any partial reduction results computed by local chunks and write it to buffer host memory.
718
        void finish_task_local_reduction(batch& command_batch, const local_reduction& red, const reduction_info& rinfo, const execution_command& ecmd,
719
            const task& tsk, const std::vector<localized_chunk>& concurrent_chunks);
720

721
        /// Launch a device kernel for each local chunk of a task, passing the relevant buffer allocations in place of accessors and reduction descriptors.
722
        instruction* launch_task_kernel(batch& command_batch, const execution_command& ecmd, const task& tsk, const localized_chunk& chunk);
723

724
        /// Add dependencies for all buffer accesses and reductions of a task, then update tracking structures accordingly.
725
        void perform_task_buffer_accesses(
726
            const task& tsk, const std::vector<localized_chunk>& concurrent_chunks, const std::vector<instruction*>& command_instructions);
727

728
        /// If a task has side effects, serialize it with respect to the last task that shares a host object.
729
        void perform_task_side_effects(
730
            const task& tsk, const std::vector<localized_chunk>& concurrent_chunks, const std::vector<instruction*>& command_instructions);
731

732
        /// If a task is part of a collective group, serialize it with respect to the last host task in that group.
733
        void perform_task_collective_operations(
734
            const task& tsk, const std::vector<localized_chunk>& concurrent_chunks, const std::vector<instruction*>& command_instructions);
735

736
        void compile_execution_command(batch& batch, const execution_command& ecmd);
737
        void compile_push_command(batch& batch, const push_command& pcmd);
738
        void defer_await_push_command(const await_push_command& apcmd);
739
        void compile_reduction_command(batch& batch, const reduction_command& rcmd);
740
        void compile_fence_command(batch& batch, const fence_command& fcmd);
741
        void compile_horizon_command(batch& batch, const horizon_command& hcmd);
742
        void compile_epoch_command(batch& batch, const epoch_command& ecmd);
743

744
        /// Passes all instructions and outbound pilots that have been accumulated in `batch` to the delegate (if any). Called after compiling a command, creating
745
        /// or destroying a buffer or host object, and also in our constructor for the creation of the initial epoch.
746
        void flush_batch(batch&& batch);
747

748
        std::string print_buffer_debug_label(buffer_id bid) const;
749
};
750

751
generator_impl::generator_impl(const size_t num_nodes, const node_id local_nid, const system_info& system, instruction_graph& idag,
424✔
752
    instruction_graph_generator::delegate* const dlg, instruction_recorder* const recorder, const instruction_graph_generator::policy_set& policy)
424✔
753
    : m_idag(&idag), m_num_nodes(num_nodes), m_local_nid(local_nid), m_system(system), m_delegate(dlg), m_recorder(recorder), m_policy(policy),
424✔
754
      m_memories(m_system.memories.size()) //
424✔
755
{
756
#ifndef NDEBUG
757
        assert(m_system.memories.size() <= max_num_memories);
424✔
758
        assert(std::all_of(
1,255✔
759
            m_system.devices.begin(), m_system.devices.end(), [&](const device_info& device) { return device.native_memory < m_system.memories.size(); }));
760
        for(memory_id mid_a = 0; mid_a < m_system.memories.size(); ++mid_a) {
2,103✔
761
                assert(m_system.memories[mid_a].copy_peers[mid_a]);
1,679✔
762
                for(memory_id mid_b = mid_a + 1; mid_b < m_system.memories.size(); ++mid_b) {
4,484✔
763
                        assert(m_system.memories[mid_a].copy_peers[mid_b] == m_system.memories[mid_b].copy_peers[mid_a]
2,805✔
764
                               && "system_info::memories::copy_peers must be reflexive");
765
                }
766
        }
767
#endif
768
}
424✔
769

770
void generator_impl::notify_buffer_created(
591✔
771
    const buffer_id bid, const range<3>& range, const size_t elem_size, const size_t elem_align, allocation_id user_aid) //
772
{
773
        const auto [iter, inserted] =
1,182✔
774
            m_buffers.emplace(std::piecewise_construct, std::tuple(bid), std::tuple(range, elem_size, elem_align, m_system.memories.size()));
591✔
775
        assert(inserted);
591✔
776

777
        if(user_aid != null_allocation_id) {
591✔
778
                // The buffer was host-initialized through a user-specified pointer, which we consider a fully coherent allocation in user_memory_id.
779
                assert(user_aid.get_memory_id() == user_memory_id);
77✔
780
                const box entire_buffer = subrange({}, range);
231✔
781

782
                auto& buffer = iter->second;
77✔
783
                auto& memory = buffer.memories[user_memory_id];
77✔
784
                auto& allocation = memory.allocations.emplace_back(user_aid, nullptr /* alloc_instruction */, entire_buffer, buffer.range);
77✔
785

786
                allocation.track_atomic_write(entire_buffer, m_last_epoch);
77✔
787
                buffer.track_original_write(entire_buffer, m_last_epoch, user_memory_id);
77✔
788
        }
789
}
591✔
790

791
void generator_impl::notify_buffer_debug_name_changed(const buffer_id bid, const std::string& name) { m_buffers.at(bid).debug_name = name; }
23✔
792

793
void generator_impl::notify_buffer_destroyed(const buffer_id bid) {
581✔
794
        const auto iter = m_buffers.find(bid);
581✔
795
        assert(iter != m_buffers.end());
581✔
796
        auto& buffer = iter->second;
581✔
797

798
        batch free_batch;
581✔
799
        for(memory_id mid = 0; mid < buffer.memories.size(); ++mid) {
2,859✔
800
                auto& memory = buffer.memories[mid];
2,278✔
801
                if(mid == user_memory_id) {
2,278✔
802
                        // When the buffer is gone, we can also drop the user allocation from executor tracking (there currently are either 0 or 1 of them).
803
                        for(const auto& user_alloc : memory.allocations) {
658✔
804
                                m_unreferenced_user_allocations.push_back(user_alloc.aid);
77✔
805
                        }
806
                } else {
807
                        for(auto& allocation : memory.allocations) {
2,965✔
808
                                const auto free_instr = create<free_instruction>(free_batch, allocation.aid, [&](const auto& record_debug_info) {
1,268✔
809
                                        record_debug_info(allocation.box.get_area() * buffer.elem_size, buffer_allocation_record{bid, buffer.debug_name, allocation.box});
954✔
810
                                });
2,222!
811
                                add_dependencies_on_last_concurrent_accesses(free_instr, allocation, allocation.box, instruction_dependency_origin::allocation_lifetime);
1,268✔
812
                                // no need to modify the access front - we're removing the buffer altogether!
813
                        }
814
                }
815
        }
816
        flush_batch(std::move(free_batch));
581✔
817

818
        m_buffers.erase(iter);
581✔
819
}
1,162✔
820

821
void generator_impl::notify_host_object_created(const host_object_id hoid, const bool owns_instance) {
101✔
822
        assert(m_host_objects.count(hoid) == 0);
101✔
823
        m_host_objects.emplace(std::piecewise_construct, std::tuple(hoid), std::tuple(owns_instance, m_last_epoch));
101✔
824
        // The host object is created in "userspace" and no instructions need to be emitted.
825
}
101✔
826

827
void generator_impl::notify_host_object_destroyed(const host_object_id hoid) {
97✔
828
        const auto iter = m_host_objects.find(hoid);
97✔
829
        assert(iter != m_host_objects.end());
97✔
830
        auto& obj = iter->second;
97✔
831

832
        if(obj.owns_instance) { // this is false for host_object<T&> and host_object<void>
97✔
833
                batch destroy_batch;
93✔
834
                const auto destroy_instr = create<destroy_host_object_instruction>(destroy_batch, hoid, [](const auto& record_debug_info) { record_debug_info(); });
156✔
835
                add_dependency(destroy_instr, obj.last_side_effect, instruction_dependency_origin::side_effect);
93✔
836
                flush_batch(std::move(destroy_batch));
93✔
837
        }
93✔
838

839
        m_host_objects.erase(iter);
97✔
840
}
97✔
841

842
allocation_id generator_impl::new_allocation_id(const memory_id mid) {
1,537✔
843
        assert(mid < m_memories.size());
1,537✔
844
        assert(mid != user_memory_id && "user allocation ids are not managed by the instruction graph generator");
1,537✔
845
        return allocation_id(mid, m_memories[mid].next_raw_aid++);
1,537✔
846
}
847

848
template <typename Instruction, typename... CtorParamsAndRecordWithFn, size_t... CtorParamIndices, size_t RecordWithFnIndex>
849
Instruction* generator_impl::create_internal(batch& batch, const std::tuple<CtorParamsAndRecordWithFn...>& ctor_args_and_record_with,
13,139✔
850
    std::index_sequence<CtorParamIndices...> /* ctor_param_indices*/, std::index_sequence<RecordWithFnIndex> /* record_with_fn_index */) {
851
        const auto iid = m_next_instruction_id++;
13,139✔
852
        const auto priority = batch.base_priority + instruction_type_priority<Instruction>;
13,139✔
853
        auto unique_instr = std::make_unique<Instruction>(iid, priority, std::get<CtorParamIndices>(ctor_args_and_record_with)...);
13,139✔
854
        const auto instr = m_idag->retain_in_current_epoch(std::move(unique_instr));
13,139✔
855
        m_execution_front.insert(iid);
13,139✔
856
        batch.generated_instructions.push_back(instr);
13,139✔
857

858
        if(is_recording()) {
13,139✔
859
                const auto& record_with = std::get<RecordWithFnIndex>(ctor_args_and_record_with);
4,316✔
860
#ifndef NDEBUG
861
                bool recorded = false;
4,316✔
862
#endif
863
                record_with([&](auto&&... debug_info) {
12,948✔
864
                        m_recorder->record_instruction(
8,632✔
865
                            std::make_unique<record_type_for_t<Instruction>>(std::as_const(*instr), std::forward<decltype(debug_info)>(debug_info)...));
4,316✔
866
#ifndef NDEBUG
867
                        recorded = true;
4,316✔
868
#endif
869
                });
870
                assert(recorded && "record_debug_info() not called within recording functor");
4,316✔
871
        }
872

873
        return instr;
13,139✔
874
}
13,139✔
875

876
template <typename Instruction, typename... CtorParamsAndRecordWithFn>
877
Instruction* generator_impl::create(batch& batch, CtorParamsAndRecordWithFn&&... ctor_args_and_record_with) {
13,139✔
878
        constexpr auto n_args = sizeof...(CtorParamsAndRecordWithFn);
13,139✔
879
        static_assert(n_args > 0);
880
        return create_internal<Instruction>(batch, std::forward_as_tuple(std::forward<CtorParamsAndRecordWithFn>(ctor_args_and_record_with)...),
13,139✔
881
            std::make_index_sequence<n_args - 1>(), std::index_sequence<n_args - 1>());
26,278✔
882
}
883

884
message_id generator_impl::create_outbound_pilot(batch& current_batch, const node_id target, const transfer_id& trid, const box<3>& box) {
775✔
885
        // message ids (IDAG equivalent of MPI send/receive tags) tie send / receive instructions to their respective pilots.
886
        const message_id msgid = m_next_message_id++;
775✔
887
        const outbound_pilot pilot{target, pilot_message{msgid, trid, box}};
775✔
888
        current_batch.generated_pilots.push_back(pilot);
775✔
889
        if(is_recording()) { m_recorder->record_outbound_pilot(pilot); }
775✔
890
        return msgid;
1,550✔
891
}
892

893
void generator_impl::add_dependency(instruction* const from, instruction* const to, const instruction_dependency_origin record_origin) {
26,274✔
894
        from->add_dependency(to->get_id());
26,274✔
895
        if(is_recording()) { m_recorder->record_dependency(instruction_dependency_record(to->get_id(), from->get_id(), record_origin)); }
26,274✔
896
        m_execution_front.erase(to->get_id());
26,274✔
897
}
26,274✔
898

899
void generator_impl::add_dependencies_on_access_front(
19,752✔
900
    instruction* const accessing_instruction, const access_front& front, const instruction_dependency_origin origin_for_read_write_front) //
901
{
902
        const auto record_origin = front.get_mode() == access_front::allocate ? instruction_dependency_origin::allocation_lifetime : origin_for_read_write_front;
19,752✔
903
        for(const auto writer : front.get_instructions()) {
42,895✔
904
                add_dependency(accessing_instruction, writer, record_origin);
23,143✔
905
        }
906
}
19,752✔
907

908
void generator_impl::add_dependencies_on_last_writers(instruction* const accessing_instruction, buffer_allocation_state& allocation, const region<3>& region) {
8,133✔
909
        for(const auto& [box, front] : allocation.last_writers.get_region_values(region)) {
17,242✔
910
                add_dependencies_on_access_front(accessing_instruction, front, instruction_dependency_origin::read_from_allocation);
9,109✔
911
        }
8,133✔
912
}
8,133✔
913

914
void generator_impl::perform_concurrent_read_from_allocation(
3,531✔
915
    instruction* const reading_instruction, buffer_allocation_state& allocation, const region<3>& region) //
916
{
917
        add_dependencies_on_last_writers(reading_instruction, allocation, region);
3,531✔
918
        allocation.track_concurrent_read(region, reading_instruction);
3,531✔
919
}
3,531✔
920

921
void generator_impl::add_dependencies_on_last_concurrent_accesses(instruction* const accessing_instruction, buffer_allocation_state& allocation,
9,042✔
922
    const region<3>& region, const instruction_dependency_origin origin_for_read_write_front) //
923
{
924
        for(const auto& [box, front] : allocation.last_concurrent_accesses.get_region_values(region)) {
19,082✔
925
                add_dependencies_on_access_front(accessing_instruction, front, origin_for_read_write_front);
10,040✔
926
        }
9,042✔
927
}
9,042✔
928

929
void generator_impl::perform_atomic_write_to_allocation(instruction* const writing_instruction, buffer_allocation_state& allocation, const region<3>& region) {
3,067✔
930
        add_dependencies_on_last_concurrent_accesses(writing_instruction, allocation, region, instruction_dependency_origin::write_to_allocation);
3,067✔
931
        allocation.track_atomic_write(region, writing_instruction);
3,067✔
932
}
3,067✔
933

934
void generator_impl::apply_epoch(instruction* const epoch) {
1,769✔
935
        for(auto& memory : m_memories) {
9,985✔
936
                memory.apply_epoch(epoch);
8,216✔
937
        }
938
        for(auto& [_, buffer] : m_buffers) {
2,571✔
939
                buffer.apply_epoch(epoch);
802✔
940
        }
941
        for(auto& [_, host_object] : m_host_objects) {
1,835✔
942
                host_object.apply_epoch(epoch);
66✔
943
        }
944
        for(auto& [_, collective_group] : m_collective_groups) {
3,196✔
945
                collective_group.apply_epoch(epoch);
1,427✔
946
        }
947
        m_last_epoch = epoch;
1,769✔
948
}
1,769✔
949

950
void generator_impl::collapse_execution_front_to(instruction* const horizon_or_epoch) {
1,821✔
951
        for(const auto iid : m_execution_front) {
7,547✔
952
                if(iid == horizon_or_epoch->get_id()) continue;
5,726✔
953
                // we can't use instruction_graph_generator::add_dependency since it modifies the m_execution_front which we're iterating over here
954
                horizon_or_epoch->add_dependency(iid);
3,905✔
955
                if(is_recording()) {
3,905✔
956
                        m_recorder->record_dependency(instruction_dependency_record(iid, horizon_or_epoch->get_id(), instruction_dependency_origin::execution_front));
1,147✔
957
                }
958
        }
959
        m_execution_front.clear();
1,821✔
960
        m_execution_front.insert(horizon_or_epoch->get_id());
1,821✔
961
}
1,821✔
962

963
staging_allocation& generator_impl::acquire_staging_allocation(batch& current_batch, const memory_id mid, const size_t size_bytes, const size_t align_bytes) {
179✔
964
        assert(size_bytes % align_bytes == 0);
179✔
965
        auto& memory = m_memories[mid];
179✔
966

967
        for(auto& alloc : memory.staging_allocation_pool) {
499✔
968
                // We attempt to re-use allocations made with exactly the same parameters. Staging allocations are implicitly treated as free when their last use
969
                // disappears behind a new epoch. This guarantees that staging allocation re-use never introduces new dependencies in the graph.
970
                if(alloc.size_bytes == size_bytes && alloc.align_bytes == align_bytes && alloc.last_accesses.get_instructions().size() == 1
381!
971
                    && alloc.last_accesses.get_instructions().front() == m_last_epoch) {
775✔
972
                        return alloc; // last use of this allocation is already epoch-serialized with the current batch, so no new unintended serialization will occur
74✔
973
                }
974
        }
975

976
        const auto aid = new_allocation_id(mid);
105✔
977
        const auto alloc_instr = create<alloc_instruction>(current_batch, aid, size_bytes, align_bytes, //
210✔
978
            [&](const auto& record_debug_info) { record_debug_info(alloc_instruction_record::alloc_origin::staging, std::nullopt, std::nullopt); });
240✔
979
        add_dependency(alloc_instr, m_last_epoch, instruction_dependency_origin::last_epoch);
105✔
980
        memory.staging_allocation_pool.emplace_back(aid, size_bytes, align_bytes, alloc_instr);
105✔
981
        return memory.staging_allocation_pool.back();
105✔
982
}
983

984
void generator_impl::free_all_staging_allocations(batch& current_batch) {
414✔
985
        for(auto& memory : m_memories) {
2,057✔
986
                for(const auto& alloc : memory.staging_allocation_pool) {
1,748✔
987
                        const auto free_instr = create<free_instruction>(current_batch, alloc.aid, //
210✔
988
                            [&](const auto& record_debug_info) { record_debug_info(alloc.size_bytes, std::nullopt); });
135✔
989
                        add_dependencies_on_access_front(free_instr, alloc.last_accesses, instruction_dependency_origin::allocation_lifetime);
105✔
990
                }
991
                memory.staging_allocation_pool.clear();
1,643✔
992
        }
993
}
414✔
994

995
void generator_impl::allocate_contiguously(batch& current_batch, const buffer_id bid, const memory_id mid, box_vector<3>&& required_contiguous_boxes) {
13,478✔
996
        CELERITY_DETAIL_TRACY_ZONE_SCOPED("iggen::allocate", Teal);
997

998
        if(required_contiguous_boxes.empty()) return;
13,478✔
999

1000
        auto& buffer = m_buffers.at(bid);
4,632✔
1001
        auto& memory = buffer.memories[mid];
4,632✔
1002

1003
        assert(std::all_of(required_contiguous_boxes.begin(), required_contiguous_boxes.end(),
15,728✔
1004
            [&](const box<3>& box) { return !box.empty() && detail::box<3>::full_range(buffer.range).covers(box); }));
1005

1006
        if(std::all_of(required_contiguous_boxes.begin(), required_contiguous_boxes.end(), //
4,632✔
1007
               [&](const box<3>& box) { return memory.is_allocated_contiguously(box); })) {
5,225✔
1008
                return;
3,306✔
1009
        }
1010

1011
        // As soon as any allocation must be realized, allocate the full set of contiguous boxes anticipate()d up until this point
1012
        required_contiguous_boxes.insert(required_contiguous_boxes.end(), memory.anticipated_contiguous_boxes.begin(), memory.anticipated_contiguous_boxes.end());
1,326✔
1013
        memory.anticipated_contiguous_boxes.clear();
1,326✔
1014

1015
        // We currently only ever *grow* the allocation of buffers on each memory, which means that we must merge (re-size) existing allocations that overlap with
1016
        // but do not fully contain one of the required contiguous boxes. *Overlapping* here strictly means having a non-empty intersection; two allocations whose
1017
        // boxes merely touch can continue to co-exist
1018
        auto&& contiguous_boxes_after_realloc = std::move(required_contiguous_boxes);
1,326✔
1019
        for(auto& alloc : memory.allocations) {
1,990✔
1020
                contiguous_boxes_after_realloc.push_back(alloc.box);
664✔
1021
        }
1022
        merge_overlapping_bounding_boxes(contiguous_boxes_after_realloc);
1,326✔
1023

1024
        // Allocations that are now fully contained in (but not equal to) one of the newly contiguous bounding boxes will be freed at the end of the reallocation
1025
        // step, because we currently disallow overlapping allocations for simplicity. These will function as sources for resize-copies below.
1026
        const auto resize_from_begin = std::partition(memory.allocations.begin(), memory.allocations.end(), [&](const buffer_allocation_state& allocation) {
1,326✔
1027
                return std::find(contiguous_boxes_after_realloc.begin(), contiguous_boxes_after_realloc.end(), allocation.box) != contiguous_boxes_after_realloc.end();
664✔
1028
        });
1029
        const auto resize_from_end = memory.allocations.end();
1,326✔
1030

1031
        // Derive the set of new boxes to allocate by removing all existing boxes from the set of contiguous boxes.
1032
        auto&& new_alloc_boxes = std::move(contiguous_boxes_after_realloc);
1,326✔
1033
        const auto last_new_allocation = std::remove_if(new_alloc_boxes.begin(), new_alloc_boxes.end(),
1,326✔
1034
            [&](auto& box) { return std::any_of(memory.allocations.begin(), memory.allocations.end(), [&](auto& alloc) { return alloc.box == box; }); });
5,759✔
1035
        new_alloc_boxes.erase(last_new_allocation, new_alloc_boxes.end());
1,326✔
1036
        assert(!new_alloc_boxes.empty()); // otherwise we would have returned early
1,326✔
1037

1038
        // Opportunistically merge connected boxes to keep the number of allocations and the tracking overhead low. This will not introduce artificial
1039
        // synchronization points because resize-copies are still rooted on the original last-writers.
1040
        merge_connected_boxes(new_alloc_boxes);
1,326✔
1041

1042
        // We collect new allocations in a vector *separate* from memory.allocations as to not invalidate iterators (and to avoid resize-copying from them).
1043
        std::vector<buffer_allocation_state> new_allocations;
1,326✔
1044
        new_allocations.reserve(new_alloc_boxes.size());
1,326✔
1045

1046
        // Create new allocations and initialize them via resize-copies if necessary.
1047
        for(const auto& new_box : new_alloc_boxes) {
2,671✔
1048
                const auto aid = new_allocation_id(mid);
1,345✔
1049
                const auto alloc_instr =
1,345✔
1050
                    create<alloc_instruction>(current_batch, aid, new_box.get_area() * buffer.elem_size, buffer.elem_align, [&](const auto& record_debug_info) {
2,690✔
1051
                            record_debug_info(alloc_instruction_record::alloc_origin::buffer, buffer_allocation_record{bid, buffer.debug_name, new_box}, std::nullopt);
1,028✔
1052
                    });
3,718!
1053
                add_dependency(alloc_instr, m_last_epoch, instruction_dependency_origin::last_epoch);
1,345✔
1054

1055
                auto& new_alloc = new_allocations.emplace_back(aid, alloc_instr, new_box, buffer.range);
1,345✔
1056

1057
                // Since allocations don't overlap, we copy from those that are about to be freed
1058
                for(auto source_it = resize_from_begin; source_it != resize_from_end; ++source_it) {
1,420✔
1059
                        auto& resize_source_alloc = *source_it;
75✔
1060

1061
                        // Only copy those boxes to the new allocation that are still up-to-date in the old allocation. The caller of allocate_contiguously should remove
1062
                        // any region from up_to_date_memories that they intend to discard / overwrite immediately to avoid dead resize copies.
1063
                        // TODO investigate a garbage-collection heuristic that omits these copies if there are other up-to-date memories and we do not expect the region to
1064
                        // be read again on this memory.
1065
                        const auto full_copy_box = box_intersection(new_alloc.box, resize_source_alloc.box);
75✔
1066
                        if(full_copy_box.empty()) continue; // not every previous allocation necessarily intersects with every new allocation
75!
1067

1068
                        region_builder<3> live_copy_boxes;
75✔
1069
                        for(const auto& [copy_box, location] : buffer.up_to_date_memories.get_region_values(full_copy_box)) {
166✔
1070
                                if(location.test(mid)) { live_copy_boxes.add(copy_box); }
91✔
1071
                        }
75✔
1072
                        // even if allocations intersect, the entire intersection might be overwritten by the task that requested reallocation - in which case the caller
1073
                        // would have reset up_to_date_memories for the corresponding elements
1074
                        if(live_copy_boxes.empty()) continue;
75✔
1075

1076
                        const auto live_copy_region = std::move(live_copy_boxes).into_region();
73✔
1077
                        const auto copy_instr = create<copy_instruction>(current_batch, resize_source_alloc.aid, new_alloc.aid, strided_layout(resize_source_alloc.box),
73✔
1078
                            strided_layout(new_alloc.box), live_copy_region, buffer.elem_size,
146✔
1079
                            [&](const auto& record_debug_info) { record_debug_info(copy_instruction_record::copy_origin::resize, bid, buffer.debug_name); });
252✔
1080

1081
                        perform_concurrent_read_from_allocation(copy_instr, resize_source_alloc, live_copy_region);
73✔
1082
                        perform_atomic_write_to_allocation(copy_instr, new_alloc, live_copy_region);
73✔
1083
                }
75✔
1084
        }
1085

1086
        // Free old allocations now that all required resize-copies have been issued.
1087
        // TODO consider keeping old allocations around until their box is written to (or at least until the end of the current instruction batch) in order to
1088
        // resolve "buffer-locking" anti-dependencies
1089
        for(auto it = resize_from_begin; it != resize_from_end; ++it) {
1,401✔
1090
                auto& old_alloc = *it;
75✔
1091

1092
                const auto free_instr = create<free_instruction>(current_batch, old_alloc.aid, [&](const auto& record_debug_info) {
75✔
1093
                        record_debug_info(old_alloc.box.get_area() * buffer.elem_size, buffer_allocation_record{bid, buffer.debug_name, old_alloc.box});
70✔
1094
                });
145!
1095
                add_dependencies_on_last_concurrent_accesses(free_instr, old_alloc, old_alloc.box, instruction_dependency_origin::allocation_lifetime);
75✔
1096
        }
1097

1098
        // TODO garbage-collect allocations that are not up-to-date and not written to in this task
1099

1100
        memory.total_allocations_performed += new_allocations.size();
1,326✔
1101
        memory.allocations.erase(resize_from_begin, memory.allocations.end());
1,326✔
1102
        memory.allocations.insert(memory.allocations.end(), std::make_move_iterator(new_allocations.begin()), std::make_move_iterator(new_allocations.end()));
1,326✔
1103

1104
        // Heuristically detect excessive resizes, which should all be elided by scheduler lookahead (and iggen::anticipate()) in a well-behaved program.
1105
        if(!memory.frequent_allocations_warning_emitted && memory.total_allocations_performed >= 10) {
1,326✔
1106
                // This warning also covers all cases of the buffer_memory_state::allocations vector growing out of control, which quickly degrades scheduler
1107
                // performance in the current quadratic / cubic implementation of merge_overlapping_bounding_boxes / merge_connected_boxes.
1108
                CELERITY_WARN("Your program triggers frequent allocations or resizes for buffer {}, which may degrade performance. If possible, avoid "
2✔
1109
                              "celerity::queue::fence(), celerity::queue::wait() and celerity::experimental::flush() between command groups of growing access "
1110
                              "patterns, or try increasing scheduler lookahead via celerity::experimental::set_lookahead().",
1111
                    print_buffer_debug_label(bid));
1112
                memory.frequent_allocations_warning_emitted = true;
2✔
1113
        }
1114
}
1,326✔
1115

1116
void generator_impl::commit_pending_region_receive_to_host_memory(
343✔
1117
    batch& current_batch, const buffer_id bid, const buffer_state::region_receive& receive, const std::vector<region<3>>& concurrent_reads) //
1118
{
1119
        const auto trid = transfer_id(receive.consumer_tid, bid, no_reduction_id);
343✔
1120

1121
        // For simplicity of the initial IDAG implementation, we choose to receive directly into host-buffer allocations. This saves us from juggling
1122
        // staging-buffers, but comes at a price in performance since the communicator needs to linearize and de-linearize transfers from and to regions that have
1123
        // non-zero strides within their host allocation.
1124
        //
1125
        // TODO 1) maintain staging allocations and move (de-)linearization to the device in order to profit from higher memory bandwidths
1126
        //      2) explicitly support communicators that can send and receive directly to and from device memory (NVIDIA GPUDirect RDMA)
1127

1128
        auto& buffer = m_buffers.at(bid);
343✔
1129
        auto& host_memory = buffer.memories[host_memory_id];
343✔
1130

1131
        std::vector<buffer_allocation_state*> allocations;
343✔
1132
        for(const auto& min_contiguous_box : receive.required_contiguous_allocations) {
773✔
1133
                // The caller (aka satisfy_task_buffer_requirements) must ensure that all received boxes are allocated contiguously
1134
                auto& alloc = host_memory.get_contiguous_allocation(min_contiguous_box);
430✔
1135
                if(std::find(allocations.begin(), allocations.end(), &alloc) == allocations.end()) { allocations.push_back(&alloc); }
430✔
1136
        }
1137

1138
        for(const auto alloc : allocations) {
761✔
1139
                const auto region_received_into_alloc = region_intersection(alloc->box, receive.received_region);
418✔
1140
                std::vector<region<3>> independent_await_regions;
418✔
1141
                for(const auto& read_region : concurrent_reads) {
948✔
1142
                        const auto await_region = region_intersection(read_region, region_received_into_alloc);
530✔
1143
                        if(!await_region.empty()) { independent_await_regions.push_back(await_region); }
530✔
1144
                }
530✔
1145
                assert(!independent_await_regions.empty());
418✔
1146

1147
                // Ensure that receive-instructions inserted for concurrent readers are themselves concurrent.
1148
                symmetrically_split_overlapping_regions(independent_await_regions);
418✔
1149

1150
                if(independent_await_regions.size() > 1) {
418✔
1151
                        // If there are multiple concurrent readers requiring different parts of the received region, we emit independent await_receive_instructions so as
1152
                        // to not introduce artificial synchronization points (and facilitate computation-communication overlap). Since the (remote) sender might still
1153
                        // choose to perform the entire transfer en-bloc, we must inform the receive_arbiter of the target allocation and the full transfer region via a
1154
                        // split_receive_instruction.
1155
                        const auto split_recv_instr = create<split_receive_instruction>(current_batch, trid, region_received_into_alloc, alloc->aid, alloc->box,
60✔
1156
                            buffer.elem_size, [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
40✔
1157

1158
                        // We add dependencies to the split_receive_instruction as if it were a writer, but update the last_writers only at the await_receive_instruction.
1159
                        // The actual write happens somewhere in-between these instructions as orchestrated by the receive_arbiter, so no other access must depend on
1160
                        // split_receive_instruction directly.
1161
                        add_dependencies_on_last_concurrent_accesses(
30✔
1162
                            split_recv_instr, *alloc, region_received_into_alloc, instruction_dependency_origin::write_to_allocation);
1163

1164
                        for(const auto& await_region : independent_await_regions) {
113✔
1165
                                const auto await_instr = create<await_receive_instruction>(
166✔
1166
                                    current_batch, trid, await_region, [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
118✔
1167

1168
                                add_dependency(await_instr, split_recv_instr, instruction_dependency_origin::split_receive);
83✔
1169

1170
                                alloc->track_atomic_write(await_region, await_instr);
83✔
1171
                                buffer.original_writers.update_region(await_region, await_instr);
83✔
1172
                        }
1173
                } else {
1174
                        // A receive_instruction is equivalent to a spit_receive_instruction followed by a single await_receive_instruction, but (as the common case) has
1175
                        // less tracking overhead in the instruction graph.
1176
                        const auto recv_instr = create<receive_instruction>(current_batch, trid, region_received_into_alloc, alloc->aid, alloc->box, buffer.elem_size,
776✔
1177
                            [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
406✔
1178

1179
                        perform_atomic_write_to_allocation(recv_instr, *alloc, region_received_into_alloc);
388✔
1180
                        buffer.original_writers.update_region(region_received_into_alloc, recv_instr);
388✔
1181
                }
1182
        }
418✔
1183

1184
        buffer.original_write_memories.update_region(receive.received_region, host_memory_id);
343✔
1185
        buffer.up_to_date_memories.update_region(receive.received_region, memory_mask().set(host_memory_id));
343✔
1186
}
686✔
1187

1188
/// Multi-dimensional host <-> device copies become a severe performance bottleneck on Nvidia devices when the contiguous chunk size is small and the number of
1189
/// chunks is large (on the order of 1-10 MiB/s copy throughput vs. the available 25 GB/s host memory bandwidth). This function heuristically decides if it is
1190
/// beneficial to replace the nD host <-> device copy with an nD <-> 1d device (de)linearization step that enables a follow-up fast 1d host <-> device copy.
1191
bool should_linearize_copy_region(const memory_id alloc_mid, const box<3>& alloc_box, const region<3>& copy_region, const size_t elem_size) {
1,044✔
1192
        constexpr size_t max_linearized_region_bytes = 64 << 20; // 64 MiB - limit the device memory consumed for staging
1,044✔
1193
        constexpr size_t max_chunk_size_to_linearize = 64;
1,044✔
1194

1195
        if(alloc_mid < first_device_memory_id) return false;
1,044✔
1196
        if(copy_region.get_area() * elem_size > max_linearized_region_bytes) return false;
960✔
1197

1198
        size_t min_discontinuous_chunk_size_bytes = std::numeric_limits<size_t>::max();
940✔
1199
        for(const auto& copy_box : copy_region.get_boxes()) {
1,886✔
1200
                const auto linearization =
946✔
1201
                    layout_nd_copy(alloc_box.get_range(), copy_box.get_range(), copy_box.get_offset() - alloc_box.get_offset(), zeros, copy_box.get_range(), elem_size);
946✔
1202
                if(linearization.num_complex_strides > 0) {
946✔
1203
                        min_discontinuous_chunk_size_bytes = std::min(min_discontinuous_chunk_size_bytes, linearization.contiguous_size);
102✔
1204
                }
1205
        }
1206
        return min_discontinuous_chunk_size_bytes < max_chunk_size_to_linearize;
940✔
1207
}
1208

1209
void generator_impl::establish_coherence_between_buffer_memories(
3,347✔
1210
    batch& current_batch, const buffer_id bid, dense_map<memory_id, std::vector<region<3>>>& concurrent_reads_from_memory) //
1211
{
1212
        CELERITY_DETAIL_TRACY_ZONE_SCOPED("iggen::coherence", Red2);
1213

1214
        auto& buffer = m_buffers.at(bid);
3,347✔
1215

1216
        // (1) Examine what regions need to be copied between memories locally to satisfy all reads. Regions within `concurrent_reads_from_memory` are already split
1217
        // on consumer instructions, meaning that we will not introduce artificial dependencies between multiple parallel producer-consumer chains as long as we do
1218
        // not attempt to merge between those separate regions. We further split unsatisfied regions by original writer as well as writer and reader allocation
1219
        // boxes in order to produce copy regions that can be serviced by individual `copy_instruction`s.
1220

1221
        // In the fast path, regions can be copied directly from producer to consumer memory.
1222
        std::unordered_map<std::pair<memory_id, memory_id>, std::vector<region<3>>, utils::pair_hash> concurrent_direct_copies;
3,347✔
1223

1224
        // Some hardware setups require staging device-to-device copies through (pinned) host memory. Instead of keying by source and destination memories like for
1225
        // direct copies, we re-examine both in (3) to make sure we only copy to host once in case of a 1:n device-to-device broadcast.
1226
        std::unordered_map<memory_id, std::vector<region<3>>> concurrently_host_staged_copies;
3,347✔
1227

1228
        // Instead of planning / creating instructions directly, collect region vectors so (2) can remove overlaps with symmetrically_split_overlapping_regions.
1229
        for(memory_id dest_mid = 0; dest_mid < concurrent_reads_from_memory.size(); ++dest_mid) {
17,418✔
1230
                for(auto& dest_region : concurrent_reads_from_memory[dest_mid]) {
17,696✔
1231
                        // up_to_date_memories is a memory_mask, so regions that are up-to-date on one memory can still end up being enumerated as disjoint boxes.
1232
                        // We therefore merge them using a map keyed by original writers and their memories before constructing the final copy regions.
1233
                        std::unordered_map<std::pair<memory_id, instruction_id>, region_builder<3>, utils::pair_hash> source_boxes_by_writer;
3,625✔
1234
                        for(const auto& [box, up_to_date_mids] : buffer.up_to_date_memories.get_region_values(dest_region)) {
9,481✔
1235
                                if(up_to_date_mids.any() /* gracefully handle uninitialized read */ && !up_to_date_mids.test(dest_mid)) {
5,856✔
1236
                                        for(const auto& [source_memory_box, source_mid] : buffer.original_write_memories.get_region_values(box)) {
4,684✔
1237
                                                for(const auto& [source_memory_writer_box, original_writer] : buffer.original_writers.get_region_values(source_memory_box)) {
5,023✔
1238
                                                        source_boxes_by_writer[{source_mid, original_writer->get_id()}].add(source_memory_writer_box);
2,649✔
1239
                                                }
2,374✔
1240
                                        }
2,310✔
1241
                                }
1242
                        }
3,625✔
1243
                        for(auto& [source, source_builder] : source_boxes_by_writer) {
6,105✔
1244
                                const auto& [source_mid, _] = source;
2,480✔
1245
                                const auto source_region = std::move(source_builder).into_region();
2,480✔
1246

1247
                                for(auto& source_alloc : buffer.memories[source_mid].allocations) {
5,973✔
1248
                                        const auto source_alloc_region = region_intersection(source_region, source_alloc.box);
3,493✔
1249
                                        if(source_alloc_region.empty()) continue;
3,493✔
1250

1251
                                        for(auto& dest_alloc : buffer.memories[dest_mid].allocations) {
5,599✔
1252
                                                auto copy_region = region_intersection(source_alloc_region, dest_alloc.box);
3,119✔
1253
                                                if(copy_region.empty()) continue;
3,119✔
1254

1255
                                                if(m_system.memories[source_mid].copy_peers.test(dest_mid)) {
2,480✔
1256
                                                        concurrent_direct_copies[{source_mid, dest_mid}].push_back(std::move(copy_region));
1,974✔
1257
                                                } else {
1258
                                                        // TODO consider device-side linearization also for device-to-peer copies and device-to-host-buffer coherence copies
1259
                                                        concurrently_host_staged_copies[source_mid].push_back(std::move(copy_region));
506✔
1260
                                                }
1261
                                        }
3,119✔
1262
                                }
3,493✔
1263
                        }
2,480✔
1264
                }
3,625✔
1265
        }
1266

1267
        // (2) Plan an abstract tree of copy operations necessary to establish full coherence. Staged or source-linearized copies will manifest as proper
1268
        // instruction trees rather than chains in case of broadcast-like producer-consumer patterns. The explicit planning structure avoids the introduction of
1269
        // temporary region maps to track dependencies across staging allocations by exploiting the fact that (1) results in a full producer-consumer split, meaning
1270
        // that every read from a staging allocation is guaranteed to depend on exactly one writer.
1271

1272
        /// Abstract plan for satisfying all copies from a single buffer allocation and region.
1273
        struct copy_plan {
1274
                /// Data is strided in a persistent buffer (box) allocation.
1275
                struct in_buffer {
1276
                        allocation_id aid;
1277
                        explicit in_buffer(const allocation_id aid) : aid(aid) {}
4,988✔
1278
                };
1279

1280
                /// Data is linearized in a temporary staging allocation.
1281
                struct staged {
1282
                        memory_id mid = 0;
1283
                        size_t offset_bytes = 0;
1284
                        explicit staged(const memory_id mid, const size_t offset_bytes) : mid(mid), offset_bytes(offset_bytes) {}
498✔
1285
                };
1286

1287
                using location = std::variant<in_buffer, staged>;
1288

1289
                /// A node in the copy tree.
1290
                struct hop {
1291
                        copy_plan::location location;
1292
                        std::vector<hop> next;
1293

1294
                        hop(const copy_plan::location loc) : location(loc) {}
5,486✔
1295
                        hop& chain(const copy_plan::location& loc) & { return next.emplace_back(loc); }
3,017✔
1296
                };
1297

1298
                detail::region<3> region;
1299
                hop source;
1300

1301
                copy_plan(const detail::region<3>& region, const location& source) : region(region), source(source) {}
2,469✔
1302
                hop& chain(const copy_plan::location& loc) & { return source.chain(loc); }
1,983✔
1303
        };
1304

1305
        std::vector<copy_plan> planned_copies;
3,347✔
1306

1307
        // (2a) Plan all direct, non-staged copies. All such copies are concurrent.
1308

1309
        for(auto& [source_dest_mid, concurrent_copies] : concurrent_direct_copies) {
4,835✔
1310
                const auto [source_mid, dest_mid] = source_dest_mid;
1,488✔
1311
                assert(source_mid != dest_mid);
1,488✔
1312

1313
                symmetrically_split_overlapping_regions(concurrent_copies); // ensure copy regions are disjoint
1,488✔
1314

1315
                for(const auto& copy_region : concurrent_copies) {
3,471✔
1316
                        // We split by source / dest allocations above, so source / dest allocations are unique
1317
                        auto& source_alloc = buffer.memories[source_mid].get_contiguous_allocation(bounding_box(copy_region));
1,983✔
1318
                        auto& dest_alloc = buffer.memories[dest_mid].get_contiguous_allocation(bounding_box(copy_region));
1,983✔
1319

1320
                        planned_copies.emplace_back(copy_region, copy_plan::in_buffer(source_alloc.aid)).chain(copy_plan::in_buffer(dest_alloc.aid));
1,983✔
1321
                }
1322
        }
1323

1324
        // (2b) Plan host-staged copy instruction chains where necessary, and heuristically decide whether a strided source or destination should be (de)linearized
1325
        // on the device. Staged copies are all fully concurrent with direct copies from (2a).
1326

1327
        dense_map<memory_id, staging_allocation*> staging_allocs; // empty when concurrently_host_staged_copies.empty()
3,347✔
1328
        if(!concurrently_host_staged_copies.empty()) {
3,347✔
1329
                dense_map<memory_id, size_t> staging_allocation_sizes_bytes(m_memories.size());
171✔
1330
                const auto stage_alignment_bytes = std::lcm(buffer.elem_align, hardware_destructive_interference_size);
171✔
1331
                const auto get_region_size_bytes = [&](const region<3>& region) { return utils::ceil(region.get_area() * buffer.elem_size, stage_alignment_bytes); };
669✔
1332

1333
                for(auto& [source_mid, concurrent_regions] : concurrently_host_staged_copies) {
540✔
1334
                        symmetrically_split_overlapping_regions(concurrent_regions); // ensure copy regions are disjoint
369✔
1335

1336
                        for(const auto& region : concurrent_regions) { // iterations are independent
855✔
1337
                                // We split by source / dest allocations above, so source / dest allocations are unique
1338
                                auto& source_alloc = buffer.memories[source_mid].get_contiguous_allocation(bounding_box(region));
486✔
1339

1340
                                // Begin at the strided source buffer allocation
1341
                                auto stage_source_hop = &planned_copies.emplace_back(region, copy_plan::in_buffer(source_alloc.aid)).source;
486✔
1342

1343
                                if(should_linearize_copy_region(source_mid, source_alloc.box, region, buffer.elem_size)) {
486✔
1344
                                        // Add a linearized hop in source (device) memory
1345
                                        stage_source_hop = &stage_source_hop->chain(copy_plan::staged(source_mid, staging_allocation_sizes_bytes[source_mid]));
4✔
1346
                                        staging_allocation_sizes_bytes[source_mid] += get_region_size_bytes(region);
4✔
1347
                                }
1348

1349
                                // Add the linearized staging hop in host memory
1350
                                const auto host_stage_hop = &stage_source_hop->chain(copy_plan::staged(host_memory_id, staging_allocation_sizes_bytes[host_memory_id]));
486✔
1351
                                staging_allocation_sizes_bytes[host_memory_id] += get_region_size_bytes(region);
486✔
1352

1353
                                // There can be multiple destinations in case of a broadcast pattern
1354
                                for(memory_id dest_mid = first_device_memory_id; dest_mid < concurrent_reads_from_memory.size(); ++dest_mid) {
1,954✔
1355
                                        if(dest_mid == source_mid) continue;
1,468✔
1356

1357
                                        // The region will appear in concurrently_host_staged_copies if it is outdated on any destination in (1), so we need to query
1358
                                        // up_to_date_memories a second time to make sure we don't create unnecessary copies - a previous kernel might already have made some
1359
                                        // destination memories coherent, but not all.
1360
                                        const auto boxes_up_to_date = buffer.up_to_date_memories.get_region_values(region);
1,062✔
1361
                                        // Because up_to_date_memories maps to a memory_mask, we can end up with multiple boxes, but they must all agree on dest_mid.
1362
                                        assert(!boxes_up_to_date.empty() && std::all_of(boxes_up_to_date.begin(), boxes_up_to_date.end(), [&](const auto& box_and_mids) {
2,124✔
1363
                                                return box_and_mids.second.test(dest_mid) == boxes_up_to_date.front().second.test(dest_mid);
1364
                                        }));
1365
                                        if(boxes_up_to_date.front().second.test(dest_mid)) continue;
1,062✔
1366

1367
                                        for(const auto& dest_memory_region : concurrent_reads_from_memory[dest_mid]) {
2,111✔
1368
                                                if(region_intersection(dest_memory_region, region).empty()) continue;
1,050✔
1369
                                                assert(region_difference(region, dest_memory_region).empty()); // we split by dest_mid above
536✔
1370

1371
                                                auto& dest_alloc = buffer.memories[dest_mid].get_contiguous_allocation(bounding_box(region));
536✔
1372

1373
                                                auto unstage_source_hop = host_stage_hop;
536✔
1374
                                                if(should_linearize_copy_region(dest_mid, dest_alloc.box, region, buffer.elem_size)) {
536✔
1375
                                                        // Add a linearized hop in dest (device) memory
1376
                                                        unstage_source_hop = &host_stage_hop->chain(copy_plan::staged(dest_mid, staging_allocation_sizes_bytes[dest_mid]));
8✔
1377
                                                        staging_allocation_sizes_bytes[dest_mid] += get_region_size_bytes(region);
8✔
1378
                                                }
1379

1380
                                                // Finish the chain in the final strided dest buffer allocation
1381
                                                unstage_source_hop->chain(copy_plan::in_buffer{dest_alloc.aid});
536✔
1382
                                        }
1383
                                }
1,062✔
1384
                        }
1385
                }
1386

1387
                // Staging allocation sizes are now final, allocate
1388
                staging_allocs.resize(m_memories.size());
171✔
1389
                for(memory_id mid = 0; mid < m_memories.size(); ++mid) {
947✔
1390
                        if(staging_allocation_sizes_bytes[mid] == 0) continue;
776✔
1391
                        staging_allocs[mid] = &acquire_staging_allocation(current_batch, mid, staging_allocation_sizes_bytes[mid], stage_alignment_bytes);
179✔
1392
                }
1393
        }
171✔
1394

1395
        // (3) Recursively traverse each copy_plan to generate all copy instructions and their dependencies.
1396

1397
        using copy_location_metadata = std::tuple<allocation_id, buffer_allocation_state* /* optional */, region_layout>;
1398

1399
        // Looks up metadata from now-allocated buffers and staging space for use in execute_copy_plan_recursive
1400
        const auto get_copy_location_metadata = [&](const copy_plan::location& location) {
3,347✔
1401
                return matchbox::match<copy_location_metadata>(
1402
                    location,
1403
                    [&](const copy_plan::in_buffer& in_buffer) {
6,034✔
1404
                            auto& alloc = buffer.memories[in_buffer.aid.get_memory_id()].get_allocation(in_buffer.aid);
4,988✔
1405
                            return std::tuple(in_buffer.aid, &alloc, strided_layout(alloc.box));
4,988✔
1406
                    },
1407
                    [&](const copy_plan::staged& staged) {
6,034✔
1408
                            assert(staging_allocs[staged.mid] != nullptr);
1,046✔
1409
                            return std::tuple(staging_allocs[staged.mid]->aid, nullptr, linearized_layout(staged.offset_bytes));
1,046✔
1410
                    });
12,068✔
1411
        };
3,347✔
1412

1413
        // Tracks the full read-front for each used staging allocation while staging_allocation::last_accesses still points to the last instruction
1414
        // (= alloc_instruction or the last effective epoch) to avoid incorrectly chaining copies that read from the same allocation.
1415
        dense_map<memory_id, access_front> reads_from_staging_allocs(staging_allocs.size(), access_front(access_front::read));
3,347✔
1416

1417
        // Inserts one copy between two hops, and then recurses to complete the subtree of its destination.
1418
        // The lambda is passed into itself as the last generic parameter to permit recursion.
1419
        const auto execute_copy_plan_recursive = [&](const region<3>& region, const copy_plan::hop& source_hop, const copy_plan::hop& dest_hop,
3,347✔
1420
                                                     instruction* const source_copy_instr, const auto& execute_copy_plan_recursive) -> void {
1421
                const auto [dest_aid, dest_buffer_alloc, dest_layout] = get_copy_location_metadata(dest_hop.location);
3,017✔
1422
                const auto [source_aid, source_buffer_alloc, source_layout] = get_copy_location_metadata(source_hop.location);
3,017✔
1423

1424
                const auto copy_instr = create<copy_instruction>(current_batch, source_aid, dest_aid, source_layout, dest_layout, region, buffer.elem_size,
3,017✔
1425
                    [&, &source_layout = source_layout, source_aid = source_aid, dest_aid = dest_aid](const auto& record_debug_info) {
3,954✔
1426
                            using copy_origin = copy_instruction_record::copy_origin;
1427
                            const auto origin = source_aid.get_memory_id() == dest_aid.get_memory_id()
3,748✔
1428
                                                    ? (std::holds_alternative<strided_layout>(source_layout) ? copy_origin::linearizing : copy_origin::delinearizing)
3,736✔
1429
                                                    : (std::holds_alternative<copy_plan::staged>(dest_hop.location) ? copy_origin::staging : copy_origin::coherence);
925✔
1430
                            record_debug_info(origin, bid, buffer.debug_name);
937✔
1431
                    });
1432

1433
                if(source_buffer_alloc != nullptr) {
3,017✔
1434
                        perform_concurrent_read_from_allocation(copy_instr, *source_buffer_alloc, region);
2,469✔
1435
                } else /* source is staged */ {
1436
                        reads_from_staging_allocs[source_aid.get_memory_id()].add_instruction(copy_instr);
548✔
1437
                }
1438

1439
                if(source_copy_instr != nullptr) { add_dependency(copy_instr, source_copy_instr, instruction_dependency_origin::read_from_allocation); }
3,017✔
1440

1441
                if(dest_buffer_alloc != nullptr) {
3,017✔
1442
                        perform_atomic_write_to_allocation(copy_instr, *dest_buffer_alloc, region);
2,519✔
1443
                } else /* dest is staged */ {
1444
                        auto& stage = *staging_allocs[dest_aid.get_memory_id()];
498✔
1445
                        // ensure that copy_instr transitively depends on the alloc_instruction of the staging allocation
1446
                        add_dependencies_on_access_front(copy_instr, stage.last_accesses, instruction_dependency_origin::write_to_allocation);
498✔
1447
                }
1448

1449
                for(const auto& next_dest_hop : dest_hop.next) {
3,565✔
1450
                        execute_copy_plan_recursive(region, dest_hop, next_dest_hop, copy_instr, execute_copy_plan_recursive);
548✔
1451
                }
1452
        };
3,017✔
1453

1454
        // Create instructions for all planned copies
1455
        for(const auto& plan : planned_copies) {
5,816✔
1456
                for(const auto& dest_hop : plan.source.next) {
4,938✔
1457
                        execute_copy_plan_recursive(plan.region, plan.source, dest_hop, nullptr, execute_copy_plan_recursive);
2,469✔
1458
                }
1459
        }
1460

1461
        // Now that all copies have been created, update all staging allocation access fronts accordingly.
1462
        for(memory_id mid = 0; mid < staging_allocs.size(); ++mid) {
4,123✔
1463
                if(staging_allocs[mid] == nullptr) continue;
776✔
1464
                staging_allocs[mid]->last_accesses = std::move(reads_from_staging_allocs[mid]);
179✔
1465
        }
1466

1467
        // (4) Update buffer.up_to_date_memories en-bloc, regardless of which copy instructions were actually emitted.
1468

1469
        if(!concurrent_direct_copies.empty() || !concurrently_host_staged_copies.empty()) {
3,347✔
1470
                for(memory_id mid = 0; mid < concurrent_reads_from_memory.size(); ++mid) {
4,400✔
1471
                        for(const auto& region : concurrent_reads_from_memory[mid]) {
5,141✔
1472
                                for(auto& [box, location] : buffer.up_to_date_memories.get_region_values(region)) {
5,892✔
1473
                                        buffer.up_to_date_memories.update_box(box, memory_mask(location).set(mid));
4,088✔
1474
                                }
1,804✔
1475
                        }
1476
                }
1477
        }
1478
}
6,694✔
1479

1480
void generator_impl::create_task_collective_groups(batch& command_batch, const task& tsk) {
2,335✔
1481
        const auto cgid = tsk.get_collective_group_id();
2,335✔
1482
        if(cgid == non_collective_group_id) return;
2,335✔
1483
        if(m_collective_groups.count(cgid) != 0) return;
48✔
1484

1485
        // New collective groups are created by cloning the root collective group (aka MPI_COMM_WORLD).
1486
        auto& root_cg = m_collective_groups.at(root_collective_group_id);
39✔
1487
        const auto clone_cg_isntr = create<clone_collective_group_instruction>(
78✔
1488
            command_batch, root_collective_group_id, tsk.get_collective_group_id(), [](const auto& record_debug_info) { record_debug_info(); });
87✔
1489

1490
        m_collective_groups.emplace(cgid, clone_cg_isntr);
39✔
1491

1492
        // Cloning itself is a collective operation and must be serialized as such.
1493
        add_dependency(clone_cg_isntr, root_cg.last_collective_operation, instruction_dependency_origin::collective_group_order);
39✔
1494
        root_cg.last_collective_operation = clone_cg_isntr;
39✔
1495
}
1496

1497
std::vector<localized_chunk> generator_impl::split_task_execution_range(const execution_command& ecmd, const task& tsk) {
4,250✔
1498
        CELERITY_DETAIL_TRACY_ZONE_SCOPED("iggen::split_task", Maroon);
1499

1500
        if(tsk.get_execution_target() == execution_target::device && m_system.devices.empty()) { utils::panic("no device on which to execute device kernel"); }
4,250!
1501

1502
        const bool is_splittable_locally =
1503
            tsk.has_variable_split() && tsk.get_side_effect_map().empty() && tsk.get_collective_group_id() == non_collective_group_id;
4,250!
1504
        const auto split = tsk.get_hint<experimental::hints::split_2d>() != nullptr ? split_2d : split_1d;
4,250✔
1505

1506
        const auto command_sr = ecmd.get_execution_range();
4,250✔
1507
        const auto command_chunk = chunk<3>(command_sr.offset, command_sr.range, tsk.get_global_size());
4,250✔
1508

1509
        // As a heuristic to keep inter-device communication to a minimum, we split the execution range twice when oversubscription is active: Once to obtain
1510
        // contiguous chunks per device, and one more (below) to subdivide the ranges on each device (which can help with computation-communication overlap).
1511
        std::vector<chunk<3>> coarse_chunks;
4,250✔
1512
        if(is_splittable_locally && tsk.get_execution_target() == execution_target::device) {
4,250✔
1513
                coarse_chunks = split(command_chunk, tsk.get_granularity(), m_system.devices.size());
1,771✔
1514
        } else {
1515
                coarse_chunks = {command_chunk};
2,479✔
1516
        }
1517

1518
        size_t oversubscribe_factor = 1;
4,250✔
1519
        if(const auto oversubscribe = tsk.get_hint<experimental::hints::oversubscribe>(); oversubscribe != nullptr) {
4,250✔
1520
                // Our local reduction setup uses the normal per-device backing buffer allocation as the reduction output of each device. Since we can't track
1521
                // overlapping allocations at the moment, we have no way of oversubscribing reduction kernels without introducing a data race between multiple "fine
1522
                // chunks" on the final write. This could be solved by creating separate reduction-output allocations for each device chunk and not touching the
1523
                // actual buffer allocation. This is left as *future work* for a general overhaul of reductions.
1524
                if(is_splittable_locally && tsk.get_reductions().empty()) {
155✔
1525
                        oversubscribe_factor = oversubscribe->get_factor();
147✔
1526
                } else if(m_policy.unsafe_oversubscription_error != error_policy::ignore) {
8✔
1527
                        utils::report_error(m_policy.unsafe_oversubscription_error, "Refusing to oversubscribe {}{}.", print_task_debug_label(tsk),
12✔
1528
                            !tsk.get_reductions().empty()                              ? " because it performs a reduction"
11✔
1529
                            : !tsk.get_side_effect_map().empty()                       ? " because it has side effects"
5✔
1530
                            : tsk.get_collective_group_id() != non_collective_group_id ? " because it participates in a collective group"
7!
1531
                            : !tsk.has_variable_split()                                ? " because its iteration space cannot be split"
1!
1532
                                                                                       : "");
1533
                }
1534
        }
1535

1536
        // Split a second time (if oversubscribed) and assign native memory and devices (if the task is a device kernel).
1537
        std::vector<localized_chunk> concurrent_chunks;
4,246✔
1538
        for(size_t coarse_idx = 0; coarse_idx < coarse_chunks.size(); ++coarse_idx) {
9,579✔
1539
                for(const auto& fine_chunk : split(coarse_chunks[coarse_idx], tsk.get_granularity(), oversubscribe_factor)) {
11,278✔
1540
                        auto& localized_chunk = concurrent_chunks.emplace_back();
5,945✔
1541
                        localized_chunk.execution_range = box(subrange(fine_chunk.offset, fine_chunk.range));
5,945✔
1542
                        if(tsk.get_execution_target() == execution_target::device) {
5,945✔
1543
                                assert(coarse_idx < m_system.devices.size());
3,451✔
1544
                                localized_chunk.memory_id = m_system.devices[coarse_idx].native_memory;
3,451✔
1545
                                localized_chunk.device_id = device_id(coarse_idx);
3,451✔
1546
                        } else {
1547
                                localized_chunk.memory_id = host_memory_id;
2,494✔
1548
                        }
1549
                }
5,333✔
1550
        }
1551
        return concurrent_chunks;
8,492✔
1552
}
4,250✔
1553

1554
void generator_impl::report_task_overlapping_writes(const task& tsk, const std::vector<localized_chunk>& concurrent_chunks) const {
2,327✔
1555
        box_vector<3> concurrent_execution_ranges(concurrent_chunks.size(), box<3>());
16,289✔
1556
        std::transform(concurrent_chunks.begin(), concurrent_chunks.end(), concurrent_execution_ranges.begin(),
2,327✔
1557
            [](const localized_chunk& chunk) { return chunk.execution_range; });
3,282✔
1558

1559
        if(const auto overlapping_writes = detect_overlapping_writes(tsk, concurrent_execution_ranges); !overlapping_writes.empty()) {
2,327✔
1560
                auto error = fmt::format("{} has overlapping writes on N{} in", print_task_debug_label(tsk, true /* title case */), m_local_nid);
3✔
1561
                for(const auto& [bid, overlap] : overlapping_writes) {
6✔
1562
                        fmt::format_to(std::back_inserter(error), " {} {}", print_buffer_debug_label(bid), overlap);
3✔
1563
                }
1564
                error += ". Choose a non-overlapping range mapper for this write access or constrain the split via experimental::constrain_split to make the access "
1565
                         "non-overlapping.";
3✔
1566
                utils::report_error(m_policy.overlapping_write_error, "{}", error);
3✔
1567
        }
2,330✔
1568
}
4,652✔
1569

1570
void generator_impl::satisfy_task_buffer_requirements(batch& current_batch, const buffer_id bid, const task& tsk, const subrange<3>& local_execution_range,
2,715✔
1571
    const bool local_node_is_reduction_initializer, const std::vector<localized_chunk>& concurrent_chunks_after_split) //
1572
{
1573
        CELERITY_DETAIL_TRACY_ZONE_SCOPED("iggen::satisfy_buffer_requirements", ForestGreen);
1574

1575
        assert(!concurrent_chunks_after_split.empty());
2,715✔
1576

1577
        auto& buffer = m_buffers.at(bid);
2,715✔
1578

1579
        dense_map<memory_id, box_vector<3>> required_contiguous_allocations(m_memories.size());
2,715✔
1580

1581
        region_builder<3> accessed_boxes; // which elements are accessed (to figure out applying receives)
2,715✔
1582
        region_builder<3> consumed_boxes; // which elements are accessed with a consuming access (these need to be preserved across resizes)
2,715✔
1583

1584
        const auto& bam = tsk.get_buffer_access_map();
2,715✔
1585
        accessed_boxes.add(bam.compute_produced_region(bid, box<3>(local_execution_range)));
2,715✔
1586
        const auto reads = bam.compute_consumed_region(bid, box<3>(local_execution_range));
2,715✔
1587
        accessed_boxes.add(reads);
2,715✔
1588
        consumed_boxes.add(reads);
2,715✔
1589

1590
        // reductions can introduce buffer reads if they do not initialize_to_identity (but they cannot be split), so we evaluate them first
1591
        assert(std::count_if(tsk.get_reductions().begin(), tsk.get_reductions().end(), [=](const reduction_info& r) { return r.bid == bid; }) <= 1
2,877✔
1592
               && "task defines multiple reductions on the same buffer");
1593
        const auto reduction = std::find_if(tsk.get_reductions().begin(), tsk.get_reductions().end(), [=](const reduction_info& r) { return r.bid == bid; });
2,864✔
1594
        if(reduction != tsk.get_reductions().end()) {
2,715✔
1595
                for(const auto& chunk : concurrent_chunks_after_split) {
272✔
1596
                        required_contiguous_allocations[chunk.memory_id].push_back(scalar_reduction_box);
166✔
1597
                }
1598
                const auto include_current_value = local_node_is_reduction_initializer && reduction->init_from_buffer;
106!
1599
                if(concurrent_chunks_after_split.size() > 1 || include_current_value) {
106✔
1600
                        // We insert a host-side reduce-instruction in the multi-chunk scenario; its result will end up in the host buffer allocation.
1601
                        // If the user did not specify `initialize_to_identity`, we treat the existing buffer contents as an additional reduction chunk, so we can always
1602
                        // perform SYCL reductions with `initialize_to_identity` semantics.
1603
                        required_contiguous_allocations[host_memory_id].push_back(scalar_reduction_box);
45✔
1604
                }
1605
                accessed_boxes.add(scalar_reduction_box);
106✔
1606
                if(include_current_value) {
106✔
1607
                        // scalar_reduction_box will be copied into the local-reduction gather buffer ahead of the kernel instruction
1608
                        consumed_boxes.add(scalar_reduction_box);
11✔
1609
                }
1610
        }
1611

1612
        const auto accessed_region = std::move(accessed_boxes).into_region();
2,715✔
1613
        const auto consumed_region = std::move(consumed_boxes).into_region();
2,715✔
1614

1615
        // Boxes that are accessed but not consumed do not need to be preserved across resizes. This set operation is not equivalent to accumulating all
1616
        // non-consumer mode accesses above, since a kernel can have both a read_only and a discard_write access for the same buffer element, and Celerity must
1617
        // treat the overlap as-if it were a read_write access according to the SYCL spec.
1618
        // We maintain a box_vector here because we also add all received boxes, as these are overwritten by a recv_instruction before being read from the kernel.
1619
        region_builder<3> discarded_boxes;
2,715✔
1620
        discarded_boxes.add(region_difference(accessed_region, consumed_region));
2,715✔
1621

1622
        // Collect all pending receives (await-push commands) that we must apply before executing this task.
1623
        std::vector<buffer_state::region_receive> applied_receives;
2,715✔
1624
        {
1625
                const auto first_applied_receive = std::partition(buffer.pending_receives.begin(), buffer.pending_receives.end(),
2,715✔
1626
                    [&](const buffer_state::region_receive& r) { return region_intersection(consumed_region, r.received_region).empty(); });
343✔
1627
                const auto last_applied_receive = buffer.pending_receives.end();
2,715✔
1628
                for(auto it = first_applied_receive; it != last_applied_receive; ++it) {
3,058✔
1629
                        // we (re) allocate before receiving, but there's no need to preserve previous data at the receive location
1630
                        discarded_boxes.add(it->received_region);
343✔
1631
                        // split_receive_instruction needs contiguous allocations for the bounding boxes of potentially received fragments
1632
                        required_contiguous_allocations[host_memory_id].insert(
1,715✔
1633
                            required_contiguous_allocations[host_memory_id].end(), it->required_contiguous_allocations.begin(), it->required_contiguous_allocations.end());
1,372✔
1634
                }
1635

1636
                if(first_applied_receive != last_applied_receive) {
2,715✔
1637
                        applied_receives.assign(first_applied_receive, last_applied_receive);
343✔
1638
                        buffer.pending_receives.erase(first_applied_receive, last_applied_receive);
343✔
1639
                }
1640
        }
1641

1642
        if(reduction != tsk.get_reductions().end()) {
2,715✔
1643
                assert(std::all_of(buffer.pending_receives.begin(), buffer.pending_receives.end(), [&](const buffer_state::region_receive& r) {
106✔
1644
                        return region_intersection(r.received_region, scalar_reduction_box).empty();
1645
                }) && std::all_of(buffer.pending_gathers.begin(), buffer.pending_gathers.end(), [&](const buffer_state::gather_receive& r) {
1646
                        return box_intersection(r.gather_box, scalar_reduction_box).empty();
1647
                }) && "buffer has an unprocessed await-push into a region that is going to be used as a reduction output");
1648
        }
1649

1650
        const auto discarded_region = std::move(discarded_boxes).into_region();
2,715✔
1651

1652
        // Detect and report uninitialized reads
1653
        if(m_policy.uninitialized_read_error != error_policy::ignore) {
2,715✔
1654
                region_builder<3> uninitialized_reads;
561✔
1655
                const auto locally_required_region = region_difference(consumed_region, discarded_region);
561✔
1656
                for(const auto& [box, location] : buffer.up_to_date_memories.get_region_values(locally_required_region)) {
939✔
1657
                        if(!location.any()) { uninitialized_reads.add(box); }
378✔
1658
                }
561✔
1659
                if(!uninitialized_reads.empty()) {
561✔
1660
                        // Observing an uninitialized read that is not visible in the TDAG means we have a bug.
1661
                        utils::report_error(m_policy.uninitialized_read_error,
16✔
1662
                            "Instructions for {} are trying to read {} {}, which is neither found locally nor has been await-pushed before.", print_task_debug_label(tsk),
12✔
1663
                            print_buffer_debug_label(bid), std::move(uninitialized_reads).into_region());
24✔
1664
                }
1665
        }
565✔
1666

1667
        // Do not preserve any received or overwritten region across receives or buffer resizes later on: allocate_contiguously will insert resize-copy instructions
1668
        // for all up_to_date regions of allocations that it replaces with larger ones.
1669
        buffer.up_to_date_memories.update_region(discarded_region, memory_mask());
2,711✔
1670

1671
        // Collect chunk-reads by memory to establish local coherence later
1672
        dense_map<memory_id, std::vector<region<3>>> concurrent_reads_from_memory(m_memories.size());
2,711✔
1673
        for(const auto& chunk : concurrent_chunks_after_split) {
6,963✔
1674
                required_contiguous_allocations[chunk.memory_id].append(bam.compute_required_contiguous_boxes(bid, chunk.execution_range.get_subrange()));
4,252✔
1675

1676
                auto chunk_reads = bam.compute_consumed_region(bid, chunk.execution_range.get_subrange());
4,252✔
1677
                if(!chunk_reads.empty()) { concurrent_reads_from_memory[chunk.memory_id].emplace_back(std::move(chunk_reads)); }
4,252✔
1678
        }
4,252✔
1679
        if(local_node_is_reduction_initializer && reduction != tsk.get_reductions().end() && reduction->init_from_buffer) {
2,711!
1680
                concurrent_reads_from_memory[host_memory_id].emplace_back(scalar_reduction_box);
10✔
1681
        }
1682

1683
        // Now that we know all required contiguous allocations, issue any required alloc- and resize-copy instructions
1684
        for(memory_id mid = 0; mid < required_contiguous_allocations.size(); ++mid) {
15,510✔
1685
                allocate_contiguously(current_batch, bid, mid, std::move(required_contiguous_allocations[mid]));
12,799✔
1686
        }
1687

1688
        // Receive all remote data (which overlaps with the accessed region) into host memory
1689
        std::vector<region<3>> all_concurrent_reads;
2,711✔
1690
        for(const auto& reads : concurrent_reads_from_memory) {
15,510✔
1691
                all_concurrent_reads.insert(all_concurrent_reads.end(), reads.begin(), reads.end());
12,799✔
1692
        }
1693
        for(const auto& receive : applied_receives) {
3,054✔
1694
                commit_pending_region_receive_to_host_memory(current_batch, bid, receive, all_concurrent_reads);
343✔
1695
        }
1696

1697
        // Create the necessary coherence copy instructions to satisfy all remaining requirements locally.
1698
        establish_coherence_between_buffer_memories(current_batch, bid, concurrent_reads_from_memory);
2,711✔
1699
}
5,458✔
1700

1701
local_reduction generator_impl::prepare_task_local_reduction(
105✔
1702
    batch& command_batch, const reduction_info& rinfo, const execution_command& ecmd, const task& tsk, const size_t num_concurrent_chunks) //
1703
{
1704
        const auto [rid_, bid_, reduction_task_includes_buffer_value] = rinfo;
105✔
1705
        const auto bid = bid_; // allow capturing in lambda
105✔
1706

1707
        auto& buffer = m_buffers.at(bid);
105✔
1708

1709
        local_reduction red;
105✔
1710
        red.include_local_buffer_value = reduction_task_includes_buffer_value && ecmd.is_reduction_initializer();
105✔
1711
        red.first_kernel_chunk_offset = red.include_local_buffer_value ? 1 : 0;
105✔
1712
        red.num_input_chunks = red.first_kernel_chunk_offset + num_concurrent_chunks;
105✔
1713
        red.chunk_size_bytes = scalar_reduction_box.get_area() * buffer.elem_size;
105✔
1714

1715
        // If the reduction only has a single local contribution, we simply accept it as the fully-reduced final value without issuing additional instructions.
1716
        // A local_reduction with num_input_chunks == 1 is treated as a no-op by `finish_task_local_reduction`.
1717
        assert(red.num_input_chunks > 0);
105✔
1718
        if(red.num_input_chunks == 1) return red;
105✔
1719

1720
        // Create a gather-allocation into which `finish_task_local_reduction` will copy each new partial result, and if the reduction is not
1721
        // initialize_to_identity, we copy the current buffer value before it is being overwritten by the kernel.
1722
        red.gather_aid = new_allocation_id(host_memory_id);
44✔
1723
        red.gather_alloc_instr = create<alloc_instruction>(
44✔
1724
            command_batch, red.gather_aid, red.num_input_chunks * red.chunk_size_bytes, buffer.elem_align, [&](const auto& record_debug_info) {
44✔
1725
                    record_debug_info(
18✔
1726
                        alloc_instruction_record::alloc_origin::gather, buffer_allocation_record{bid, buffer.debug_name, scalar_reduction_box}, red.num_input_chunks);
36✔
1727
            });
36!
1728
        add_dependency(red.gather_alloc_instr, m_last_epoch, instruction_dependency_origin::last_epoch);
44✔
1729

1730
        /// Normally, there is one _reduction chunk_ per _kernel chunk_, unless the local node is the designated reduction initializer and the reduction is not
1731
        /// `initialize_to_identity`, in which case we add an additional _reduction chunk_ for the current buffer value and insert it in the first position of the
1732
        /// local gather allocation.
1733
        if(red.include_local_buffer_value) {
44✔
1734
                // The source host allocation is already provided by satisfy_task_buffer_requirements
1735
                auto& source_allocation = buffer.memories[host_memory_id].get_contiguous_allocation(scalar_reduction_box);
10✔
1736
                const size_t dest_offset_bytes = 0; // initial value is the first entry
10✔
1737

1738
                // copy to local gather space
1739
                const auto current_value_copy_instr = create<copy_instruction>(command_batch, source_allocation.aid, red.gather_aid,
10✔
1740
                    strided_layout(source_allocation.box), linearized_layout(dest_offset_bytes), scalar_reduction_box, buffer.elem_size,
30✔
1741
                    [&](const auto& record_debug_info) { record_debug_info(copy_instruction_record::copy_origin::gather, bid, buffer.debug_name); });
33✔
1742

1743
                add_dependency(current_value_copy_instr, red.gather_alloc_instr, instruction_dependency_origin::allocation_lifetime);
10✔
1744
                perform_concurrent_read_from_allocation(current_value_copy_instr, source_allocation, scalar_reduction_box);
10✔
1745
        }
1746
        return red;
44✔
1747
}
1748

1749
void generator_impl::finish_task_local_reduction(batch& command_batch, const local_reduction& red, const reduction_info& rinfo, const execution_command& ecmd,
105✔
1750
    const task& tsk,
1751
    const std::vector<localized_chunk>& concurrent_chunks) //
1752
{
1753
        // If the reduction only has a single contribution, its write is already the final result and does not need to be reduced.
1754
        if(red.num_input_chunks == 1) return;
105✔
1755

1756
        const auto [rid, bid_, reduction_task_includes_buffer_value] = rinfo;
44✔
1757
        const auto bid = bid_; // allow capturing in lambda
44✔
1758

1759
        auto& buffer = m_buffers.at(bid);
44✔
1760
        auto& host_memory = buffer.memories[host_memory_id];
44✔
1761

1762
        // prepare_task_local_reduction has allocated gather space which preserves the current buffer value when the reduction does not initialize_to_identity
1763
        std::vector<copy_instruction*> gather_copy_instrs;
44✔
1764
        gather_copy_instrs.reserve(concurrent_chunks.size());
44✔
1765
        for(size_t j = 0; j < concurrent_chunks.size(); ++j) {
147✔
1766
                const auto source_mid = concurrent_chunks[j].memory_id;
103✔
1767
                auto& source_allocation = buffer.memories[source_mid].get_contiguous_allocation(scalar_reduction_box);
103✔
1768

1769
                // Copy local partial result to gather space
1770
                const auto copy_instr = create<copy_instruction>(command_batch, source_allocation.aid, red.gather_aid, strided_layout(source_allocation.box),
206✔
1771
                    linearized_layout((red.first_kernel_chunk_offset + j) * buffer.elem_size /* offset */), scalar_reduction_box, buffer.elem_size,
206✔
1772
                    [&](const auto& record_debug_info) { record_debug_info(copy_instruction_record::copy_origin::gather, bid, buffer.debug_name); });
455✔
1773

1774
                add_dependency(copy_instr, red.gather_alloc_instr, instruction_dependency_origin::allocation_lifetime);
103✔
1775
                perform_concurrent_read_from_allocation(copy_instr, source_allocation, scalar_reduction_box);
103✔
1776

1777
                gather_copy_instrs.push_back(copy_instr);
103✔
1778
        }
1779

1780
        // Insert a local reduce_instruction which reads from the gather buffer and writes to the host-buffer allocation for `scalar_reduction_box`.
1781
        auto& dest_allocation = host_memory.get_contiguous_allocation(scalar_reduction_box);
44✔
1782
        const auto reduce_instr =
1783
            create<reduce_instruction>(command_batch, rid, red.gather_aid, red.num_input_chunks, dest_allocation.aid, [&](const auto& record_debug_info) {
44✔
1784
                    record_debug_info(std::nullopt, bid, buffer.debug_name, scalar_reduction_box, reduce_instruction_record::reduction_scope::local);
18✔
1785
            });
62✔
1786

1787
        for(auto& copy_instr : gather_copy_instrs) {
147✔
1788
                add_dependency(reduce_instr, copy_instr, instruction_dependency_origin::read_from_allocation);
103✔
1789
        }
1790
        perform_atomic_write_to_allocation(reduce_instr, dest_allocation, scalar_reduction_box);
44✔
1791
        buffer.track_original_write(scalar_reduction_box, reduce_instr, host_memory_id);
44✔
1792

1793
        // Free the gather allocation created in `prepare_task_local_reduction`.
1794
        const auto gather_free_instr = create<free_instruction>(
88✔
1795
            command_batch, red.gather_aid, [&](const auto& record_debug_info) { record_debug_info(red.num_input_chunks * red.chunk_size_bytes, std::nullopt); });
62✔
1796
        add_dependency(gather_free_instr, reduce_instr, instruction_dependency_origin::allocation_lifetime);
44✔
1797
}
44✔
1798

1799
instruction* generator_impl::launch_task_kernel(batch& command_batch, const execution_command& ecmd, const task& tsk, const localized_chunk& chunk) {
3,276✔
1800
        CELERITY_DETAIL_TRACY_ZONE_SCOPED("iggen::launch_kernel", Blue2);
1801

1802
        const auto& bam = tsk.get_buffer_access_map();
3,276✔
1803

1804
        buffer_access_allocation_map allocation_map(bam.get_num_accesses());
9,828✔
1805
        buffer_access_allocation_map reduction_map(tsk.get_reductions().size());
9,828✔
1806
        size_t global_memory_access_estimate_bytes = 0;
3,276✔
1807

1808
        std::vector<buffer_memory_record> buffer_memory_access_map;       // if is_recording()
3,276✔
1809
        std::vector<buffer_reduction_record> buffer_memory_reduction_map; // if is_recording()
3,276✔
1810
        if(is_recording()) {
3,276✔
1811
                buffer_memory_access_map.resize(bam.get_num_accesses());
1,239✔
1812
                buffer_memory_reduction_map.resize(tsk.get_reductions().size());
1,239✔
1813
        }
1814

1815
        // map buffer accesses (hydration_ids) to allocations in chunk-memory
1816
        for(size_t i = 0; i < bam.get_num_accesses(); ++i) {
7,348✔
1817
                const auto [bid, mode] = bam.get_nth_access(i);
4,072✔
1818
                const auto& buffer = m_buffers.at(bid);
4,072✔
1819
                const auto accessed_region = bam.get_requirements_for_nth_access(i, chunk.execution_range.get_subrange());
4,072✔
1820
                if(!accessed_region.empty()) {
4,072✔
1821
                        const auto accessed_bounding_box = bounding_box(accessed_region);
4,051✔
1822
                        const auto& alloc = buffer.memories[chunk.memory_id].get_contiguous_allocation(accessed_bounding_box);
4,051✔
1823
                        allocation_map[i] = {alloc.aid, alloc.box, accessed_bounding_box CELERITY_DETAIL_IF_ACCESSOR_BOUNDARY_CHECK(, bid, buffer.debug_name)};
8,102✔
1824
                } else {
1825
                        allocation_map[i] = buffer_access_allocation{null_allocation_id, {}, {} CELERITY_DETAIL_IF_ACCESSOR_BOUNDARY_CHECK(, bid, buffer.debug_name)};
21✔
1826
                }
1827
                global_memory_access_estimate_bytes +=
4,072✔
1828
                    (static_cast<size_t>(access::mode_traits::is_producer(mode)) + static_cast<size_t>(access::mode_traits::is_consumer(mode)))
4,072✔
1829
                    * accessed_region.get_area() * buffer.elem_size;
4,072✔
1830
                if(is_recording()) { buffer_memory_access_map[i] = buffer_memory_record{bid, buffer.debug_name, accessed_region}; }
4,072!
1831
        }
4,072✔
1832

1833
        // map reduction outputs to allocations in chunk-memory
1834
        for(size_t i = 0; i < tsk.get_reductions().size(); ++i) {
3,440✔
1835
                const auto& rinfo = tsk.get_reductions()[i];
164✔
1836
                const auto& buffer = m_buffers.at(rinfo.bid);
164✔
1837
                const auto& alloc = buffer.memories[chunk.memory_id].get_contiguous_allocation(scalar_reduction_box);
164✔
1838
                reduction_map[i] = {alloc.aid, alloc.box, scalar_reduction_box CELERITY_DETAIL_IF_ACCESSOR_BOUNDARY_CHECK(, rinfo.bid, buffer.debug_name)};
328✔
1839
                global_memory_access_estimate_bytes += chunk.execution_range.get_area() * buffer.elem_size;
164✔
1840
                if(is_recording()) { buffer_memory_reduction_map[i] = buffer_reduction_record{rinfo.bid, buffer.debug_name, rinfo.rid}; }
164✔
1841
        }
1842

1843
        if(tsk.get_execution_target() == execution_target::device) {
3,276✔
1844
                assert(chunk.execution_range.get_area() > 0);
1,966✔
1845
                assert(chunk.device_id.has_value());
1,966✔
1846
                return create<device_kernel_instruction>(command_batch, *chunk.device_id, tsk.get_launcher<device_kernel_launcher>(), chunk.execution_range,
1,966✔
1847
                    std::move(allocation_map), std::move(reduction_map),
1,966✔
1848
                    global_memory_access_estimate_bytes //
1849
                        CELERITY_DETAIL_IF_ACCESSOR_BOUNDARY_CHECK(, tsk.get_type(), tsk.get_id(), tsk.get_debug_name()),
5,898✔
1850
                    [&](const auto& record_debug_info) {
3,932✔
1851
                            record_debug_info(ecmd.get_task()->get_id(), ecmd.get_id(), tsk.get_debug_name(), buffer_memory_access_map, buffer_memory_reduction_map);
1,107✔
1852
                    });
8,971✔
1853
        } else {
1854
                assert(tsk.get_execution_target() == execution_target::host);
1,310✔
1855
                assert(chunk.memory_id == host_memory_id);
1,310✔
1856
                assert(reduction_map.empty());
1,310✔
1857
                // We ignore global_memory_access_estimate_bytes for host tasks because they are typically limited by I/O instead
1858
                return create<host_task_instruction>(command_batch, tsk.get_launcher<host_task_launcher>(), chunk.execution_range, tsk.get_global_size(),
2,620✔
1859
                    std::move(allocation_map),
1,310✔
1860
                    tsk.get_collective_group_id() //
2,620✔
1861
                    CELERITY_DETAIL_IF_ACCESSOR_BOUNDARY_CHECK(, tsk.get_type(), tsk.get_id(), tsk.get_debug_name()),
3,930✔
1862
                    [&](const auto& record_debug_info) {
2,620✔
1863
                            record_debug_info(ecmd.get_task()->get_id(), ecmd.get_id(), tsk.get_debug_name(), buffer_memory_access_map);
132✔
1864
                    });
7,992✔
1865
        }
1866
}
7,491✔
1867

1868
void generator_impl::perform_task_buffer_accesses(
2,325✔
1869
    const task& tsk, const std::vector<localized_chunk>& concurrent_chunks, const std::vector<instruction*>& command_instructions) //
1870
{
1871
        CELERITY_DETAIL_TRACY_ZONE_SCOPED("iggen::perform_buffer_access", Red3);
1872

1873
        const auto& bam = tsk.get_buffer_access_map();
2,325✔
1874
        if(bam.get_num_accesses() == 0 && tsk.get_reductions().empty()) return;
2,325✔
1875

1876
        // 1. Collect the read-sets and write-sets of all concurrent chunks on all buffers (TODO this is what buffer_access_map should actually return)
1877

1878
        struct read_write_sets {
1879
                region<3> reads;
1880
                region<3> writes;
1881
        };
1882

1883
        std::vector<std::unordered_map<buffer_id, read_write_sets>> concurrent_read_write_sets(concurrent_chunks.size());
6,612✔
1884

1885
        for(const auto bid : bam.get_accessed_buffers()) {
4,746✔
1886
                for(size_t i = 0; i < concurrent_chunks.size(); ++i) {
6,566✔
1887
                        const auto sr = concurrent_chunks[i].execution_range.get_subrange();
4,024✔
1888
                        read_write_sets rw{bam.compute_consumed_region(bid, sr), bam.compute_produced_region(bid, sr)};
4,024!
1889
                        concurrent_read_write_sets[i].emplace(bid, std::move(rw));
4,024✔
1890
                }
4,024✔
1891
        }
1892

1893
        for(const auto& rinfo : tsk.get_reductions()) {
2,309✔
1894
                for(size_t i = 0; i < concurrent_chunks.size(); ++i) {
269✔
1895
                        auto& rw_map = concurrent_read_write_sets[i][rinfo.bid]; // allow default-insert on `bid`
164✔
1896
                        rw_map.writes = region_union(rw_map.writes, scalar_reduction_box);
164✔
1897
                }
1898
        }
1899

1900
        // 2. Insert all true-dependencies for reads and anti-dependencies for writes. We do this en-bloc instead of using `perform_concurrent_read_from_allocation`
1901
        // or `perform_atomic_write_to_allocation` to avoid incorrect dependencies between our concurrent chunks by updating tracking structures too early.
1902

1903
        for(size_t i = 0; i < concurrent_chunks.size(); ++i) {
5,351✔
1904
                for(const auto& [bid, rw] : concurrent_read_write_sets[i]) {
7,335✔
1905
                        auto& buffer = m_buffers.at(bid);
4,188✔
1906
                        auto& memory = buffer.memories[concurrent_chunks[i].memory_id];
4,188✔
1907

1908
                        for(auto& allocation : memory.allocations) {
8,790✔
1909
                                add_dependencies_on_last_writers(command_instructions[i], allocation, region_intersection(rw.reads, allocation.box));
4,602✔
1910
                                add_dependencies_on_last_concurrent_accesses(
9,204✔
1911
                                    command_instructions[i], allocation, region_intersection(rw.writes, allocation.box), instruction_dependency_origin::write_to_allocation);
9,204✔
1912
                        }
1913
                }
1914
        }
1915

1916
        // 3. Clear tracking structures for all regions that are being written to. We gracefully handle overlapping writes by treating the set of all conflicting
1917
        // writers as last writers of an allocation.
1918

1919
        for(size_t i = 0; i < concurrent_chunks.size(); ++i) {
5,351✔
1920
                for(const auto& [bid, rw] : concurrent_read_write_sets[i]) {
7,335✔
1921
                        assert(command_instructions[i] != nullptr);
4,188✔
1922
                        auto& buffer = m_buffers.at(bid);
4,188✔
1923
                        for(auto& alloc : buffer.memories[concurrent_chunks[i].memory_id].allocations) {
8,790✔
1924
                                alloc.begin_concurrent_writes(region_intersection(alloc.box, rw.writes));
4,602✔
1925
                        }
1926
                }
1927
        }
1928

1929
        // 4. Update data locations and last writers resulting from all concurrent reads and overlapping writes
1930

1931
        for(size_t i = 0; i < concurrent_chunks.size(); ++i) {
5,351✔
1932
                for(const auto& [bid, rw] : concurrent_read_write_sets[i]) {
7,335✔
1933
                        assert(command_instructions[i] != nullptr);
4,188✔
1934
                        auto& buffer = m_buffers.at(bid);
4,188✔
1935

1936
                        for(auto& alloc : buffer.memories[concurrent_chunks[i].memory_id].allocations) {
8,790✔
1937
                                alloc.track_concurrent_read(region_intersection(alloc.box, rw.reads), command_instructions[i]);
4,602✔
1938
                                alloc.track_concurrent_write(region_intersection(alloc.box, rw.writes), command_instructions[i]);
4,602✔
1939
                        }
1940
                        buffer.track_original_write(rw.writes, command_instructions[i], concurrent_chunks[i].memory_id);
4,188✔
1941
                }
1942
        }
1943
}
2,204✔
1944

1945
void generator_impl::perform_task_side_effects(
2,325✔
1946
    const task& tsk, const std::vector<localized_chunk>& concurrent_chunks, const std::vector<instruction*>& command_instructions) //
1947
{
1948
        if(tsk.get_side_effect_map().empty()) return;
2,325✔
1949

1950
        assert(concurrent_chunks.size() == 1); // splitting instructions with side effects would race
106✔
1951
        assert(!concurrent_chunks[0].device_id.has_value());
106✔
1952
        assert(concurrent_chunks[0].memory_id == host_memory_id);
106✔
1953

1954
        for(const auto& [hoid, order] : tsk.get_side_effect_map()) {
221✔
1955
                auto& host_object = m_host_objects.at(hoid);
115✔
1956
                if(const auto last_side_effect = host_object.last_side_effect) {
115!
1957
                        add_dependency(command_instructions[0], last_side_effect, instruction_dependency_origin::side_effect);
115✔
1958
                }
1959
                host_object.last_side_effect = command_instructions[0];
115✔
1960
        }
1961
}
1962

1963
void generator_impl::perform_task_collective_operations(
2,325✔
1964
    const task& tsk, const std::vector<localized_chunk>& concurrent_chunks, const std::vector<instruction*>& command_instructions) //
1965
{
1966
        if(tsk.get_collective_group_id() == non_collective_group_id) return;
2,325✔
1967

1968
        assert(concurrent_chunks.size() == 1); //
47✔
1969
        assert(!concurrent_chunks[0].device_id.has_value());
47✔
1970
        assert(concurrent_chunks[0].memory_id == host_memory_id);
47✔
1971

1972
        auto& group = m_collective_groups.at(tsk.get_collective_group_id()); // must be created previously with clone_collective_group_instruction
47✔
1973
        add_dependency(command_instructions[0], group.last_collective_operation, instruction_dependency_origin::collective_group_order);
47✔
1974
        group.last_collective_operation = command_instructions[0];
47✔
1975
}
1976

1977
void generator_impl::compile_execution_command(batch& command_batch, const execution_command& ecmd) {
2,335✔
1978
        const auto& tsk = *ecmd.get_task();
2,335✔
1979

1980
        // 1. If this is a collective host task, we might need to insert a `clone_collective_group_instruction` which the task instruction is later serialized on.
1981
        create_task_collective_groups(command_batch, tsk);
2,335✔
1982

1983
        // 2. Split the task into local chunks and (in case of a device kernel) assign it to devices
1984
        const auto concurrent_chunks = split_task_execution_range(ecmd, tsk);
2,335✔
1985

1986
        // 3. Detect and report overlapping writes - is not a fatal error to discover one, we always generate an executable (albeit racy) instruction graph
1987
        if(m_policy.overlapping_write_error != error_policy::ignore) { report_task_overlapping_writes(tsk, concurrent_chunks); }
2,331✔
1988

1989
        // 4. Perform all necessary receives, allocations, resize- and coherence copies to provide an appropriate set of buffer allocations and data distribution
1990
        // for all kernels and host tasks of this task. This is done simultaneously for all chunks to optimize the graph and avoid inefficient copy-chains.
1991
        auto accessed_bids = tsk.get_buffer_access_map().get_accessed_buffers();
2,329✔
1992
        for(const auto& rinfo : tsk.get_reductions()) {
2,435✔
1993
                accessed_bids.insert(rinfo.bid);
106✔
1994
        }
1995
        for(const auto bid : accessed_bids) {
4,976✔
1996
                satisfy_task_buffer_requirements(command_batch, bid, tsk, ecmd.get_execution_range(), ecmd.is_reduction_initializer(), concurrent_chunks);
2,651✔
1997
        }
1998

1999
        // 5. If the task contains reductions with more than one local input, create the appropriate gather allocations and (if the local node is the designated
2000
        // reduction initializer) copies the current buffer value into the new gather space.
2001
        std::vector<local_reduction> local_reductions(tsk.get_reductions().size());
6,975✔
2002
        for(size_t i = 0; i < local_reductions.size(); ++i) {
2,430✔
2003
                local_reductions[i] = prepare_task_local_reduction(command_batch, tsk.get_reductions()[i], ecmd, tsk, concurrent_chunks.size());
105✔
2004
        }
2005

2006
        // 6. Issue instructions to launch all concurrent kernels / host tasks.
2007
        std::vector<instruction*> command_instructions(concurrent_chunks.size());
6,975✔
2008
        for(size_t i = 0; i < concurrent_chunks.size(); ++i) {
5,601✔
2009
                command_instructions[i] = launch_task_kernel(command_batch, ecmd, tsk, concurrent_chunks[i]);
3,276✔
2010
        }
2011

2012
        // 7. Compute dependencies and update tracking data structures
2013
        perform_task_buffer_accesses(tsk, concurrent_chunks, command_instructions);
2,325✔
2014
        perform_task_side_effects(tsk, concurrent_chunks, command_instructions);
2,325✔
2015
        perform_task_collective_operations(tsk, concurrent_chunks, command_instructions);
2,325✔
2016

2017
        // 8. For any reductions with more than one local input, collect partial results and perform the reduction operation in host memory. This is done eagerly to
2018
        // avoid ever having to persist partial reduction states in our buffer tracking.
2019
        for(size_t i = 0; i < local_reductions.size(); ++i) {
2,430✔
2020
                finish_task_local_reduction(command_batch, local_reductions[i], tsk.get_reductions()[i], ecmd, tsk, concurrent_chunks);
105✔
2021
        }
2022

2023
        // 9. If any of the instructions have no predecessor, anchor them on the last epoch (this can only happen for chunks without any buffer accesses).
2024
        for(const auto instr : command_instructions) {
5,601✔
2025
                if(instr->get_dependencies().empty()) { add_dependency(instr, m_last_epoch, instruction_dependency_origin::last_epoch); }
3,276✔
2026
        }
2027
}
4,660✔
2028

2029
void generator_impl::compile_push_command(batch& command_batch, const push_command& pcmd) {
436✔
2030
        const auto trid = pcmd.get_transfer_id();
436✔
2031
        // TODO: Since we now have a single fat push command, we can be smarter about how we allocate host memory and issue coherence copies.
2032
        for(const auto& [target, reg] : pcmd.get_target_regions()) {
1,076✔
2033
                // If not all nodes contribute partial results to a global reductions, the remaining ones need to notify their peers that they should not expect any
2034
                // data. This is done by announcing an empty box through the pilot message, but not actually performing a send.
2035
                if(reg.empty()) {
640✔
2036
                        assert(trid.rid != no_reduction_id);
4✔
2037
                        create_outbound_pilot(command_batch, target, trid, box<3>());
20✔
2038
                        continue;
4✔
2039
                }
2040

2041
                // Prioritize all instructions participating in a "push" to hide the latency of establishing local coherence behind the typically much longer latencies
2042
                // of inter-node communication
2043
                command_batch.base_priority = 10;
636✔
2044

2045
                auto& buffer = m_buffers.at(trid.bid);
636✔
2046
                auto& host_memory = buffer.memories[host_memory_id];
636✔
2047

2048
                // We want to generate the fewest number of send instructions possible without introducing new synchronization points between chunks of the same
2049
                // command that generated the pushed data. This will allow computation-communication overlap, especially in the case of oversubscribed splits.
2050
                dense_map<memory_id, std::vector<region<3>>> concurrent_send_source_regions(host_memory_id + 1); // establish_coherence() takes a dense_map
636✔
2051
                auto& concurrent_send_regions = concurrent_send_source_regions[host_memory_id];
636✔
2052

2053
                // Since we now send boxes individually, we do not need to allocate the entire push_box contiguously.
2054
                box_vector<3> required_host_allocation;
636✔
2055
                {
2056
                        std::unordered_map<instruction_id, region_builder<3>> individual_send_boxes;
636✔
2057
                        for(auto& [box, original_writer] : buffer.original_writers.get_region_values(reg)) {
1,388✔
2058
                                individual_send_boxes[original_writer->get_id()].add(box);
752✔
2059
                                required_host_allocation.push_back(box);
752✔
2060
                        }
636✔
2061
                        for(auto& [original_writer, boxes] : individual_send_boxes) {
1,388✔
2062
                                concurrent_send_regions.push_back(std::move(boxes).into_region());
752✔
2063
                        }
2064
                }
636✔
2065

2066
                allocate_contiguously(command_batch, trid.bid, host_memory_id, std::move(required_host_allocation));
636✔
2067
                establish_coherence_between_buffer_memories(command_batch, trid.bid, concurrent_send_source_regions);
636✔
2068

2069
                for(const auto& send_region : concurrent_send_regions) {
1,388✔
2070
                        for(const auto& full_send_box : send_region.get_boxes()) {
1,504✔
2071
                                // Splitting must happen on buffer range instead of host allocation range to ensure boxes are also suitable for the receiver, which might have
2072
                                // a differently-shaped backing allocation
2073
                                for(const auto& compatible_send_box : split_into_communicator_compatible_boxes(buffer.range, full_send_box)) {
1,523✔
2074
                                        const message_id msgid = create_outbound_pilot(command_batch, target, trid, compatible_send_box);
771✔
2075

2076
                                        auto& allocation = host_memory.get_contiguous_allocation(compatible_send_box); // we allocate_contiguously above
771✔
2077

2078
                                        const auto offset_in_allocation = compatible_send_box.get_offset() - allocation.box.get_offset();
771✔
2079
                                        const auto send_instr = create<send_instruction>(command_batch, target, msgid, allocation.aid, allocation.box.get_range(),
771✔
2080
                                            offset_in_allocation, compatible_send_box.get_range(), buffer.elem_size,
1,542✔
2081
                                            [&](const auto& record_debug_info) { record_debug_info(pcmd.get_id(), trid, buffer.debug_name, compatible_send_box.get_offset()); });
2,402✔
2082

2083
                                        perform_concurrent_read_from_allocation(send_instr, allocation, compatible_send_box);
771✔
2084
                                }
752✔
2085
                        }
2086
                }
2087
        }
636✔
2088
}
436✔
2089

2090
void generator_impl::defer_await_push_command(const await_push_command& apcmd) {
386✔
2091
        // We do not generate instructions for await-push commands immediately upon receiving them; instead, we buffer them and generate
2092
        // recv-instructions as soon as data is to be read by another instruction. This way, we can split the recv instructions and avoid
2093
        // unnecessary synchronization points between chunks that can otherwise profit from a computation-communication overlap.
2094

2095
        const auto& trid = apcmd.get_transfer_id();
386✔
2096
        if(is_recording()) { m_recorder->record_await_push_command_id(trid, apcmd.get_id()); }
386✔
2097

2098
        auto& buffer = m_buffers.at(trid.bid);
386✔
2099

2100
#ifndef NDEBUG
2101
        for(const auto& receive : buffer.pending_receives) {
386!
2102
                assert((trid.rid != no_reduction_id || receive.consumer_tid != trid.consumer_tid)
×
2103
                       && "received multiple await-pushes for the same consumer-task, buffer and reduction id");
2104
                assert(region_intersection(receive.received_region, apcmd.get_region()).empty()
×
2105
                       && "received an await-push command into a previously await-pushed region without an intermediate read");
2106
        }
2107
        for(const auto& gather : buffer.pending_gathers) {
386!
2108
                assert(std::pair(gather.consumer_tid, gather.rid) != std::pair(trid.consumer_tid, gather.rid)
×
2109
                       && "received multiple await-pushes for the same consumer-task, buffer and reduction id");
2110
                assert(region_intersection(gather.gather_box, apcmd.get_region()).empty()
×
2111
                       && "received an await-push command into a previously await-pushed region without an intermediate read");
2112
        }
2113
#endif
2114

2115
        if(trid.rid == no_reduction_id) {
386✔
2116
                buffer.pending_receives.emplace_back(trid.consumer_tid, apcmd.get_region(), connected_subregion_bounding_boxes(apcmd.get_region()));
343✔
2117
        } else {
2118
                assert(apcmd.get_region().get_boxes().size() == 1);
43✔
2119
                buffer.pending_gathers.emplace_back(trid.consumer_tid, trid.rid, apcmd.get_region().get_boxes().front());
43✔
2120
        }
2121
}
386✔
2122

2123
void generator_impl::compile_reduction_command(batch& command_batch, const reduction_command& rcmd) {
43✔
2124
        // In a single-node setting, global reductions are no-ops, so no reduction commands should ever be issued
2125
        assert(m_num_nodes > 1 && "received a reduction command in a single-node configuration");
43✔
2126

2127
        const auto [rid_, bid_, init_from_buffer] = rcmd.get_reduction_info();
43✔
2128
        const auto rid = rid_; // allow capturing in lambda
43✔
2129
        const auto bid = bid_; // allow capturing in lambda
43✔
2130

2131
        auto& buffer = m_buffers.at(bid);
43✔
2132

2133
        const auto gather = std::find_if(buffer.pending_gathers.begin(), buffer.pending_gathers.end(), [&](const buffer_state::gather_receive& g) {
43✔
2134
                return g.rid == rid; // assume that g.consumer_tid is correct because there cannot be multiple concurrent reductions for a single task
43✔
2135
        });
2136
        assert(gather != buffer.pending_gathers.end() && "received reduction command that is not preceded by an appropriate await-push");
43✔
2137
        assert(gather->gather_box == scalar_reduction_box);
43✔
2138

2139
        // 1. Create a host-memory allocation to gather the array of partial results
2140

2141
        const auto gather_aid = new_allocation_id(host_memory_id);
43✔
2142
        const auto node_chunk_size = gather->gather_box.get_area() * buffer.elem_size;
43✔
2143
        const auto gather_alloc_instr =
2144
            create<alloc_instruction>(command_batch, gather_aid, m_num_nodes * node_chunk_size, buffer.elem_align, [&](const auto& record_debug_info) {
43✔
2145
                    record_debug_info(
27✔
2146
                        alloc_instruction_record::alloc_origin::gather, buffer_allocation_record{bid, buffer.debug_name, gather->gather_box}, m_num_nodes);
54✔
2147
            });
97!
2148
        add_dependency(gather_alloc_instr, m_last_epoch, instruction_dependency_origin::last_epoch);
43✔
2149

2150
        // 2. Fill the gather space with the reduction identity, so that the gather_receive_command can simply ignore empty boxes sent by peers that do not
2151
        // contribute to the reduction, and we can skip the gather-copy instruction if we ourselves do not contribute a partial result.
2152

2153
        const auto fill_identity_instr =
2154
            create<fill_identity_instruction>(command_batch, rid, gather_aid, m_num_nodes, [](const auto& record_debug_info) { record_debug_info(); });
70✔
2155
        add_dependency(fill_identity_instr, gather_alloc_instr, instruction_dependency_origin::allocation_lifetime);
43✔
2156

2157
        // 3. If the local node contributes to the reduction, copy the contribution to the appropriate position in the gather space. Testing `up_to_date_memories`
2158
        // locally is not enough to establish whether there is a local contribution, since the local node might not have participated in the task that initiated the
2159
        // reduction. Instead, we are informed about this condition by the command graph.
2160

2161
        copy_instruction* local_gather_copy_instr = nullptr;
43✔
2162
        if(rcmd.has_local_contribution()) {
43✔
2163
                const auto contribution_location = buffer.up_to_date_memories.get_region_values(scalar_reduction_box).front().second;
41✔
2164
                const auto source_mid = next_location(contribution_location, host_memory_id);
41✔
2165
                // if scalar_box is up to date in that memory, it (the single element) must also be contiguous
2166
                auto& source_allocation = buffer.memories[source_mid].get_contiguous_allocation(scalar_reduction_box);
41✔
2167

2168
                local_gather_copy_instr = create<copy_instruction>(command_batch, source_allocation.aid, gather_aid, strided_layout(source_allocation.box),
82✔
2169
                    linearized_layout(m_local_nid * buffer.elem_size /* offset */), scalar_reduction_box, buffer.elem_size,
82✔
2170
                    [&](const auto& record_debug_info) { record_debug_info(copy_instruction_record::copy_origin::gather, bid, buffer.debug_name); });
107✔
2171
                add_dependency(local_gather_copy_instr, fill_identity_instr, instruction_dependency_origin::write_to_allocation);
41✔
2172
                perform_concurrent_read_from_allocation(local_gather_copy_instr, source_allocation, scalar_reduction_box);
41✔
2173
        }
2174

2175
        // 4. Gather remote contributions to the partial result array
2176

2177
        const transfer_id trid(gather->consumer_tid, bid, gather->rid);
43✔
2178
        const auto gather_recv_instr = create<gather_receive_instruction>(command_batch, trid, gather_aid, node_chunk_size,
86✔
2179
            [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name, gather->gather_box, m_num_nodes); });
70✔
2180
        add_dependency(gather_recv_instr, fill_identity_instr, instruction_dependency_origin::write_to_allocation);
43✔
2181

2182
        // 5. Perform the global reduction on the host by reading the array of inputs from the gather space and writing to the buffer's host allocation that covers
2183
        // `scalar_reduction_box`.
2184

2185
        allocate_contiguously(command_batch, bid, host_memory_id, {scalar_reduction_box});
129✔
2186

2187
        auto& host_memory = buffer.memories[host_memory_id];
43✔
2188
        auto& dest_allocation = host_memory.get_contiguous_allocation(scalar_reduction_box);
43✔
2189

2190
        const auto reduce_instr = create<reduce_instruction>(command_batch, rid, gather_aid, m_num_nodes, dest_allocation.aid, [&](const auto& record_debug_info) {
43✔
2191
                record_debug_info(rcmd.get_id(), bid, buffer.debug_name, scalar_reduction_box, reduce_instruction_record::reduction_scope::global);
27✔
2192
        });
70✔
2193
        add_dependency(reduce_instr, gather_recv_instr, instruction_dependency_origin::read_from_allocation);
43✔
2194
        if(local_gather_copy_instr != nullptr) { add_dependency(reduce_instr, local_gather_copy_instr, instruction_dependency_origin::read_from_allocation); }
43✔
2195
        perform_atomic_write_to_allocation(reduce_instr, dest_allocation, scalar_reduction_box);
43✔
2196
        buffer.track_original_write(scalar_reduction_box, reduce_instr, host_memory_id);
43✔
2197

2198
        // 6. Free the gather space
2199

2200
        const auto gather_free_instr = create<free_instruction>(
86✔
2201
            command_batch, gather_aid, [&](const auto& record_debug_info) { record_debug_info(m_num_nodes * node_chunk_size, std::nullopt); });
70✔
2202
        add_dependency(gather_free_instr, reduce_instr, instruction_dependency_origin::allocation_lifetime);
43✔
2203

2204
        buffer.pending_gathers.clear();
43✔
2205

2206
        // The associated reducer will be garbage-collected form the executor as we pass the reduction id on via the instruction_garbage member of the next horizon
2207
        // or epoch instruction.
2208
}
43✔
2209

2210
void generator_impl::compile_fence_command(batch& command_batch, const fence_command& fcmd) {
85✔
2211
        const auto& tsk = *fcmd.get_task();
85✔
2212

2213
        assert(tsk.get_reductions().empty());
85✔
2214
        assert(tsk.get_collective_group_id() == non_collective_group_id);
85✔
2215

2216
        const auto& bam = tsk.get_buffer_access_map();
85✔
2217
        const auto& sem = tsk.get_side_effect_map();
85✔
2218
        assert(bam.get_num_accesses() + sem.size() == 1);
85✔
2219

2220
        // buffer fences encode their buffer id and subrange through buffer_access_map with a fixed range mapper (which is rather ugly)
2221
        if(bam.get_num_accesses() != 0) {
85✔
2222
                const auto bid = *bam.get_accessed_buffers().begin();
66✔
2223
                const auto fence_region = bam.compute_consumed_region(bid, {});
330✔
2224
                const auto fence_box = !fence_region.empty() ? fence_region.get_boxes().front() : box<3>();
74✔
2225

2226
                const auto user_allocation_id = tsk.get_task_promise()->get_user_allocation_id();
66✔
2227
                assert(user_allocation_id != null_allocation_id && user_allocation_id.get_memory_id() == user_memory_id);
66✔
2228

2229
                auto& buffer = m_buffers.at(bid);
66✔
2230
                copy_instruction* copy_instr = nullptr;
66✔
2231
                // gracefully handle empty-range buffer fences
2232
                if(!fence_box.empty()) {
66✔
2233
                        // We make the host buffer coherent first in order to apply pending await-pushes.
2234
                        // TODO this enforces a contiguous host-buffer allocation which may cause unnecessary resizes.
2235
                        satisfy_task_buffer_requirements(command_batch, bid, tsk, {}, false /* is_reduction_initializer: irrelevant */,
384✔
2236
                            std::vector{localized_chunk{host_memory_id, std::nullopt, box<3>()}} /* local_chunks: irrelevant */);
256✔
2237

2238
                        auto& host_buffer_allocation = buffer.memories[host_memory_id].get_contiguous_allocation(fence_box);
64✔
2239
                        copy_instr = create<copy_instruction>(command_batch, host_buffer_allocation.aid, user_allocation_id, strided_layout(host_buffer_allocation.box),
128✔
2240
                            strided_layout(fence_box), fence_box, buffer.elem_size,
128✔
2241
                            [&](const auto& record_debug_info) { record_debug_info(copy_instruction_record::copy_origin::fence, bid, buffer.debug_name); });
141✔
2242

2243
                        perform_concurrent_read_from_allocation(copy_instr, host_buffer_allocation, fence_box);
64✔
2244
                }
2245

2246
                const auto fence_instr = create<fence_instruction>(command_batch, tsk.get_task_promise(),
66✔
2247
                    [&](const auto& record_debug_info) { record_debug_info(tsk.get_id(), fcmd.get_id(), bid, buffer.debug_name, fence_box.get_subrange()); });
147✔
2248

2249
                if(copy_instr != nullptr) {
66✔
2250
                        add_dependency(fence_instr, copy_instr, instruction_dependency_origin::read_from_allocation);
64✔
2251
                } else {
2252
                        // an empty-range buffer fence has no data dependencies but must still be executed to fulfill its promise - attach it to the current epoch.
2253
                        add_dependency(fence_instr, m_last_epoch, instruction_dependency_origin::last_epoch);
2✔
2254
                }
2255

2256
                // we will just assume that the runtime does not intend to re-use the allocation it has passed
2257
                m_unreferenced_user_allocations.push_back(user_allocation_id);
66✔
2258
        }
66✔
2259

2260
        // host-object fences encode their host-object id in the task side effect map (which is also very ugly)
2261
        if(!sem.empty()) {
85✔
2262
                const auto hoid = sem.begin()->first;
19✔
2263

2264
                auto& obj = m_host_objects.at(hoid);
19✔
2265
                const auto fence_instr = create<fence_instruction>(
38✔
2266
                    command_batch, tsk.get_task_promise(), [&, hoid = hoid](const auto& record_debug_info) { record_debug_info(tsk.get_id(), fcmd.get_id(), hoid); });
21✔
2267

2268
                add_dependency(fence_instr, obj.last_side_effect, instruction_dependency_origin::side_effect);
19✔
2269
                obj.last_side_effect = fence_instr;
19✔
2270
        }
2271
}
85✔
2272

2273
void generator_impl::compile_horizon_command(batch& command_batch, const horizon_command& hcmd) {
638✔
2274
        m_idag->begin_epoch(hcmd.get_task()->get_id());
638✔
2275
        instruction_garbage garbage{hcmd.get_completed_reductions(), std::move(m_unreferenced_user_allocations)};
638✔
2276
        const auto horizon = create<horizon_instruction>(
638✔
2277
            command_batch, hcmd.get_task()->get_id(), std::move(garbage), [&](const auto& record_debug_info) { record_debug_info(hcmd.get_id()); });
1,344✔
2278

2279
        collapse_execution_front_to(horizon);
638✔
2280
        if(m_last_horizon != nullptr) { apply_epoch(m_last_horizon); }
638✔
2281
        m_last_horizon = horizon;
638✔
2282
}
1,276✔
2283

2284
void generator_impl::compile_epoch_command(batch& command_batch, const epoch_command& ecmd) {
1,183✔
2285
        if(ecmd.get_epoch_action() == epoch_action::shutdown) { free_all_staging_allocations(command_batch); }
1,183✔
2286

2287
        m_idag->begin_epoch(ecmd.get_task()->get_id());
1,183✔
2288
        instruction_garbage garbage{ecmd.get_completed_reductions(), std::move(m_unreferenced_user_allocations)};
1,183✔
2289
        const auto epoch = create<epoch_instruction>(command_batch, ecmd.get_task()->get_id(), ecmd.get_epoch_action(), ecmd.get_task()->get_task_promise(),
5,915✔
2290
            std::move(garbage), [&](const auto& record_debug_info) { record_debug_info(ecmd.get_id()); });
6,354✔
2291

2292
        collapse_execution_front_to(epoch);
1,183✔
2293
        apply_epoch(epoch);
1,183✔
2294
        m_last_horizon = nullptr;
1,183✔
2295

2296
        if(ecmd.get_epoch_action() == epoch_action::init) {
1,183✔
2297
                // The root collective group already exists in the runtime, but we must still equip it with a meaningful last_host_task.
2298
                m_collective_groups.emplace(root_collective_group_id, epoch);
424✔
2299
        }
2300
}
2,366✔
2301

2302
void generator_impl::flush_batch(batch&& batch) { // NOLINT(cppcoreguidelines-rvalue-reference-param-not-moved) we do move the members of `batch`
5,770✔
2303
        // sanity check: every instruction except the initial epoch must be temporally anchored through at least one dependency
2304
        assert(std::all_of(batch.generated_instructions.begin(), batch.generated_instructions.end(),
18,908✔
2305
            [](const auto instr) { return instr->get_id() == 0 || !instr->get_dependencies().empty(); }));
2306
        assert(is_topologically_sorted(batch.generated_instructions.begin(), batch.generated_instructions.end()));
5,770✔
2307

2308
        // instructions must be recorded manually after each create<instr>() call; verify that we never flush an unrecorded instruction
2309
        assert(m_recorder == nullptr || std::all_of(batch.generated_instructions.begin(), batch.generated_instructions.end(), [this](const auto instr) {
235,089✔
2310
                return std::any_of(
2311
                    m_recorder->get_graph_nodes().begin(), m_recorder->get_graph_nodes().end(), [=](const auto& rec) { return rec->id == instr->get_id(); });
2312
        }));
2313

2314
        if(m_delegate != nullptr && (!batch.generated_instructions.empty() || !batch.generated_pilots.empty())) {
5,770!
2315
                m_delegate->flush(std::move(batch.generated_instructions), std::move(batch.generated_pilots));
3,962✔
2316
        }
2317

2318
#ifndef NDEBUG // ~batch() checks if it has been flushed, which we want to acknowledge even if m_delegate == nullptr
2319
        batch.generated_instructions = {};
5,770✔
2320
        batch.generated_pilots = {};
5,770✔
2321
#endif
2322
}
5,770✔
2323

2324
instruction_graph_generator::scheduling_hint generator_impl::anticipate(const command& cmd) {
4,171✔
2325
        // Although fence, push and await-push commands currently trigger host buffer allocations, these will be migrated to staging allocations eventually.
2326
        if(!utils::isa<execution_command>(&cmd)) return instruction_graph_generator::scheduling_hint::is_self_contained;
4,171✔
2327

2328
        CELERITY_DETAIL_TRACY_ZONE_SCOPED("iggen::anticipate", Teal);
2329

2330
        const auto& ecmd = *utils::as<execution_command>(&cmd);
1,915✔
2331
        const auto& tsk = *ecmd.get_task();
1,915✔
2332
        const auto concurrent_chunks = split_task_execution_range(ecmd, tsk);
1,915✔
2333

2334
        std::unordered_map<std::pair<buffer_id, memory_id>, box_vector<3>, utils::pair_hash> required_contiguous_boxes;
1,915✔
2335
        const auto require_contiguous = [&](const buffer_id bid, const memory_id mid, const box_vector<3>& boxes) {
1,915✔
2336
                auto& new_boxes = required_contiguous_boxes[{bid, mid}]; // allow default-insert
3,662✔
2337
                new_boxes.insert(new_boxes.end(), boxes.begin(), boxes.end());
3,662✔
2338
        };
3,662✔
2339

2340
        // Aggregate required boxes for kernel buffer accesses
2341
        const auto& bam = tsk.get_buffer_access_map();
1,915✔
2342
        for(const auto bid : bam.get_accessed_buffers()) {
4,131✔
2343
                for(const auto& chunk : concurrent_chunks) {
5,712✔
2344
                        require_contiguous(bid, chunk.memory_id, bam.compute_required_contiguous_boxes(bid, chunk.execution_range));
3,496✔
2345
                }
2346
        }
2347

2348
        // Aggregate required boxes for reduction in- and outputs
2349
        for(const auto& rinfo : tsk.get_reductions()) {
1,980✔
2350
                require_contiguous(rinfo.bid, host_memory_id, {scalar_reduction_box});
195✔
2351
                for(const auto& chunk : concurrent_chunks) {
166✔
2352
                        require_contiguous(rinfo.bid, chunk.memory_id, {scalar_reduction_box});
303✔
2353
                }
2354
        }
2355

2356
        // If the set of allocated boxes grows for any buffer since the last call to anticipate(), queueing this command might allow merging those allocations with
2357
        // future ones to avoid resizes
2358
        bool would_allocate = false;
1,915✔
2359
        for(const auto& [bid_mid, required_boxes] : required_contiguous_boxes) {
5,073✔
2360
                const auto [bid, mid] = bid_mid;
3,158✔
2361
                auto& memory = m_buffers.at(bid).memories[mid];
3,158✔
2362

2363
                bool would_allocate_in_buffer_memory = false;
3,158✔
2364
                auto& anticipated_boxes = memory.anticipated_contiguous_boxes;
3,158✔
2365
                for(const auto& box : required_boxes) {
6,847✔
2366
                        if(memory.is_allocated_contiguously(box)) continue;
3,689✔
2367
                        if(std::any_of(anticipated_boxes.begin(), anticipated_boxes.end(), [&](const detail::box<3>& a) { return a.covers(box); })) continue;
3,071✔
2368
                        memory.anticipated_contiguous_boxes.push_back(box);
1,045✔
2369
                        would_allocate_in_buffer_memory = true;
1,045✔
2370
                }
2371

2372
                if(would_allocate_in_buffer_memory) {
3,158✔
2373
                        merge_overlapping_bounding_boxes(memory.anticipated_contiguous_boxes);
929✔
2374
                        merge_connected_boxes(memory.anticipated_contiguous_boxes);
929✔
2375
                        would_allocate = true;
929✔
2376
                }
2377
        }
2378
        return would_allocate ? instruction_graph_generator::scheduling_hint::could_merge_with_future_commands
1,915✔
2379
                              : instruction_graph_generator::scheduling_hint::is_self_contained;
1,915✔
2380
}
1,915✔
2381

2382
void generator_impl::compile(const command& cmd) {
5,106✔
2383
        batch command_batch;
5,106✔
2384
        matchbox::match(
5,106✔
2385
            cmd,                                                                                    //
2386
            [&](const execution_command& ecmd) { compile_execution_command(command_batch, ecmd); }, //
7,441✔
2387
            [&](const push_command& pcmd) { compile_push_command(command_batch, pcmd); },           //
10,648✔
2388
            [&](const await_push_command& apcmd) { defer_await_push_command(apcmd); },              //
10,598✔
2389
            [&](const horizon_command& hcmd) { compile_horizon_command(command_batch, hcmd); },     //
10,850✔
2390
            [&](const epoch_command& ecmd) { compile_epoch_command(command_batch, ecmd); },         //
11,395✔
2391
            [&](const reduction_command& rcmd) { compile_reduction_command(command_batch, rcmd); }, //
10,255✔
2392
            [&](const fence_command& fcmd) { compile_fence_command(command_batch, fcmd); }          //
10,297✔
2393
        );
2394
        flush_batch(std::move(command_batch));
5,096✔
2395
}
10,202✔
2396

2397
std::string generator_impl::print_buffer_debug_label(const buffer_id bid) const { return utils::make_buffer_debug_label(bid, m_buffers.at(bid).debug_name); }
9✔
2398

2399
} // namespace celerity::detail::instruction_graph_generator_detail
2400

2401
namespace celerity::detail {
2402

2403
instruction_graph_generator::instruction_graph_generator(const size_t num_nodes, const node_id local_nid, const system_info& system, instruction_graph& idag,
424✔
2404
    instruction_graph_generator::delegate* const dlg, instruction_recorder* const recorder, const policy_set& policy)
424✔
2405
    : m_impl(new instruction_graph_generator_detail::generator_impl(num_nodes, local_nid, system, idag, dlg, recorder, policy)) {}
424!
2406

2407
instruction_graph_generator::~instruction_graph_generator() = default;
424✔
2408

2409
void instruction_graph_generator::notify_buffer_created(
591✔
2410
    const buffer_id bid, const range<3>& range, const size_t elem_size, const size_t elem_align, const allocation_id user_allocation_id) {
2411
        m_impl->notify_buffer_created(bid, range, elem_size, elem_align, user_allocation_id);
591✔
2412
}
591✔
2413

2414
void instruction_graph_generator::notify_buffer_debug_name_changed(const buffer_id bid, const std::string& name) {
23✔
2415
        m_impl->notify_buffer_debug_name_changed(bid, name);
23✔
2416
}
23✔
2417

2418
void instruction_graph_generator::notify_buffer_destroyed(const buffer_id bid) { m_impl->notify_buffer_destroyed(bid); }
581✔
2419

2420
void instruction_graph_generator::notify_host_object_created(const host_object_id hoid, const bool owns_instance) {
101✔
2421
        m_impl->notify_host_object_created(hoid, owns_instance);
101✔
2422
}
101✔
2423

2424
void instruction_graph_generator::notify_host_object_destroyed(const host_object_id hoid) { m_impl->notify_host_object_destroyed(hoid); }
97✔
2425

2426
instruction_graph_generator::scheduling_hint instruction_graph_generator::anticipate(const command& cmd) { return m_impl->anticipate(cmd); }
4,171✔
2427

2428
void instruction_graph_generator::compile(const command& cmd) { m_impl->compile(cmd); }
5,106✔
2429

2430
} // namespace celerity::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc