• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 11609936308

31 Oct 2024 10:08AM UTC coverage: 95.249% (+0.05%) from 95.198%
11609936308

push

github

fknorr
Update benchmark results for new TDAG structure

3034 of 3420 branches covered (88.71%)

Branch coverage included in aggregate %.

6730 of 6831 relevant lines covered (98.52%)

1524046.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.24
/src/command_graph_generator.cc
1
#include "command_graph_generator.h"
2

3
#include "access_modes.h"
4
#include "command.h"
5
#include "command_graph.h"
6
#include "print_utils.h"
7
#include "recorders.h"
8
#include "split.h"
9
#include "task.h"
10
#include "types.h"
11

12
#include <cstddef>
13

14

15
namespace celerity::detail {
16

17
command_graph_generator::command_graph_generator(
584✔
18
    const size_t num_nodes, const node_id local_nid, command_graph& cdag, detail::command_recorder* const recorder, const policy_set& policy)
584✔
19
    : m_num_nodes(num_nodes), m_local_nid(local_nid), m_policy(policy), m_cdag(cdag), m_recorder(recorder) {
584✔
20
        if(m_num_nodes > max_num_nodes) {
584!
21
                throw std::runtime_error(fmt::format("Number of nodes requested ({}) exceeds compile-time maximum of {}", m_num_nodes, max_num_nodes));
×
22
        }
23
}
584✔
24

25
void command_graph_generator::notify_buffer_created(const buffer_id bid, const range<3>& range, bool host_initialized) {
698✔
26
        m_buffers.emplace(std::piecewise_construct, std::tuple{bid}, std::tuple{range, range});
698✔
27
        if(host_initialized && m_policy.uninitialized_read_error != error_policy::ignore) { m_buffers.at(bid).initialized_region = box(subrange({}, range)); }
770✔
28
        // Mark contents as available locally (= don't generate await push commands) and fully replicated (= don't generate push commands).
29
        // This is required when tasks access host-initialized or uninitialized buffers.
30
        m_buffers.at(bid).local_last_writer.update_region(subrange<3>({}, range), m_epoch_for_new_commands);
2,094✔
31
        m_buffers.at(bid).replicated_regions.update_region(subrange<3>({}, range), node_bitset{}.set());
2,094✔
32
}
698✔
33

34
void command_graph_generator::notify_buffer_debug_name_changed(const buffer_id bid, const std::string& debug_name) {
23✔
35
        m_buffers.at(bid).debug_name = debug_name;
23✔
36
}
23✔
37

38
void command_graph_generator::notify_buffer_destroyed(const buffer_id bid) {
503✔
39
        assert(m_buffers.count(bid) != 0);
503✔
40
        m_buffers.erase(bid);
503✔
41
}
503✔
42

43
void command_graph_generator::notify_host_object_created(const host_object_id hoid) {
62✔
44
        assert(m_host_objects.count(hoid) == 0);
62✔
45
        m_host_objects.emplace(hoid, host_object_state{m_epoch_for_new_commands});
62✔
46
}
62✔
47

48
void command_graph_generator::notify_host_object_destroyed(const host_object_id hoid) {
49✔
49
        assert(m_host_objects.count(hoid) != 0);
49✔
50
        m_host_objects.erase(hoid);
49✔
51
}
49✔
52

53
/// Returns whether an iterator range of commands is topologically sorted, i.e. sequential execution would satisfy all internal dependencies.
54
template <typename Iterator>
55
bool is_topologically_sorted(Iterator begin, Iterator end) {
5,162✔
56
        for(auto check = begin; check != end; ++check) {
11,192✔
57
                for(const auto dep : (*check)->get_dependencies()) {
14,767✔
58
                        if(std::find_if(std::next(check), end, [dep](const auto& node) { return node == dep.node; }) != end) return false;
19,304!
59
                }
60
        }
61
        return true;
5,162✔
62
}
63

64
std::vector<abstract_command*> command_graph_generator::build_task(const task& tsk) {
5,169✔
65
        assert(m_current_cmd_batch.empty());
5,169✔
66
        [[maybe_unused]] const auto cmd_count_before = m_cdag.command_count();
5,169✔
67

68
        const auto epoch_to_prune_before = m_epoch_for_new_commands;
5,169✔
69

70
        switch(tsk.get_type()) {
5,169!
71
        case task_type::epoch: generate_epoch_command(tsk); break;
1,306✔
72
        case task_type::horizon: generate_horizon_command(tsk); break;
808✔
73
        case task_type::device_compute:
3,055✔
74
        case task_type::host_compute:
75
        case task_type::master_node:
76
        case task_type::collective:
77
        case task_type::fence: generate_distributed_commands(tsk); break;
3,055✔
78
        default: throw std::runtime_error("Task type NYI");
×
79
        }
80

81
        // It is currently undefined to split reduction-producer tasks into multiple chunks on the same node:
82
        //   - Per-node reduction intermediate results are stored with fixed access to a single backing buffer,
83
        //     so multiple chunks on the same node will race on this buffer access
84
        //   - Inputs to the final reduction command are ordered by origin node ids to guarantee bit-identical results. It is not possible to distinguish
85
        //     more than one chunk per node in the serialized commands, so different nodes can produce different final reduction results for non-associative
86
        //     or non-commutative operations
87
        if(!tsk.get_reductions().empty()) { assert(m_cdag.task_command_count(tsk.get_id()) <= 1); }
5,162!
88

89
        // Commands without any other true-dependency must depend on the active epoch command to ensure they cannot be re-ordered before the epoch.
90
        // Need to check count b/c for some tasks we may not have generated any commands locally.
91
        if(m_cdag.task_command_count(tsk.get_id()) > 0) {
5,162✔
92
                for(auto* const cmd : m_cdag.task_commands(tsk.get_id())) {
9,912✔
93
                        generate_epoch_dependencies(cmd);
4,960✔
94
                }
95
        }
96

97
        // Check that all commands have been created through create_command
98
        assert(m_cdag.command_count() - cmd_count_before == m_current_cmd_batch.size());
5,162✔
99

100
        // If a new epoch was completed in the CDAG before the current task, prune all predecessor commands of that epoch.
101
        prune_commands_before(epoch_to_prune_before);
5,162✔
102

103
        // Check that all commands have been recorded
104
        if(is_recording()) {
5,162✔
105
                assert(std::all_of(m_current_cmd_batch.begin(), m_current_cmd_batch.end(), [this](const abstract_command* cmd) {
51,785✔
106
                        return std::any_of(m_recorder->get_graph_nodes().begin(), m_recorder->get_graph_nodes().end(),
107
                            [cmd](const std::unique_ptr<command_record>& rec) { return rec->id == cmd->get_cid(); });
108
                }));
109
        }
110

111
        assert(is_topologically_sorted(m_current_cmd_batch.begin(), m_current_cmd_batch.end()));
5,162✔
112
        return std::move(m_current_cmd_batch);
10,324✔
113
}
114

115
void command_graph_generator::report_overlapping_writes(const task& tsk, const box_vector<3>& local_chunks) const {
3,051✔
116
        const chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()};
3,051✔
117

118
        // Since this check is run distributed on every node, we avoid quadratic behavior by only checking for conflicts between all local chunks and the
119
        // region-union of remote chunks. This way, every conflict will be reported by at least one node.
120
        const box<3> global_chunk(subrange(full_chunk.offset, full_chunk.range));
3,051✔
121
        auto remote_chunks = region_difference(global_chunk, region(box_vector<3>(local_chunks))).into_boxes();
3,051✔
122

123
        // detect_overlapping_writes takes a single box_vector, so we concatenate local and global chunks (the order does not matter)
124
        auto distributed_chunks = std::move(remote_chunks);
3,051✔
125
        distributed_chunks.insert(distributed_chunks.end(), local_chunks.begin(), local_chunks.end());
3,051✔
126

127
        if(const auto overlapping_writes = detect_overlapping_writes(tsk, distributed_chunks); !overlapping_writes.empty()) {
3,051✔
128
                auto error = fmt::format("{} has overlapping writes between multiple nodes in", print_task_debug_label(tsk, true /* title case */));
28✔
129
                for(const auto& [bid, overlap] : overlapping_writes) {
28✔
130
                        fmt::format_to(std::back_inserter(error), " {} {}", print_buffer_debug_label(bid), overlap);
28✔
131
                }
132
                error += ". Choose a non-overlapping range mapper for this write access or constrain the split via experimental::constrain_split to make the access "
133
                         "non-overlapping.";
14✔
134
                utils::report_error(m_policy.overlapping_write_error, "{}", error);
14✔
135
        }
3,065✔
136
}
6,102✔
137

138
std::vector<command_graph_generator::assigned_chunk> command_graph_generator::split_task_and_assign_chunks(const task& tsk) const {
3,055✔
139
        const chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()};
3,055✔
140
        const size_t num_chunks = m_num_nodes * m_test_chunk_multiplier;
3,055✔
141
        const auto chunks = ([&] {
6,110✔
142
                if(tsk.get_type() == task_type::collective || tsk.get_type() == task_type::fence) {
3,055✔
143
                        std::vector<chunk<3>> chunks;
164✔
144
                        for(size_t nid = 0; nid < m_num_nodes; ++nid) {
579✔
145
                                chunks.push_back(chunk_cast<3>(chunk<1>{id<1>{tsk.get_type() == task_type::collective ? nid : 0}, ones, {m_num_nodes}}));
415✔
146
                        }
147
                        return chunks;
164✔
148
                }
164✔
149
                if(tsk.has_variable_split()) {
2,891✔
150
                        if(tsk.get_hint<experimental::hints::split_1d>() != nullptr) {
1,521✔
151
                                // no-op, keeping this for documentation purposes
152
                        }
153
                        if(tsk.get_hint<experimental::hints::split_2d>() != nullptr) { return split_2d(full_chunk, tsk.get_granularity(), num_chunks); }
1,521✔
154
                        return split_1d(full_chunk, tsk.get_granularity(), num_chunks);
2,986✔
155
                }
156
                return std::vector<chunk<3>>{full_chunk};
4,110✔
157
        })();
3,055✔
158
        assert(chunks.size() <= num_chunks); // We may have created less than requested
3,055✔
159
        assert(!chunks.empty());
3,055✔
160

161
        // Assign each chunk to a node
162
        // We assign chunks next to each other to the same worker (if there is more chunks than workers), as this is likely to produce less
163
        // transfers between tasks than a round-robin assignment (for typical stencil codes).
164
        // FIXME: This only works if the number of chunks is an integer multiple of the number of workers, e.g. 3 chunks for 2 workers degrades to RR.
165
        const auto chunks_per_node = std::max<size_t>(1, chunks.size() / m_num_nodes);
3,055✔
166

167
        std::vector<assigned_chunk> assigned_chunks;
3,055✔
168
        for(size_t i = 0; i < chunks.size(); ++i) {
8,065✔
169
                const node_id nid = (i / chunks_per_node) % m_num_nodes;
5,010✔
170
                assigned_chunks.push_back({nid, chunks[i]});
5,010✔
171
        }
172
        return assigned_chunks;
6,110✔
173
}
3,055✔
174

175
command_graph_generator::buffer_requirements_list command_graph_generator::get_buffer_requirements_for_mapped_access(
8,058✔
176
    const task& tsk, const subrange<3>& sr, const range<3> global_size) const {
177
        buffer_requirements_list result;
8,058✔
178
        const auto& access_map = tsk.get_buffer_access_map();
8,058✔
179
        for(const buffer_id bid : access_map.get_accessed_buffers()) {
16,481✔
180
                buffer_requirements reqs{bid, {}, {}};
8,423✔
181
                for(const auto m : access_map.get_access_modes(bid)) {
17,504✔
182
                        if(detail::access::mode_traits::is_consumer(m)) {
9,081✔
183
                                reqs.consumed = region_union(reqs.consumed, access_map.get_mode_requirements(bid, m, tsk.get_dimensions(), sr, global_size));
6,805✔
184
                        }
185
                        // Not else: `access_mode::write` is both a consumer and a producer
186
                        if(detail::access::mode_traits::is_producer(m)) {
9,081✔
187
                                reqs.produced = region_union(reqs.produced, access_map.get_mode_requirements(bid, m, tsk.get_dimensions(), sr, global_size));
5,446✔
188
                        }
189
                }
8,423✔
190
                result.emplace_back(std::move(reqs));
8,423✔
191
        }
16,481✔
192
        return result;
8,058✔
193
}
×
194

195
const box<3> empty_reduction_box({0, 0, 0}, {0, 0, 0});
196
const box<3> scalar_reduction_box({0, 0, 0}, {1, 1, 1});
197

198
command_graph_generator::assigned_chunks_with_requirements command_graph_generator::compute_per_chunk_requirements(
3,055✔
199
    const task& tsk, const std::vector<assigned_chunk>& assigned_chunks) const {
200
        assigned_chunks_with_requirements result;
3,055✔
201

202
        for(const auto& a_chunk : assigned_chunks) {
8,065✔
203
                const node_id nid = a_chunk.executed_on;
5,010✔
204
                auto requirements = get_buffer_requirements_for_mapped_access(tsk, a_chunk.chnk, tsk.get_global_size());
5,010✔
205

206
                // Add read/write requirements for reductions performed in this task.
207
                for(const auto& reduction : tsk.get_reductions()) {
5,474✔
208
                        // task_manager verifies that there are no reduction <-> write-access conflicts
209
                        assert(std::none_of(
616✔
210
                            requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == reduction.bid && !br.produced.empty(); }));
211
                        auto it = std::find_if(requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == reduction.bid; });
616✔
212
                        if(it == requirements.end()) { it = requirements.insert(requirements.end(), buffer_requirements{reduction.bid, {}, {}}); }
464!
213
                        it->produced = scalar_reduction_box;
464✔
214
                        if(nid == reduction_initializer_nid && reduction.init_from_buffer) { it->consumed = scalar_reduction_box; }
464✔
215
                }
216

217
                if(nid == m_local_nid) {
5,010✔
218
                        result.local_chunks.emplace_back(a_chunk, std::move(requirements));
2,853✔
219
                } else {
220
                        result.remote_chunks.emplace_back(a_chunk, std::move(requirements));
2,157✔
221
                }
222
        }
5,010✔
223

224
        return result;
3,055✔
225
}
×
226

227
void command_graph_generator::resolve_pending_reductions(const task& tsk, const assigned_chunks_with_requirements& chunks_with_requirements) {
3,053✔
228
        auto accessed_buffers = tsk.get_buffer_access_map().get_accessed_buffers();
3,053✔
229
        // Handle chained reductions (i.e., reductions that combine into a buffer that currently is in a pending reduction state)
230
        for(const auto& reduction : tsk.get_reductions()) {
3,225✔
231
                accessed_buffers.insert(reduction.bid);
172✔
232
        }
233

234
        for(const auto bid : accessed_buffers) {
6,344✔
235
                auto& buffer = m_buffers.at(bid);
3,293✔
236
                if(!buffer.pending_reduction.has_value()) { continue; }
3,293✔
237
                const auto& reduction = *buffer.pending_reduction;
131✔
238

239
                const auto local_last_writer = buffer.local_last_writer.get_region_values(scalar_reduction_box);
131✔
240
                assert(local_last_writer.size() == 1);
131✔
241
                // Prepare the buffer state for after the reduction has been performed:
242
                // Keep the current last writer, but mark it as stale, so that if we don't generate a reduction command locally,
243
                // we'll know to get the data from elsewhere. If we generate a reduction command, this will be overwritten by its command id.
244
                write_command_state wcs{static_cast<command_id>(local_last_writer[0].second)};
131✔
245
                wcs.mark_as_stale();
131✔
246
                buffer_state post_reduction_state{region_map<write_command_state>{ones, wcs}, region_map<node_bitset>{ones, node_bitset{}}};
131✔
247
                if(m_policy.uninitialized_read_error != error_policy::ignore) { post_reduction_state.initialized_region = scalar_reduction_box; }
131✔
248

249
                node_bitset participating_nodes;
393✔
250

251
                // Since the local reduction command overwrites the buffer contents that need to be pushed to other nodes, we need to process remote chunks first.
252
                for(const auto& [a_chunk, requirements] : chunks_with_requirements.remote_chunks) {
287✔
253
                        if(std::none_of(requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == bid && !br.consumed.empty(); })) {
334✔
254
                                // This chunk doesn't read from the buffer
255
                                continue;
26✔
256
                        }
257
                        participating_nodes.set(a_chunk.executed_on);
130✔
258
                }
259

260
                // Generate push command to all participating nodes
261
                if(participating_nodes.any()) {
131✔
262
                        // Push an empty range if we don't have any fresh data on this node. This will then generate an empty pilot that tells the
263
                        // other node's receive_arbiter to not expect a send.
264
                        const bool notification_only = !local_last_writer[0].second.is_fresh();
98✔
265
                        const auto push_box = notification_only ? empty_reduction_box : scalar_reduction_box;
98✔
266
                        assert(participating_nodes.count() == m_num_nodes - 1 || participating_nodes.count() == 1);
98✔
267
                        std::vector<std::pair<node_id, region<3>>> regions;
98✔
268
                        for(node_id nid = 0; nid < m_num_nodes; ++nid) {
416✔
269
                                if(!participating_nodes.test(nid)) continue;
318✔
270
                                regions.push_back({nid, push_box});
130✔
271
                        }
272
                        auto* const cmd = create_command<push_command>(transfer_id(tsk.get_id(), bid, reduction.rid), std::move(regions),
98✔
273
                            [&, bid = bid](const auto& record_debug_info) { record_debug_info(m_buffers.at(bid).debug_name); });
262✔
274
                        if(notification_only) {
98✔
275
                                generate_epoch_dependencies(cmd);
4✔
276
                        } else {
277
                                m_command_buffer_reads[cmd->get_cid()][bid] = region_union(m_command_buffer_reads[cmd->get_cid()][bid], scalar_reduction_box);
94✔
278
                                add_dependency(cmd, m_cdag.get(local_last_writer[0].second), dependency_kind::true_dep, dependency_origin::dataflow);
94✔
279
                        }
280

281
                        // Mark the reduction result as replicated so we don't generate data transfers to any of the participating nodes
282
                        post_reduction_state.replicated_regions.update_box(scalar_reduction_box, participating_nodes);
98✔
283
                }
98✔
284

285
                // We currently don't support multiple chunks on a single node for reductions (there is also -- for now -- no way to create multiple chunks,
286
                // as oversubscription is handled by the instruction graph).
287
                // NOTE: The participating_nodes.count() check below relies on this being true
288
                assert(chunks_with_requirements.local_chunks.size() <= 1);
131✔
289
                for(const auto& [a_chunk, requirements] : chunks_with_requirements.local_chunks) {
212✔
290
                        if(std::none_of(requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == bid && !br.consumed.empty(); })) {
172✔
291
                                // This chunk doesn't read from the buffer
292
                                continue;
13✔
293
                        }
294

295
                        auto* const ap_cmd = create_command<await_push_command>(transfer_id(tsk.get_id(), bid, reduction.rid), scalar_reduction_box.get_subrange(),
136✔
296
                            [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
256✔
297
                        generate_epoch_dependencies(ap_cmd);
68✔
298

299
                        auto* const reduce_cmd = create_command<reduction_command>(reduction, local_last_writer[0].second.is_fresh() /* has_local_contribution */,
68✔
300
                            [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
188✔
301

302
                        // Only generate a true dependency on the last writer if this node participated in the intermediate result computation.
303
                        if(local_last_writer[0].second.is_fresh()) {
68✔
304
                                add_dependency(reduce_cmd, m_cdag.get(local_last_writer[0].second), dependency_kind::true_dep, dependency_origin::dataflow);
66✔
305
                        }
306
                        add_dependency(reduce_cmd, ap_cmd, dependency_kind::true_dep, dependency_origin::dataflow);
68✔
307
                        generate_anti_dependencies(tsk, bid, buffer.local_last_writer, scalar_reduction_box, reduce_cmd);
68✔
308

309
                        post_reduction_state.local_last_writer.update_box(scalar_reduction_box, reduce_cmd->get_cid());
68✔
310
                        participating_nodes.set(m_local_nid); // We are participating
68✔
311
                }
312

313
                // We currently do not support generating reduction commands on only a subset of nodes, except for the special case of a single command.
314
                // This is because it is unclear who owns the final result in this case (normally all nodes "own" the result).
315
                //   => I.e., reducing and using the result on the participating nodes is actually not the problem (this works as intended); the issue only arises
316
                //      if the result is subsequently required in other tasks. Since we don't have a good way of detecting this condition however, we currently
317
                //      disallow partial reductions altogether.
318
                // NOTE: This check relies on the fact that we currently only generate a single chunk per node for reductions (see assertion above).
319
                if(participating_nodes.count() > 1 && participating_nodes.count() != m_num_nodes) {
131✔
320
                        utils::report_error(error_policy::panic,
6✔
321
                            "{} requires a reduction on {} that is not performed on all nodes. This is currently not supported. Either "
322
                            "ensure that all nodes receive a chunk that reads from the buffer, or reduce the data on a single node.",
323
                            print_task_debug_label(tsk, true /* title case */), print_buffer_debug_label(bid));
12✔
324
                }
325

326
                // For buffers that were in a pending reduction state and a reduction was generated
327
                // (i.e., the result was not discarded), set their new state.
328
                if(participating_nodes.count() > 0) {
129✔
329
                        m_completed_reductions.push_back(reduction.rid);
123✔
330
                        buffer = std::move(post_reduction_state);
123✔
331
                }
332
        }
133✔
333
}
6,104✔
334

335
void command_graph_generator::generate_pushes(const task& tsk, const assigned_chunks_with_requirements& chunks_with_requirements) {
3,051✔
336
        struct push_scratch {
337
                std::unordered_map<node_id, region<3>> target_regions;
338
                std::unordered_set<command_id> depends_on;
339
        };
340
        std::unordered_map<buffer_id, push_scratch> per_buffer_pushes;
3,051✔
341

342
        for(auto& [a_chunk, requirements] : chunks_with_requirements.remote_chunks) {
5,204✔
343
                const node_id nid = a_chunk.executed_on;
2,153✔
344

345
                for(const auto& [bid, consumed, _] : requirements) {
4,819✔
346
                        if(consumed.empty()) continue;
2,666✔
347
                        auto& buffer = m_buffers.at(bid);
1,765✔
348

349
                        const auto local_sources = buffer.local_last_writer.get_region_values(consumed);
1,765✔
350
                        for(const auto& [local_box, wcs] : local_sources) {
4,867✔
351
                                if(!wcs.is_fresh() || wcs.is_replicated()) { continue; }
3,102✔
352

353
                                // Make sure we don't push anything we've already pushed to this node before
354
                                box_vector<3> non_replicated_boxes;
847✔
355
                                for(const auto& [replicated_box, nodes] : buffer.replicated_regions.get_region_values(local_box)) {
1,792✔
356
                                        if(nodes.test(nid)) continue;
945✔
357
                                        non_replicated_boxes.push_back(replicated_box);
724✔
358
                                }
847✔
359

360
                                if(!non_replicated_boxes.empty()) {
847✔
361
                                        assert(!utils::isa<await_push_command>(m_cdag.get(wcs)) && "Attempting to push non-owned data?!");
658✔
362
                                        auto push_region = region<3>(std::move(non_replicated_boxes));
658✔
363
                                        // Remember that we've replicated this region
364
                                        for(const auto& [replicated_box, nodes] : buffer.replicated_regions.get_region_values(push_region)) {
1,382✔
365
                                                buffer.replicated_regions.update_box(replicated_box, node_bitset{nodes}.set(nid));
724✔
366
                                        }
658✔
367
                                        auto& scratch = per_buffer_pushes[bid];
658✔
368
                                        if(auto it = scratch.target_regions.find(nid); it != scratch.target_regions.end()) {
658✔
369
                                                it->second = region_union(it->second, push_region);
3✔
370
                                        } else {
371
                                                scratch.target_regions.emplace(nid, std::move(push_region));
655✔
372
                                        }
373
                                        scratch.depends_on.insert(wcs);
658✔
374
                                }
658✔
375
                        }
847✔
376
                }
1,765✔
377
        }
378

379
        // Generate push command for each buffer
380
        for(auto& [bid, scratch] : per_buffer_pushes) {
3,485✔
381
                region<3> combined_region;
434✔
382
                std::vector<std::pair<node_id, region<3>>> target_regions;
434✔
383
                for(auto& [nid, region] : scratch.target_regions) {
1,089✔
384
                        combined_region = region_union(combined_region, region);
655✔
385
                        target_regions.push_back({nid, std::move(region)});
655✔
386
                }
387

388
                auto* cmd = create_command<push_command>(transfer_id(tsk.get_id(), bid, no_reduction_id), std::move(target_regions),
434✔
389
                    [&, bid = bid](const auto& record_debug_info) { record_debug_info(m_buffers.at(bid).debug_name); });
949✔
390
                for(auto dep : scratch.depends_on) {
871✔
391
                        add_dependency(cmd, m_cdag.get(dep), dependency_kind::true_dep, dependency_origin::dataflow);
437✔
392
                }
393

394
                // Store the read access for determining anti-dependencies
395
                m_command_buffer_reads[cmd->get_cid()].emplace(bid, std::move(combined_region));
434✔
396
        }
434✔
397
}
6,102✔
398

399
// TODO: We currently generate an await push command for each local chunk, whereas we only generate a single push command for all remote chunks
400
void command_graph_generator::generate_await_pushes(const task& tsk, const assigned_chunks_with_requirements& chunks_with_requirements) {
3,051✔
401
        for(auto& [a_chunk, requirements] : chunks_with_requirements.local_chunks) {
5,900✔
402
                for(auto& [bid, consumed, _] : requirements) {
5,946✔
403
                        if(consumed.empty()) continue;
3,097✔
404
                        auto& buffer = m_buffers.at(bid);
2,396✔
405

406
                        const auto local_sources = buffer.local_last_writer.get_region_values(consumed);
2,396✔
407
                        box_vector<3> missing_part_boxes;
2,396✔
408
                        for(const auto& [box, wcs] : local_sources) {
6,062✔
409
                                // Note that we initialize all buffers as fresh, so this doesn't trigger for uninitialized reads
410
                                if(!box.empty() && !wcs.is_fresh()) { missing_part_boxes.push_back(box); }
3,666!
411
                        }
412

413
                        // There is data we don't yet have locally. Generate an await push command for it.
414
                        if(!missing_part_boxes.empty()) {
2,396✔
415
                                const region missing_parts(std::move(missing_part_boxes));
409✔
416
                                assert(m_num_nodes > 1);
409✔
417
                                auto* const ap_cmd = create_command<await_push_command>(transfer_id(tsk.get_id(), bid, no_reduction_id), missing_parts,
409✔
418
                                    [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
911✔
419
                                generate_anti_dependencies(tsk, bid, buffer.local_last_writer, missing_parts, ap_cmd);
409✔
420
                                generate_epoch_dependencies(ap_cmd);
409✔
421
                                // Remember that we have this data now
422
                                buffer.local_last_writer.update_region(missing_parts, {ap_cmd->get_cid(), true /* is_replicated */});
409✔
423
                        }
409✔
424
                }
2,396✔
425
        }
426
}
3,051✔
427

428
void command_graph_generator::update_local_buffer_fresh_regions(const task& tsk, const std::unordered_map<buffer_id, region<3>>& per_buffer_local_writes) {
3,048✔
429
        auto requirements = get_buffer_requirements_for_mapped_access(tsk, subrange<3>(tsk.get_global_offset(), tsk.get_global_size()), tsk.get_global_size());
3,048✔
430
        // Add requirements for reductions
431
        for(const auto& reduction : tsk.get_reductions()) {
3,220✔
432
                auto it = std::find_if(requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == reduction.bid; });
223✔
433
                if(it == requirements.end()) { it = requirements.insert(requirements.end(), buffer_requirements{reduction.bid, {}, {}}); }
172!
434
                it->produced = scalar_reduction_box;
172✔
435
        }
436
        for(auto& [bid, _, produced] : requirements) {
6,336✔
437
                region global_writes = produced;
3,288✔
438
                auto& buffer = m_buffers.at(bid);
3,288✔
439
                if(m_policy.uninitialized_read_error != error_policy::ignore) { buffer.initialized_region = region_union(buffer.initialized_region, global_writes); }
3,288✔
440

441
                const auto remote_writes = ([&, bid = bid] {
6,576✔
442
                        if(auto it = per_buffer_local_writes.find(bid); it != per_buffer_local_writes.end()) {
3,288✔
443
                                const auto& local_writes = it->second;
2,276✔
444
                                assert(region_difference(local_writes, global_writes).empty()); // Local writes have to be a subset of global writes
2,276✔
445
                                return region_difference(global_writes, local_writes);
2,276✔
446
                        }
447
                        return std::move(global_writes);
1,012✔
448
                })(); // IIFE
3,288✔
449

450
                // TODO: We need a way of updating regions in place! E.g. apply_to_values(box, callback)
451
                auto boxes_and_cids = buffer.local_last_writer.get_region_values(remote_writes);
3,288✔
452
                for(auto& [box, wcs] : boxes_and_cids) {
4,214✔
453
                        if(wcs.is_fresh()) {
926✔
454
                                wcs.mark_as_stale();
626✔
455
                                buffer.local_last_writer.update_region(box, wcs);
626✔
456
                        }
457
                }
458
        }
3,288✔
459
}
6,096✔
460

461
void command_graph_generator::generate_distributed_commands(const task& tsk) {
3,055✔
462
        const auto chunks = split_task_and_assign_chunks(tsk);
3,055✔
463
        const auto chunks_with_requirements = compute_per_chunk_requirements(tsk, chunks);
3,055✔
464

465
        // Check for and report overlapping writes between local chunks, and between local and remote chunks.
466
        if(m_policy.overlapping_write_error != error_policy::ignore) {
3,055✔
467
                box_vector<3> local_chunks;
3,051✔
468
                for(const auto& [a_chunk, _] : chunks_with_requirements.local_chunks) {
5,900✔
469
                        local_chunks.push_back(box<3>{a_chunk.chnk});
2,849✔
470
                }
471
                report_overlapping_writes(tsk, local_chunks);
3,051✔
472
        }
3,051✔
473

474
        resolve_pending_reductions(tsk, chunks_with_requirements);
3,053✔
475
        generate_pushes(tsk, chunks_with_requirements);
3,051✔
476
        generate_await_pushes(tsk, chunks_with_requirements);
3,051✔
477

478
        // Union of all per-buffer writes on this node, used to determine which parts of a buffer are fresh/stale later on.
479
        std::unordered_map<buffer_id, region<3>> per_buffer_local_writes;
3,051✔
480

481
        // Create command for each local chunk and resolve local data dependencies.
482
        for(const auto& [a_chunk, requirements] : chunks_with_requirements.local_chunks) {
5,897✔
483
                abstract_command* cmd = nullptr;
2,849✔
484
                if(tsk.get_type() == task_type::fence) {
2,849✔
485
                        cmd = create_command<fence_command>(
88✔
486
                            &tsk, [&](const auto& record_debug_info) { record_debug_info(tsk, [this](const buffer_id bid) { return m_buffers.at(bid).debug_name; }); });
130✔
487
                } else {
488
                        cmd = create_command<execution_command>(&tsk, subrange{a_chunk.chnk},
5,522✔
489
                            [&](const auto& record_debug_info) { record_debug_info(tsk, [this](const buffer_id bid) { return m_buffers.at(bid).debug_name; }); });
8,005✔
490

491
                        // Go over all reductions that are to be performed *during* the execution of this chunk,
492
                        // not to be confused with any pending reductions that need to be finalized *before* the
493
                        // execution of this chunk (those have already been handled by resolve_pending_reductions).
494
                        // If a reduction reads the previous value of the buffer (i.e. w/o property::reduction::initialize_to_identity),
495
                        // we have to include it in exactly one of the per-node intermediate reductions.
496
                        for(const auto& reduction : tsk.get_reductions()) {
2,913✔
497
                                if(m_local_nid == reduction_initializer_nid && reduction.init_from_buffer) {
166✔
498
                                        utils::as<execution_command>(cmd)->set_is_reduction_initializer(true);
14✔
499
                                        break;
14✔
500
                                }
501
                        }
502
                }
503

504
                if(tsk.get_type() == task_type::collective) {
2,849✔
505
                        // Collective host tasks have an implicit dependency on the previous task in the same collective group,
506
                        // which is required in order to guarantee they are executed in the same order on every node.
507
                        auto cgid = tsk.get_collective_group_id();
76✔
508
                        if(auto prev = m_last_collective_commands.find(cgid); prev != m_last_collective_commands.end()) {
76✔
509
                                add_dependency(cmd, m_cdag.get(prev->second), dependency_kind::true_dep, dependency_origin::collective_group_serialization);
20✔
510
                                m_last_collective_commands.erase(prev);
20✔
511
                        }
512
                        m_last_collective_commands.emplace(cgid, cmd->get_cid());
76✔
513
                }
514

515
                for(const auto& [bid, consumed, produced] : requirements) {
5,943✔
516
                        auto& buffer = m_buffers.at(bid);
3,097✔
517

518
                        // Process consuming accesses first, so we don't add dependencies onto our own writes
519
                        if(!consumed.empty()) {
3,097✔
520
                                for(const auto& [box, wcs] : buffer.local_last_writer.get_region_values(consumed)) {
6,062✔
521
                                        if(box.empty()) continue;
3,666!
522
                                        assert(wcs.is_fresh() && "Unresolved remote data dependency");
3,666✔
523
                                        add_dependency(cmd, m_cdag.get(wcs), dependency_kind::true_dep, dependency_origin::dataflow);
3,666✔
524
                                }
2,396✔
525

526
                                // Store the read access for determining anti-dependencies later on
527
                                m_command_buffer_reads[cmd->get_cid()].emplace(bid, consumed);
2,396✔
528
                        }
529

530
                        if(!produced.empty()) {
3,097✔
531
                                generate_anti_dependencies(tsk, bid, buffer.local_last_writer, produced, cmd);
2,276✔
532

533
                                // Update last writer
534
                                buffer.local_last_writer.update_region(produced, cmd->get_cid());
2,276✔
535
                                buffer.replicated_regions.update_region(produced, node_bitset{});
2,276✔
536

537
                                // In case this buffer was in a pending reduction state we discarded the result and need to remove the pending reduction.
538
                                if(buffer.pending_reduction.has_value()) {
2,276✔
539
                                        m_completed_reductions.push_back(buffer.pending_reduction->rid);
1✔
540
                                        buffer.pending_reduction = std::nullopt;
1✔
541
                                }
542

543
                                per_buffer_local_writes.emplace(bid, produced);
2,276✔
544
                        }
545

546
                        if(m_policy.uninitialized_read_error != error_policy::ignore
4,102✔
547
                            && !bounding_box(buffer.initialized_region).covers(bounding_box(consumed.get_boxes()))) {
4,102✔
548
                                utils::report_error(m_policy.uninitialized_read_error,
15✔
549
                                    "Command C{} on N{}, which executes {} of {}, reads {} {}, which has not been written by any node.", cmd->get_cid(), m_local_nid,
6✔
550
                                    box(subrange(a_chunk.chnk.offset, a_chunk.chnk.range)), print_task_debug_label(tsk), print_buffer_debug_label(bid),
27✔
551
                                    region_difference(consumed, buffer.initialized_region));
9✔
552
                        }
553
                }
554
        }
555

556
        // Mark any buffers that now are in a pending reduction state as such.
557
        // If there is only one chunk/command, it already implicitly generates the final reduced value
558
        // and the buffer does not need to be flagged as a pending reduction.
559
        for(const auto& reduction : tsk.get_reductions()) {
3,220✔
560
                if(chunks.size() > 1) {
172✔
561
                        m_buffers.at(reduction.bid).pending_reduction = reduction;
136✔
562
                } else {
563
                        m_completed_reductions.push_back(reduction.rid);
36✔
564
                }
565
        }
566

567
        update_local_buffer_fresh_regions(tsk, per_buffer_local_writes);
3,048✔
568
        process_task_side_effect_requirements(tsk);
3,048✔
569
}
6,113✔
570

571
void command_graph_generator::generate_anti_dependencies(const task& tsk, const buffer_id bid, const region_map<write_command_state>& last_writers_map,
2,753✔
572
    const region<3>& write_req, abstract_command* const write_cmd) //
573
{
574
        const auto last_writers = last_writers_map.get_region_values(write_req);
2,753✔
575
        for(const auto& [box, wcs] : last_writers) {
5,632✔
576
                auto* const last_writer_cmd = m_cdag.get(static_cast<command_id>(wcs));
2,879✔
577
                assert(!utils::isa<task_command>(last_writer_cmd) || utils::as<task_command>(last_writer_cmd)->get_task() != &tsk);
2,879✔
578

579
                // Add anti-dependencies onto all successors of the writer
580
                bool has_successors = false;
2,879✔
581
                for(auto d : last_writer_cmd->get_dependents()) {
7,355✔
582
                        // Only consider true dependencies
583
                        if(d.kind != dependency_kind::true_dep) continue;
4,476✔
584

585
                        auto* const cmd = d.node;
4,157✔
586

587
                        // We might have already generated new commands within the same task that also depend on this; in that case, skip it
588
                        if(utils::isa<task_command>(cmd) && utils::as<task_command>(cmd)->get_task() == &tsk) continue;
4,157✔
589

590
                        // So far we don't know whether the dependent actually intersects with the subrange we're writing
591
                        if(const auto command_reads_it = m_command_buffer_reads.find(cmd->get_cid()); command_reads_it != m_command_buffer_reads.end()) {
2,522✔
592
                                const auto& command_reads = command_reads_it->second;
1,083✔
593
                                // The task might be a dependent because of another buffer
594
                                if(const auto buffer_reads_it = command_reads.find(bid); buffer_reads_it != command_reads.end()) {
1,083✔
595
                                        if(!region_intersection(write_req, buffer_reads_it->second).empty()) {
1,040✔
596
                                                has_successors = true;
887✔
597
                                                add_dependency(write_cmd, cmd, dependency_kind::anti_dep, dependency_origin::dataflow);
887✔
598
                                        }
599
                                }
600
                        }
601
                }
602

603
                // In some cases (horizons, master node host task, weird discard_* constructs...)
604
                // the last writer might not have any successors. Just add the anti-dependency onto the writer itself then.
605
                if(!has_successors) { add_dependency(write_cmd, last_writer_cmd, dependency_kind::anti_dep, dependency_origin::dataflow); }
2,879✔
606
        }
607
}
5,506✔
608

609
void command_graph_generator::process_task_side_effect_requirements(const task& tsk) {
3,048✔
610
        const task_id tid = tsk.get_id();
3,048✔
611
        if(tsk.get_side_effect_map().empty()) return; // skip the loop in the common case
3,048✔
612
        if(m_cdag.task_command_count(tid) == 0) return;
212✔
613

614
        for(auto* const cmd : m_cdag.task_commands(tid)) {
280✔
615
                for(const auto& side_effect : tsk.get_side_effect_map()) {
291✔
616
                        const auto [hoid, order] = side_effect;
151✔
617
                        auto& host_object = m_host_objects.at(hoid);
151✔
618

619
                        if(host_object.last_side_effect.has_value()) {
151!
620
                                // TODO once we have different side_effect_orders, their interaction will determine the dependency kind
621
                                add_dependency(cmd, m_cdag.get(*host_object.last_side_effect), dependency_kind::true_dep, dependency_origin::dataflow);
151✔
622
                        }
623

624
                        // Simplification: If there are multiple chunks per node, we generate true-dependencies between them in an arbitrary order, when all we really
625
                        // need is mutual exclusion (i.e. a bi-directional pseudo-dependency).
626
                        host_object.last_side_effect = cmd->get_cid();
151✔
627
                }
628
        }
629
}
630

631
void command_graph_generator::set_epoch_for_new_commands(const abstract_command* const epoch_or_horizon) {
2,052✔
632
        // both an explicit epoch command and an applied horizon can be effective epochs
633
        assert(utils::isa<epoch_command>(epoch_or_horizon) || utils::isa<horizon_command>(epoch_or_horizon));
2,052✔
634

635
        for(auto& [bid, bs] : m_buffers) {
2,999✔
636
                bs.local_last_writer.apply_to_values([epoch_or_horizon](const write_command_state& wcs) {
947✔
637
                        auto new_wcs = write_command_state(std::max(epoch_or_horizon->get_cid(), static_cast<command_id>(wcs)), wcs.is_replicated());
2,064✔
638
                        if(!wcs.is_fresh()) new_wcs.mark_as_stale();
2,064✔
639
                        return new_wcs;
2,064✔
640
                });
641
        }
642
        for(auto& [cgid, cid] : m_last_collective_commands) {
2,134✔
643
                cid = std::max(epoch_or_horizon->get_cid(), cid);
82✔
644
        }
645
        for(auto& [_, host_object] : m_host_objects) {
2,119✔
646
                if(host_object.last_side_effect.has_value()) { host_object.last_side_effect = std::max(epoch_or_horizon->get_cid(), *host_object.last_side_effect); }
67!
647
        }
648

649
        m_epoch_for_new_commands = epoch_or_horizon->get_cid();
2,052✔
650
}
2,052✔
651

652
void command_graph_generator::reduce_execution_front_to(abstract_command* const new_front) {
2,114✔
653
        const auto previous_execution_front = m_cdag.get_execution_front();
2,114✔
654
        for(auto* const front_cmd : previous_execution_front) {
6,763✔
655
                if(front_cmd != new_front) { add_dependency(new_front, front_cmd, dependency_kind::true_dep, dependency_origin::execution_front); }
4,649✔
656
        }
657
        assert(m_cdag.get_execution_front().size() == 1 && *m_cdag.get_execution_front().begin() == new_front);
2,114✔
658
}
4,228✔
659

660
void command_graph_generator::generate_epoch_command(const task& tsk) {
1,306✔
661
        assert(tsk.get_type() == task_type::epoch);
1,306✔
662
        auto* const epoch = create_command<epoch_command>(
2,612✔
663
            &tsk, tsk.get_epoch_action(), std::move(m_completed_reductions), [&](const auto& record_debug_info) { record_debug_info(tsk); });
1,903✔
664
        set_epoch_for_new_commands(epoch);
1,306✔
665
        m_current_horizon = no_command;
1,306✔
666
        // Make the epoch depend on the previous execution front
667
        reduce_execution_front_to(epoch);
1,306✔
668
}
1,306✔
669

670
void command_graph_generator::generate_horizon_command(const task& tsk) {
808✔
671
        assert(tsk.get_type() == task_type::horizon);
808✔
672
        auto* const horizon =
673
            create_command<horizon_command>(&tsk, std::move(m_completed_reductions), [&](const auto& record_debug_info) { record_debug_info(tsk); });
1,047✔
674

675
        if(m_current_horizon != static_cast<command_id>(no_command)) {
808✔
676
                // Apply the previous horizon
677
                set_epoch_for_new_commands(m_cdag.get(m_current_horizon));
746✔
678
        }
679
        m_current_horizon = horizon->get_cid();
808✔
680

681
        // Make the horizon depend on the previous execution front
682
        reduce_execution_front_to(horizon);
808✔
683
}
808✔
684

685
void command_graph_generator::generate_epoch_dependencies(abstract_command* cmd) {
5,441✔
686
        // No command must be re-ordered before its last preceding epoch to enforce the barrier semantics of epochs.
687
        // To guarantee that each node has a transitive true dependency (=temporal dependency) on the epoch, it is enough to add an epoch -> command dependency
688
        // to any command that has no other true dependencies itself and no graph traversal is necessary. This can be verified by a simple induction proof.
689

690
        // As long the first epoch is present in the graph, all transitive dependencies will be visible and the initial epoch commands (tid 0) are the only
691
        // commands with no true predecessor. As soon as the first epoch is pruned through the horizon mechanism however, more than one node with no true
692
        // predecessor can appear (when visualizing the graph). This does not violate the ordering constraint however, because all "free-floating" nodes
693
        // in that snapshot had a true-dependency chain to their predecessor epoch at the point they were flushed, which is sufficient for following the
694
        // dependency chain from the executor perspective.
695

696
        if(const auto deps = cmd->get_dependencies();
5,441✔
697
            std::none_of(deps.begin(), deps.end(), [](const abstract_command::dependency d) { return d.kind == dependency_kind::true_dep; })) {
10,204✔
698
                if(!utils::isa<epoch_command>(cmd) || utils::as<epoch_command>(cmd)->get_epoch_action() != epoch_action::init) {
1,711!
699
                        assert(cmd->get_cid() != m_epoch_for_new_commands);
1,127✔
700
                        add_dependency(cmd, m_cdag.get(m_epoch_for_new_commands), dependency_kind::true_dep, dependency_origin::last_epoch);
1,127✔
701
                }
702
        }
703
}
5,441✔
704

705
void command_graph_generator::prune_commands_before(const command_id epoch) {
5,162✔
706
        if(epoch > m_epoch_last_pruned_before) {
5,162✔
707
                m_cdag.erase_if([&](abstract_command* cmd) {
1,082✔
708
                        if(cmd->get_cid() < epoch) {
8,832✔
709
                                m_command_buffer_reads.erase(cmd->get_cid());
3,932✔
710
                                return true;
3,932✔
711
                        }
712
                        return false;
4,900✔
713
                });
714
                m_epoch_last_pruned_before = epoch;
1,082✔
715
        }
716
}
5,162✔
717

718
std::string command_graph_generator::print_buffer_debug_label(const buffer_id bid) const {
19✔
719
        return utils::make_buffer_debug_label(bid, m_buffers.at(bid).debug_name);
19✔
720
}
721

722
} // namespace celerity::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc