• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 11665018161

04 Nov 2024 01:17PM UTC coverage: 95.167% (-0.08%) from 95.251%
11665018161

push

github

fknorr
Update benchmark results for region_builder

3016 of 3412 branches covered (88.39%)

Branch coverage included in aggregate %.

6691 of 6788 relevant lines covered (98.57%)

1312478.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.11
/src/command_graph_generator.cc
1
#include "command_graph_generator.h"
2

3
#include "access_modes.h"
4
#include "command_graph.h"
5
#include "grid.h"
6
#include "intrusive_graph.h"
7
#include "print_utils.h"
8
#include "ranges.h"
9
#include "recorders.h"
10
#include "region_map.h"
11
#include "split.h"
12
#include "task.h"
13
#include "types.h"
14
#include "utils.h"
15

16
#include <algorithm>
17
#include <cassert>
18
#include <cstddef>
19
#include <memory>
20
#include <tuple>
21
#include <unordered_set>
22
#include <utility>
23
#include <vector>
24

25

26
namespace celerity::detail {
27

28
command_graph_generator::command_graph_generator(
592✔
29
    const size_t num_nodes, const node_id local_nid, command_graph& cdag, detail::command_recorder* const recorder, const policy_set& policy)
592✔
30
    : m_num_nodes(num_nodes), m_local_nid(local_nid), m_policy(policy), m_cdag(&cdag), m_recorder(recorder) {
592✔
31
        if(m_num_nodes > max_num_nodes) {
592!
32
                throw std::runtime_error(fmt::format("Number of nodes requested ({}) exceeds compile-time maximum of {}", m_num_nodes, max_num_nodes));
×
33
        }
34
}
592✔
35

36
void command_graph_generator::notify_buffer_created(const buffer_id bid, const range<3>& range, bool host_initialized) {
707✔
37
        assert(m_epoch_for_new_commands != nullptr);
707✔
38
        assert(!m_buffers.contains(bid));
707✔
39
        // Mark contents as available locally (= don't generate await push commands) and fully replicated (= don't generate push commands).
40
        // This is required when tasks access host-initialized or uninitialized buffers.
41
        auto& buffer = m_buffers.emplace(std::piecewise_construct, std::tuple(bid), std::tuple(range, m_epoch_for_new_commands, node_bitset().set())).first->second;
707✔
42
        if(host_initialized && m_policy.uninitialized_read_error != error_policy::ignore) { buffer.initialized_region = box(subrange({}, range)); }
781✔
43
}
707✔
44

45
void command_graph_generator::notify_buffer_debug_name_changed(const buffer_id bid, const std::string& debug_name) {
23✔
46
        m_buffers.at(bid).debug_name = debug_name;
23✔
47
}
23✔
48

49
void command_graph_generator::notify_buffer_destroyed(const buffer_id bid) {
508✔
50
        assert(m_buffers.contains(bid));
508✔
51
        m_buffers.erase(bid);
508✔
52
}
508✔
53

54
void command_graph_generator::notify_host_object_created(const host_object_id hoid) {
62✔
55
        assert(m_epoch_for_new_commands != nullptr);
62✔
56
        assert(!m_host_objects.contains(hoid));
62✔
57
        m_host_objects.emplace(hoid, m_epoch_for_new_commands);
62✔
58
}
62✔
59

60
void command_graph_generator::notify_host_object_destroyed(const host_object_id hoid) {
49✔
61
        assert(m_host_objects.contains(hoid));
49✔
62
        m_host_objects.erase(hoid);
49✔
63
}
49✔
64

65
/// Returns whether an iterator range of commands is topologically sorted, i.e. sequential execution would satisfy all internal dependencies.
66
template <typename Iterator>
67
bool is_topologically_sorted(Iterator begin, Iterator end) {
5,188✔
68
        for(auto check = begin; check != end; ++check) {
11,252✔
69
                for(const auto dep : (*check)->get_dependencies()) {
14,831✔
70
                        if(std::find_if(std::next(check), end, [dep](const auto& node) { return node == dep.node; }) != end) return false;
19,376!
71
                }
72
        }
73
        return true;
5,188✔
74
}
75

76
std::vector<const command*> command_graph_generator::build_task(const task& tsk) {
5,195✔
77
        const auto epoch_to_prune_before = m_epoch_for_new_commands;
5,195✔
78
        batch current_batch;
5,195✔
79

80
        switch(tsk.get_type()) {
5,195!
81
        case task_type::epoch: generate_epoch_command(current_batch, tsk); break;
1,318✔
82
        case task_type::horizon: generate_horizon_command(current_batch, tsk); break;
808✔
83
        case task_type::device_compute:
3,069✔
84
        case task_type::host_compute:
85
        case task_type::master_node:
86
        case task_type::collective:
87
        case task_type::fence: generate_distributed_commands(current_batch, tsk); break;
3,069✔
88
        default: throw std::runtime_error("Task type NYI");
×
89
        }
90

91
        // It is currently undefined to split reduction-producer tasks into multiple chunks on the same node:
92
        //   - Per-node reduction intermediate results are stored with fixed access to a single backing buffer,
93
        //     so multiple chunks on the same node will race on this buffer access
94
        //   - Inputs to the final reduction command are ordered by origin node ids to guarantee bit-identical results. It is not possible to distinguish
95
        //     more than one chunk per node in the serialized commands, so different nodes can produce different final reduction results for non-associative
96
        //     or non-commutative operations
97
        if(!tsk.get_reductions().empty()) {
5,188✔
98
                assert(std::count_if(current_batch.begin(), current_batch.end(), [](const command* cmd) { return utils::isa<task_command>(cmd); }) <= 1);
328✔
99
        }
100

101
        // If a new epoch was completed in the CDAG before the current task, we can erase all tracking information from earlier commands.
102
        // After the epoch (or horizon) command has been executed, the scheduler will then delete all obsolete commands from the CDAG.
103
        if(epoch_to_prune_before != nullptr) {
5,188✔
104
                std::erase_if(m_command_buffer_reads, [=](const auto& cid_reads) { return cid_reads.first < epoch_to_prune_before->get_id(); });
21,399✔
105
        }
106

107
        // Check that all commands have been recorded
108
        if(is_recording()) {
5,188✔
109
                assert(std::all_of(current_batch.begin(), current_batch.end(), [this](const command* cmd) {
51,938✔
110
                        return std::any_of(m_recorder->get_graph_nodes().begin(), m_recorder->get_graph_nodes().end(),
111
                            [cmd](const std::unique_ptr<command_record>& rec) { return rec->id == cmd->get_id(); });
112
                }));
113
        }
114

115
        assert(is_topologically_sorted(current_batch.begin(), current_batch.end()));
5,188✔
116
        return current_batch;
5,188✔
117
}
7✔
118

119
void command_graph_generator::report_overlapping_writes(const task& tsk, const box_vector<3>& local_chunks) const {
3,065✔
120
        const chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()};
3,065✔
121

122
        // Since this check is run distributed on every node, we avoid quadratic behavior by only checking for conflicts between all local chunks and the
123
        // region-union of remote chunks. This way, every conflict will be reported by at least one node.
124
        const box<3> global_chunk(subrange(full_chunk.offset, full_chunk.range));
3,065✔
125
        auto remote_chunks = region_difference(global_chunk, region(box_vector<3>(local_chunks))).into_boxes();
3,065✔
126

127
        // detect_overlapping_writes takes a single box_vector, so we concatenate local and global chunks (the order does not matter)
128
        auto distributed_chunks = std::move(remote_chunks);
3,065✔
129
        distributed_chunks.insert(distributed_chunks.end(), local_chunks.begin(), local_chunks.end());
3,065✔
130

131
        if(const auto overlapping_writes = detect_overlapping_writes(tsk, distributed_chunks); !overlapping_writes.empty()) {
3,065✔
132
                auto error = fmt::format("{} has overlapping writes between multiple nodes in", print_task_debug_label(tsk, true /* title case */));
28✔
133
                for(const auto& [bid, overlap] : overlapping_writes) {
28✔
134
                        fmt::format_to(std::back_inserter(error), " {} {}", print_buffer_debug_label(bid), overlap);
28✔
135
                }
136
                error += ". Choose a non-overlapping range mapper for this write access or constrain the split via experimental::constrain_split to make the access "
137
                         "non-overlapping.";
14✔
138
                utils::report_error(m_policy.overlapping_write_error, "{}", error);
14✔
139
        }
3,079✔
140
}
6,130✔
141

142
std::vector<command_graph_generator::assigned_chunk> command_graph_generator::split_task_and_assign_chunks(const task& tsk) const {
3,069✔
143
        const chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()};
3,069✔
144
        const size_t num_chunks = m_num_nodes * m_test_chunk_multiplier;
3,069✔
145
        const auto chunks = ([&] {
6,138✔
146
                if(tsk.get_type() == task_type::collective || tsk.get_type() == task_type::fence) {
3,069✔
147
                        std::vector<chunk<3>> chunks;
164✔
148
                        for(size_t nid = 0; nid < m_num_nodes; ++nid) {
579✔
149
                                chunks.push_back(chunk_cast<3>(chunk<1>{id<1>{tsk.get_type() == task_type::collective ? nid : 0}, ones, {m_num_nodes}}));
415✔
150
                        }
151
                        return chunks;
164✔
152
                }
164✔
153
                if(tsk.has_variable_split()) {
2,905✔
154
                        if(tsk.get_hint<experimental::hints::split_1d>() != nullptr) {
1,535✔
155
                                // no-op, keeping this for documentation purposes
156
                        }
157
                        if(tsk.get_hint<experimental::hints::split_2d>() != nullptr) { return split_2d(full_chunk, tsk.get_granularity(), num_chunks); }
1,535✔
158
                        return split_1d(full_chunk, tsk.get_granularity(), num_chunks);
2,992✔
159
                }
160
                return std::vector<chunk<3>>{full_chunk};
4,110✔
161
        })();
3,069✔
162
        assert(chunks.size() <= num_chunks); // We may have created less than requested
3,069✔
163
        assert(!chunks.empty());
3,069✔
164

165
        // Assign each chunk to a node
166
        // We assign chunks next to each other to the same worker (if there is more chunks than workers), as this is likely to produce less
167
        // transfers between tasks than a round-robin assignment (for typical stencil codes).
168
        // FIXME: This only works if the number of chunks is an integer multiple of the number of workers, e.g. 3 chunks for 2 workers degrades to RR.
169
        const auto chunks_per_node = std::max<size_t>(1, chunks.size() / m_num_nodes);
3,069✔
170

171
        std::vector<assigned_chunk> assigned_chunks;
3,069✔
172
        for(size_t i = 0; i < chunks.size(); ++i) {
8,117✔
173
                const node_id nid = (i / chunks_per_node) % m_num_nodes;
5,048✔
174
                assigned_chunks.push_back({nid, chunks[i]});
5,048✔
175
        }
176
        return assigned_chunks;
6,138✔
177
}
3,069✔
178

179
command_graph_generator::buffer_requirements_list command_graph_generator::get_buffer_requirements_for_mapped_access(
8,110✔
180
    const task& tsk, const subrange<3>& sr, const range<3> global_size) const {
181
        buffer_requirements_list result;
8,110✔
182
        const auto& access_map = tsk.get_buffer_access_map();
8,110✔
183
        for(const buffer_id bid : access_map.get_accessed_buffers()) {
16,591✔
184
                region_builder<3> consumed;
8,481✔
185
                region_builder<3> produced;
8,481✔
186
                for(const auto m : access_map.get_access_modes(bid)) {
17,620✔
187
                        const auto req = access_map.get_mode_requirements(bid, m, tsk.get_dimensions(), sr, global_size);
9,139✔
188
                        if(detail::access::mode_traits::is_consumer(m)) { consumed.add(req); }
9,139✔
189
                        if(detail::access::mode_traits::is_producer(m)) { produced.add(req); } // not else: `access_mode::write` is both a consumer and a producer
9,139✔
190
                }
17,620✔
191
                result.push_back(buffer_requirements{bid, std::move(consumed).into_region(), std::move(produced).into_region()});
8,481!
192
        }
16,591✔
193
        return result;
8,110✔
194
}
×
195

196
const box<3> empty_reduction_box({0, 0, 0}, {0, 0, 0});
197
const box<3> scalar_reduction_box({0, 0, 0}, {1, 1, 1});
198

199
command_graph_generator::assigned_chunks_with_requirements command_graph_generator::compute_per_chunk_requirements(
3,069✔
200
    const task& tsk, const std::vector<assigned_chunk>& assigned_chunks) const {
201
        assigned_chunks_with_requirements result;
3,069✔
202

203
        for(const auto& a_chunk : assigned_chunks) {
8,117✔
204
                const node_id nid = a_chunk.executed_on;
5,048✔
205
                auto requirements = get_buffer_requirements_for_mapped_access(tsk, a_chunk.chnk, tsk.get_global_size());
5,048✔
206

207
                // Add read/write requirements for reductions performed in this task.
208
                for(const auto& reduction : tsk.get_reductions()) {
5,512✔
209
                        // task_manager verifies that there are no reduction <-> write-access conflicts
210
                        assert(std::none_of(
616✔
211
                            requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == reduction.bid && !br.produced.empty(); }));
212
                        auto it = std::find_if(requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == reduction.bid; });
616✔
213
                        if(it == requirements.end()) { it = requirements.insert(requirements.end(), buffer_requirements{reduction.bid, {}, {}}); }
464!
214
                        it->produced = scalar_reduction_box;
464✔
215
                        if(nid == reduction_initializer_nid && reduction.init_from_buffer) { it->consumed = scalar_reduction_box; }
464✔
216
                }
217

218
                if(nid == m_local_nid) {
5,048✔
219
                        result.local_chunks.emplace_back(a_chunk, std::move(requirements));
2,867✔
220
                } else {
221
                        result.remote_chunks.emplace_back(a_chunk, std::move(requirements));
2,181✔
222
                }
223
        }
5,048✔
224

225
        return result;
3,069✔
226
}
×
227

228
void command_graph_generator::resolve_pending_reductions(
3,067✔
229
    batch& current_batch, const task& tsk, const assigned_chunks_with_requirements& chunks_with_requirements) {
230
        auto accessed_buffers = tsk.get_buffer_access_map().get_accessed_buffers();
3,067✔
231
        // Handle chained reductions (i.e., reductions that combine into a buffer that currently is in a pending reduction state)
232
        for(const auto& reduction : tsk.get_reductions()) {
3,239✔
233
                accessed_buffers.insert(reduction.bid);
172✔
234
        }
235

236
        for(const auto bid : accessed_buffers) {
6,375✔
237
                auto& buffer = m_buffers.at(bid);
3,310✔
238
                if(!buffer.pending_reduction.has_value()) { continue; }
3,310✔
239
                const auto& reduction = *buffer.pending_reduction;
131✔
240

241
                const auto local_last_writer_set = buffer.local_last_writer.get_region_values(scalar_reduction_box);
131✔
242
                assert(local_last_writer_set.size() == 1);
131✔
243
                const auto local_last_writer = local_last_writer_set[0].second;
131✔
244

245
                // Prepare the buffer state for after the reduction has been performed:
246
                // Keep the current last writer, but mark it as stale, so that if we don't generate a reduction command locally,
247
                // we'll know to get the data from elsewhere. If we generate a reduction command, this will be overwritten by its command id.
248
                auto wcs = local_last_writer;
131✔
249
                wcs.mark_as_stale();
131✔
250
                buffer_state post_reduction_state(ones, wcs, node_bitset());
131✔
251
                if(m_policy.uninitialized_read_error != error_policy::ignore) { post_reduction_state.initialized_region = scalar_reduction_box; }
131✔
252

253
                node_bitset participating_nodes;
393✔
254

255
                // Since the local reduction command overwrites the buffer contents that need to be pushed to other nodes, we need to process remote chunks first.
256
                for(const auto& [a_chunk, requirements] : chunks_with_requirements.remote_chunks) {
287✔
257
                        if(std::none_of(requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == bid && !br.consumed.empty(); })) {
334✔
258
                                // This chunk doesn't read from the buffer
259
                                continue;
26✔
260
                        }
261
                        participating_nodes.set(a_chunk.executed_on);
130✔
262
                }
263

264
                // Generate push command to all participating nodes
265
                if(participating_nodes.any()) {
131✔
266
                        // Push an empty range if we don't have any fresh data on this node. This will then generate an empty pilot that tells the
267
                        // other node's receive_arbiter to not expect a send.
268
                        const bool notification_only = !local_last_writer.is_fresh();
98✔
269
                        const auto push_box = notification_only ? empty_reduction_box : scalar_reduction_box;
98✔
270
                        assert(participating_nodes.count() == m_num_nodes - 1 || participating_nodes.count() == 1);
98✔
271
                        std::vector<std::pair<node_id, region<3>>> regions;
98✔
272
                        for(node_id nid = 0; nid < m_num_nodes; ++nid) {
416✔
273
                                if(!participating_nodes.test(nid)) continue;
318✔
274
                                regions.push_back({nid, push_box});
130✔
275
                        }
276
                        auto* const cmd = create_command<push_command>(current_batch, transfer_id(tsk.get_id(), bid, reduction.rid), std::move(regions),
98✔
277
                            [&, bid = bid](const auto& record_debug_info) { record_debug_info(m_buffers.at(bid).debug_name); });
262✔
278
                        if(notification_only) {
98✔
279
                                generate_epoch_dependencies(cmd);
4✔
280
                        } else {
281
                                m_command_buffer_reads[cmd->get_id()][bid] = region_union(m_command_buffer_reads[cmd->get_id()][bid], scalar_reduction_box);
94✔
282
                                add_dependency(cmd, local_last_writer, dependency_kind::true_dep, dependency_origin::dataflow);
94✔
283
                        }
284

285
                        // Mark the reduction result as replicated so we don't generate data transfers to any of the participating nodes
286
                        post_reduction_state.replicated_regions.update_box(scalar_reduction_box, participating_nodes);
98✔
287
                }
98✔
288

289
                // We currently don't support multiple chunks on a single node for reductions (there is also -- for now -- no way to create multiple chunks,
290
                // as oversubscription is handled by the instruction graph).
291
                // NOTE: The participating_nodes.count() check below relies on this being true
292
                assert(chunks_with_requirements.local_chunks.size() <= 1);
131✔
293
                for(const auto& [a_chunk, requirements] : chunks_with_requirements.local_chunks) {
212✔
294
                        if(std::none_of(requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == bid && !br.consumed.empty(); })) {
172✔
295
                                // This chunk doesn't read from the buffer
296
                                continue;
13✔
297
                        }
298

299
                        auto* const ap_cmd = create_command<await_push_command>(current_batch, transfer_id(tsk.get_id(), bid, reduction.rid),
68✔
300
                            scalar_reduction_box.get_subrange(), [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
188✔
301
                        generate_epoch_dependencies(ap_cmd);
68✔
302

303
                        auto* const reduce_cmd = create_command<reduction_command>(current_batch, reduction, local_last_writer.is_fresh() /* has_local_contribution */,
68✔
304
                            [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
188✔
305

306
                        // Only generate a true dependency on the last writer if this node participated in the intermediate result computation.
307
                        if(local_last_writer.is_fresh()) { add_dependency(reduce_cmd, local_last_writer, dependency_kind::true_dep, dependency_origin::dataflow); }
68✔
308
                        add_dependency(reduce_cmd, ap_cmd, dependency_kind::true_dep, dependency_origin::dataflow);
68✔
309
                        generate_anti_dependencies(tsk, bid, buffer.local_last_writer, scalar_reduction_box, reduce_cmd);
68✔
310

311
                        post_reduction_state.local_last_writer.update_box(scalar_reduction_box, reduce_cmd);
68✔
312
                        participating_nodes.set(m_local_nid); // We are participating
68✔
313
                }
314

315
                // We currently do not support generating reduction commands on only a subset of nodes, except for the special case of a single command.
316
                // This is because it is unclear who owns the final result in this case (normally all nodes "own" the result).
317
                //   => I.e., reducing and using the result on the participating nodes is actually not the problem (this works as intended); the issue only arises
318
                //      if the result is subsequently required in other tasks. Since we don't have a good way of detecting this condition however, we currently
319
                //      disallow partial reductions altogether.
320
                // NOTE: This check relies on the fact that we currently only generate a single chunk per node for reductions (see assertion above).
321
                if(participating_nodes.count() > 1 && participating_nodes.count() != m_num_nodes) {
131✔
322
                        utils::report_error(error_policy::panic,
6✔
323
                            "{} requires a reduction on {} that is not performed on all nodes. This is currently not supported. Either "
324
                            "ensure that all nodes receive a chunk that reads from the buffer, or reduce the data on a single node.",
325
                            print_task_debug_label(tsk, true /* title case */), print_buffer_debug_label(bid));
12✔
326
                }
327

328
                // For buffers that were in a pending reduction state and a reduction was generated
329
                // (i.e., the result was not discarded), set their new state.
330
                if(participating_nodes.count() > 0) {
129✔
331
                        m_completed_reductions.push_back(reduction.rid);
123✔
332
                        buffer = std::move(post_reduction_state);
123✔
333
                }
334
        }
133✔
335
}
6,132✔
336

337
void command_graph_generator::generate_pushes(batch& current_batch, const task& tsk, const assigned_chunks_with_requirements& chunks_with_requirements) {
3,065✔
338
        struct push_scratch {
339
                std::unordered_map<node_id, region_builder<3>> target_regions;
340
                std::unordered_set<command*> depends_on;
341
        };
342
        std::unordered_map<buffer_id, push_scratch> per_buffer_pushes;
3,065✔
343

344
        for(auto& [a_chunk, requirements] : chunks_with_requirements.remote_chunks) {
5,242✔
345
                const node_id nid = a_chunk.executed_on;
2,177✔
346

347
                for(const auto& [bid, consumed, _] : requirements) {
4,867✔
348
                        if(consumed.empty()) continue;
2,690✔
349
                        auto& buffer = m_buffers.at(bid);
1,777✔
350

351
                        const auto local_sources = buffer.local_last_writer.get_region_values(consumed);
1,777✔
352
                        for(const auto& [local_box, wcs] : local_sources) {
4,907✔
353
                                if(!wcs.is_fresh() || wcs.is_replicated()) { continue; }
3,130✔
354

355
                                // Make sure we don't push anything we've already pushed to this node before
356
                                region_builder<3> non_replicated_boxes;
855✔
357
                                for(const auto& [replicated_box, nodes] : buffer.replicated_regions.get_region_values(local_box)) {
1,812✔
358
                                        if(nodes.test(nid)) continue;
957✔
359
                                        non_replicated_boxes.add(replicated_box);
736✔
360
                                }
855✔
361

362
                                if(!non_replicated_boxes.empty()) {
855✔
363
                                        assert(!utils::isa<await_push_command>(wcs.get_command()) && "Attempting to push non-owned data?!");
666✔
364
                                        auto push_region = std::move(non_replicated_boxes).into_region();
666✔
365
                                        // Remember that we've replicated this region
366
                                        for(const auto& [replicated_box, nodes] : buffer.replicated_regions.get_region_values(push_region)) {
1,402✔
367
                                                buffer.replicated_regions.update_box(replicated_box, node_bitset{nodes}.set(nid));
736✔
368
                                        }
666✔
369
                                        auto& scratch = per_buffer_pushes[bid]; // allow default-insert
666✔
370
                                        scratch.target_regions[nid /* allow default-insert */].add(push_region);
666✔
371
                                        scratch.depends_on.insert(wcs);
666✔
372
                                }
666✔
373
                        }
855✔
374
                }
1,777✔
375
        }
376

377
        // Generate push command for each buffer
378
        for(auto& [bid, scratch] : per_buffer_pushes) {
3,503✔
379
                region_builder<3> combined_region;
438✔
380
                std::vector<std::pair<node_id, region<3>>> target_regions;
438✔
381
                for(auto& [nid, boxes] : scratch.target_regions) {
1,101✔
382
                        auto region = std::move(boxes).into_region();
663✔
383
                        combined_region.add(region);
663✔
384
                        target_regions.push_back({nid, std::move(region)});
663✔
385
                }
663✔
386

387
                auto* const cmd = create_command<push_command>(current_batch, transfer_id(tsk.get_id(), bid, no_reduction_id), std::move(target_regions),
438✔
388
                    [&, bid = bid](const auto& record_debug_info) { record_debug_info(m_buffers.at(bid).debug_name); });
961✔
389
                for(const auto dep : scratch.depends_on) {
879✔
390
                        add_dependency(cmd, dep, dependency_kind::true_dep, dependency_origin::dataflow);
441✔
391
                }
392

393
                // Store the read access for determining anti-dependencies
394
                m_command_buffer_reads[cmd->get_id()].emplace(bid, std::move(combined_region).into_region());
438✔
395
        }
438✔
396
}
6,130✔
397

398
// TODO: We currently generate an await push command for each local chunk, whereas we only generate a single push command for all remote chunks
399
void command_graph_generator::generate_await_pushes(batch& current_batch, const task& tsk, const assigned_chunks_with_requirements& chunks_with_requirements) {
3,065✔
400
        for(auto& [a_chunk, requirements] : chunks_with_requirements.local_chunks) {
5,928✔
401
                for(auto& [bid, consumed, _] : requirements) {
5,977✔
402
                        if(consumed.empty()) continue;
3,114✔
403
                        auto& buffer = m_buffers.at(bid);
2,403✔
404

405
                        const auto local_sources = buffer.local_last_writer.get_region_values(consumed);
2,403✔
406
                        region_builder<3> missing_part_boxes;
2,403✔
407
                        for(const auto& [box, wcs] : local_sources) {
6,084✔
408
                                // Note that we initialize all buffers as fresh, so this doesn't trigger for uninitialized reads
409
                                if(!box.empty() && !wcs.is_fresh()) { missing_part_boxes.add(box); }
3,681!
410
                        }
411

412
                        // There is data we don't yet have locally. Generate an await push command for it.
413
                        if(!missing_part_boxes.empty()) {
2,403✔
414
                                const auto missing_parts = std::move(missing_part_boxes).into_region();
413✔
415
                                assert(m_num_nodes > 1);
413✔
416
                                auto* const ap_cmd = create_command<await_push_command>(current_batch, transfer_id(tsk.get_id(), bid, no_reduction_id), missing_parts,
413✔
417
                                    [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
923✔
418
                                generate_anti_dependencies(tsk, bid, buffer.local_last_writer, missing_parts, ap_cmd);
413✔
419
                                generate_epoch_dependencies(ap_cmd);
413✔
420
                                // Remember that we have this data now
421
                                buffer.local_last_writer.update_region(missing_parts, {ap_cmd, true /* is_replicated */});
413✔
422
                        }
413✔
423
                }
2,403✔
424
        }
425
}
3,065✔
426

427
void command_graph_generator::update_local_buffer_fresh_regions(const task& tsk, const std::unordered_map<buffer_id, region<3>>& per_buffer_local_writes) {
3,062✔
428
        auto requirements = get_buffer_requirements_for_mapped_access(tsk, subrange<3>(tsk.get_global_offset(), tsk.get_global_size()), tsk.get_global_size());
3,062✔
429
        // Add requirements for reductions
430
        for(const auto& reduction : tsk.get_reductions()) {
3,234✔
431
                auto it = std::find_if(requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == reduction.bid; });
223✔
432
                if(it == requirements.end()) { it = requirements.insert(requirements.end(), buffer_requirements{reduction.bid, {}, {}}); }
172!
433
                it->produced = scalar_reduction_box;
172✔
434
        }
435
        for(auto& [bid, _, produced] : requirements) {
6,367✔
436
                region global_writes = produced;
3,305✔
437
                auto& buffer = m_buffers.at(bid);
3,305✔
438
                if(m_policy.uninitialized_read_error != error_policy::ignore) { buffer.initialized_region = region_union(buffer.initialized_region, global_writes); }
3,305✔
439

440
                const auto remote_writes = ([&, bid = bid] {
6,610✔
441
                        if(auto it = per_buffer_local_writes.find(bid); it != per_buffer_local_writes.end()) {
3,305✔
442
                                const auto& local_writes = it->second;
2,286✔
443
                                assert(region_difference(local_writes, global_writes).empty()); // Local writes have to be a subset of global writes
2,286✔
444
                                return region_difference(global_writes, local_writes);
2,286✔
445
                        }
446
                        return std::move(global_writes);
1,019✔
447
                })(); // IIFE
3,305✔
448

449
                // TODO: We need a way of updating regions in place! E.g. apply_to_values(box, callback)
450
                auto boxes_and_cids = buffer.local_last_writer.get_region_values(remote_writes);
3,305✔
451
                for(auto& [box, wcs] : boxes_and_cids) {
4,239✔
452
                        if(wcs.is_fresh()) {
934✔
453
                                wcs.mark_as_stale();
634✔
454
                                buffer.local_last_writer.update_region(box, wcs);
634✔
455
                        }
456
                }
457
        }
3,305✔
458
}
6,124✔
459

460
void command_graph_generator::generate_distributed_commands(batch& current_batch, const task& tsk) {
3,069✔
461
        const auto chunks = split_task_and_assign_chunks(tsk);
3,069✔
462
        const auto chunks_with_requirements = compute_per_chunk_requirements(tsk, chunks);
3,069✔
463

464
        // Check for and report overlapping writes between local chunks, and between local and remote chunks.
465
        if(m_policy.overlapping_write_error != error_policy::ignore) {
3,069✔
466
                box_vector<3> local_chunks;
3,065✔
467
                for(const auto& [a_chunk, _] : chunks_with_requirements.local_chunks) {
5,928✔
468
                        local_chunks.push_back(box<3>{a_chunk.chnk});
2,863✔
469
                }
470
                report_overlapping_writes(tsk, local_chunks);
3,065✔
471
        }
3,065✔
472

473
        resolve_pending_reductions(current_batch, tsk, chunks_with_requirements);
3,067✔
474
        generate_pushes(current_batch, tsk, chunks_with_requirements);
3,065✔
475
        generate_await_pushes(current_batch, tsk, chunks_with_requirements);
3,065✔
476

477
        // Union of all per-buffer writes on this node, used to determine which parts of a buffer are fresh/stale later on.
478
        std::unordered_map<buffer_id, region<3>> per_buffer_local_writes;
3,065✔
479

480
        // Create command for each local chunk and resolve local data dependencies.
481
        for(const auto& [a_chunk, requirements] : chunks_with_requirements.local_chunks) {
5,925✔
482
                command* cmd = nullptr;
2,863✔
483
                if(tsk.get_type() == task_type::fence) {
2,863✔
484
                        cmd = create_command<fence_command>(current_batch, &tsk,
176✔
485
                            [&](const auto& record_debug_info) { record_debug_info(tsk, [this](const buffer_id bid) { return m_buffers.at(bid).debug_name; }); });
218✔
486
                } else {
487
                        // Go over all reductions that are to be performed *during* the execution of this chunk,
488
                        // not to be confused with any pending reductions that need to be finalized *before* the
489
                        // execution of this chunk (those have already been handled by resolve_pending_reductions).
490
                        // If a reduction reads the previous value of the buffer (i.e. w/o property::reduction::initialize_to_identity),
491
                        // we have to include it in exactly one of the per-node intermediate reductions.
492
                        const bool is_reduction_initializer = std::any_of(tsk.get_reductions().begin(), tsk.get_reductions().end(),
2,775✔
493
                            [&](const auto& reduction) { return m_local_nid == reduction_initializer_nid && reduction.init_from_buffer; });
166✔
494
                        cmd = create_command<execution_command>(current_batch, &tsk, subrange{a_chunk.chnk}, is_reduction_initializer,
5,550✔
495
                            [&](const auto& record_debug_info) { record_debug_info(tsk, [this](const buffer_id bid) { return m_buffers.at(bid).debug_name; }); });
8,064✔
496
                }
497

498
                if(tsk.get_type() == task_type::collective) {
2,863✔
499
                        // Collective host tasks have an implicit dependency on the previous task in the same collective group,
500
                        // which is required in order to guarantee they are executed in the same order on every node.
501
                        auto cgid = tsk.get_collective_group_id();
76✔
502
                        if(const auto cg = m_collective_groups.find(cgid); cg != m_collective_groups.end()) {
76✔
503
                                add_dependency(cmd, cg->second.last_collective_command, dependency_kind::true_dep, dependency_origin::collective_group_serialization);
20✔
504
                                cg->second.last_collective_command = cmd;
20✔
505
                        } else {
506
                                m_collective_groups.emplace(cgid, cmd);
56✔
507
                        }
508
                }
509

510
                for(const auto& [bid, consumed, produced] : requirements) {
5,974✔
511
                        auto& buffer = m_buffers.at(bid);
3,114✔
512

513
                        // Process consuming accesses first, so we don't add dependencies onto our own writes
514
                        if(!consumed.empty()) {
3,114✔
515
                                for(const auto& [box, wcs] : buffer.local_last_writer.get_region_values(consumed)) {
6,084✔
516
                                        if(box.empty()) continue;
3,681!
517
                                        assert(wcs.is_fresh() && "Unresolved remote data dependency");
3,681✔
518
                                        add_dependency(cmd, wcs, dependency_kind::true_dep, dependency_origin::dataflow);
3,681✔
519
                                }
2,403✔
520

521
                                // Store the read access for determining anti-dependencies later on
522
                                m_command_buffer_reads[cmd->get_id()].emplace(bid, consumed);
2,403✔
523
                        }
524

525
                        if(!produced.empty()) {
3,114✔
526
                                generate_anti_dependencies(tsk, bid, buffer.local_last_writer, produced, cmd);
2,286✔
527

528
                                // Update last writer
529
                                buffer.local_last_writer.update_region(produced, cmd);
2,286✔
530
                                buffer.replicated_regions.update_region(produced, node_bitset{});
2,286✔
531

532
                                // In case this buffer was in a pending reduction state we discarded the result and need to remove the pending reduction.
533
                                if(buffer.pending_reduction.has_value()) {
2,286✔
534
                                        m_completed_reductions.push_back(buffer.pending_reduction->rid);
1✔
535
                                        buffer.pending_reduction = std::nullopt;
1✔
536
                                }
537

538
                                per_buffer_local_writes.emplace(bid, produced);
2,286✔
539
                        }
540

541
                        if(m_policy.uninitialized_read_error != error_policy::ignore) {
3,114✔
542
                                if(const auto uninitialized_reads = region_difference(consumed, buffer.initialized_region); !uninitialized_reads.empty()) {
1,022✔
543
                                        utils::report_error(m_policy.uninitialized_read_error,
12✔
544
                                            "Command C{} on N{}, which executes {} of {}, reads {} {}, which has not been written by any node.", cmd->get_id(), m_local_nid,
6✔
545
                                            box(subrange(a_chunk.chnk.offset, a_chunk.chnk.range)), print_task_debug_label(tsk), print_buffer_debug_label(bid),
27✔
546
                                            uninitialized_reads);
547
                                }
1,022✔
548
                        }
549
                }
550

551
                for(const auto& side_effect : tsk.get_side_effect_map()) {
3,011✔
552
                        const auto [hoid, order] = side_effect;
151✔
553
                        auto& host_object = m_host_objects.at(hoid);
151✔
554

555
                        if(host_object.last_side_effect != nullptr) {
151!
556
                                // TODO once we have different side_effect_orders, their interaction will determine the dependency kind
557
                                add_dependency(cmd, host_object.last_side_effect, dependency_kind::true_dep, dependency_origin::dataflow);
151✔
558
                        }
559

560
                        // Simplification: If there are multiple chunks per node, we generate true-dependencies between them in an arbitrary order, when all we really
561
                        // need is mutual exclusion (i.e. a bi-directional pseudo-dependency).
562
                        host_object.last_side_effect = cmd;
151✔
563
                }
564

565
                generate_epoch_dependencies(cmd);
2,860✔
566
        }
567

568
        // Mark any buffers that now are in a pending reduction state as such.
569
        // If there is only one chunk/command, it already implicitly generates the final reduced value
570
        // and the buffer does not need to be flagged as a pending reduction.
571
        for(const auto& reduction : tsk.get_reductions()) {
3,234✔
572
                if(chunks.size() > 1) {
172✔
573
                        m_buffers.at(reduction.bid).pending_reduction = reduction;
136✔
574
                } else {
575
                        m_completed_reductions.push_back(reduction.rid);
36✔
576
                }
577
        }
578
        update_local_buffer_fresh_regions(tsk, per_buffer_local_writes);
3,062✔
579
}
6,141✔
580

581
void command_graph_generator::generate_anti_dependencies(
2,767✔
582
    const task& tsk, const buffer_id bid, const region_map<write_command_state>& last_writers_map, const region<3>& write_req, command* const write_cmd) //
583
{
584
        const auto last_writers = last_writers_map.get_region_values(write_req);
2,767✔
585
        for(const auto& [box, wcs] : last_writers) {
5,667✔
586
                auto* const last_writer_cmd = wcs.get_command();
2,900✔
587
                assert(!utils::isa<task_command>(last_writer_cmd) || utils::as<task_command>(last_writer_cmd)->get_task() != &tsk);
2,900✔
588

589
                // Add anti-dependencies onto all successors of the writer
590
                bool has_successors = false;
2,900✔
591
                for(auto d : last_writer_cmd->get_dependents()) {
7,393✔
592
                        // Only consider true dependencies
593
                        if(d.kind != dependency_kind::true_dep) continue;
4,493✔
594

595
                        auto* const cmd = d.node;
4,167✔
596

597
                        // We might have already generated new commands within the same task that also depend on this; in that case, skip it
598
                        if(utils::isa<task_command>(cmd) && utils::as<task_command>(cmd)->get_task() == &tsk) continue;
4,167✔
599

600
                        // So far we don't know whether the dependent actually intersects with the subrange we're writing
601
                        if(const auto command_reads_it = m_command_buffer_reads.find(cmd->get_id()); command_reads_it != m_command_buffer_reads.end()) {
2,532✔
602
                                const auto& command_reads = command_reads_it->second;
1,085✔
603
                                // The task might be a dependent because of another buffer
604
                                if(const auto buffer_reads_it = command_reads.find(bid); buffer_reads_it != command_reads.end()) {
1,085✔
605
                                        if(!region_intersection(write_req, buffer_reads_it->second).empty()) {
1,042✔
606
                                                has_successors = true;
889✔
607
                                                add_dependency(write_cmd, cmd, dependency_kind::anti_dep, dependency_origin::dataflow);
889✔
608
                                        }
609
                                }
610
                        }
611
                }
612

613
                // In some cases (horizons, master node host task, weird discard_* constructs...)
614
                // the last writer might not have any successors. Just add the anti-dependency onto the writer itself then.
615
                if(!has_successors) { add_dependency(write_cmd, last_writer_cmd, dependency_kind::anti_dep, dependency_origin::dataflow); }
2,900✔
616
        }
617
}
5,534✔
618

619
void command_graph_generator::set_epoch_for_new_commands(command* const epoch_or_horizon) {
2,064✔
620
        // both an explicit epoch command and an applied horizon can be effective epochs
621
        assert(utils::isa<epoch_command>(epoch_or_horizon) || utils::isa<horizon_command>(epoch_or_horizon));
2,064✔
622

623
        for(auto& [_, buffer] : m_buffers) {
3,011✔
624
                buffer.local_last_writer.apply_to_values([epoch_or_horizon](write_command_state wcs) {
947✔
625
                        if(epoch_or_horizon->get_id() <= wcs.get_command()->get_id()) return wcs;
2,064✔
626
                        write_command_state new_wcs(epoch_or_horizon, wcs.is_replicated());
1,031✔
627
                        if(!wcs.is_fresh()) new_wcs.mark_as_stale();
1,031✔
628
                        return new_wcs;
1,031✔
629
                });
630
        }
631
        for(auto& [_, host_object] : m_host_objects) {
2,131✔
632
                if(host_object.last_side_effect != nullptr && host_object.last_side_effect->get_id() < epoch_or_horizon->get_id()) {
67!
633
                        host_object.last_side_effect = epoch_or_horizon;
45✔
634
                }
635
        }
636
        for(auto& [cgid, collective_group] : m_collective_groups) {
2,146✔
637
                if(collective_group.last_collective_command->get_id() < epoch_or_horizon->get_id()) { collective_group.last_collective_command = epoch_or_horizon; }
82!
638
        }
639

640
        m_epoch_for_new_commands = epoch_or_horizon;
2,064✔
641
}
2,064✔
642

643
void command_graph_generator::reduce_execution_front_to(command* const new_front) {
2,126✔
644
        const auto previous_execution_front = m_execution_front; // modified inside loop through add_dependency
2,126✔
645
        for(auto* const front_cmd : previous_execution_front) {
6,791✔
646
                if(front_cmd != new_front) { add_dependency(new_front, front_cmd, dependency_kind::true_dep, dependency_origin::execution_front); }
4,665✔
647
        }
648
        assert(m_execution_front.size() == 1 && *m_execution_front.begin() == new_front);
2,126✔
649
}
4,252✔
650

651
void command_graph_generator::generate_epoch_command(batch& current_batch, const task& tsk) {
1,318✔
652
        assert(tsk.get_type() == task_type::epoch);
1,318✔
653
        m_cdag->begin_epoch(tsk.get_id());
1,318✔
654
        auto* const epoch = create_command<epoch_command>(
2,636✔
655
            current_batch, &tsk, tsk.get_epoch_action(), std::move(m_completed_reductions), [&](const auto& record_debug_info) { record_debug_info(tsk); });
1,927✔
656
        set_epoch_for_new_commands(epoch);
1,318✔
657
        m_current_horizon = no_command;
1,318✔
658
        // Make the epoch depend on the previous execution front
659
        reduce_execution_front_to(epoch);
1,318✔
660
}
1,318✔
661

662
void command_graph_generator::generate_horizon_command(batch& current_batch, const task& tsk) {
808✔
663
        assert(tsk.get_type() == task_type::horizon);
808✔
664
        m_cdag->begin_epoch(tsk.get_id());
808✔
665
        auto* const new_horizon =
666
            create_command<horizon_command>(current_batch, &tsk, std::move(m_completed_reductions), [&](const auto& record_debug_info) { record_debug_info(tsk); });
1,047✔
667

668
        if(m_current_horizon != nullptr) {
808✔
669
                // Apply the previous horizon
670
                set_epoch_for_new_commands(m_current_horizon);
746✔
671
        }
672
        m_current_horizon = new_horizon;
808✔
673

674
        // Make the horizon depend on the previous execution front
675
        reduce_execution_front_to(new_horizon);
808✔
676
}
808✔
677

678
void command_graph_generator::generate_epoch_dependencies(command* cmd) {
3,345✔
679
        // No command must be re-ordered before its last preceding epoch to enforce the barrier semantics of epochs.
680
        // To guarantee that each node has a transitive true dependency (=temporal dependency) on the epoch, it is enough to add an epoch -> command dependency
681
        // to any command that has no other true dependencies itself and no graph traversal is necessary. This can be verified by a simple induction proof.
682

683
        // As long the first epoch is present in the graph, all transitive dependencies will be visible and the initial epoch commands (tid 0) are the only
684
        // commands with no true predecessor. As soon as the first epoch is pruned through the horizon mechanism however, more than one node with no true
685
        // predecessor can appear (when visualizing the graph). This does not violate the ordering constraint however, because all "free-floating" nodes
686
        // in that snapshot had a true-dependency chain to their predecessor epoch at the point they were flushed, which is sufficient for following the
687
        // dependency chain from the executor perspective.
688

689
        if(const auto deps = cmd->get_dependencies();
3,345✔
690
            std::none_of(deps.begin(), deps.end(), [](const command::dependency d) { return d.kind == dependency_kind::true_dep; })) {
6,596✔
691
                if(!utils::isa<epoch_command>(cmd) || utils::as<epoch_command>(cmd)->get_epoch_action() != epoch_action::init) {
1,138!
692
                        assert(cmd != m_epoch_for_new_commands);
1,138✔
693
                        add_dependency(cmd, m_epoch_for_new_commands, dependency_kind::true_dep, dependency_origin::last_epoch);
1,138✔
694
                }
695
        }
696
}
3,345✔
697

698
std::string command_graph_generator::print_buffer_debug_label(const buffer_id bid) const {
19✔
699
        return utils::make_buffer_debug_label(bid, m_buffers.at(bid).debug_name);
19✔
700
}
701

702
} // namespace celerity::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc