• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 11253295570

09 Oct 2024 10:34AM UTC coverage: 95.362% (+0.3%) from 95.051%
11253295570

Pull #289

github

psalz
Update benchmark results for command_graph_generator refactor
Pull Request #289: Refactor command graph generation, bring testing infrastructure up to speed with IDAG

2957 of 3332 branches covered (88.75%)

Branch coverage included in aggregate %.

365 of 367 new or added lines in 5 files covered. (99.46%)

12 existing lines in 6 files now uncovered.

6625 of 6716 relevant lines covered (98.65%)

1489224.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.17
/src/command_graph_generator.cc
1
#include "command_graph_generator.h"
2

3
#include "access_modes.h"
4
#include "command.h"
5
#include "command_graph.h"
6
#include "recorders.h"
7
#include "split.h"
8
#include "task.h"
9
#include "task_manager.h"
10

11
namespace celerity::detail {
12

13
command_graph_generator::command_graph_generator(
556✔
14
    const size_t num_nodes, const node_id local_nid, command_graph& cdag, const task_manager& tm, detail::command_recorder* recorder, const policy_set& policy)
556✔
15
    : m_num_nodes(num_nodes), m_local_nid(local_nid), m_policy(policy), m_cdag(cdag), m_task_mngr(tm), m_recorder(recorder) {
556✔
16
        if(m_num_nodes > max_num_nodes) {
556!
17
                throw std::runtime_error(fmt::format("Number of nodes requested ({}) exceeds compile-time maximum of {}", m_num_nodes, max_num_nodes));
×
18
        }
19

20
        // Build initial epoch command (this is required to properly handle anti-dependencies on host-initialized buffers).
21
        // We manually generate the first command so it doesn't get added to the current batch; it will be replaced by applied horizons
22
        // or explicit epochs down the line (see set_epoch_for_new_commands).
23
        auto* const epoch_cmd = cdag.create<epoch_command>(task_manager::initial_epoch_task, epoch_action::none, std::vector<reduction_id>{});
556✔
24
        if(is_recording()) { m_recorder->record_command(std::make_unique<epoch_command_record>(*epoch_cmd, *tm.get_task(task_manager::initial_epoch_task))); }
556✔
25
        m_epoch_for_new_commands = epoch_cmd->get_cid();
556✔
26
}
556✔
27

28
void command_graph_generator::notify_buffer_created(const buffer_id bid, const range<3>& range, bool host_initialized) {
662✔
29
        m_buffers.emplace(std::piecewise_construct, std::tuple{bid}, std::tuple{range, range});
662✔
30
        if(host_initialized && m_policy.uninitialized_read_error != error_policy::ignore) { m_buffers.at(bid).initialized_region = box(subrange({}, range)); }
734✔
31
        // Mark contents as available locally (= don't generate await push commands) and fully replicated (= don't generate push commands).
32
        // This is required when tasks access host-initialized or uninitialized buffers.
33
        m_buffers.at(bid).local_last_writer.update_region(subrange<3>({}, range), m_epoch_for_new_commands);
1,986✔
34
        m_buffers.at(bid).replicated_regions.update_region(subrange<3>({}, range), node_bitset{}.set());
1,986✔
35
}
662✔
36

37
void command_graph_generator::notify_buffer_debug_name_changed(const buffer_id bid, const std::string& debug_name) {
23✔
38
        m_buffers.at(bid).debug_name = debug_name;
23✔
39
}
23✔
40

41
void command_graph_generator::notify_buffer_destroyed(const buffer_id bid) {
492✔
42
        assert(m_buffers.count(bid) != 0);
492✔
43
        m_buffers.erase(bid);
492✔
44
}
492✔
45

46
void command_graph_generator::notify_host_object_created(const host_object_id hoid) {
62✔
47
        assert(m_host_objects.count(hoid) == 0);
62✔
48
        m_host_objects.emplace(hoid, host_object_state{m_epoch_for_new_commands});
62✔
49
}
62✔
50

51
void command_graph_generator::notify_host_object_destroyed(const host_object_id hoid) {
49✔
52
        assert(m_host_objects.count(hoid) != 0);
49✔
53
        m_host_objects.erase(hoid);
49✔
54
}
49✔
55

56
// According to Wikipedia https://en.wikipedia.org/wiki/Topological_sorting#Depth-first_search
57
// TODO: This may no longer be necessary since different command types are now generated in pre-determined order - revisit
58
std::vector<abstract_command*> sort_topologically(command_set unmarked) {
5,611✔
59
        command_set temporary_marked;
5,611✔
60
        command_set permanent_marked;
5,611✔
61
        std::vector<abstract_command*> sorted(unmarked.size());
16,833✔
62
        auto sorted_front = sorted.rbegin();
5,611✔
63

64
        const auto visit = [&](abstract_command* const cmd, auto& visit /* to allow recursion in lambda */) {
5,611✔
65
                if(permanent_marked.count(cmd) != 0) return;
7,011✔
66
                assert(temporary_marked.count(cmd) == 0 && "cyclic command graph");
6,541✔
67
                unmarked.erase(cmd);
6,541✔
68
                temporary_marked.insert(cmd);
6,541✔
69
                for(const auto dep : cmd->get_dependents()) {
7,011✔
70
                        visit(dep.node, visit);
470✔
71
                }
72
                temporary_marked.erase(cmd);
6,541✔
73
                permanent_marked.insert(cmd);
6,541✔
74
                *sorted_front++ = cmd;
6,541✔
75
        };
5,611✔
76

77
        while(!unmarked.empty()) {
12,152✔
78
                visit(*unmarked.begin(), visit);
6,541✔
79
        }
80

81
        return sorted;
11,222✔
82
}
5,611✔
83

84
command_set command_graph_generator::build_task(const task& tsk) {
6,567✔
85
        assert(m_current_cmd_batch.empty());
6,567✔
86
        [[maybe_unused]] const auto cmd_count_before = m_cdag.command_count();
6,567✔
87

88
        const auto epoch_to_prune_before = m_epoch_for_new_commands;
6,567✔
89

90
        switch(tsk.get_type()) {
6,567!
91
        case task_type::epoch: generate_epoch_command(tsk); break;
459✔
92
        case task_type::horizon: generate_horizon_command(tsk); break;
1,061✔
93
        case task_type::device_compute:
5,047✔
94
        case task_type::host_compute:
95
        case task_type::master_node:
96
        case task_type::collective:
97
        case task_type::fence: generate_distributed_commands(tsk); break;
5,047✔
98
        default: throw std::runtime_error("Task type NYI");
×
99
        }
100

101
        // It is currently undefined to split reduction-producer tasks into multiple chunks on the same node:
102
        //   - Per-node reduction intermediate results are stored with fixed access to a single backing buffer,
103
        //     so multiple chunks on the same node will race on this buffer access
104
        //   - Inputs to the final reduction command are ordered by origin node ids to guarantee bit-identical results. It is not possible to distinguish
105
        //     more than one chunk per node in the serialized commands, so different nodes can produce different final reduction results for non-associative
106
        //     or non-commutative operations
107
        if(!tsk.get_reductions().empty()) { assert(m_cdag.task_command_count(tsk.get_id()) <= 1); }
6,561!
108

109
        // Commands without any other true-dependency must depend on the active epoch command to ensure they cannot be re-ordered before the epoch.
110
        // Need to check count b/c for some tasks we may not have generated any commands locally.
111
        if(m_cdag.task_command_count(tsk.get_id()) > 0) {
6,561✔
112
                for(auto* const cmd : m_cdag.task_commands(tsk.get_id())) {
12,718✔
113
                        generate_epoch_dependencies(cmd);
6,359✔
114
                }
115
        }
116

117
        // Check that all commands have been created through create_command
118
        assert(m_cdag.command_count() - cmd_count_before == m_current_cmd_batch.size());
6,561✔
119

120
        // If a new epoch was completed in the CDAG before the current task, prune all predecessor commands of that epoch.
121
        prune_commands_before(epoch_to_prune_before);
6,561✔
122

123
        // Check that all commands have been recorded
124
        if(is_recording()) {
6,561✔
125
                assert(std::all_of(m_current_cmd_batch.begin(), m_current_cmd_batch.end(), [this](const abstract_command* cmd) {
50,178✔
126
                        return std::any_of(m_recorder->get_all().begin(), m_recorder->get_all().end(),
127
                            [cmd](const std::unique_ptr<command_record>& rec) { return rec->id == cmd->get_cid(); });
128
                }));
129
        }
130

131
        return std::move(m_current_cmd_batch);
13,122✔
132
}
133

134
void command_graph_generator::report_overlapping_writes(const task& tsk, const box_vector<3>& local_chunks) const {
5,043✔
135
        const chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()};
5,043✔
136

137
        // Since this check is run distributed on every node, we avoid quadratic behavior by only checking for conflicts between all local chunks and the
138
        // region-union of remote chunks. This way, every conflict will be reported by at least one node.
139
        const box<3> global_chunk(subrange(full_chunk.offset, full_chunk.range));
5,043✔
140
        auto remote_chunks = region_difference(global_chunk, region(box_vector<3>(local_chunks))).into_boxes();
5,043✔
141

142
        // detect_overlapping_writes takes a single box_vector, so we concatenate local and global chunks (the order does not matter)
143
        auto distributed_chunks = std::move(remote_chunks);
5,043✔
144
        distributed_chunks.insert(distributed_chunks.end(), local_chunks.begin(), local_chunks.end());
5,043✔
145

146
        if(const auto overlapping_writes = detect_overlapping_writes(tsk, distributed_chunks); !overlapping_writes.empty()) {
5,043✔
147
                auto error = fmt::format("{} has overlapping writes between multiple nodes in", print_task_debug_label(tsk, true /* title case */));
28✔
148
                for(const auto& [bid, overlap] : overlapping_writes) {
28✔
149
                        fmt::format_to(std::back_inserter(error), " {} {}", print_buffer_debug_label(bid), overlap);
28✔
150
                }
151
                error += ". Choose a non-overlapping range mapper for this write access or constrain the split via experimental::constrain_split to make the access "
152
                         "non-overlapping.";
14✔
153
                utils::report_error(m_policy.overlapping_write_error, "{}", error);
14✔
154
        }
5,057✔
155
}
10,086✔
156

157
std::vector<command_graph_generator::assigned_chunk> command_graph_generator::split_task_and_assign_chunks(const task& tsk) const {
5,047✔
158
        const chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()};
5,047✔
159
        const size_t num_chunks = m_num_nodes * 1; // TODO Make configurable
5,047✔
160
        const auto chunks = ([&] {
10,094✔
161
                if(tsk.get_type() == task_type::collective || tsk.get_type() == task_type::fence) {
5,047✔
162
                        std::vector<chunk<3>> chunks;
164✔
163
                        for(size_t nid = 0; nid < m_num_nodes; ++nid) {
579✔
164
                                chunks.push_back(chunk_cast<3>(chunk<1>{id<1>{tsk.get_type() == task_type::collective ? nid : 0}, ones, {m_num_nodes}}));
415✔
165
                        }
166
                        return chunks;
164✔
167
                }
164✔
168
                if(tsk.has_variable_split()) {
4,883✔
169
                        if(tsk.get_hint<experimental::hints::split_1d>() != nullptr) {
1,442✔
170
                                // no-op, keeping this for documentation purposes
171
                        }
172
                        if(tsk.get_hint<experimental::hints::split_2d>() != nullptr) { return split_2d(full_chunk, tsk.get_granularity(), num_chunks); }
1,442✔
173
                        return split_1d(full_chunk, tsk.get_granularity(), num_chunks);
2,858✔
174
                }
175
                return std::vector<chunk<3>>{full_chunk};
10,323✔
176
        })();
5,047✔
177
        assert(chunks.size() <= num_chunks); // We may have created less than requested
5,047✔
178
        assert(!chunks.empty());
5,047✔
179

180
        // Assign each chunk to a node
181
        // We assign chunks next to each other to the same worker (if there is more chunks than workers), as this is likely to produce less
182
        // transfers between tasks than a round-robin assignment (for typical stencil codes).
183
        // FIXME: This only works if the number of chunks is an integer multiple of the number of workers, e.g. 3 chunks for 2 workers degrades to RR.
184
        const auto chunks_per_node = std::max<size_t>(1, chunks.size() / m_num_nodes);
5,047✔
185

186
        std::vector<assigned_chunk> assigned_chunks;
5,047✔
187
        for(size_t i = 0; i < chunks.size(); ++i) {
11,928✔
188
                const node_id nid = (i / chunks_per_node) % m_num_nodes;
6,881✔
189
                assigned_chunks.push_back({nid, chunks[i]});
6,881✔
190
        }
191
        return assigned_chunks;
10,094✔
192
}
5,047✔
193

194
command_graph_generator::buffer_requirements_list command_graph_generator::get_buffer_requirements_for_mapped_access(
11,922✔
195
    const task& tsk, const subrange<3>& sr, const range<3> global_size) const {
196
        buffer_requirements_list result;
11,922✔
197
        const auto& access_map = tsk.get_buffer_access_map();
11,922✔
198
        for(const buffer_id bid : access_map.get_accessed_buffers()) {
22,149✔
199
                buffer_requirements reqs{bid, {}, {}};
10,227✔
200
                for(const auto m : access_map.get_access_modes(bid)) {
21,112✔
201
                        if(detail::access::mode_traits::is_consumer(m)) {
10,885✔
202
                                reqs.consumed = region_union(reqs.consumed, access_map.get_mode_requirements(bid, m, tsk.get_dimensions(), sr, global_size));
8,713✔
203
                        }
204
                        // Not else: `access_mode::write` is both a consumer and a producer
205
                        if(detail::access::mode_traits::is_producer(m)) {
10,885✔
206
                                reqs.produced = region_union(reqs.produced, access_map.get_mode_requirements(bid, m, tsk.get_dimensions(), sr, global_size));
7,410✔
207
                        }
208
                }
10,227✔
209
                result.emplace_back(std::move(reqs));
10,227✔
210
        }
22,149✔
211
        return result;
11,922✔
NEW
212
}
×
213

214
const box<3> empty_reduction_box({0, 0, 0}, {0, 0, 0});
215
const box<3> scalar_reduction_box({0, 0, 0}, {1, 1, 1});
216

217
command_graph_generator::assigned_chunks_with_requirements command_graph_generator::compute_per_chunk_requirements(
5,047✔
218
    const task& tsk, const std::vector<assigned_chunk>& assigned_chunks) const {
219
        assigned_chunks_with_requirements result;
5,047✔
220

221
        for(const auto& a_chunk : assigned_chunks) {
11,928✔
222
                const node_id nid = a_chunk.executed_on;
6,881✔
223
                auto requirements = get_buffer_requirements_for_mapped_access(tsk, a_chunk.chnk, tsk.get_global_size());
6,881✔
224

225
                // Add read/write requirements for reductions performed in this task.
226
                for(const auto& reduction : tsk.get_reductions()) {
7,313✔
227
                        // task_manager verifies that there are no reduction <-> write-access conflicts
228
                        assert(std::none_of(
584✔
229
                            requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == reduction.bid && !br.produced.empty(); }));
230
                        auto it = std::find_if(requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == reduction.bid; });
584✔
231
                        if(it == requirements.end()) { it = requirements.insert(requirements.end(), buffer_requirements{reduction.bid, {}, {}}); }
432!
232
                        it->produced = scalar_reduction_box;
432✔
233
                        if(nid == reduction_initializer_nid && reduction.init_from_buffer) { it->consumed = scalar_reduction_box; }
432✔
234
                }
235

236
                if(nid == m_local_nid) {
6,881✔
237
                        result.local_chunks.emplace_back(a_chunk, std::move(requirements));
4,845✔
238
                } else {
239
                        result.remote_chunks.emplace_back(a_chunk, std::move(requirements));
2,036✔
240
                }
241
        }
6,881✔
242

243
        return result;
5,047✔
NEW
244
}
×
245

246
void command_graph_generator::resolve_pending_reductions(const task& tsk, const assigned_chunks_with_requirements& chunks_with_requirements) {
5,045✔
247
        auto accessed_buffers = tsk.get_buffer_access_map().get_accessed_buffers();
5,045✔
248
        // Handle chained reductions (i.e., reductions that combine into a buffer that currently is in a pending reduction state)
249
        for(const auto& reduction : tsk.get_reductions()) {
5,209✔
250
                accessed_buffers.insert(reduction.bid);
164✔
251
        }
252

253
        for(const auto bid : accessed_buffers) {
9,282✔
254
                auto& buffer = m_buffers.at(bid);
4,238✔
255
                if(!buffer.pending_reduction.has_value()) { continue; }
4,238✔
256
                const auto& reduction = *buffer.pending_reduction;
126✔
257

258
                const auto local_last_writer = buffer.local_last_writer.get_region_values(scalar_reduction_box);
126✔
259
                assert(local_last_writer.size() == 1);
126✔
260
                // Prepare the buffer state for after the reduction has been performed:
261
                // Keep the current last writer, but mark it as stale, so that if we don't generate a reduction command locally,
262
                // we'll know to get the data from elsewhere. If we generate a reduction command, this will be overwritten by its command id.
263
                write_command_state wcs{static_cast<command_id>(local_last_writer[0].second)};
126✔
264
                wcs.mark_as_stale();
126✔
265
                buffer_state post_reduction_state{region_map<write_command_state>{ones, wcs}, region_map<node_bitset>{ones, node_bitset{}}};
126✔
266
                if(m_policy.uninitialized_read_error != error_policy::ignore) { post_reduction_state.initialized_region = scalar_reduction_box; }
126✔
267

268
                size_t number_of_participating_nodes = 0;
126✔
269

270
                // Since the local reduction command overwrites the buffer contents that need to be pushed to other nodes, we need to process remote chunks first.
271
                for(const auto& [a_chunk, requirements] : chunks_with_requirements.remote_chunks) {
278✔
272
                        if(std::none_of(requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == bid && !br.consumed.empty(); })) {
326✔
273
                                // This chunk doesn't read from the buffer
274
                                continue;
26✔
275
                        }
276
                        const node_id nid = a_chunk.executed_on;
126✔
277
                        const bool notification_only = !local_last_writer[0].second.is_fresh();
126✔
278

279
                        // Push an empty range if we don't have any fresh data on this node. This will then generate an empty pilot that tells the
280
                        // other node's receive_arbiter to not expect a send.
281
                        const auto push_box = notification_only ? empty_reduction_box : scalar_reduction_box;
126✔
282

283
                        auto* const push_cmd = create_command<push_command>(nid, transfer_id(tsk.get_id(), bid, reduction.rid), push_box.get_subrange(),
252✔
284
                            [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
472✔
285

286
                        if(notification_only) {
126✔
287
                                generate_epoch_dependencies(push_cmd);
6✔
288
                        } else {
289
                                m_command_buffer_reads[push_cmd->get_cid()][bid] = region_union(m_command_buffer_reads[push_cmd->get_cid()][bid], scalar_reduction_box);
120✔
290
                                add_dependency(push_cmd, m_cdag.get(local_last_writer[0].second), dependency_kind::true_dep, dependency_origin::dataflow);
120✔
291
                        }
292

293
                        // Mark the reduction result as replicated so we don't generate data transfers to this node
294
                        // TODO: We need a way of updating regions in place! E.g. apply_to_values(box, callback)
295
                        const auto replicated_box = post_reduction_state.replicated_regions.get_region_values(scalar_reduction_box);
126✔
296
                        assert(replicated_box.size() == 1);
126✔
297
                        for(const auto& [_, nodes] : replicated_box) {
252✔
298
                                post_reduction_state.replicated_regions.update_box(scalar_reduction_box, node_bitset{nodes}.set(nid));
126✔
299
                        }
300
                        number_of_participating_nodes++; // This node is participating
126✔
301
                }
126✔
302

303
                // We currently don't support multiple chunks on a single node for reductions (there is also -- for now -- no way to create multiple chunks,
304
                // as oversubscription is handled by the instruction graph).
305
                // NOTE: The number_of_participating_nodes check below relies on this being true
306
                assert(chunks_with_requirements.local_chunks.size() <= 1);
126✔
307
                for(const auto& [a_chunk, requirements] : chunks_with_requirements.local_chunks) {
205✔
308
                        if(std::none_of(requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == bid && !br.consumed.empty(); })) {
168✔
309
                                // This chunk doesn't read from the buffer
310
                                continue;
13✔
311
                        }
312

313
                        auto* const ap_cmd = create_command<await_push_command>(transfer_id(tsk.get_id(), bid, reduction.rid), scalar_reduction_box.get_subrange(),
132✔
314
                            [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
248✔
315
                        generate_epoch_dependencies(ap_cmd);
66✔
316

317
                        auto* const reduce_cmd = create_command<reduction_command>(reduction, local_last_writer[0].second.is_fresh() /* has_local_contribution */,
66✔
318
                            [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
182✔
319

320
                        // Only generate a true dependency on the last writer if this node participated in the intermediate result computation.
321
                        if(local_last_writer[0].second.is_fresh()) {
66✔
322
                                add_dependency(reduce_cmd, m_cdag.get(local_last_writer[0].second), dependency_kind::true_dep, dependency_origin::dataflow);
64✔
323
                        }
324
                        add_dependency(reduce_cmd, ap_cmd, dependency_kind::true_dep, dependency_origin::dataflow);
66✔
325
                        generate_anti_dependencies(tsk.get_id(), bid, buffer.local_last_writer, scalar_reduction_box, reduce_cmd);
66✔
326

327
                        post_reduction_state.local_last_writer.update_box(scalar_reduction_box, reduce_cmd->get_cid());
66✔
328
                        number_of_participating_nodes++; // We are participating
66✔
329
                }
330

331
                // We currently do not support generating reduction commands on only a subset of nodes, except for the special case of a single command.
332
                // This is because it is unclear who owns the final result in this case (normally all nodes "own" the result).
333
                //   => I.e., reducing and using the result on the participating nodes is actually not the problem (this works as intended); the issue only arises
334
                //      if the result is subsequently required in other tasks. Since we don't have a good way of detecting this condition however, we currently
335
                //      disallow partial reductions altogether.
336
                // NOTE: This check relies on the fact that we currently only generate a single chunk per node for reductions (see assertion above).
337
                if(number_of_participating_nodes > 1 && number_of_participating_nodes != m_num_nodes) {
126✔
338
                        utils::report_error(error_policy::panic,
3✔
339
                            "{} requires a reduction on {} that is not performed on all nodes. This is currently not supported. Either "
340
                            "ensure that all nodes receive a chunk that reads from the buffer, or reduce the data on a single node.",
341
                            print_task_debug_label(tsk, true /* title case */), print_buffer_debug_label(bid));
6✔
342
                }
343

344
                // For buffers that were in a pending reduction state and a reduction was generated
345
                // (i.e., the result was not discarded), set their new state.
346
                if(number_of_participating_nodes > 0) {
125✔
347
                        m_completed_reductions.push_back(reduction.rid);
119✔
348
                        buffer = std::move(post_reduction_state);
119✔
349
                }
350
        }
127✔
351
}
10,089✔
352

353
void command_graph_generator::generate_pushes(const task& tsk, const assigned_chunks_with_requirements& chunks_with_requirements) {
5,044✔
354
        for(auto& [a_chunk, requirements] : chunks_with_requirements.remote_chunks) {
7,077✔
355
                const node_id nid = a_chunk.executed_on;
2,033✔
356

357
                for(const auto& [bid, consumed, _] : requirements) {
4,573✔
358
                        if(consumed.empty()) continue;
2,540✔
359
                        auto& buffer = m_buffers.at(bid);
1,707✔
360

361
                        // We generate separate push command for each last writer command for now, possibly even multiple for partially already-replicated data.
362
                        // TODO: Can and/or should we consolidate?
363
                        const auto local_sources = buffer.local_last_writer.get_region_values(consumed);
1,707✔
364
                        for(const auto& [local_box, wcs] : local_sources) {
4,688✔
365
                                if(!wcs.is_fresh() || wcs.is_replicated()) { continue; }
2,981✔
366

367
                                // Make sure we don't push anything we've already pushed to this node before
368
                                box_vector<3> non_replicated_boxes;
803✔
369
                                for(const auto& [replicated_box, nodes] : buffer.replicated_regions.get_region_values(local_box)) {
1,704✔
370
                                        if(nodes.test(nid)) continue;
901✔
371
                                        non_replicated_boxes.push_back(replicated_box);
687✔
372
                                }
803✔
373

374
                                // Merge all connected boxes to determine final set of pushes
375
                                const auto push_region = region<3>(std::move(non_replicated_boxes));
803✔
376
                                for(const auto& push_box : push_region.get_boxes()) {
1,424✔
377
                                        auto* const push_cmd = create_command<push_command>(nid, transfer_id(tsk.get_id(), bid, no_reduction_id), push_box.get_subrange(),
1,242✔
378
                                            [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
1,943✔
379
                                        assert(!utils::isa<await_push_command>(m_cdag.get(wcs)) && "Attempting to push non-owned data?!");
621✔
380
                                        add_dependency(push_cmd, m_cdag.get(wcs), dependency_kind::true_dep, dependency_origin::dataflow);
621✔
381

382
                                        // Store the read access for determining anti-dependencies later on
383
                                        m_command_buffer_reads[push_cmd->get_cid()][bid] = push_box;
621✔
384
                                }
385

386
                                // Remember that we've replicated this region
387
                                for(const auto& [replicated_box, nodes] : buffer.replicated_regions.get_region_values(push_region)) {
1,490✔
388
                                        buffer.replicated_regions.update_box(replicated_box, node_bitset{nodes}.set(nid));
687✔
389
                                }
803✔
390
                        }
803✔
391
                }
1,707✔
392
        }
393
}
5,044✔
394

395
void command_graph_generator::generate_await_pushes(const task& tsk, const assigned_chunks_with_requirements& chunks_with_requirements) {
5,044✔
396
        for(auto& [a_chunk, requirements] : chunks_with_requirements.local_chunks) {
9,886✔
397
                for(auto& [bid, consumed, _] : requirements) {
8,885✔
398
                        if(consumed.empty()) continue;
4,043✔
399
                        auto& buffer = m_buffers.at(bid);
3,385✔
400

401
                        const auto local_sources = buffer.local_last_writer.get_region_values(consumed);
3,385✔
402
                        box_vector<3> missing_part_boxes;
3,385✔
403
                        for(const auto& [box, wcs] : local_sources) {
8,008✔
404
                                // Note that we initialize all buffers as fresh, so this doesn't trigger for uninitialized reads
405
                                if(!box.empty() && !wcs.is_fresh()) { missing_part_boxes.push_back(box); }
4,623!
406
                        }
407

408
                        // There is data we don't yet have locally. Generate an await push command for it.
409
                        if(!missing_part_boxes.empty()) {
3,385✔
410
                                const region missing_parts(std::move(missing_part_boxes));
390✔
411
                                assert(m_num_nodes > 1);
390✔
412
                                auto* const ap_cmd = create_command<await_push_command>(transfer_id(tsk.get_id(), bid, no_reduction_id), missing_parts,
390✔
413
                                    [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
854✔
414
                                generate_anti_dependencies(tsk.get_id(), bid, buffer.local_last_writer, missing_parts, ap_cmd);
390✔
415
                                generate_epoch_dependencies(ap_cmd);
390✔
416
                                // Remember that we have this data now
417
                                buffer.local_last_writer.update_region(missing_parts, {ap_cmd->get_cid(), true /* is_replicated */});
390✔
418
                        }
390✔
419
                }
3,385✔
420
        }
421
}
5,044✔
422

423
void command_graph_generator::update_local_buffer_fresh_regions(const task& tsk, const std::unordered_map<buffer_id, region<3>>& per_buffer_local_writes) {
5,041✔
424
        auto requirements = get_buffer_requirements_for_mapped_access(tsk, subrange<3>(tsk.get_global_offset(), tsk.get_global_size()), tsk.get_global_size());
5,041✔
425
        // Add requirements for reductions
426
        for(const auto& reduction : tsk.get_reductions()) {
5,205✔
427
                auto it = std::find_if(requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == reduction.bid; });
215✔
428
                if(it == requirements.end()) { it = requirements.insert(requirements.end(), buffer_requirements{reduction.bid, {}, {}}); }
164!
429
                it->produced = scalar_reduction_box;
164✔
430
        }
431
        for(auto& [bid, _, produced] : requirements) {
9,275✔
432
                region global_writes = produced;
4,234✔
433
                auto& buffer = m_buffers.at(bid);
4,234✔
434
                if(m_policy.uninitialized_read_error != error_policy::ignore) { buffer.initialized_region = region_union(buffer.initialized_region, global_writes); }
4,234✔
435

436
                const auto remote_writes = ([&, bid = bid] {
8,468✔
437
                        if(auto it = per_buffer_local_writes.find(bid); it != per_buffer_local_writes.end()) {
4,234✔
438
                                const auto& local_writes = it->second;
3,271✔
439
                                assert(region_difference(local_writes, global_writes).empty()); // Local writes have to be a subset of global writes
3,271✔
440
                                return region_difference(global_writes, local_writes);
3,271✔
441
                        }
442
                        return std::move(global_writes);
963✔
443
                })(); // IIFE
4,234✔
444

445
                // TODO: We need a way of updating regions in place! E.g. apply_to_values(box, callback)
446
                auto boxes_and_cids = buffer.local_last_writer.get_region_values(remote_writes);
4,234✔
447
                for(auto& [box, wcs] : boxes_and_cids) {
5,132✔
448
                        if(wcs.is_fresh()) {
898✔
449
                                wcs.mark_as_stale();
598✔
450
                                buffer.local_last_writer.update_region(box, wcs);
598✔
451
                        }
452
                }
453
        }
4,234✔
454
}
10,082✔
455

456
void command_graph_generator::generate_distributed_commands(const task& tsk) {
5,047✔
457
        const auto chunks = split_task_and_assign_chunks(tsk);
5,047✔
458
        const auto chunks_with_requirements = compute_per_chunk_requirements(tsk, chunks);
5,047✔
459

460
        // Check for and report overlapping writes between local chunks, and between local and remote chunks.
461
        if(m_policy.overlapping_write_error != error_policy::ignore) {
5,047✔
462
                box_vector<3> local_chunks;
5,043✔
463
                for(const auto& [a_chunk, _] : chunks_with_requirements.local_chunks) {
9,884✔
464
                        local_chunks.push_back(box<3>{a_chunk.chnk});
4,841✔
465
                }
466
                report_overlapping_writes(tsk, local_chunks);
5,043✔
467
        }
5,043✔
468

469
        resolve_pending_reductions(tsk, chunks_with_requirements);
5,045✔
470
        generate_pushes(tsk, chunks_with_requirements);
5,044✔
471
        generate_await_pushes(tsk, chunks_with_requirements);
5,044✔
472

473
        // Union of all per-buffer writes on this node, used to determine which parts of a buffer are fresh/stale later on.
474
        std::unordered_map<buffer_id, region<3>> per_buffer_local_writes;
5,044✔
475

476
        // Create command for each local chunk and resolve local data dependencies.
477
        for(const auto& [a_chunk, requirements] : chunks_with_requirements.local_chunks) {
9,883✔
478
                abstract_command* cmd = nullptr;
4,842✔
479
                if(tsk.get_type() == task_type::fence) {
4,842✔
480
                        cmd = create_command<fence_command>(tsk.get_id(),
176✔
481
                            [&](const auto& record_debug_info) { record_debug_info(tsk, [this](const buffer_id bid) { return m_buffers.at(bid).debug_name; }); });
218✔
482
                } else {
483
                        cmd = create_command<execution_command>(tsk.get_id(), subrange{a_chunk.chnk},
9,508✔
484
                            [&](const auto& record_debug_info) { record_debug_info(tsk, [this](const buffer_id bid) { return m_buffers.at(bid).debug_name; }); });
11,827✔
485

486
                        // Go over all reductions that are to be performed *during* the execution of this chunk,
487
                        // not to be confused with any pending reductions that need to be finalized *before* the
488
                        // execution of this chunk (those have already been handled by resolve_pending_reductions).
489
                        // If a reduction reads the previous value of the buffer (i.e. w/o property::reduction::initialize_to_identity),
490
                        // we have to include it in exactly one of the per-node intermediate reductions.
491
                        for(const auto& reduction : tsk.get_reductions()) {
4,898✔
492
                                if(m_local_nid == reduction_initializer_nid && reduction.init_from_buffer) {
158✔
493
                                        utils::as<execution_command>(cmd)->set_is_reduction_initializer(true);
14✔
494
                                        break;
14✔
495
                                }
496
                        }
497
                }
498

499
                if(tsk.get_type() == task_type::collective) {
4,842✔
500
                        // Collective host tasks have an implicit dependency on the previous task in the same collective group,
501
                        // which is required in order to guarantee they are executed in the same order on every node.
502
                        auto cgid = tsk.get_collective_group_id();
76✔
503
                        if(auto prev = m_last_collective_commands.find(cgid); prev != m_last_collective_commands.end()) {
76✔
504
                                add_dependency(cmd, m_cdag.get(prev->second), dependency_kind::true_dep, dependency_origin::collective_group_serialization);
20✔
505
                                m_last_collective_commands.erase(prev);
20✔
506
                        }
507
                        m_last_collective_commands.emplace(cgid, cmd->get_cid());
76✔
508
                }
509

510
                for(const auto& [bid, consumed, produced] : requirements) {
8,882✔
511
                        auto& buffer = m_buffers.at(bid);
4,043✔
512

513
                        // Process consuming accesses first, so we don't add dependencies onto our own writes
514
                        if(!consumed.empty()) {
4,043✔
515
                                for(const auto& [box, wcs] : buffer.local_last_writer.get_region_values(consumed)) {
8,008✔
516
                                        if(box.empty()) continue;
4,623!
517
                                        assert(wcs.is_fresh() && "Unresolved remote data dependency");
4,623✔
518
                                        add_dependency(cmd, m_cdag.get(wcs), dependency_kind::true_dep, dependency_origin::dataflow);
4,623✔
519
                                }
3,385✔
520

521
                                // Store the read access for determining anti-dependencies later on
522
                                m_command_buffer_reads[cmd->get_cid()].emplace(bid, consumed);
3,385✔
523
                        }
524

525
                        if(!produced.empty()) {
4,043✔
526
                                generate_anti_dependencies(tsk.get_id(), bid, buffer.local_last_writer, produced, cmd);
3,271✔
527

528
                                // Update last writer
529
                                buffer.local_last_writer.update_region(produced, cmd->get_cid());
3,271✔
530
                                buffer.replicated_regions.update_region(produced, node_bitset{});
3,271✔
531

532
                                // In case this buffer was in a pending reduction state we discarded the result and need to remove the pending reduction.
533
                                if(buffer.pending_reduction.has_value()) {
3,271✔
534
                                        m_completed_reductions.push_back(buffer.pending_reduction->rid);
1✔
535
                                        buffer.pending_reduction = std::nullopt;
1✔
536
                                }
537

538
                                per_buffer_local_writes.emplace(bid, produced);
3,271✔
539
                        }
540

541
                        if(m_policy.uninitialized_read_error != error_policy::ignore
4,961✔
542
                            && !bounding_box(buffer.initialized_region).covers(bounding_box(consumed.get_boxes()))) {
4,961✔
543
                                utils::report_error(m_policy.uninitialized_read_error,
15✔
544
                                    "Command C{} on N{}, which executes {} of {}, reads {} {}, which has not been written by any node.", cmd->get_cid(), m_local_nid,
6✔
545
                                    box(subrange(a_chunk.chnk.offset, a_chunk.chnk.range)), print_task_debug_label(tsk), print_buffer_debug_label(bid),
27✔
546
                                    region_difference(consumed, buffer.initialized_region));
9✔
547
                        }
548
                }
549
        }
550

551
        // Mark any buffers that now are in a pending reduction state as such.
552
        // If there is only one chunk/command, it already implicitly generates the final reduced value
553
        // and the buffer does not need to be flagged as a pending reduction.
554
        for(const auto& reduction : tsk.get_reductions()) {
5,205✔
555
                if(chunks.size() > 1) {
164✔
556
                        m_buffers.at(reduction.bid).pending_reduction = reduction;
128✔
557
                } else {
558
                        m_completed_reductions.push_back(reduction.rid);
36✔
559
                }
560
        }
561

562
        update_local_buffer_fresh_regions(tsk, per_buffer_local_writes);
5,041✔
563
        process_task_side_effect_requirements(tsk);
5,041✔
564
}
10,097✔
565

566
void command_graph_generator::generate_anti_dependencies(
3,727✔
567
    task_id tid, buffer_id bid, const region_map<write_command_state>& last_writers_map, const region<3>& write_req, abstract_command* write_cmd) {
568
        const auto last_writers = last_writers_map.get_region_values(write_req);
3,727✔
569
        for(const auto& [box, wcs] : last_writers) {
7,569✔
570
                auto* const last_writer_cmd = m_cdag.get(static_cast<command_id>(wcs));
3,842✔
571
                assert(!utils::isa<task_command>(last_writer_cmd) || utils::as<task_command>(last_writer_cmd)->get_tid() != tid);
3,842✔
572

573
                // Add anti-dependencies onto all successors of the writer
574
                bool has_successors = false;
3,842✔
575
                for(auto d : last_writer_cmd->get_dependents()) {
9,640✔
576
                        // Only consider true dependencies
577
                        if(d.kind != dependency_kind::true_dep) continue;
5,798✔
578

579
                        auto* const cmd = d.node;
5,494✔
580

581
                        // We might have already generated new commands within the same task that also depend on this; in that case, skip it
582
                        if(utils::isa<task_command>(cmd) && utils::as<task_command>(cmd)->get_tid() == tid) continue;
5,494!
583

584
                        // So far we don't know whether the dependent actually intersects with the subrange we're writing
585
                        if(const auto command_reads_it = m_command_buffer_reads.find(cmd->get_cid()); command_reads_it != m_command_buffer_reads.end()) {
2,825✔
586
                                const auto& command_reads = command_reads_it->second;
1,179✔
587
                                // The task might be a dependent because of another buffer
588
                                if(const auto buffer_reads_it = command_reads.find(bid); buffer_reads_it != command_reads.end()) {
1,179✔
589
                                        if(!region_intersection(write_req, buffer_reads_it->second).empty()) {
1,128✔
590
                                                has_successors = true;
974✔
591
                                                add_dependency(write_cmd, cmd, dependency_kind::anti_dep, dependency_origin::dataflow);
974✔
592
                                        }
593
                                }
594
                        }
595
                }
596

597
                // In some cases (horizons, master node host task, weird discard_* constructs...)
598
                // the last writer might not have any successors. Just add the anti-dependency onto the writer itself then.
599
                if(!has_successors) { add_dependency(write_cmd, last_writer_cmd, dependency_kind::anti_dep, dependency_origin::dataflow); }
3,842✔
600
        }
601
}
7,454✔
602

603
void command_graph_generator::process_task_side_effect_requirements(const task& tsk) {
5,041✔
604
        const task_id tid = tsk.get_id();
5,041✔
605
        if(tsk.get_side_effect_map().empty()) return; // skip the loop in the common case
5,041✔
606
        if(m_cdag.task_command_count(tid) == 0) return;
212✔
607

608
        for(auto* const cmd : m_cdag.task_commands(tid)) {
280✔
609
                for(const auto& side_effect : tsk.get_side_effect_map()) {
291✔
610
                        const auto [hoid, order] = side_effect;
151✔
611
                        auto& host_object = m_host_objects.at(hoid);
151✔
612

613
                        if(host_object.last_side_effect.has_value()) {
151!
614
                                // TODO once we have different side_effect_orders, their interaction will determine the dependency kind
615
                                add_dependency(cmd, m_cdag.get(*host_object.last_side_effect), dependency_kind::true_dep, dependency_origin::dataflow);
151✔
616
                        }
617

618
                        // Simplification: If there are multiple chunks per node, we generate true-dependencies between them in an arbitrary order, when all we really
619
                        // need is mutual exclusion (i.e. a bi-directional pseudo-dependency).
620
                        host_object.last_side_effect = cmd->get_cid();
151✔
621
                }
622
        }
623
}
624

625
void command_graph_generator::set_epoch_for_new_commands(const abstract_command* const epoch_or_horizon) {
1,458✔
626
        // both an explicit epoch command and an applied horizon can be effective epochs
627
        assert(utils::isa<epoch_command>(epoch_or_horizon) || utils::isa<horizon_command>(epoch_or_horizon));
1,458✔
628

629
        for(auto& [bid, bs] : m_buffers) {
2,626✔
630
                bs.local_last_writer.apply_to_values([epoch_or_horizon](const write_command_state& wcs) {
1,168✔
631
                        auto new_wcs = write_command_state(std::max(epoch_or_horizon->get_cid(), static_cast<command_id>(wcs)), wcs.is_replicated());
2,263✔
632
                        if(!wcs.is_fresh()) new_wcs.mark_as_stale();
2,263✔
633
                        return new_wcs;
2,263✔
634
                });
635
        }
636
        for(auto& [cgid, cid] : m_last_collective_commands) {
1,516✔
637
                cid = std::max(epoch_or_horizon->get_cid(), cid);
58✔
638
        }
639
        for(auto& [_, host_object] : m_host_objects) {
1,522✔
640
                if(host_object.last_side_effect.has_value()) { host_object.last_side_effect = std::max(epoch_or_horizon->get_cid(), *host_object.last_side_effect); }
64!
641
        }
642

643
        m_epoch_for_new_commands = epoch_or_horizon->get_cid();
1,458✔
644
}
1,458✔
645

646
void command_graph_generator::reduce_execution_front_to(abstract_command* const new_front) {
1,520✔
647
        const auto previous_execution_front = m_cdag.get_execution_front();
1,520✔
648
        for(auto* const front_cmd : previous_execution_front) {
7,000✔
649
                if(front_cmd != new_front) { add_dependency(new_front, front_cmd, dependency_kind::true_dep, dependency_origin::execution_front); }
5,480✔
650
        }
651
        assert(m_cdag.get_execution_front().size() == 1 && *m_cdag.get_execution_front().begin() == new_front);
1,520✔
652
}
3,040✔
653

654
void command_graph_generator::generate_epoch_command(const task& tsk) {
459✔
655
        assert(tsk.get_type() == task_type::epoch);
459✔
656
        auto* const epoch = create_command<epoch_command>(
459✔
657
            tsk.get_id(), tsk.get_epoch_action(), std::move(m_completed_reductions), [&](const auto& record_debug_info) { record_debug_info(tsk); });
1,107✔
658
        set_epoch_for_new_commands(epoch);
459✔
659
        m_current_horizon = no_command;
459✔
660
        // Make the epoch depend on the previous execution front
661
        reduce_execution_front_to(epoch);
459✔
662
}
459✔
663

664
void command_graph_generator::generate_horizon_command(const task& tsk) {
1,061✔
665
        assert(tsk.get_type() == task_type::horizon);
1,061✔
666
        auto* const horizon =
667
            create_command<horizon_command>(tsk.get_id(), std::move(m_completed_reductions), [&](const auto& record_debug_info) { record_debug_info(tsk); });
1,295✔
668

669
        if(m_current_horizon != static_cast<command_id>(no_command)) {
1,061✔
670
                // Apply the previous horizon
671
                set_epoch_for_new_commands(m_cdag.get(m_current_horizon));
999✔
672
        }
673
        m_current_horizon = horizon->get_cid();
1,061✔
674

675
        // Make the horizon depend on the previous execution front
676
        reduce_execution_front_to(horizon);
1,061✔
677
}
1,061✔
678

679
void command_graph_generator::generate_epoch_dependencies(abstract_command* cmd) {
6,821✔
680
        // No command must be re-ordered before its last preceding epoch to enforce the barrier semantics of epochs.
681
        // To guarantee that each node has a transitive true dependency (=temporal dependency) on the epoch, it is enough to add an epoch -> command dependency
682
        // to any command that has no other true dependencies itself and no graph traversal is necessary. This can be verified by a simple induction proof.
683

684
        // As long the first epoch is present in the graph, all transitive dependencies will be visible and the initial epoch commands (tid 0) are the only
685
        // commands with no true predecessor. As soon as the first epoch is pruned through the horizon mechanism however, more than one node with no true
686
        // predecessor can appear (when visualizing the graph). This does not violate the ordering constraint however, because all "free-floating" nodes
687
        // in that snapshot had a true-dependency chain to their predecessor epoch at the point they were flushed, which is sufficient for following the
688
        // dependency chain from the executor perspective.
689

690
        if(const auto deps = cmd->get_dependencies();
6,821✔
691
            std::none_of(deps.begin(), deps.end(), [](const abstract_command::dependency d) { return d.kind == dependency_kind::true_dep; })) {
12,511✔
692
                assert(cmd->get_cid() != m_epoch_for_new_commands);
2,110✔
693
                add_dependency(cmd, m_cdag.get(m_epoch_for_new_commands), dependency_kind::true_dep, dependency_origin::last_epoch);
2,110✔
694
        }
695
}
6,821✔
696

697
void command_graph_generator::prune_commands_before(const command_id epoch) {
6,561✔
698
        if(epoch > m_epoch_last_pruned_before) {
6,561✔
699
                m_cdag.erase_if([&](abstract_command* cmd) {
1,077✔
700
                        if(cmd->get_cid() < epoch) {
10,555✔
701
                                m_command_buffer_reads.erase(cmd->get_cid());
4,332✔
702
                                return true;
4,332✔
703
                        }
704
                        return false;
6,223✔
705
                });
706
                m_epoch_last_pruned_before = epoch;
1,077✔
707
        }
708
}
6,561✔
709

710
std::string command_graph_generator::print_buffer_debug_label(const buffer_id bid) const {
18✔
711
        return utils::make_buffer_debug_label(bid, m_buffers.at(bid).debug_name);
18✔
712
}
713

714
} // namespace celerity::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc