• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 11342347897

15 Oct 2024 08:36AM UTC coverage: 95.308% (+0.3%) from 95.051%
11342347897

push

github

psalz
Update benchmark results for command_graph_generator refactor

2963 of 3344 branches covered (88.61%)

Branch coverage included in aggregate %.

6646 of 6738 relevant lines covered (98.63%)

1517300.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.96
/src/command_graph_generator.cc
1
#include "command_graph_generator.h"
2

3
#include "access_modes.h"
4
#include "command.h"
5
#include "command_graph.h"
6
#include "recorders.h"
7
#include "split.h"
8
#include "task.h"
9
#include "task_manager.h"
10

11
namespace celerity::detail {
12

13
command_graph_generator::command_graph_generator(
561✔
14
    const size_t num_nodes, const node_id local_nid, command_graph& cdag, const task_manager& tm, detail::command_recorder* recorder, const policy_set& policy)
561✔
15
    : m_num_nodes(num_nodes), m_local_nid(local_nid), m_policy(policy), m_cdag(cdag), m_task_mngr(tm), m_recorder(recorder) {
561✔
16
        if(m_num_nodes > max_num_nodes) {
561!
17
                throw std::runtime_error(fmt::format("Number of nodes requested ({}) exceeds compile-time maximum of {}", m_num_nodes, max_num_nodes));
×
18
        }
19

20
        // Build initial epoch command (this is required to properly handle anti-dependencies on host-initialized buffers).
21
        // We manually generate the first command so it doesn't get added to the current batch; it will be replaced by applied horizons
22
        // or explicit epochs down the line (see set_epoch_for_new_commands).
23
        auto* const epoch_cmd = cdag.create<epoch_command>(task_manager::initial_epoch_task, epoch_action::none, std::vector<reduction_id>{});
561✔
24
        if(is_recording()) { m_recorder->record_command(std::make_unique<epoch_command_record>(*epoch_cmd, *tm.get_task(task_manager::initial_epoch_task))); }
561✔
25
        m_epoch_for_new_commands = epoch_cmd->get_cid();
561✔
26
}
561✔
27

28
void command_graph_generator::notify_buffer_created(const buffer_id bid, const range<3>& range, bool host_initialized) {
663✔
29
        m_buffers.emplace(std::piecewise_construct, std::tuple{bid}, std::tuple{range, range});
663✔
30
        if(host_initialized && m_policy.uninitialized_read_error != error_policy::ignore) { m_buffers.at(bid).initialized_region = box(subrange({}, range)); }
735✔
31
        // Mark contents as available locally (= don't generate await push commands) and fully replicated (= don't generate push commands).
32
        // This is required when tasks access host-initialized or uninitialized buffers.
33
        m_buffers.at(bid).local_last_writer.update_region(subrange<3>({}, range), m_epoch_for_new_commands);
1,989✔
34
        m_buffers.at(bid).replicated_regions.update_region(subrange<3>({}, range), node_bitset{}.set());
1,989✔
35
}
663✔
36

37
void command_graph_generator::notify_buffer_debug_name_changed(const buffer_id bid, const std::string& debug_name) {
23✔
38
        m_buffers.at(bid).debug_name = debug_name;
23✔
39
}
23✔
40

41
void command_graph_generator::notify_buffer_destroyed(const buffer_id bid) {
493✔
42
        assert(m_buffers.count(bid) != 0);
493✔
43
        m_buffers.erase(bid);
493✔
44
}
493✔
45

46
void command_graph_generator::notify_host_object_created(const host_object_id hoid) {
62✔
47
        assert(m_host_objects.count(hoid) == 0);
62✔
48
        m_host_objects.emplace(hoid, host_object_state{m_epoch_for_new_commands});
62✔
49
}
62✔
50

51
void command_graph_generator::notify_host_object_destroyed(const host_object_id hoid) {
49✔
52
        assert(m_host_objects.count(hoid) != 0);
49✔
53
        m_host_objects.erase(hoid);
49✔
54
}
49✔
55

56
/// Returns whether an iterator range of commands is topologically sorted, i.e. sequential execution would satisfy all internal dependencies.
57
template <typename Iterator>
58
bool is_topologically_sorted(Iterator begin, Iterator end) {
6,831✔
59
        for(auto check = begin; check != end; ++check) {
14,725✔
60
                for(const auto dep : (*check)->get_dependencies()) {
20,573✔
61
                        if(std::find_if(std::next(check), end, [dep](const auto& node) { return node == dep.node; }) != end) return false;
27,995!
62
                }
63
        }
64
        return true;
6,831✔
65
}
66

67
std::vector<abstract_command*> command_graph_generator::build_task(const task& tsk) {
6,837✔
68
        assert(m_current_cmd_batch.empty());
6,837✔
69
        [[maybe_unused]] const auto cmd_count_before = m_cdag.command_count();
6,837✔
70

71
        const auto epoch_to_prune_before = m_epoch_for_new_commands;
6,837✔
72

73
        switch(tsk.get_type()) {
6,837!
74
        case task_type::epoch: generate_epoch_command(tsk); break;
728✔
75
        case task_type::horizon: generate_horizon_command(tsk); break;
1,061✔
76
        case task_type::device_compute:
5,048✔
77
        case task_type::host_compute:
78
        case task_type::master_node:
79
        case task_type::collective:
80
        case task_type::fence: generate_distributed_commands(tsk); break;
5,048✔
81
        default: throw std::runtime_error("Task type NYI");
×
82
        }
83

84
        // It is currently undefined to split reduction-producer tasks into multiple chunks on the same node:
85
        //   - Per-node reduction intermediate results are stored with fixed access to a single backing buffer,
86
        //     so multiple chunks on the same node will race on this buffer access
87
        //   - Inputs to the final reduction command are ordered by origin node ids to guarantee bit-identical results. It is not possible to distinguish
88
        //     more than one chunk per node in the serialized commands, so different nodes can produce different final reduction results for non-associative
89
        //     or non-commutative operations
90
        if(!tsk.get_reductions().empty()) { assert(m_cdag.task_command_count(tsk.get_id()) <= 1); }
6,831!
91

92
        // Commands without any other true-dependency must depend on the active epoch command to ensure they cannot be re-ordered before the epoch.
93
        // Need to check count b/c for some tasks we may not have generated any commands locally.
94
        if(m_cdag.task_command_count(tsk.get_id()) > 0) {
6,831✔
95
                for(auto* const cmd : m_cdag.task_commands(tsk.get_id())) {
13,258✔
96
                        generate_epoch_dependencies(cmd);
6,629✔
97
                }
98
        }
99

100
        // Check that all commands have been created through create_command
101
        assert(m_cdag.command_count() - cmd_count_before == m_current_cmd_batch.size());
6,831✔
102

103
        // If a new epoch was completed in the CDAG before the current task, prune all predecessor commands of that epoch.
104
        prune_commands_before(epoch_to_prune_before);
6,831✔
105

106
        // Check that all commands have been recorded
107
        if(is_recording()) {
6,831✔
108
                assert(std::all_of(m_current_cmd_batch.begin(), m_current_cmd_batch.end(), [this](const abstract_command* cmd) {
50,360✔
109
                        return std::any_of(m_recorder->get_graph_nodes().begin(), m_recorder->get_graph_nodes().end(),
110
                            [cmd](const std::unique_ptr<command_record>& rec) { return rec->id == cmd->get_cid(); });
111
                }));
112
        }
113

114
        assert(is_topologically_sorted(m_current_cmd_batch.begin(), m_current_cmd_batch.end()));
6,831✔
115
        return std::move(m_current_cmd_batch);
13,662✔
116
}
117

118
void command_graph_generator::report_overlapping_writes(const task& tsk, const box_vector<3>& local_chunks) const {
5,044✔
119
        const chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()};
5,044✔
120

121
        // Since this check is run distributed on every node, we avoid quadratic behavior by only checking for conflicts between all local chunks and the
122
        // region-union of remote chunks. This way, every conflict will be reported by at least one node.
123
        const box<3> global_chunk(subrange(full_chunk.offset, full_chunk.range));
5,044✔
124
        auto remote_chunks = region_difference(global_chunk, region(box_vector<3>(local_chunks))).into_boxes();
5,044✔
125

126
        // detect_overlapping_writes takes a single box_vector, so we concatenate local and global chunks (the order does not matter)
127
        auto distributed_chunks = std::move(remote_chunks);
5,044✔
128
        distributed_chunks.insert(distributed_chunks.end(), local_chunks.begin(), local_chunks.end());
5,044✔
129

130
        if(const auto overlapping_writes = detect_overlapping_writes(tsk, distributed_chunks); !overlapping_writes.empty()) {
5,044✔
131
                auto error = fmt::format("{} has overlapping writes between multiple nodes in", print_task_debug_label(tsk, true /* title case */));
28✔
132
                for(const auto& [bid, overlap] : overlapping_writes) {
28✔
133
                        fmt::format_to(std::back_inserter(error), " {} {}", print_buffer_debug_label(bid), overlap);
28✔
134
                }
135
                error += ". Choose a non-overlapping range mapper for this write access or constrain the split via experimental::constrain_split to make the access "
136
                         "non-overlapping.";
14✔
137
                utils::report_error(m_policy.overlapping_write_error, "{}", error);
14✔
138
        }
5,058✔
139
}
10,088✔
140

141
std::vector<command_graph_generator::assigned_chunk> command_graph_generator::split_task_and_assign_chunks(const task& tsk) const {
5,048✔
142
        const chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()};
5,048✔
143
        const size_t num_chunks = m_num_nodes * 1; // TODO Make configurable
5,048✔
144
        const auto chunks = ([&] {
10,096✔
145
                if(tsk.get_type() == task_type::collective || tsk.get_type() == task_type::fence) {
5,048✔
146
                        std::vector<chunk<3>> chunks;
164✔
147
                        for(size_t nid = 0; nid < m_num_nodes; ++nid) {
579✔
148
                                chunks.push_back(chunk_cast<3>(chunk<1>{id<1>{tsk.get_type() == task_type::collective ? nid : 0}, ones, {m_num_nodes}}));
415✔
149
                        }
150
                        return chunks;
164✔
151
                }
164✔
152
                if(tsk.has_variable_split()) {
4,884✔
153
                        if(tsk.get_hint<experimental::hints::split_1d>() != nullptr) {
1,464✔
154
                                // no-op, keeping this for documentation purposes
155
                        }
156
                        if(tsk.get_hint<experimental::hints::split_2d>() != nullptr) { return split_2d(full_chunk, tsk.get_granularity(), num_chunks); }
1,464✔
157
                        return split_1d(full_chunk, tsk.get_granularity(), num_chunks);
2,902✔
158
                }
159
                return std::vector<chunk<3>>{full_chunk};
10,260✔
160
        })();
5,048✔
161
        assert(chunks.size() <= num_chunks); // We may have created less than requested
5,048✔
162
        assert(!chunks.empty());
5,048✔
163

164
        // Assign each chunk to a node
165
        // We assign chunks next to each other to the same worker (if there is more chunks than workers), as this is likely to produce less
166
        // transfers between tasks than a round-robin assignment (for typical stencil codes).
167
        // FIXME: This only works if the number of chunks is an integer multiple of the number of workers, e.g. 3 chunks for 2 workers degrades to RR.
168
        const auto chunks_per_node = std::max<size_t>(1, chunks.size() / m_num_nodes);
5,048✔
169

170
        std::vector<assigned_chunk> assigned_chunks;
5,048✔
171
        for(size_t i = 0; i < chunks.size(); ++i) {
11,930✔
172
                const node_id nid = (i / chunks_per_node) % m_num_nodes;
6,882✔
173
                assigned_chunks.push_back({nid, chunks[i]});
6,882✔
174
        }
175
        return assigned_chunks;
10,096✔
176
}
5,048✔
177

178
command_graph_generator::buffer_requirements_list command_graph_generator::get_buffer_requirements_for_mapped_access(
11,924✔
179
    const task& tsk, const subrange<3>& sr, const range<3> global_size) const {
180
        buffer_requirements_list result;
11,924✔
181
        const auto& access_map = tsk.get_buffer_access_map();
11,924✔
182
        for(const buffer_id bid : access_map.get_accessed_buffers()) {
22,153✔
183
                buffer_requirements reqs{bid, {}, {}};
10,229✔
184
                for(const auto m : access_map.get_access_modes(bid)) {
21,116✔
185
                        if(detail::access::mode_traits::is_consumer(m)) {
10,887✔
186
                                reqs.consumed = region_union(reqs.consumed, access_map.get_mode_requirements(bid, m, tsk.get_dimensions(), sr, global_size));
8,715✔
187
                        }
188
                        // Not else: `access_mode::write` is both a consumer and a producer
189
                        if(detail::access::mode_traits::is_producer(m)) {
10,887✔
190
                                reqs.produced = region_union(reqs.produced, access_map.get_mode_requirements(bid, m, tsk.get_dimensions(), sr, global_size));
7,410✔
191
                        }
192
                }
10,229✔
193
                result.emplace_back(std::move(reqs));
10,229✔
194
        }
22,153✔
195
        return result;
11,924✔
196
}
×
197

198
const box<3> empty_reduction_box({0, 0, 0}, {0, 0, 0});
199
const box<3> scalar_reduction_box({0, 0, 0}, {1, 1, 1});
200

201
command_graph_generator::assigned_chunks_with_requirements command_graph_generator::compute_per_chunk_requirements(
5,048✔
202
    const task& tsk, const std::vector<assigned_chunk>& assigned_chunks) const {
203
        assigned_chunks_with_requirements result;
5,048✔
204

205
        for(const auto& a_chunk : assigned_chunks) {
11,930✔
206
                const node_id nid = a_chunk.executed_on;
6,882✔
207
                auto requirements = get_buffer_requirements_for_mapped_access(tsk, a_chunk.chnk, tsk.get_global_size());
6,882✔
208

209
                // Add read/write requirements for reductions performed in this task.
210
                for(const auto& reduction : tsk.get_reductions()) {
7,314✔
211
                        // task_manager verifies that there are no reduction <-> write-access conflicts
212
                        assert(std::none_of(
584✔
213
                            requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == reduction.bid && !br.produced.empty(); }));
214
                        auto it = std::find_if(requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == reduction.bid; });
584✔
215
                        if(it == requirements.end()) { it = requirements.insert(requirements.end(), buffer_requirements{reduction.bid, {}, {}}); }
432!
216
                        it->produced = scalar_reduction_box;
432✔
217
                        if(nid == reduction_initializer_nid && reduction.init_from_buffer) { it->consumed = scalar_reduction_box; }
432✔
218
                }
219

220
                if(nid == m_local_nid) {
6,882✔
221
                        result.local_chunks.emplace_back(a_chunk, std::move(requirements));
4,846✔
222
                } else {
223
                        result.remote_chunks.emplace_back(a_chunk, std::move(requirements));
2,036✔
224
                }
225
        }
6,882✔
226

227
        return result;
5,048✔
228
}
×
229

230
void command_graph_generator::resolve_pending_reductions(const task& tsk, const assigned_chunks_with_requirements& chunks_with_requirements) {
5,046✔
231
        auto accessed_buffers = tsk.get_buffer_access_map().get_accessed_buffers();
5,046✔
232
        // Handle chained reductions (i.e., reductions that combine into a buffer that currently is in a pending reduction state)
233
        for(const auto& reduction : tsk.get_reductions()) {
5,210✔
234
                accessed_buffers.insert(reduction.bid);
164✔
235
        }
236

237
        for(const auto bid : accessed_buffers) {
9,284✔
238
                auto& buffer = m_buffers.at(bid);
4,239✔
239
                if(!buffer.pending_reduction.has_value()) { continue; }
4,239✔
240
                const auto& reduction = *buffer.pending_reduction;
126✔
241

242
                const auto local_last_writer = buffer.local_last_writer.get_region_values(scalar_reduction_box);
126✔
243
                assert(local_last_writer.size() == 1);
126✔
244
                // Prepare the buffer state for after the reduction has been performed:
245
                // Keep the current last writer, but mark it as stale, so that if we don't generate a reduction command locally,
246
                // we'll know to get the data from elsewhere. If we generate a reduction command, this will be overwritten by its command id.
247
                write_command_state wcs{static_cast<command_id>(local_last_writer[0].second)};
126✔
248
                wcs.mark_as_stale();
126✔
249
                buffer_state post_reduction_state{region_map<write_command_state>{ones, wcs}, region_map<node_bitset>{ones, node_bitset{}}};
126✔
250
                if(m_policy.uninitialized_read_error != error_policy::ignore) { post_reduction_state.initialized_region = scalar_reduction_box; }
126✔
251

252
                size_t number_of_participating_nodes = 0;
126✔
253

254
                // Since the local reduction command overwrites the buffer contents that need to be pushed to other nodes, we need to process remote chunks first.
255
                for(const auto& [a_chunk, requirements] : chunks_with_requirements.remote_chunks) {
278✔
256
                        if(std::none_of(requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == bid && !br.consumed.empty(); })) {
326✔
257
                                // This chunk doesn't read from the buffer
258
                                continue;
26✔
259
                        }
260
                        const node_id nid = a_chunk.executed_on;
126✔
261
                        const bool notification_only = !local_last_writer[0].second.is_fresh();
126✔
262

263
                        // Push an empty range if we don't have any fresh data on this node. This will then generate an empty pilot that tells the
264
                        // other node's receive_arbiter to not expect a send.
265
                        const auto push_box = notification_only ? empty_reduction_box : scalar_reduction_box;
126✔
266

267
                        auto* const push_cmd = create_command<push_command>(nid, transfer_id(tsk.get_id(), bid, reduction.rid), push_box.get_subrange(),
252✔
268
                            [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
472✔
269

270
                        if(notification_only) {
126✔
271
                                generate_epoch_dependencies(push_cmd);
6✔
272
                        } else {
273
                                m_command_buffer_reads[push_cmd->get_cid()][bid] = region_union(m_command_buffer_reads[push_cmd->get_cid()][bid], scalar_reduction_box);
120✔
274
                                add_dependency(push_cmd, m_cdag.get(local_last_writer[0].second), dependency_kind::true_dep, dependency_origin::dataflow);
120✔
275
                        }
276

277
                        // Mark the reduction result as replicated so we don't generate data transfers to this node
278
                        // TODO: We need a way of updating regions in place! E.g. apply_to_values(box, callback)
279
                        const auto replicated_box = post_reduction_state.replicated_regions.get_region_values(scalar_reduction_box);
126✔
280
                        assert(replicated_box.size() == 1);
126✔
281
                        for(const auto& [_, nodes] : replicated_box) {
252✔
282
                                post_reduction_state.replicated_regions.update_box(scalar_reduction_box, node_bitset{nodes}.set(nid));
126✔
283
                        }
284
                        number_of_participating_nodes++; // This node is participating
126✔
285
                }
126✔
286

287
                // We currently don't support multiple chunks on a single node for reductions (there is also -- for now -- no way to create multiple chunks,
288
                // as oversubscription is handled by the instruction graph).
289
                // NOTE: The number_of_participating_nodes check below relies on this being true
290
                assert(chunks_with_requirements.local_chunks.size() <= 1);
126✔
291
                for(const auto& [a_chunk, requirements] : chunks_with_requirements.local_chunks) {
205✔
292
                        if(std::none_of(requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == bid && !br.consumed.empty(); })) {
168✔
293
                                // This chunk doesn't read from the buffer
294
                                continue;
13✔
295
                        }
296

297
                        auto* const ap_cmd = create_command<await_push_command>(transfer_id(tsk.get_id(), bid, reduction.rid), scalar_reduction_box.get_subrange(),
132✔
298
                            [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
248✔
299
                        generate_epoch_dependencies(ap_cmd);
66✔
300

301
                        auto* const reduce_cmd = create_command<reduction_command>(reduction, local_last_writer[0].second.is_fresh() /* has_local_contribution */,
66✔
302
                            [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
182✔
303

304
                        // Only generate a true dependency on the last writer if this node participated in the intermediate result computation.
305
                        if(local_last_writer[0].second.is_fresh()) {
66✔
306
                                add_dependency(reduce_cmd, m_cdag.get(local_last_writer[0].second), dependency_kind::true_dep, dependency_origin::dataflow);
64✔
307
                        }
308
                        add_dependency(reduce_cmd, ap_cmd, dependency_kind::true_dep, dependency_origin::dataflow);
66✔
309
                        generate_anti_dependencies(tsk.get_id(), bid, buffer.local_last_writer, scalar_reduction_box, reduce_cmd);
66✔
310

311
                        post_reduction_state.local_last_writer.update_box(scalar_reduction_box, reduce_cmd->get_cid());
66✔
312
                        number_of_participating_nodes++; // We are participating
66✔
313
                }
314

315
                // We currently do not support generating reduction commands on only a subset of nodes, except for the special case of a single command.
316
                // This is because it is unclear who owns the final result in this case (normally all nodes "own" the result).
317
                //   => I.e., reducing and using the result on the participating nodes is actually not the problem (this works as intended); the issue only arises
318
                //      if the result is subsequently required in other tasks. Since we don't have a good way of detecting this condition however, we currently
319
                //      disallow partial reductions altogether.
320
                // NOTE: This check relies on the fact that we currently only generate a single chunk per node for reductions (see assertion above).
321
                if(number_of_participating_nodes > 1 && number_of_participating_nodes != m_num_nodes) {
126✔
322
                        utils::report_error(error_policy::panic,
3✔
323
                            "{} requires a reduction on {} that is not performed on all nodes. This is currently not supported. Either "
324
                            "ensure that all nodes receive a chunk that reads from the buffer, or reduce the data on a single node.",
325
                            print_task_debug_label(tsk, true /* title case */), print_buffer_debug_label(bid));
6✔
326
                }
327

328
                // For buffers that were in a pending reduction state and a reduction was generated
329
                // (i.e., the result was not discarded), set their new state.
330
                if(number_of_participating_nodes > 0) {
125✔
331
                        m_completed_reductions.push_back(reduction.rid);
119✔
332
                        buffer = std::move(post_reduction_state);
119✔
333
                }
334
        }
127✔
335
}
10,091✔
336

337
void command_graph_generator::generate_pushes(const task& tsk, const assigned_chunks_with_requirements& chunks_with_requirements) {
5,045✔
338
        for(auto& [a_chunk, requirements] : chunks_with_requirements.remote_chunks) {
7,078✔
339
                const node_id nid = a_chunk.executed_on;
2,033✔
340

341
                for(const auto& [bid, consumed, _] : requirements) {
4,573✔
342
                        if(consumed.empty()) continue;
2,540✔
343
                        auto& buffer = m_buffers.at(bid);
1,707✔
344

345
                        // We generate separate push command for each last writer command for now, possibly even multiple for partially already-replicated data.
346
                        // TODO: Can and/or should we consolidate?
347
                        const auto local_sources = buffer.local_last_writer.get_region_values(consumed);
1,707✔
348
                        for(const auto& [local_box, wcs] : local_sources) {
4,688✔
349
                                if(!wcs.is_fresh() || wcs.is_replicated()) { continue; }
2,981✔
350

351
                                // Make sure we don't push anything we've already pushed to this node before
352
                                box_vector<3> non_replicated_boxes;
803✔
353
                                for(const auto& [replicated_box, nodes] : buffer.replicated_regions.get_region_values(local_box)) {
1,704✔
354
                                        if(nodes.test(nid)) continue;
901✔
355
                                        non_replicated_boxes.push_back(replicated_box);
687✔
356
                                }
803✔
357

358
                                // Merge all connected boxes to determine final set of pushes
359
                                const auto push_region = region<3>(std::move(non_replicated_boxes));
803✔
360
                                for(const auto& push_box : push_region.get_boxes()) {
1,424✔
361
                                        auto* const push_cmd = create_command<push_command>(nid, transfer_id(tsk.get_id(), bid, no_reduction_id), push_box.get_subrange(),
1,242✔
362
                                            [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
1,943✔
363
                                        assert(!utils::isa<await_push_command>(m_cdag.get(wcs)) && "Attempting to push non-owned data?!");
621✔
364
                                        add_dependency(push_cmd, m_cdag.get(wcs), dependency_kind::true_dep, dependency_origin::dataflow);
621✔
365

366
                                        // Store the read access for determining anti-dependencies later on
367
                                        m_command_buffer_reads[push_cmd->get_cid()][bid] = push_box;
621✔
368
                                }
369

370
                                // Remember that we've replicated this region
371
                                for(const auto& [replicated_box, nodes] : buffer.replicated_regions.get_region_values(push_region)) {
1,490✔
372
                                        buffer.replicated_regions.update_box(replicated_box, node_bitset{nodes}.set(nid));
687✔
373
                                }
803✔
374
                        }
803✔
375
                }
1,707✔
376
        }
377
}
5,045✔
378

379
void command_graph_generator::generate_await_pushes(const task& tsk, const assigned_chunks_with_requirements& chunks_with_requirements) {
5,045✔
380
        for(auto& [a_chunk, requirements] : chunks_with_requirements.local_chunks) {
9,888✔
381
                for(auto& [bid, consumed, _] : requirements) {
8,887✔
382
                        if(consumed.empty()) continue;
4,044✔
383
                        auto& buffer = m_buffers.at(bid);
3,386✔
384

385
                        const auto local_sources = buffer.local_last_writer.get_region_values(consumed);
3,386✔
386
                        box_vector<3> missing_part_boxes;
3,386✔
387
                        for(const auto& [box, wcs] : local_sources) {
8,010✔
388
                                // Note that we initialize all buffers as fresh, so this doesn't trigger for uninitialized reads
389
                                if(!box.empty() && !wcs.is_fresh()) { missing_part_boxes.push_back(box); }
4,624!
390
                        }
391

392
                        // There is data we don't yet have locally. Generate an await push command for it.
393
                        if(!missing_part_boxes.empty()) {
3,386✔
394
                                const region missing_parts(std::move(missing_part_boxes));
390✔
395
                                assert(m_num_nodes > 1);
390✔
396
                                auto* const ap_cmd = create_command<await_push_command>(transfer_id(tsk.get_id(), bid, no_reduction_id), missing_parts,
390✔
397
                                    [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
854✔
398
                                generate_anti_dependencies(tsk.get_id(), bid, buffer.local_last_writer, missing_parts, ap_cmd);
390✔
399
                                generate_epoch_dependencies(ap_cmd);
390✔
400
                                // Remember that we have this data now
401
                                buffer.local_last_writer.update_region(missing_parts, {ap_cmd->get_cid(), true /* is_replicated */});
390✔
402
                        }
390✔
403
                }
3,386✔
404
        }
405
}
5,045✔
406

407
void command_graph_generator::update_local_buffer_fresh_regions(const task& tsk, const std::unordered_map<buffer_id, region<3>>& per_buffer_local_writes) {
5,042✔
408
        auto requirements = get_buffer_requirements_for_mapped_access(tsk, subrange<3>(tsk.get_global_offset(), tsk.get_global_size()), tsk.get_global_size());
5,042✔
409
        // Add requirements for reductions
410
        for(const auto& reduction : tsk.get_reductions()) {
5,206✔
411
                auto it = std::find_if(requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == reduction.bid; });
215✔
412
                if(it == requirements.end()) { it = requirements.insert(requirements.end(), buffer_requirements{reduction.bid, {}, {}}); }
164!
413
                it->produced = scalar_reduction_box;
164✔
414
        }
415
        for(auto& [bid, _, produced] : requirements) {
9,277✔
416
                region global_writes = produced;
4,235✔
417
                auto& buffer = m_buffers.at(bid);
4,235✔
418
                if(m_policy.uninitialized_read_error != error_policy::ignore) { buffer.initialized_region = region_union(buffer.initialized_region, global_writes); }
4,235✔
419

420
                const auto remote_writes = ([&, bid = bid] {
8,470✔
421
                        if(auto it = per_buffer_local_writes.find(bid); it != per_buffer_local_writes.end()) {
4,235✔
422
                                const auto& local_writes = it->second;
3,271✔
423
                                assert(region_difference(local_writes, global_writes).empty()); // Local writes have to be a subset of global writes
3,271✔
424
                                return region_difference(global_writes, local_writes);
3,271✔
425
                        }
426
                        return std::move(global_writes);
964✔
427
                })(); // IIFE
4,235✔
428

429
                // TODO: We need a way of updating regions in place! E.g. apply_to_values(box, callback)
430
                auto boxes_and_cids = buffer.local_last_writer.get_region_values(remote_writes);
4,235✔
431
                for(auto& [box, wcs] : boxes_and_cids) {
5,133✔
432
                        if(wcs.is_fresh()) {
898✔
433
                                wcs.mark_as_stale();
598✔
434
                                buffer.local_last_writer.update_region(box, wcs);
598✔
435
                        }
436
                }
437
        }
4,235✔
438
}
10,084✔
439

440
void command_graph_generator::generate_distributed_commands(const task& tsk) {
5,048✔
441
        const auto chunks = split_task_and_assign_chunks(tsk);
5,048✔
442
        const auto chunks_with_requirements = compute_per_chunk_requirements(tsk, chunks);
5,048✔
443

444
        // Check for and report overlapping writes between local chunks, and between local and remote chunks.
445
        if(m_policy.overlapping_write_error != error_policy::ignore) {
5,048✔
446
                box_vector<3> local_chunks;
5,044✔
447
                for(const auto& [a_chunk, _] : chunks_with_requirements.local_chunks) {
9,886✔
448
                        local_chunks.push_back(box<3>{a_chunk.chnk});
4,842✔
449
                }
450
                report_overlapping_writes(tsk, local_chunks);
5,044✔
451
        }
5,044✔
452

453
        resolve_pending_reductions(tsk, chunks_with_requirements);
5,046✔
454
        generate_pushes(tsk, chunks_with_requirements);
5,045✔
455
        generate_await_pushes(tsk, chunks_with_requirements);
5,045✔
456

457
        // Union of all per-buffer writes on this node, used to determine which parts of a buffer are fresh/stale later on.
458
        std::unordered_map<buffer_id, region<3>> per_buffer_local_writes;
5,045✔
459

460
        // Create command for each local chunk and resolve local data dependencies.
461
        for(const auto& [a_chunk, requirements] : chunks_with_requirements.local_chunks) {
9,885✔
462
                abstract_command* cmd = nullptr;
4,843✔
463
                if(tsk.get_type() == task_type::fence) {
4,843✔
464
                        cmd = create_command<fence_command>(tsk.get_id(),
176✔
465
                            [&](const auto& record_debug_info) { record_debug_info(tsk, [this](const buffer_id bid) { return m_buffers.at(bid).debug_name; }); });
218✔
466
                } else {
467
                        cmd = create_command<execution_command>(tsk.get_id(), subrange{a_chunk.chnk},
9,510✔
468
                            [&](const auto& record_debug_info) { record_debug_info(tsk, [this](const buffer_id bid) { return m_buffers.at(bid).debug_name; }); });
11,829✔
469

470
                        // Go over all reductions that are to be performed *during* the execution of this chunk,
471
                        // not to be confused with any pending reductions that need to be finalized *before* the
472
                        // execution of this chunk (those have already been handled by resolve_pending_reductions).
473
                        // If a reduction reads the previous value of the buffer (i.e. w/o property::reduction::initialize_to_identity),
474
                        // we have to include it in exactly one of the per-node intermediate reductions.
475
                        for(const auto& reduction : tsk.get_reductions()) {
4,899✔
476
                                if(m_local_nid == reduction_initializer_nid && reduction.init_from_buffer) {
158✔
477
                                        utils::as<execution_command>(cmd)->set_is_reduction_initializer(true);
14✔
478
                                        break;
14✔
479
                                }
480
                        }
481
                }
482

483
                if(tsk.get_type() == task_type::collective) {
4,843✔
484
                        // Collective host tasks have an implicit dependency on the previous task in the same collective group,
485
                        // which is required in order to guarantee they are executed in the same order on every node.
486
                        auto cgid = tsk.get_collective_group_id();
76✔
487
                        if(auto prev = m_last_collective_commands.find(cgid); prev != m_last_collective_commands.end()) {
76✔
488
                                add_dependency(cmd, m_cdag.get(prev->second), dependency_kind::true_dep, dependency_origin::collective_group_serialization);
20✔
489
                                m_last_collective_commands.erase(prev);
20✔
490
                        }
491
                        m_last_collective_commands.emplace(cgid, cmd->get_cid());
76✔
492
                }
493

494
                for(const auto& [bid, consumed, produced] : requirements) {
8,884✔
495
                        auto& buffer = m_buffers.at(bid);
4,044✔
496

497
                        // Process consuming accesses first, so we don't add dependencies onto our own writes
498
                        if(!consumed.empty()) {
4,044✔
499
                                for(const auto& [box, wcs] : buffer.local_last_writer.get_region_values(consumed)) {
8,010✔
500
                                        if(box.empty()) continue;
4,624!
501
                                        assert(wcs.is_fresh() && "Unresolved remote data dependency");
4,624✔
502
                                        add_dependency(cmd, m_cdag.get(wcs), dependency_kind::true_dep, dependency_origin::dataflow);
4,624✔
503
                                }
3,386✔
504

505
                                // Store the read access for determining anti-dependencies later on
506
                                m_command_buffer_reads[cmd->get_cid()].emplace(bid, consumed);
3,386✔
507
                        }
508

509
                        if(!produced.empty()) {
4,044✔
510
                                generate_anti_dependencies(tsk.get_id(), bid, buffer.local_last_writer, produced, cmd);
3,271✔
511

512
                                // Update last writer
513
                                buffer.local_last_writer.update_region(produced, cmd->get_cid());
3,271✔
514
                                buffer.replicated_regions.update_region(produced, node_bitset{});
3,271✔
515

516
                                // In case this buffer was in a pending reduction state we discarded the result and need to remove the pending reduction.
517
                                if(buffer.pending_reduction.has_value()) {
3,271✔
518
                                        m_completed_reductions.push_back(buffer.pending_reduction->rid);
1✔
519
                                        buffer.pending_reduction = std::nullopt;
1✔
520
                                }
521

522
                                per_buffer_local_writes.emplace(bid, produced);
3,271✔
523
                        }
524

525
                        if(m_policy.uninitialized_read_error != error_policy::ignore
4,962✔
526
                            && !bounding_box(buffer.initialized_region).covers(bounding_box(consumed.get_boxes()))) {
4,962✔
527
                                utils::report_error(m_policy.uninitialized_read_error,
15✔
528
                                    "Command C{} on N{}, which executes {} of {}, reads {} {}, which has not been written by any node.", cmd->get_cid(), m_local_nid,
6✔
529
                                    box(subrange(a_chunk.chnk.offset, a_chunk.chnk.range)), print_task_debug_label(tsk), print_buffer_debug_label(bid),
27✔
530
                                    region_difference(consumed, buffer.initialized_region));
9✔
531
                        }
532
                }
533
        }
534

535
        // Mark any buffers that now are in a pending reduction state as such.
536
        // If there is only one chunk/command, it already implicitly generates the final reduced value
537
        // and the buffer does not need to be flagged as a pending reduction.
538
        for(const auto& reduction : tsk.get_reductions()) {
5,206✔
539
                if(chunks.size() > 1) {
164✔
540
                        m_buffers.at(reduction.bid).pending_reduction = reduction;
128✔
541
                } else {
542
                        m_completed_reductions.push_back(reduction.rid);
36✔
543
                }
544
        }
545

546
        update_local_buffer_fresh_regions(tsk, per_buffer_local_writes);
5,042✔
547
        process_task_side_effect_requirements(tsk);
5,042✔
548
}
10,099✔
549

550
void command_graph_generator::generate_anti_dependencies(
3,727✔
551
    task_id tid, buffer_id bid, const region_map<write_command_state>& last_writers_map, const region<3>& write_req, abstract_command* write_cmd) {
552
        const auto last_writers = last_writers_map.get_region_values(write_req);
3,727✔
553
        for(const auto& [box, wcs] : last_writers) {
7,569✔
554
                auto* const last_writer_cmd = m_cdag.get(static_cast<command_id>(wcs));
3,842✔
555
                assert(!utils::isa<task_command>(last_writer_cmd) || utils::as<task_command>(last_writer_cmd)->get_tid() != tid);
3,842✔
556

557
                // Add anti-dependencies onto all successors of the writer
558
                bool has_successors = false;
3,842✔
559
                for(auto d : last_writer_cmd->get_dependents()) {
9,640✔
560
                        // Only consider true dependencies
561
                        if(d.kind != dependency_kind::true_dep) continue;
5,798✔
562

563
                        auto* const cmd = d.node;
5,494✔
564

565
                        // We might have already generated new commands within the same task that also depend on this; in that case, skip it
566
                        if(utils::isa<task_command>(cmd) && utils::as<task_command>(cmd)->get_tid() == tid) continue;
5,494!
567

568
                        // So far we don't know whether the dependent actually intersects with the subrange we're writing
569
                        if(const auto command_reads_it = m_command_buffer_reads.find(cmd->get_cid()); command_reads_it != m_command_buffer_reads.end()) {
2,825✔
570
                                const auto& command_reads = command_reads_it->second;
1,179✔
571
                                // The task might be a dependent because of another buffer
572
                                if(const auto buffer_reads_it = command_reads.find(bid); buffer_reads_it != command_reads.end()) {
1,179✔
573
                                        if(!region_intersection(write_req, buffer_reads_it->second).empty()) {
1,128✔
574
                                                has_successors = true;
974✔
575
                                                add_dependency(write_cmd, cmd, dependency_kind::anti_dep, dependency_origin::dataflow);
974✔
576
                                        }
577
                                }
578
                        }
579
                }
580

581
                // In some cases (horizons, master node host task, weird discard_* constructs...)
582
                // the last writer might not have any successors. Just add the anti-dependency onto the writer itself then.
583
                if(!has_successors) { add_dependency(write_cmd, last_writer_cmd, dependency_kind::anti_dep, dependency_origin::dataflow); }
3,842✔
584
        }
585
}
7,454✔
586

587
void command_graph_generator::process_task_side_effect_requirements(const task& tsk) {
5,042✔
588
        const task_id tid = tsk.get_id();
5,042✔
589
        if(tsk.get_side_effect_map().empty()) return; // skip the loop in the common case
5,042✔
590
        if(m_cdag.task_command_count(tid) == 0) return;
212✔
591

592
        for(auto* const cmd : m_cdag.task_commands(tid)) {
280✔
593
                for(const auto& side_effect : tsk.get_side_effect_map()) {
291✔
594
                        const auto [hoid, order] = side_effect;
151✔
595
                        auto& host_object = m_host_objects.at(hoid);
151✔
596

597
                        if(host_object.last_side_effect.has_value()) {
151!
598
                                // TODO once we have different side_effect_orders, their interaction will determine the dependency kind
599
                                add_dependency(cmd, m_cdag.get(*host_object.last_side_effect), dependency_kind::true_dep, dependency_origin::dataflow);
151✔
600
                        }
601

602
                        // Simplification: If there are multiple chunks per node, we generate true-dependencies between them in an arbitrary order, when all we really
603
                        // need is mutual exclusion (i.e. a bi-directional pseudo-dependency).
604
                        host_object.last_side_effect = cmd->get_cid();
151✔
605
                }
606
        }
607
}
608

609
void command_graph_generator::set_epoch_for_new_commands(const abstract_command* const epoch_or_horizon) {
1,727✔
610
        // both an explicit epoch command and an applied horizon can be effective epochs
611
        assert(utils::isa<epoch_command>(epoch_or_horizon) || utils::isa<horizon_command>(epoch_or_horizon));
1,727✔
612

613
        for(auto& [bid, bs] : m_buffers) {
2,924✔
614
                bs.local_last_writer.apply_to_values([epoch_or_horizon](const write_command_state& wcs) {
1,197✔
615
                        auto new_wcs = write_command_state(std::max(epoch_or_horizon->get_cid(), static_cast<command_id>(wcs)), wcs.is_replicated());
2,314✔
616
                        if(!wcs.is_fresh()) new_wcs.mark_as_stale();
2,314✔
617
                        return new_wcs;
2,314✔
618
                });
619
        }
620
        for(auto& [cgid, cid] : m_last_collective_commands) {
1,809✔
621
                cid = std::max(epoch_or_horizon->get_cid(), cid);
82✔
622
        }
623
        for(auto& [_, host_object] : m_host_objects) {
1,794✔
624
                if(host_object.last_side_effect.has_value()) { host_object.last_side_effect = std::max(epoch_or_horizon->get_cid(), *host_object.last_side_effect); }
67!
625
        }
626

627
        m_epoch_for_new_commands = epoch_or_horizon->get_cid();
1,727✔
628
}
1,727✔
629

630
void command_graph_generator::reduce_execution_front_to(abstract_command* const new_front) {
1,789✔
631
        const auto previous_execution_front = m_cdag.get_execution_front();
1,789✔
632
        for(auto* const front_cmd : previous_execution_front) {
7,807✔
633
                if(front_cmd != new_front) { add_dependency(new_front, front_cmd, dependency_kind::true_dep, dependency_origin::execution_front); }
6,018✔
634
        }
635
        assert(m_cdag.get_execution_front().size() == 1 && *m_cdag.get_execution_front().begin() == new_front);
1,789✔
636
}
3,578✔
637

638
void command_graph_generator::generate_epoch_command(const task& tsk) {
728✔
639
        assert(tsk.get_type() == task_type::epoch);
728✔
640
        auto* const epoch = create_command<epoch_command>(
728✔
641
            tsk.get_id(), tsk.get_epoch_action(), std::move(m_completed_reductions), [&](const auto& record_debug_info) { record_debug_info(tsk); });
1,663✔
642
        set_epoch_for_new_commands(epoch);
728✔
643
        m_current_horizon = no_command;
728✔
644
        // Make the epoch depend on the previous execution front
645
        reduce_execution_front_to(epoch);
728✔
646
}
728✔
647

648
void command_graph_generator::generate_horizon_command(const task& tsk) {
1,061✔
649
        assert(tsk.get_type() == task_type::horizon);
1,061✔
650
        auto* const horizon =
651
            create_command<horizon_command>(tsk.get_id(), std::move(m_completed_reductions), [&](const auto& record_debug_info) { record_debug_info(tsk); });
1,295✔
652

653
        if(m_current_horizon != static_cast<command_id>(no_command)) {
1,061✔
654
                // Apply the previous horizon
655
                set_epoch_for_new_commands(m_cdag.get(m_current_horizon));
999✔
656
        }
657
        m_current_horizon = horizon->get_cid();
1,061✔
658

659
        // Make the horizon depend on the previous execution front
660
        reduce_execution_front_to(horizon);
1,061✔
661
}
1,061✔
662

663
void command_graph_generator::generate_epoch_dependencies(abstract_command* cmd) {
7,091✔
664
        // No command must be re-ordered before its last preceding epoch to enforce the barrier semantics of epochs.
665
        // To guarantee that each node has a transitive true dependency (=temporal dependency) on the epoch, it is enough to add an epoch -> command dependency
666
        // to any command that has no other true dependencies itself and no graph traversal is necessary. This can be verified by a simple induction proof.
667

668
        // As long the first epoch is present in the graph, all transitive dependencies will be visible and the initial epoch commands (tid 0) are the only
669
        // commands with no true predecessor. As soon as the first epoch is pruned through the horizon mechanism however, more than one node with no true
670
        // predecessor can appear (when visualizing the graph). This does not violate the ordering constraint however, because all "free-floating" nodes
671
        // in that snapshot had a true-dependency chain to their predecessor epoch at the point they were flushed, which is sufficient for following the
672
        // dependency chain from the executor perspective.
673

674
        if(const auto deps = cmd->get_dependencies();
7,091✔
675
            std::none_of(deps.begin(), deps.end(), [](const abstract_command::dependency d) { return d.kind == dependency_kind::true_dep; })) {
13,051✔
676
                assert(cmd->get_cid() != m_epoch_for_new_commands);
2,110✔
677
                add_dependency(cmd, m_cdag.get(m_epoch_for_new_commands), dependency_kind::true_dep, dependency_origin::last_epoch);
2,110✔
678
        }
679
}
7,091✔
680

681
void command_graph_generator::prune_commands_before(const command_id epoch) {
6,831✔
682
        if(epoch > m_epoch_last_pruned_before) {
6,831✔
683
                m_cdag.erase_if([&](abstract_command* cmd) {
1,341✔
684
                        if(cmd->get_cid() < epoch) {
13,193✔
685
                                m_command_buffer_reads.erase(cmd->get_cid());
6,442✔
686
                                return true;
6,442✔
687
                        }
688
                        return false;
6,751✔
689
                });
690
                m_epoch_last_pruned_before = epoch;
1,341✔
691
        }
692
}
6,831✔
693

694
std::string command_graph_generator::print_buffer_debug_label(const buffer_id bid) const {
18✔
695
        return utils::make_buffer_debug_label(bid, m_buffers.at(bid).debug_name);
18✔
696
}
697

698
} // namespace celerity::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc