• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 11219046259

07 Oct 2024 03:38PM UTC coverage: 95.304% (+0.2%) from 95.087%
11219046259

Pull #289

github

web-flow
Merge 1841ca955 into b1cb3bbf5
Pull Request #289: Refactor command graph generation, bring testing infrastructure up to speed with IDAG

2965 of 3342 branches covered (88.72%)

Branch coverage included in aggregate %.

326 of 329 new or added lines in 5 files covered. (99.09%)

6 existing lines in 2 files now uncovered.

6635 of 6731 relevant lines covered (98.57%)

1477351.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.13
/src/command_graph_generator.cc
1
#include "command_graph_generator.h"
2

3
#include "access_modes.h"
4
#include "command.h"
5
#include "command_graph.h"
6
#include "recorders.h"
7
#include "split.h"
8
#include "task.h"
9
#include "task_manager.h"
10

11
namespace celerity::detail {
12

13
command_graph_generator::command_graph_generator(
552✔
14
    const size_t num_nodes, const node_id local_nid, command_graph& cdag, const task_manager& tm, detail::command_recorder* recorder, const policy_set& policy)
552✔
15
    : m_num_nodes(num_nodes), m_local_nid(local_nid), m_policy(policy), m_cdag(cdag), m_task_mngr(tm), m_recorder(recorder) {
552✔
16
        if(m_num_nodes > max_num_nodes) {
552!
17
                throw std::runtime_error(fmt::format("Number of nodes requested ({}) exceeds compile-time maximum of {}", m_num_nodes, max_num_nodes));
×
18
        }
19

20
        // Build initial epoch command (this is required to properly handle anti-dependencies on host-initialized buffers).
21
        // We manually generate the first command, this will be replaced by applied horizons or explicit epochs down the line (see
22
        // set_epoch_for_new_commands).
23
        auto* const epoch_cmd = cdag.create<epoch_command>(task_manager::initial_epoch_task, epoch_action::none, std::vector<reduction_id>{});
552✔
24
        if(is_recording()) { m_recorder->record_command(std::make_unique<epoch_command_record>(*epoch_cmd, *tm.get_task(task_manager::initial_epoch_task))); }
552✔
25
        m_epoch_for_new_commands = epoch_cmd->get_cid();
552✔
26
}
552✔
27

28
void command_graph_generator::notify_buffer_created(const buffer_id bid, const range<3>& range, bool host_initialized) {
658✔
29
        m_buffers.emplace(std::piecewise_construct, std::tuple{bid}, std::tuple{range, range});
658✔
30
        if(host_initialized && m_policy.uninitialized_read_error != error_policy::ignore) { m_buffers.at(bid).initialized_region = box(subrange({}, range)); }
730✔
31
        // Mark contents as available locally (= don't generate await push commands) and fully replicated (= don't generate push commands).
32
        // This is required when tasks access host-initialized or uninitialized buffers.
33
        m_buffers.at(bid).local_last_writer.update_region(subrange<3>({}, range), m_epoch_for_new_commands);
1,974✔
34
        m_buffers.at(bid).replicated_regions.update_region(subrange<3>({}, range), node_bitset{}.set());
1,974✔
35
}
658✔
36

37
void command_graph_generator::notify_buffer_debug_name_changed(const buffer_id bid, const std::string& debug_name) {
23✔
38
        m_buffers.at(bid).debug_name = debug_name;
23✔
39
}
23✔
40

41
void command_graph_generator::notify_buffer_destroyed(const buffer_id bid) {
492✔
42
        assert(m_buffers.count(bid) != 0);
492✔
43
        m_buffers.erase(bid);
492✔
44
}
492✔
45

46
void command_graph_generator::notify_host_object_created(const host_object_id hoid) {
62✔
47
        assert(m_host_objects.count(hoid) == 0);
62✔
48
        m_host_objects.emplace(hoid, host_object_state{m_epoch_for_new_commands});
62✔
49
}
62✔
50

51
void command_graph_generator::notify_host_object_destroyed(const host_object_id hoid) {
49✔
52
        assert(m_host_objects.count(hoid) != 0);
49✔
53
        m_host_objects.erase(hoid);
49✔
54
}
49✔
55

56
using buffer_requirements_map = std::unordered_map<buffer_id, std::unordered_map<access_mode, region<3>>>;
57

58
static buffer_requirements_map get_buffer_requirements_for_mapped_access(const task& tsk, subrange<3> sr, const range<3> global_size) {
11,802✔
59
        buffer_requirements_map result;
11,802✔
60
        const auto& access_map = tsk.get_buffer_access_map();
11,802✔
61
        const auto buffers = access_map.get_accessed_buffers();
11,802✔
62
        for(const buffer_id bid : buffers) {
21,929✔
63
                const auto modes = access_map.get_access_modes(bid);
10,127✔
64
                for(auto m : modes) {
20,912✔
65
                        result[bid][m] = access_map.get_mode_requirements(bid, m, tsk.get_dimensions(), sr, global_size);
10,785✔
66
                }
67
        }
10,127✔
68
        return result;
11,802✔
69
}
11,802✔
70

71
// According to Wikipedia https://en.wikipedia.org/wiki/Topological_sorting#Depth-first_search
72
std::vector<abstract_command*> sort_topologically(command_set unmarked) {
5,611✔
73
        command_set temporary_marked;
5,611✔
74
        command_set permanent_marked;
5,611✔
75
        std::vector<abstract_command*> sorted(unmarked.size());
16,833✔
76
        auto sorted_front = sorted.rbegin();
5,611✔
77

78
        const auto visit = [&](abstract_command* const cmd, auto& visit /* to allow recursion in lambda */) {
5,611✔
79
                if(permanent_marked.count(cmd) != 0) return;
6,968✔
80
                assert(temporary_marked.count(cmd) == 0 && "cyclic command graph");
6,541✔
81
                unmarked.erase(cmd);
6,541✔
82
                temporary_marked.insert(cmd);
6,541✔
83
                for(const auto dep : cmd->get_dependents()) {
7,011✔
84
                        visit(dep.node, visit);
470✔
85
                }
86
                temporary_marked.erase(cmd);
6,541✔
87
                permanent_marked.insert(cmd);
6,541✔
88
                *sorted_front++ = cmd;
6,541✔
89
        };
5,611✔
90

91
        while(!unmarked.empty()) {
12,109✔
92
                visit(*unmarked.begin(), visit);
6,498✔
93
        }
94

95
        return sorted;
11,222✔
96
}
5,611✔
97

98
command_set command_graph_generator::build_task(const task& tsk) {
6,539✔
99
        assert(m_current_cmd_batch.empty());
6,539✔
100
        [[maybe_unused]] const auto cmd_count_before = m_cdag.command_count();
6,539✔
101

102
        const auto epoch_to_prune_before = m_epoch_for_new_commands;
6,539✔
103

104
        switch(tsk.get_type()) {
6,539!
105
        case task_type::epoch: generate_epoch_command(tsk); break;
459✔
106
        case task_type::horizon: generate_horizon_command(tsk); break;
1,057✔
107
        case task_type::device_compute:
5,023✔
108
        case task_type::host_compute:
109
        case task_type::master_node:
110
        case task_type::collective:
111
        case task_type::fence: generate_distributed_commands(tsk); break;
5,023✔
112
        default: throw std::runtime_error("Task type NYI");
×
113
        }
114

115
        // It is currently undefined to split reduction-producer tasks into multiple chunks on the same node:
116
        //   - Per-node reduction intermediate results are stored with fixed access to a single backing buffer,
117
        //     so multiple chunks on the same node will race on this buffer access
118
        //   - Inputs to the final reduction command are ordered by origin node ids to guarantee bit-identical results. It is not possible to distinguish
119
        //     more than one chunk per node in the serialized commands, so different nodes can produce different final reduction results for non-associative
120
        //     or non-commutative operations
121
        if(!tsk.get_reductions().empty()) { assert(m_cdag.task_command_count(tsk.get_id()) <= 1); }
6,533!
122

123
        // Commands without any other true-dependency must depend on the active epoch command to ensure they cannot be re-ordered before the epoch.
124
        // Need to check count b/c for some tasks we may not have generated any commands locally.
125
        if(m_cdag.task_command_count(tsk.get_id()) > 0) {
6,533✔
126
                for(auto* const cmd : m_cdag.task_commands(tsk.get_id())) {
12,662✔
127
                        generate_epoch_dependencies(cmd);
6,331✔
128
                }
129
        }
130

131
        // Check that all commands have been created through create_command
132
        assert(m_cdag.command_count() - cmd_count_before == m_current_cmd_batch.size());
6,533✔
133

134
        // If a new epoch was completed in the CDAG before the current task, prune all predecessor commands of that epoch.
135
        prune_commands_before(epoch_to_prune_before);
6,533✔
136

137
        // Check that all commands have been recorded
138
        if(is_recording()) {
6,533✔
139
                assert(std::all_of(m_current_cmd_batch.begin(), m_current_cmd_batch.end(), [this](const abstract_command* cmd) {
49,510✔
140
                        return m_recorder->get_all().end()
141
                               != std::find_if(m_recorder->get_all().begin(), m_recorder->get_all().end(),
142
                                   [cmd](const std::unique_ptr<command_record>& rec) { return rec->id == cmd->get_cid(); });
143
                }));
144
        }
145

146
        return std::move(m_current_cmd_batch);
13,066✔
147
}
148

149
void command_graph_generator::report_overlapping_writes(const task& tsk, const box_vector<3>& local_chunks) const {
5,015✔
150
        const chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()};
5,015✔
151

152
        // Since this check is run distributed on every node, we avoid quadratic behavior by only checking for conflicts between all local chunks and the
153
        // region-union of remote chunks. This way, every conflict will be reported by at least one node.
154
        const box<3> global_chunk(subrange(full_chunk.offset, full_chunk.range));
5,015✔
155
        auto remote_chunks = region_difference(global_chunk, region(box_vector<3>(local_chunks))).into_boxes();
5,015✔
156

157
        // detect_overlapping_writes takes a single box_vector, so we concatenate local and global chunks (the order does not matter)
158
        auto distributed_chunks = std::move(remote_chunks);
5,015✔
159
        distributed_chunks.insert(distributed_chunks.end(), local_chunks.begin(), local_chunks.end());
5,015✔
160

161
        if(const auto overlapping_writes = detect_overlapping_writes(tsk, distributed_chunks); !overlapping_writes.empty()) {
5,015✔
162
                auto error = fmt::format("{} has overlapping writes between multiple nodes in", print_task_debug_label(tsk, true /* title case */));
28✔
163
                for(const auto& [bid, overlap] : overlapping_writes) {
28✔
164
                        fmt::format_to(std::back_inserter(error), " {} {}", print_buffer_debug_label(bid), overlap);
28✔
165
                }
166
                error += ". Choose a non-overlapping range mapper for this write access or constrain the split via experimental::constrain_split to make the access "
167
                         "non-overlapping.";
14✔
168
                utils::report_error(m_policy.overlapping_write_error, "{}", error);
14✔
169
        }
5,029✔
170
}
10,030✔
171

172
std::vector<command_graph_generator::assigned_chunk> command_graph_generator::split_task_and_assign_chunks(const task& tsk) const {
5,023✔
173
        const chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()};
5,023✔
174
        const size_t num_chunks = m_num_nodes * 1; // TODO Make configurable
5,023✔
175
        const auto chunks = ([&] {
10,046✔
176
                if(tsk.get_type() == task_type::collective || tsk.get_type() == task_type::fence) {
5,023✔
177
                        std::vector<chunk<3>> chunks;
152✔
178
                        for(size_t nid = 0; nid < m_num_nodes; ++nid) {
519✔
179
                                chunks.push_back(chunk_cast<3>(chunk<1>{id<1>{tsk.get_type() == task_type::collective ? nid : 0}, ones, {m_num_nodes}}));
367✔
180
                        }
181
                        return chunks;
152✔
182
                }
152✔
183
                if(tsk.has_variable_split()) {
4,871✔
184
                        if(tsk.get_hint<experimental::hints::split_1d>() != nullptr) {
1,430✔
185
                                // no-op, keeping this for documentation purposes
186
                        }
187
                        if(tsk.get_hint<experimental::hints::split_2d>() != nullptr) { return split_2d(full_chunk, tsk.get_granularity(), num_chunks); }
1,430✔
188
                        return split_1d(full_chunk, tsk.get_granularity(), num_chunks);
2,834✔
189
                }
190
                return std::vector<chunk<3>>{full_chunk};
10,323✔
191
        })();
5,023✔
192
        assert(chunks.size() <= num_chunks); // We may have created less than requested
5,023✔
193
        assert(!chunks.empty());
5,023✔
194

195
        // Assign each chunk to a node
196
        // We assign chunks next to each other to the same worker (if there is more chunks than workers), as this is likely to produce less
197
        // transfers between tasks than a round-robin assignment (for typical stencil codes).
198
        // FIXME: This only works if the number of chunks is an integer multiple of the number of workers, e.g. 3 chunks for 2 workers degrades to RR.
199
        const auto chunks_per_node = std::max<size_t>(1, chunks.size() / m_num_nodes);
5,023✔
200

201
        std::vector<assigned_chunk> assigned_chunks;
5,023✔
202
        for(size_t i = 0; i < chunks.size(); ++i) {
11,808✔
203
                const node_id nid = (i / chunks_per_node) % m_num_nodes;
6,785✔
204
                assigned_chunks.push_back({nid, chunks[i]});
6,785✔
205
        }
206
        return assigned_chunks;
10,046✔
207
}
5,023✔
208

209
const box<3> empty_reduction_box({0, 0, 0}, {0, 0, 0});
210
const box<3> scalar_reduction_box({0, 0, 0}, {1, 1, 1});
211

212
command_graph_generator::assigned_chunks_with_requirements command_graph_generator::compute_per_chunk_requirements(
5,023✔
213
    const task& tsk, const std::vector<assigned_chunk>& assigned_chunks) const {
214
        assigned_chunks_with_requirements result;
5,023✔
215

216
        for(auto& a_chunk : assigned_chunks) {
11,808✔
217
                const node_id nid = a_chunk.executed_on;
6,785✔
218
                auto requirements = get_buffer_requirements_for_mapped_access(tsk, a_chunk.chnk, tsk.get_global_size());
6,785✔
219

220
                // Add read/write requirements for reductions performed in this task.
221
                for(const auto& reduction : tsk.get_reductions()) {
7,201✔
222
                        auto rmode = access_mode::discard_write;
416✔
223
                        if(nid == reduction_initializer_nid && reduction.init_from_buffer) { rmode = access_mode::read_write; }
416✔
224
#ifndef NDEBUG
225
                        for(auto pmode : access::producer_modes) {
2,496✔
226
                                assert(requirements[reduction.bid].count(pmode) == 0); // task_manager verifies that there are no reduction <-> write-access conflicts
2,080✔
227
                        }
228
#endif
229
                        requirements[reduction.bid][rmode] = scalar_reduction_box;
416✔
230
                }
231

232
                if(nid == m_local_nid) {
6,785✔
233
                        result.local_chunks.push_back({a_chunk, requirements});
4,821✔
234
                } else {
235
                        result.remote_chunks.push_back({a_chunk, requirements});
1,964✔
236
                }
237
        }
6,785✔
238

239
        return result;
5,023✔
NEW
240
}
×
241

242
void command_graph_generator::resolve_pending_reductions(const task& tsk, const assigned_chunks_with_requirements& chunks_with_requirements) {
5,023✔
243
        // Buffers that currently are in a pending reduction state will receive a new buffer state after a reduction has been generated.
244
        std::unordered_map<buffer_id, buffer_state> post_reduction_buffers;
5,023✔
245

246
        std::unordered_map<buffer_id, size_t> number_of_participating_nodes;
5,023✔
247
        const auto process_chunks = [&](auto& chunks) {
5,023✔
248
                for(auto& [a_chunk, requirements] : chunks) {
16,831✔
249
                        const node_id nid = a_chunk.executed_on;
6,785✔
250
                        const bool is_local_chunk = nid == m_local_nid;
6,785✔
251

252
                        for(auto& [bid, reqs_by_mode] : requirements) {
13,454✔
253
                                auto& buffer = m_buffers.at(bid);
6,493✔
254
                                if(!buffer.pending_reduction.has_value()) continue;
6,493✔
255
                                bool has_consumer = false;
199✔
256
                                for(const auto mode : detail::access::consumer_modes) {
477✔
257
                                        if(auto req_it = reqs_by_mode.find(mode); req_it != reqs_by_mode.end()) {
454✔
258
                                                // While uncommon, we do support chunks that don't require access to a particular buffer at all.
259
                                                if(!req_it->second.empty()) {
177✔
260
                                                        has_consumer = true;
176✔
261
                                                        break;
176✔
262
                                                }
263
                                        }
264
                                }
265

266
                                if(!has_consumer) {
199✔
267
                                        // TODO the per-node reduction result is discarded - warn user about dead store
268
                                        continue;
23✔
269
                                }
270

271
                                const auto& reduction = *buffer.pending_reduction;
176✔
272

273
                                const auto local_last_writer = buffer.local_last_writer.get_region_values(scalar_reduction_box);
176✔
274
                                assert(local_last_writer.size() == 1);
176✔
275

276
                                // Prepare the buffer state for after the reduction has been performed:
277
                                // Keep the current last writer, but mark it as stale, so that if we don't generate a reduction command locally,
278
                                // we'll know to get the data from elsewhere. If we generate a reduction command, this will be overwritten by its command id.
279
                                write_command_state wcs{static_cast<command_id>(local_last_writer[0].second)};
176✔
280
                                wcs.mark_as_stale();
176✔
281

282
                                // We just treat this buffer as 1-dimensional, regardless of its actual dimensionality (as it must be unit-sized anyway)
283
                                auto [it, _] = post_reduction_buffers.emplace(std::piecewise_construct, std::tuple{bid},
528✔
284
                                    std::tuple{region_map<write_command_state>{ones, wcs}, region_map<node_bitset>{ones, node_bitset{}}});
352✔
285
                                auto& post_reduction_buffer = it->second;
176✔
286

287
                                if(m_policy.uninitialized_read_error != error_policy::ignore) { post_reduction_buffers.at(bid).initialized_region = scalar_reduction_box; }
176✔
288

289
                                if(is_local_chunk) {
176✔
290
                                        // We currently don't support multiple chunks on a single node for reductions (there is also -- for now -- no way to create multiple chunks,
291
                                        // as oversubscription is handled by the instruction graph).
292
                                        // NOTE: The number_of_participating_nodes check below relies on this being true
293
                                        assert(chunks_with_requirements.local_chunks.size() == 1);
62✔
294

295
                                        auto* const reduce_cmd = create_command<reduction_command>(reduction, local_last_writer[0].second.is_fresh() /* has_local_contribution */,
124✔
296
                                            [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
170✔
297

298
                                        // Only generate a true dependency on the last writer if this node participated in the intermediate result computation.
299
                                        if(local_last_writer[0].second.is_fresh()) {
62✔
300
                                                add_dependency(reduce_cmd, m_cdag.get(local_last_writer[0].second), dependency_kind::true_dep, dependency_origin::dataflow);
60✔
301
                                        }
302

303
                                        auto* const ap_cmd = create_command<await_push_command>(transfer_id(tsk.get_id(), bid, reduction.rid), scalar_reduction_box.get_subrange(),
124✔
304
                                            [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
170✔
305
                                        add_dependency(reduce_cmd, ap_cmd, dependency_kind::true_dep, dependency_origin::dataflow);
62✔
306
                                        generate_epoch_dependencies(ap_cmd);
62✔
307

308
                                        generate_anti_dependencies(tsk.get_id(), bid, buffer.local_last_writer, scalar_reduction_box, reduce_cmd);
62✔
309

310
                                        post_reduction_buffer.local_last_writer.update_box(scalar_reduction_box, reduce_cmd->get_cid());
62✔
311
                                        number_of_participating_nodes[bid]++; // We are participating
62✔
312
                                } else {
313
                                        const bool notification_only = !local_last_writer[0].second.is_fresh();
114✔
314

315
                                        // Push an empty range if we don't have any fresh data on this node
316
                                        const auto push_box = notification_only ? empty_reduction_box : scalar_reduction_box;
114✔
317

318
                                        auto* const push_cmd = create_command<push_command>(nid, transfer_id(tsk.get_id(), bid, reduction.rid), push_box.get_subrange(),
228✔
319
                                            [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
310✔
320

321
                                        if(notification_only) {
114✔
322
                                                generate_epoch_dependencies(push_cmd);
6✔
323
                                        } else {
324
                                                m_command_buffer_reads[push_cmd->get_cid()][bid] = region_union(m_command_buffer_reads[push_cmd->get_cid()][bid], scalar_reduction_box);
108✔
325
                                                add_dependency(push_cmd, m_cdag.get(local_last_writer[0].second), dependency_kind::true_dep, dependency_origin::dataflow);
108✔
326
                                        }
327

328
                                        // Mark the reduction result as replicated so we don't generate data transfers to this node
329
                                        // TODO: We need a way of updating regions in place! E.g. apply_to_values(box, callback)
330
                                        const auto replicated_box = post_reduction_buffer.replicated_regions.get_region_values(scalar_reduction_box);
114✔
331
                                        assert(replicated_box.size() == 1);
114✔
332
                                        for(const auto& [_, nodes] : replicated_box) {
228✔
333
                                                post_reduction_buffer.replicated_regions.update_box(scalar_reduction_box, node_bitset{nodes}.set(nid));
114✔
334
                                        }
335
                                        number_of_participating_nodes[bid]++; // This node is participating
114✔
336
                                }
114✔
337
                        }
338
                }
339
        };
10,046✔
340

341
        // Since the local reduction command overwrites the buffer contents that need to be pushed to other nodes, we need to process remote chunks first.
342
        // TODO: Replace with a C++20 join view once we have upgraded
343
        process_chunks(chunks_with_requirements.remote_chunks);
5,023✔
344
        process_chunks(chunks_with_requirements.local_chunks);
5,023✔
345

346
        // We currently do not support generating reduction commands on only a subset of nodes, except for the special case of a single command.
347
        // This is because it is unclear who owns the final result in this case (normally all nodes "own" the result).
348
        //   => I.e., reducing and using the result on the participating nodes is actually not the problem (this works as intended); the issue only arises
349
        //      if the result is subsequently required in other tasks. Since we don't have a good way of detecting this condition however, we currently
350
        //      disallow partial reductions altogether.
351
        for(auto& [bid, number_of_participating_nodes] : number_of_participating_nodes) {
5,138✔
352
                // NOTE: This check relies on the fact that we currently only support a single chunk per node for reductions (see assertion above).
353
                if(number_of_participating_nodes > 1 && number_of_participating_nodes != m_num_nodes) {
116✔
354
                        utils::report_error(error_policy::panic,
2✔
355
                            "Task T{} requires a reduction on buffer B{} that is not performed on all nodes. This is currently not supported. Either "
356
                            "ensure that all nodes receive a chunk that reads from the buffer, or reduce the data on a single node.",
357
                            tsk.get_id(), bid);
2✔
358
                }
359
        }
360

361
        // For buffers that were in a pending reduction state and a reduction was generated
362
        // (i.e., the result was not discarded), set their new state.
363
        for(auto& [bid, new_state] : post_reduction_buffers) {
5,137✔
364
                auto& buffer = m_buffers.at(bid);
115✔
365
                if(buffer.pending_reduction.has_value()) { m_completed_reductions.push_back(buffer.pending_reduction->rid); }
115!
366
                buffer = std::move(new_state);
115✔
367
        }
368
}
10,046✔
369

370
void command_graph_generator::generate_pushes(const task& tsk, const assigned_chunks_with_requirements& chunks_with_requirements) {
5,022✔
371
        for(auto& [a_chunk, requirements] : chunks_with_requirements.remote_chunks) {
6,985✔
372
                const node_id nid = a_chunk.executed_on;
1,963✔
373

374
                for(auto& [bid, reqs_by_mode] : requirements) {
4,433✔
375
                        auto& buffer = m_buffers.at(bid);
2,470✔
376

377
                        for(const auto& [mode, req] : reqs_by_mode) {
4,963✔
378
                                if(!detail::access::mode_traits::is_consumer(mode)) continue;
2,493✔
379
                                // We generate separate push command for each last writer command for now, possibly even multiple for partially already-replicated data.
380
                                // TODO: Can and/or should we consolidate?
381
                                const auto local_sources = buffer.local_last_writer.get_region_values(req);
1,693✔
382
                                for(const auto& [local_box, wcs] : local_sources) {
4,592✔
383
                                        if(!wcs.is_fresh() || wcs.is_replicated()) { continue; }
2,899✔
384

385
                                        // Make sure we don't push anything we've already pushed to this node before
386
                                        box_vector<3> non_replicated_boxes;
756✔
387
                                        for(const auto& [replicated_box, nodes] : buffer.replicated_regions.get_region_values(local_box)) {
1,610✔
388
                                                if(nodes.test(nid)) continue;
854✔
389
                                                non_replicated_boxes.push_back(replicated_box);
675✔
390
                                        }
756✔
391

392
                                        // Merge all connected boxes to determine final set of pushes
393
                                        const auto push_region = region<3>(std::move(non_replicated_boxes));
756✔
394
                                        for(auto& push_box : push_region.get_boxes()) {
1,365✔
395
                                                auto* const push_cmd = create_command<push_command>(nid, transfer_id(tsk.get_id(), bid, no_reduction_id), push_box.get_subrange(),
1,218✔
396
                                                    [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
1,895✔
397
                                                assert(!utils::isa<await_push_command>(m_cdag.get(wcs)) && "Attempting to push non-owned data?!");
609✔
398
                                                add_dependency(push_cmd, m_cdag.get(wcs), dependency_kind::true_dep, dependency_origin::dataflow);
609✔
399

400
                                                // Store the read access for determining anti-dependencies later on
401
                                                m_command_buffer_reads[push_cmd->get_cid()][bid] = push_box;
609✔
402
                                        }
403

404
                                        // Remember that we've replicated this region
405
                                        for(const auto& [replicated_box, nodes] : buffer.replicated_regions.get_region_values(push_region)) {
1,431✔
406
                                                buffer.replicated_regions.update_box(replicated_box, node_bitset{nodes}.set(nid));
675✔
407
                                        }
756✔
408
                                }
756✔
409
                        }
1,693✔
410
                }
411
        }
412
}
5,022✔
413

414
void command_graph_generator::generate_await_pushes(const task& tsk, const assigned_chunks_with_requirements& chunks_with_requirements) {
5,022✔
415
        for(auto& [a_chunk, requirements] : chunks_with_requirements.local_chunks) {
9,842✔
416
                for(auto& [bid, reqs_by_mode] : requirements) {
8,841✔
417
                        auto& buffer = m_buffers.at(bid);
4,021✔
418

419
                        for(const auto& [mode, req] : reqs_by_mode) {
8,356✔
420
                                if(!detail::access::mode_traits::is_consumer(mode)) continue;
4,335✔
421

422
                                const auto local_sources = buffer.local_last_writer.get_region_values(req);
3,398✔
423
                                box_vector<3> missing_part_boxes;
3,398✔
424
                                for(const auto& [box, wcs] : local_sources) {
7,995✔
425
                                        if(!box.empty() && !wcs.is_fresh()) { missing_part_boxes.push_back(box); }
4,597!
426
                                }
427

428
                                // There is data we don't yet have locally. Generate an await push command for it.
429
                                if(!missing_part_boxes.empty()) {
3,398✔
430
                                        const region missing_parts(std::move(missing_part_boxes));
386✔
431
                                        assert(m_num_nodes > 1);
386✔
432
                                        auto* const ap_cmd = create_command<await_push_command>(transfer_id(tsk.get_id(), bid, no_reduction_id), missing_parts,
386✔
433
                                            [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
842✔
434
                                        generate_anti_dependencies(tsk.get_id(), bid, buffer.local_last_writer, missing_parts, ap_cmd);
386✔
435
                                        generate_epoch_dependencies(ap_cmd);
386✔
436
                                        // Remember that we have this data now
437
                                        buffer.local_last_writer.update_region(missing_parts, {ap_cmd->get_cid(), true /* is_replicated */});
386✔
438
                                }
386✔
439
                        }
3,398✔
440
                }
441
        }
442
}
5,022✔
443

444
void command_graph_generator::update_local_buffer_freshness(const task& tsk, const std::unordered_map<buffer_id, region<3>>& per_buffer_local_writes) {
5,017✔
445
        auto requirements = get_buffer_requirements_for_mapped_access(tsk, subrange<3>(tsk.get_global_offset(), tsk.get_global_size()), tsk.get_global_size());
5,017✔
446
        // Add requirements for reductions
447
        for(const auto& reduction : tsk.get_reductions()) {
5,177✔
448
                // the actual mode is irrelevant as long as it's a producer - TODO have a better query API for task buffer requirements
449
                requirements[reduction.bid][access_mode::write] = scalar_reduction_box;
160✔
450
        }
451
        for(auto& [bid, reqs_by_mode] : requirements) {
9,227✔
452
                box_vector<3> global_write_boxes;
4,210✔
453
                for(const auto mode : access::producer_modes) {
25,260✔
454
                        if(reqs_by_mode.count(mode) == 0) continue;
21,050✔
455
                        const auto& by_mode = reqs_by_mode.at(mode);
3,340✔
456
                        global_write_boxes.insert(global_write_boxes.end(), by_mode.get_boxes().begin(), by_mode.get_boxes().end());
3,340✔
457
                }
458
                region global_writes(std::move(global_write_boxes));
4,210✔
459
                const auto remote_writes = ([&, bid = bid] {
8,420✔
460
                        if(auto it = per_buffer_local_writes.find(bid); it != per_buffer_local_writes.end()) {
4,210✔
461
                                const auto& local_writes = it->second;
3,267✔
462
                                assert(region_difference(local_writes, global_writes).empty()); // Local writes have to be a subset of global writes
3,267✔
463
                                return region_difference(global_writes, local_writes);
3,267✔
464
                        }
465
                        return std::move(global_writes);
943✔
466
                })(); // IIFE
4,210✔
467
                auto& buffer = m_buffers.at(bid);
4,210✔
468

469
                if(m_policy.uninitialized_read_error != error_policy::ignore) { buffer.initialized_region = region_union(buffer.initialized_region, global_writes); }
4,210✔
470

471
                // TODO: We need a way of updating regions in place! E.g. apply_to_values(box, callback)
472
                auto boxes_and_cids = buffer.local_last_writer.get_region_values(remote_writes);
4,210✔
473
                for(auto& [box, wcs] : boxes_and_cids) {
5,108✔
474
                        if(wcs.is_fresh()) {
898✔
475
                                wcs.mark_as_stale();
598✔
476
                                buffer.local_last_writer.update_region(box, wcs);
598✔
477
                        }
478
                }
479
        }
4,210✔
480
}
10,034✔
481

482
void command_graph_generator::generate_distributed_commands(const task& tsk) {
5,023✔
483
        const auto chunks = split_task_and_assign_chunks(tsk);
5,023✔
484
        const auto chunks_with_requirements = compute_per_chunk_requirements(tsk, chunks);
5,023✔
485

486
        resolve_pending_reductions(tsk, chunks_with_requirements);
5,023✔
487
        generate_pushes(tsk, chunks_with_requirements);
5,022✔
488
        generate_await_pushes(tsk, chunks_with_requirements);
5,022✔
489

490
        // Union of all per-buffer writes on this node, used to determine which parts of a buffer are fresh/stale later on.
491
        std::unordered_map<buffer_id, region<3>> per_buffer_local_writes;
5,022✔
492

493
        // Collect all local chunks for detecting overlapping writes between all local chunks and the union of remote chunks in a distributed manner.
494
        box_vector<3> local_chunks;
5,022✔
495

496
        // Create command for each chunk and resolve local data dependencies.
497
        for(auto& [a_chunk, requirements] : chunks_with_requirements.local_chunks) {
9,839✔
498
                const node_id nid = a_chunk.executed_on;
4,820✔
499

500
                abstract_command* cmd = nullptr;
4,820✔
501
                if(tsk.get_type() == task_type::fence) {
4,820✔
502
                        cmd = create_command<fence_command>(tsk.get_id(),
168✔
503
                            [&](const auto& record_debug_info) { record_debug_info(tsk, [this](const buffer_id bid) { return m_buffers.at(bid).debug_name; }); });
202✔
504
                } else {
505
                        cmd = create_command<execution_command>(tsk.get_id(), subrange{a_chunk.chnk},
9,472✔
506
                            [&](const auto& record_debug_info) { record_debug_info(tsk, [this](const buffer_id bid) { return m_buffers.at(bid).debug_name; }); });
11,755✔
507

508
                        // Go over all reductions that are to be performed *during* the execution of this chunk,
509
                        // not to be confused with any pending reductions that need to be finalized *before* the
510
                        // execution of this chunk.
511
                        // If a reduction reads the previous value of the buffer (i.e. w/o property::reduction::initialize_to_identity),
512
                        // we have to include it in exactly one of the per-node intermediate reductions.
513
                        for(const auto& reduction : tsk.get_reductions()) {
4,876✔
514
                                if(nid == reduction_initializer_nid && reduction.init_from_buffer) {
154✔
515
                                        utils::as<execution_command>(cmd)->set_is_reduction_initializer(true);
14✔
516
                                        break;
14✔
517
                                }
518
                        }
519
                }
520

521
                if(tsk.get_type() == task_type::collective) {
4,820✔
522
                        // Collective host tasks have an implicit dependency on the previous task in the same collective group,
523
                        // which is required in order to guarantee they are executed in the same order on every node.
524
                        auto cgid = tsk.get_collective_group_id();
68✔
525
                        if(auto prev = m_last_collective_commands.find(cgid); prev != m_last_collective_commands.end()) {
68✔
526
                                add_dependency(cmd, m_cdag.get(prev->second), dependency_kind::true_dep, dependency_origin::collective_group_serialization);
16✔
527
                                m_last_collective_commands.erase(prev);
16✔
528
                        }
529
                        m_last_collective_commands.emplace(cgid, cmd->get_cid());
68✔
530
                }
531

532
                local_chunks.push_back(subrange(a_chunk.chnk.offset, a_chunk.chnk.range));
4,820✔
533

534
                for(auto& [bid, reqs_by_mode] : requirements) {
8,838✔
535
                        auto& buffer = m_buffers.at(bid);
4,021✔
536

537
                        // Process consuming accesses first, so we don't add dependencies onto our own writes
538
                        region<3> uninitialized_reads;
4,021✔
539
                        region<3> all_reads;
4,021✔
540
                        for(const auto& [mode, req] : reqs_by_mode) {
8,356✔
541
                                if(!detail::access::mode_traits::is_consumer(mode)) continue;
4,335✔
542
                                all_reads = region_union(all_reads, req);
3,398✔
543
                                if(m_policy.uninitialized_read_error != error_policy::ignore
3,966✔
544
                                    && !bounding_box(buffer.initialized_region).covers(bounding_box(req.get_boxes()))) {
3,966✔
545
                                        uninitialized_reads = region_union(uninitialized_reads, region_difference(req, buffer.initialized_region));
3✔
546
                                }
547
                        }
548

549
                        if(!all_reads.empty()) {
4,021✔
550
                                for(const auto& [box, wcs] : buffer.local_last_writer.get_region_values(all_reads)) {
7,958✔
551
                                        if(box.empty()) continue;
4,589!
552
                                        assert(wcs.is_fresh() && "Unresolved remote data dependency");
4,589✔
553
                                        add_dependency(cmd, m_cdag.get(wcs), dependency_kind::true_dep, dependency_origin::dataflow);
4,589✔
554
                                }
3,369✔
555

556
                                // Store the read access for determining anti-dependencies later on
557
                                m_command_buffer_reads[cmd->get_cid()].emplace(bid, std::move(all_reads));
3,369✔
558
                        }
559

560
                        region<3> all_writes;
4,021✔
561
                        for(const auto& [mode, req] : reqs_by_mode) {
8,356✔
562
                                if(!detail::access::mode_traits::is_producer(mode)) continue;
4,335✔
563
                                all_writes = region_union(all_writes, req);
3,292✔
564
                        }
565

566
                        if(!all_writes.empty()) {
4,021✔
567
                                generate_anti_dependencies(tsk.get_id(), bid, buffer.local_last_writer, all_writes, cmd);
3,269✔
568

569
                                // Update last writer
570
                                buffer.local_last_writer.update_region(all_writes, cmd->get_cid());
3,269✔
571
                                buffer.replicated_regions.update_region(all_writes, node_bitset{});
3,269✔
572

573
                                // In case this buffer was in a pending reduction state we discarded the result and need to remove the pending reduction.
574
                                if(buffer.pending_reduction.has_value()) {
3,269✔
575
                                        m_completed_reductions.push_back(buffer.pending_reduction->rid);
1✔
576
                                        buffer.pending_reduction = std::nullopt;
1✔
577
                                }
578

579
                                per_buffer_local_writes.emplace(bid, std::move(all_writes));
3,269✔
580
                        }
581

582
                        if(!uninitialized_reads.empty()) {
4,021✔
583
                                utils::report_error(m_policy.uninitialized_read_error,
15✔
584
                                    "Command C{} on N{}, which executes {} of {}, reads {} {}, which has not been written by any node.", cmd->get_cid(), m_local_nid,
6✔
585
                                    box(subrange(a_chunk.chnk.offset, a_chunk.chnk.range)), print_task_debug_label(tsk), print_buffer_debug_label(bid),
27✔
586
                                    detail::region(std::move(uninitialized_reads)));
9✔
587
                        }
588
                }
4,027✔
589
        }
590

591
        // Check for and report overlapping writes between local chunks, and between local and remote chunks.
592
        if(m_policy.overlapping_write_error != error_policy::ignore) { report_overlapping_writes(tsk, local_chunks); }
5,019✔
593

594
        // Mark any buffers that now are in a pending reduction state as such.
595
        // If there is only one chunk/command, it already implicitly generates the final reduced value
596
        // and the buffer does not need to be flagged as a pending reduction.
597
        for(const auto& reduction : tsk.get_reductions()) {
5,177✔
598
                if(chunks.size() > 1) {
160✔
599
                        m_buffers.at(reduction.bid).pending_reduction = reduction;
124✔
600
                } else {
601
                        m_completed_reductions.push_back(reduction.rid);
36✔
602
                }
603
        }
604

605
        update_local_buffer_freshness(tsk, per_buffer_local_writes);
5,017✔
606
        process_task_side_effect_requirements(tsk);
5,017✔
607
}
10,056✔
608

609
void command_graph_generator::generate_anti_dependencies(
3,717✔
610
    task_id tid, buffer_id bid, const region_map<write_command_state>& last_writers_map, const region<3>& write_req, abstract_command* write_cmd) {
611
        const auto last_writers = last_writers_map.get_region_values(write_req);
3,717✔
612
        for(const auto& [box, wcs] : last_writers) {
7,547✔
613
                auto* const last_writer_cmd = m_cdag.get(static_cast<command_id>(wcs));
3,830✔
614
                assert(!utils::isa<task_command>(last_writer_cmd) || utils::as<task_command>(last_writer_cmd)->get_tid() != tid);
3,830✔
615

616
                // Add anti-dependencies onto all successors of the writer
617
                bool has_successors = false;
3,830✔
618
                for(auto d : last_writer_cmd->get_dependents()) {
9,604✔
619
                        // Only consider true dependencies
620
                        if(d.kind != dependency_kind::true_dep) continue;
5,774✔
621

622
                        auto* const cmd = d.node;
5,465✔
623

624
                        // We might have already generated new commands within the same task that also depend on this; in that case, skip it
625
                        if(utils::isa<task_command>(cmd) && utils::as<task_command>(cmd)->get_tid() == tid) continue;
5,465!
626

627
                        // So far we don't know whether the dependent actually intersects with the subrange we're writing
628
                        if(const auto command_reads_it = m_command_buffer_reads.find(cmd->get_cid()); command_reads_it != m_command_buffer_reads.end()) {
2,797✔
629
                                const auto& command_reads = command_reads_it->second;
1,167✔
630
                                // The task might be a dependent because of another buffer
631
                                if(const auto buffer_reads_it = command_reads.find(bid); buffer_reads_it != command_reads.end()) {
1,167✔
632
                                        if(!region_intersection(write_req, buffer_reads_it->second).empty()) {
1,116✔
633
                                                has_successors = true;
962✔
634
                                                add_dependency(write_cmd, cmd, dependency_kind::anti_dep, dependency_origin::dataflow);
962✔
635
                                        }
636
                                }
637
                        }
638
                }
639

640
                // In some cases (horizons, master node host task, weird discard_* constructs...)
641
                // the last writer might not have any successors. Just add the anti-dependency onto the writer itself then.
642
                if(!has_successors) { add_dependency(write_cmd, last_writer_cmd, dependency_kind::anti_dep, dependency_origin::dataflow); }
3,830✔
643
        }
644
}
7,434✔
645

646
void command_graph_generator::process_task_side_effect_requirements(const task& tsk) {
5,017✔
647
        const task_id tid = tsk.get_id();
5,017✔
648
        if(tsk.get_side_effect_map().empty()) return; // skip the loop in the common case
5,017✔
649
        if(m_cdag.task_command_count(tid) == 0) return;
212✔
650

651
        for(auto* const cmd : m_cdag.task_commands(tid)) {
280✔
652
                for(const auto& side_effect : tsk.get_side_effect_map()) {
291✔
653
                        const auto [hoid, order] = side_effect;
151✔
654
                        auto& host_object = m_host_objects.at(hoid);
151✔
655

656
                        if(host_object.last_side_effect.has_value()) {
151!
657
                                // TODO once we have different side_effect_orders, their interaction will determine the dependency kind
658
                                add_dependency(cmd, m_cdag.get(*host_object.last_side_effect), dependency_kind::true_dep, dependency_origin::dataflow);
151✔
659
                        }
660

661
                        // Simplification: If there are multiple chunks per node, we generate true-dependencies between them in an arbitrary order, when all we really
662
                        // need is mutual exclusion (i.e. a bi-directional pseudo-dependency).
663
                        host_object.last_side_effect = cmd->get_cid();
151✔
664
                }
665
        }
666
}
667

668
void command_graph_generator::set_epoch_for_new_commands(const abstract_command* const epoch_or_horizon) {
1,458✔
669
        // both an explicit epoch command and an applied horizon can be effective epochs
670
        assert(utils::isa<epoch_command>(epoch_or_horizon) || utils::isa<horizon_command>(epoch_or_horizon));
1,458✔
671

672
        for(auto& [bid, bs] : m_buffers) {
2,626✔
673
                bs.local_last_writer.apply_to_values([epoch_or_horizon](const write_command_state& wcs) {
1,168✔
674
                        auto new_wcs = write_command_state(std::max(epoch_or_horizon->get_cid(), static_cast<command_id>(wcs)), wcs.is_replicated());
2,263✔
675
                        if(!wcs.is_fresh()) new_wcs.mark_as_stale();
2,263✔
676
                        return new_wcs;
2,263✔
677
                });
678
        }
679
        for(auto& [cgid, cid] : m_last_collective_commands) {
1,516✔
680
                cid = std::max(epoch_or_horizon->get_cid(), cid);
58✔
681
        }
682
        for(auto& [_, host_object] : m_host_objects) {
1,522✔
683
                if(host_object.last_side_effect.has_value()) { host_object.last_side_effect = std::max(epoch_or_horizon->get_cid(), *host_object.last_side_effect); }
64!
684
        }
685

686
        m_epoch_for_new_commands = epoch_or_horizon->get_cid();
1,458✔
687
}
1,458✔
688

689
void command_graph_generator::reduce_execution_front_to(abstract_command* const new_front) {
1,516✔
690
        const auto previous_execution_front = m_cdag.get_execution_front();
1,516✔
691
        for(auto* const front_cmd : previous_execution_front) {
6,976✔
692
                if(front_cmd != new_front) { add_dependency(new_front, front_cmd, dependency_kind::true_dep, dependency_origin::execution_front); }
5,460✔
693
        }
694
        assert(m_cdag.get_execution_front().size() == 1 && *m_cdag.get_execution_front().begin() == new_front);
1,516✔
695
}
3,032✔
696

697
void command_graph_generator::generate_epoch_command(const task& tsk) {
459✔
698
        assert(tsk.get_type() == task_type::epoch);
459✔
699
        auto* const epoch = create_command<epoch_command>(
459✔
700
            tsk.get_id(), tsk.get_epoch_action(), std::move(m_completed_reductions), [&](const auto& record_debug_info) { record_debug_info(tsk); });
1,107✔
701
        set_epoch_for_new_commands(epoch);
459✔
702
        m_current_horizon = no_command;
459✔
703
        // Make the epoch depend on the previous execution front
704
        reduce_execution_front_to(epoch);
459✔
705
}
459✔
706

707
void command_graph_generator::generate_horizon_command(const task& tsk) {
1,057✔
708
        assert(tsk.get_type() == task_type::horizon);
1,057✔
709
        auto* const horizon =
710
            create_command<horizon_command>(tsk.get_id(), std::move(m_completed_reductions), [&](const auto& record_debug_info) { record_debug_info(tsk); });
1,287✔
711

712
        if(m_current_horizon != static_cast<command_id>(no_command)) {
1,057✔
713
                // Apply the previous horizon
714
                set_epoch_for_new_commands(m_cdag.get(m_current_horizon));
999✔
715
        }
716
        m_current_horizon = horizon->get_cid();
1,057✔
717

718
        // Make the horizon depend on the previous execution front
719
        reduce_execution_front_to(horizon);
1,057✔
720
}
1,057✔
721

722
void command_graph_generator::generate_epoch_dependencies(abstract_command* cmd) {
6,785✔
723
        // No command must be re-ordered before its last preceding epoch to enforce the barrier semantics of epochs.
724
        // To guarantee that each node has a transitive true dependency (=temporal dependency) on the epoch, it is enough to add an epoch -> command dependency
725
        // to any command that has no other true dependencies itself and no graph traversal is necessary. This can be verified by a simple induction proof.
726

727
        // As long the first epoch is present in the graph, all transitive dependencies will be visible and the initial epoch commands (tid 0) are the only
728
        // commands with no true predecessor. As soon as the first epoch is pruned through the horizon mechanism however, more than one node with no true
729
        // predecessor can appear (when visualizing the graph). This does not violate the ordering constraint however, because all "free-floating" nodes
730
        // in that snapshot had a true-dependency chain to their predecessor epoch at the point they were flushed, which is sufficient for following the
731
        // dependency chain from the executor perspective.
732

733
        if(const auto deps = cmd->get_dependencies();
6,785✔
734
            std::none_of(deps.begin(), deps.end(), [](const abstract_command::dependency d) { return d.kind == dependency_kind::true_dep; })) {
12,404✔
735
                assert(cmd->get_cid() != m_epoch_for_new_commands);
2,094✔
736
                add_dependency(cmd, m_cdag.get(m_epoch_for_new_commands), dependency_kind::true_dep, dependency_origin::last_epoch);
2,094✔
737
        }
738
}
6,785✔
739

740
void command_graph_generator::prune_commands_before(const command_id epoch) {
6,533✔
741
        if(epoch > m_epoch_last_pruned_before) {
6,533✔
742
                m_cdag.erase_if([&](abstract_command* cmd) {
1,077✔
743
                        if(cmd->get_cid() < epoch) {
10,555✔
744
                                m_command_buffer_reads.erase(cmd->get_cid());
4,332✔
745
                                return true;
4,332✔
746
                        }
747
                        return false;
6,223✔
748
                });
749
                m_epoch_last_pruned_before = epoch;
1,077✔
750
        }
751
}
6,533✔
752

753
std::string command_graph_generator::print_buffer_debug_label(const buffer_id bid) const {
17✔
754
        return utils::make_buffer_debug_label(bid, m_buffers.at(bid).debug_name);
17✔
755
}
756

757
} // namespace celerity::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc