• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 11854130628

15 Nov 2024 09:58AM UTC coverage: 95.102% (-0.06%) from 95.163%
11854130628

push

github

psalz
Update benchmark results for buffer_access_map refactor

2992 of 3394 branches covered (88.16%)

Branch coverage included in aggregate %.

6677 of 6773 relevant lines covered (98.58%)

1294452.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.76
/src/command_graph_generator.cc
1
#include "command_graph_generator.h"
2

3
#include "access_modes.h"
4
#include "command_graph.h"
5
#include "grid.h"
6
#include "intrusive_graph.h"
7
#include "print_utils.h"
8
#include "ranges.h"
9
#include "recorders.h"
10
#include "region_map.h"
11
#include "split.h"
12
#include "task.h"
13
#include "types.h"
14
#include "utils.h"
15

16
#include <algorithm>
17
#include <cassert>
18
#include <cstddef>
19
#include <memory>
20
#include <tuple>
21
#include <unordered_set>
22
#include <utility>
23
#include <vector>
24

25

26
namespace celerity::detail {
27

28
command_graph_generator::command_graph_generator(
592✔
29
    const size_t num_nodes, const node_id local_nid, command_graph& cdag, detail::command_recorder* const recorder, const policy_set& policy)
592✔
30
    : m_num_nodes(num_nodes), m_local_nid(local_nid), m_policy(policy), m_cdag(&cdag), m_recorder(recorder) {
592✔
31
        if(m_num_nodes > max_num_nodes) {
592!
32
                throw std::runtime_error(fmt::format("Number of nodes requested ({}) exceeds compile-time maximum of {}", m_num_nodes, max_num_nodes));
×
33
        }
34
}
592✔
35

36
void command_graph_generator::notify_buffer_created(const buffer_id bid, const range<3>& range, bool host_initialized) {
710✔
37
        assert(m_epoch_for_new_commands != nullptr);
710✔
38
        assert(!m_buffers.contains(bid));
710✔
39
        // Mark contents as available locally (= don't generate await push commands) and fully replicated (= don't generate push commands).
40
        // This is required when tasks access host-initialized or uninitialized buffers.
41
        auto& buffer = m_buffers.emplace(std::piecewise_construct, std::tuple(bid), std::tuple(range, m_epoch_for_new_commands, node_bitset().set())).first->second;
710✔
42
        if(host_initialized && m_policy.uninitialized_read_error != error_policy::ignore) { buffer.initialized_region = box(subrange({}, range)); }
784✔
43
}
710✔
44

45
void command_graph_generator::notify_buffer_debug_name_changed(const buffer_id bid, const std::string& debug_name) {
23✔
46
        m_buffers.at(bid).debug_name = debug_name;
23✔
47
}
23✔
48

49
void command_graph_generator::notify_buffer_destroyed(const buffer_id bid) {
511✔
50
        assert(m_buffers.contains(bid));
511✔
51
        m_buffers.erase(bid);
511✔
52
}
511✔
53

54
void command_graph_generator::notify_host_object_created(const host_object_id hoid) {
62✔
55
        assert(m_epoch_for_new_commands != nullptr);
62✔
56
        assert(!m_host_objects.contains(hoid));
62✔
57
        m_host_objects.emplace(hoid, m_epoch_for_new_commands);
62✔
58
}
62✔
59

60
void command_graph_generator::notify_host_object_destroyed(const host_object_id hoid) {
49✔
61
        assert(m_host_objects.contains(hoid));
49✔
62
        m_host_objects.erase(hoid);
49✔
63
}
49✔
64

65
/// Returns whether an iterator range of commands is topologically sorted, i.e. sequential execution would satisfy all internal dependencies.
66
template <typename Iterator>
67
bool is_topologically_sorted(Iterator begin, Iterator end) {
5,199✔
68
        for(auto check = begin; check != end; ++check) {
11,274✔
69
                for(const auto dep : (*check)->get_dependencies()) {
14,855✔
70
                        if(std::find_if(std::next(check), end, [dep](const auto& node) { return node == dep.node; }) != end) return false;
19,402!
71
                }
72
        }
73
        return true;
5,199✔
74
}
75

76
std::vector<const command*> command_graph_generator::build_task(const task& tsk) {
5,206✔
77
        const auto epoch_to_prune_before = m_epoch_for_new_commands;
5,206✔
78
        batch current_batch;
5,206✔
79

80
        switch(tsk.get_type()) {
5,206!
81
        case task_type::epoch: generate_epoch_command(current_batch, tsk); break;
1,323✔
82
        case task_type::horizon: generate_horizon_command(current_batch, tsk); break;
809✔
83
        case task_type::device_compute:
3,074✔
84
        case task_type::host_compute:
85
        case task_type::master_node:
86
        case task_type::collective:
87
        case task_type::fence: generate_distributed_commands(current_batch, tsk); break;
3,074✔
88
        default: throw std::runtime_error("Task type NYI");
×
89
        }
90

91
        // It is currently undefined to split reduction-producer tasks into multiple chunks on the same node:
92
        //   - Per-node reduction intermediate results are stored with fixed access to a single backing buffer,
93
        //     so multiple chunks on the same node will race on this buffer access
94
        //   - Inputs to the final reduction command are ordered by origin node ids to guarantee bit-identical results. It is not possible to distinguish
95
        //     more than one chunk per node in the serialized commands, so different nodes can produce different final reduction results for non-associative
96
        //     or non-commutative operations
97
        if(!tsk.get_reductions().empty()) {
5,199✔
98
                assert(std::count_if(current_batch.begin(), current_batch.end(), [](const command* cmd) { return utils::isa<task_command>(cmd); }) <= 1);
328✔
99
        }
100

101
        // If a new epoch was completed in the CDAG before the current task, we can erase all tracking information from earlier commands.
102
        // After the epoch (or horizon) command has been executed, the scheduler will then delete all obsolete commands from the CDAG.
103
        if(epoch_to_prune_before != nullptr) {
5,199✔
104
                std::erase_if(m_command_buffer_reads, [=](const auto& cid_reads) { return cid_reads.first < epoch_to_prune_before->get_id(); });
21,417✔
105
        }
106

107
        // Check that all commands have been recorded
108
        if(is_recording()) {
5,199✔
109
                assert(std::all_of(current_batch.begin(), current_batch.end(), [this](const command* cmd) {
51,938✔
110
                        return std::any_of(m_recorder->get_graph_nodes().begin(), m_recorder->get_graph_nodes().end(),
111
                            [cmd](const std::unique_ptr<command_record>& rec) { return rec->id == cmd->get_id(); });
112
                }));
113
        }
114

115
        assert(is_topologically_sorted(current_batch.begin(), current_batch.end()));
5,199✔
116
        return current_batch;
5,199✔
117
}
7✔
118

119
void command_graph_generator::report_overlapping_writes(const task& tsk, const box_vector<3>& local_chunks) const {
3,070✔
120
        const chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()};
3,070✔
121

122
        // Since this check is run distributed on every node, we avoid quadratic behavior by only checking for conflicts between all local chunks and the
123
        // region-union of remote chunks. This way, every conflict will be reported by at least one node.
124
        const box<3> global_chunk(subrange(full_chunk.offset, full_chunk.range));
3,070✔
125
        auto remote_chunks = region_difference(global_chunk, region(box_vector<3>(local_chunks))).into_boxes();
3,070✔
126

127
        // detect_overlapping_writes takes a single box_vector, so we concatenate local and global chunks (the order does not matter)
128
        auto distributed_chunks = std::move(remote_chunks);
3,070✔
129
        distributed_chunks.insert(distributed_chunks.end(), local_chunks.begin(), local_chunks.end());
3,070✔
130

131
        if(const auto overlapping_writes = detect_overlapping_writes(tsk, distributed_chunks); !overlapping_writes.empty()) {
3,070✔
132
                auto error = fmt::format("{} has overlapping writes between multiple nodes in", print_task_debug_label(tsk, true /* title case */));
28✔
133
                for(const auto& [bid, overlap] : overlapping_writes) {
28✔
134
                        fmt::format_to(std::back_inserter(error), " {} {}", print_buffer_debug_label(bid), overlap);
28✔
135
                }
136
                error += ". Choose a non-overlapping range mapper for this write access or constrain the split via experimental::constrain_split to make the access "
137
                         "non-overlapping.";
14✔
138
                utils::report_error(m_policy.overlapping_write_error, "{}", error);
14✔
139
        }
3,084✔
140
}
6,140✔
141

142
std::vector<command_graph_generator::assigned_chunk> command_graph_generator::split_task_and_assign_chunks(const task& tsk) const {
3,074✔
143
        const chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()};
3,074✔
144
        const size_t num_chunks = m_num_nodes * m_test_chunk_multiplier;
3,074✔
145
        const auto chunks = ([&] {
6,148✔
146
                if(tsk.get_type() == task_type::collective || tsk.get_type() == task_type::fence) {
3,074✔
147
                        std::vector<chunk<3>> chunks;
164✔
148
                        for(size_t nid = 0; nid < m_num_nodes; ++nid) {
579✔
149
                                chunks.push_back(chunk_cast<3>(chunk<1>{id<1>{tsk.get_type() == task_type::collective ? nid : 0}, ones, {m_num_nodes}}));
415✔
150
                        }
151
                        return chunks;
164✔
152
                }
164✔
153
                if(tsk.has_variable_split()) {
2,910✔
154
                        if(tsk.get_hint<experimental::hints::split_1d>() != nullptr) {
1,539✔
155
                                // no-op, keeping this for documentation purposes
156
                        }
157
                        if(tsk.get_hint<experimental::hints::split_2d>() != nullptr) { return split_2d(full_chunk, tsk.get_granularity(), num_chunks); }
1,539✔
158
                        return split_1d(full_chunk, tsk.get_granularity(), num_chunks);
3,000✔
159
                }
160
                return std::vector<chunk<3>>{full_chunk};
4,113✔
161
        })();
3,074✔
162
        assert(chunks.size() <= num_chunks); // We may have created less than requested
3,074✔
163
        assert(!chunks.empty());
3,074✔
164

165
        // Assign each chunk to a node
166
        // We assign chunks next to each other to the same worker (if there is more chunks than workers), as this is likely to produce less
167
        // transfers between tasks than a round-robin assignment (for typical stencil codes).
168
        // FIXME: This only works if the number of chunks is an integer multiple of the number of workers, e.g. 3 chunks for 2 workers degrades to RR.
169
        const auto chunks_per_node = std::max<size_t>(1, chunks.size() / m_num_nodes);
3,074✔
170

171
        std::vector<assigned_chunk> assigned_chunks;
3,074✔
172
        for(size_t i = 0; i < chunks.size(); ++i) {
8,127✔
173
                const node_id nid = (i / chunks_per_node) % m_num_nodes;
5,053✔
174
                assigned_chunks.push_back({nid, chunks[i]});
5,053✔
175
        }
176
        return assigned_chunks;
6,148✔
177
}
3,074✔
178

179
command_graph_generator::buffer_requirements_list command_graph_generator::get_buffer_requirements_for_mapped_access(
5,053✔
180
    const task& tsk, const subrange<3>& sr) const {
181
        buffer_requirements_list result;
5,053✔
182
        const auto& access_map = tsk.get_buffer_access_map();
5,053✔
183
        for(const buffer_id bid : access_map.get_accessed_buffers()) {
10,407✔
184
                result.push_back(buffer_requirements{bid, access_map.compute_consumed_region(bid, box<3>(sr)), access_map.compute_produced_region(bid, box<3>(sr))});
5,354!
185
        }
186
        return result;
5,053✔
187
}
×
188

189
const box<3> empty_reduction_box({0, 0, 0}, {0, 0, 0});
190
const box<3> scalar_reduction_box({0, 0, 0}, {1, 1, 1});
191

192
command_graph_generator::assigned_chunks_with_requirements command_graph_generator::compute_per_chunk_requirements(
3,074✔
193
    const task& tsk, const std::vector<assigned_chunk>& assigned_chunks) const {
194
        assigned_chunks_with_requirements result;
3,074✔
195

196
        for(const auto& a_chunk : assigned_chunks) {
8,127✔
197
                const node_id nid = a_chunk.executed_on;
5,053✔
198
                auto requirements = get_buffer_requirements_for_mapped_access(tsk, a_chunk.chnk);
5,053✔
199

200
                // Add read/write requirements for reductions performed in this task.
201
                for(const auto& reduction : tsk.get_reductions()) {
5,517✔
202
                        // task_manager verifies that there are no reduction <-> write-access conflicts
203
                        assert(std::none_of(
616✔
204
                            requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == reduction.bid && !br.produced.empty(); }));
205
                        auto it = std::find_if(requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == reduction.bid; });
616✔
206
                        if(it == requirements.end()) { it = requirements.insert(requirements.end(), buffer_requirements{reduction.bid, {}, {}}); }
464!
207
                        it->produced = scalar_reduction_box;
464✔
208
                        if(nid == reduction_initializer_nid && reduction.init_from_buffer) { it->consumed = scalar_reduction_box; }
464✔
209
                }
210

211
                if(nid == m_local_nid) {
5,053✔
212
                        result.local_chunks.emplace_back(a_chunk, std::move(requirements));
2,872✔
213
                } else {
214
                        result.remote_chunks.emplace_back(a_chunk, std::move(requirements));
2,181✔
215
                }
216
        }
5,053✔
217

218
        return result;
3,074✔
219
}
×
220

221
void command_graph_generator::resolve_pending_reductions(
3,072✔
222
    batch& current_batch, const task& tsk, const assigned_chunks_with_requirements& chunks_with_requirements) {
223
        auto accessed_buffers = tsk.get_buffer_access_map().get_accessed_buffers();
3,072✔
224
        // Handle chained reductions (i.e., reductions that combine into a buffer that currently is in a pending reduction state)
225
        for(const auto& reduction : tsk.get_reductions()) {
3,244✔
226
                accessed_buffers.insert(reduction.bid);
172✔
227
        }
228

229
        for(const auto bid : accessed_buffers) {
6,386✔
230
                auto& buffer = m_buffers.at(bid);
3,316✔
231
                if(!buffer.pending_reduction.has_value()) { continue; }
3,316✔
232
                const auto& reduction = *buffer.pending_reduction;
131✔
233

234
                const auto local_last_writer_set = buffer.local_last_writer.get_region_values(scalar_reduction_box);
131✔
235
                assert(local_last_writer_set.size() == 1);
131✔
236
                const auto local_last_writer = local_last_writer_set[0].second;
131✔
237

238
                // Prepare the buffer state for after the reduction has been performed:
239
                // Keep the current last writer, but mark it as stale, so that if we don't generate a reduction command locally,
240
                // we'll know to get the data from elsewhere. If we generate a reduction command, this will be overwritten by its command id.
241
                auto wcs = local_last_writer;
131✔
242
                wcs.mark_as_stale();
131✔
243
                buffer_state post_reduction_state(ones, wcs, node_bitset());
131✔
244
                if(m_policy.uninitialized_read_error != error_policy::ignore) { post_reduction_state.initialized_region = scalar_reduction_box; }
131✔
245

246
                node_bitset participating_nodes;
393✔
247

248
                // Since the local reduction command overwrites the buffer contents that need to be pushed to other nodes, we need to process remote chunks first.
249
                for(const auto& [a_chunk, requirements] : chunks_with_requirements.remote_chunks) {
287✔
250
                        if(std::none_of(requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == bid && !br.consumed.empty(); })) {
334✔
251
                                // This chunk doesn't read from the buffer
252
                                continue;
26✔
253
                        }
254
                        participating_nodes.set(a_chunk.executed_on);
130✔
255
                }
256

257
                // Generate push command to all participating nodes
258
                if(participating_nodes.any()) {
131✔
259
                        // Push an empty range if we don't have any fresh data on this node. This will then generate an empty pilot that tells the
260
                        // other node's receive_arbiter to not expect a send.
261
                        const bool notification_only = !local_last_writer.is_fresh();
98✔
262
                        const auto push_box = notification_only ? empty_reduction_box : scalar_reduction_box;
98✔
263
                        assert(participating_nodes.count() == m_num_nodes - 1 || participating_nodes.count() == 1);
98✔
264
                        std::vector<std::pair<node_id, region<3>>> regions;
98✔
265
                        for(node_id nid = 0; nid < m_num_nodes; ++nid) {
416✔
266
                                if(!participating_nodes.test(nid)) continue;
318✔
267
                                regions.push_back({nid, push_box});
130✔
268
                        }
269
                        auto* const cmd = create_command<push_command>(current_batch, transfer_id(tsk.get_id(), bid, reduction.rid), std::move(regions),
98✔
270
                            [&, bid = bid](const auto& record_debug_info) { record_debug_info(m_buffers.at(bid).debug_name); });
262✔
271
                        if(notification_only) {
98✔
272
                                generate_epoch_dependencies(cmd);
4✔
273
                        } else {
274
                                m_command_buffer_reads[cmd->get_id()][bid] = region_union(m_command_buffer_reads[cmd->get_id()][bid], scalar_reduction_box);
94✔
275
                                add_dependency(cmd, local_last_writer, dependency_kind::true_dep, dependency_origin::dataflow);
94✔
276
                        }
277

278
                        // Mark the reduction result as replicated so we don't generate data transfers to any of the participating nodes
279
                        post_reduction_state.replicated_regions.update_box(scalar_reduction_box, participating_nodes);
98✔
280
                }
98✔
281

282
                // We currently don't support multiple chunks on a single node for reductions (there is also -- for now -- no way to create multiple chunks,
283
                // as oversubscription is handled by the instruction graph).
284
                // NOTE: The participating_nodes.count() check below relies on this being true
285
                assert(chunks_with_requirements.local_chunks.size() <= 1);
131✔
286
                for(const auto& [a_chunk, requirements] : chunks_with_requirements.local_chunks) {
212✔
287
                        if(std::none_of(requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == bid && !br.consumed.empty(); })) {
172✔
288
                                // This chunk doesn't read from the buffer
289
                                continue;
13✔
290
                        }
291

292
                        auto* const ap_cmd = create_command<await_push_command>(current_batch, transfer_id(tsk.get_id(), bid, reduction.rid),
68✔
293
                            scalar_reduction_box.get_subrange(), [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
188✔
294
                        generate_epoch_dependencies(ap_cmd);
68✔
295

296
                        auto* const reduce_cmd = create_command<reduction_command>(current_batch, reduction, local_last_writer.is_fresh() /* has_local_contribution */,
68✔
297
                            [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
188✔
298

299
                        // Only generate a true dependency on the last writer if this node participated in the intermediate result computation.
300
                        if(local_last_writer.is_fresh()) { add_dependency(reduce_cmd, local_last_writer, dependency_kind::true_dep, dependency_origin::dataflow); }
68✔
301
                        add_dependency(reduce_cmd, ap_cmd, dependency_kind::true_dep, dependency_origin::dataflow);
68✔
302
                        generate_anti_dependencies(tsk, bid, buffer.local_last_writer, scalar_reduction_box, reduce_cmd);
68✔
303

304
                        post_reduction_state.local_last_writer.update_box(scalar_reduction_box, reduce_cmd);
68✔
305
                        participating_nodes.set(m_local_nid); // We are participating
68✔
306
                }
307

308
                // We currently do not support generating reduction commands on only a subset of nodes, except for the special case of a single command.
309
                // This is because it is unclear who owns the final result in this case (normally all nodes "own" the result).
310
                //   => I.e., reducing and using the result on the participating nodes is actually not the problem (this works as intended); the issue only arises
311
                //      if the result is subsequently required in other tasks. Since we don't have a good way of detecting this condition however, we currently
312
                //      disallow partial reductions altogether.
313
                // NOTE: This check relies on the fact that we currently only generate a single chunk per node for reductions (see assertion above).
314
                if(participating_nodes.count() > 1 && participating_nodes.count() != m_num_nodes) {
131✔
315
                        utils::report_error(error_policy::panic,
6✔
316
                            "{} requires a reduction on {} that is not performed on all nodes. This is currently not supported. Either "
317
                            "ensure that all nodes receive a chunk that reads from the buffer, or reduce the data on a single node.",
318
                            print_task_debug_label(tsk, true /* title case */), print_buffer_debug_label(bid));
12✔
319
                }
320

321
                // For buffers that were in a pending reduction state and a reduction was generated
322
                // (i.e., the result was not discarded), set their new state.
323
                if(participating_nodes.count() > 0) {
129✔
324
                        m_completed_reductions.push_back(reduction.rid);
123✔
325
                        buffer = std::move(post_reduction_state);
123✔
326
                }
327
        }
133✔
328
}
6,142✔
329

330
void command_graph_generator::generate_pushes(batch& current_batch, const task& tsk, const assigned_chunks_with_requirements& chunks_with_requirements) {
3,070✔
331
        struct push_scratch {
332
                std::unordered_map<node_id, region_builder<3>> target_regions;
333
                std::unordered_set<command*> depends_on;
334
        };
335
        std::unordered_map<buffer_id, push_scratch> per_buffer_pushes;
3,070✔
336

337
        for(auto& [a_chunk, requirements] : chunks_with_requirements.remote_chunks) {
5,247✔
338
                const node_id nid = a_chunk.executed_on;
2,177✔
339

340
                for(const auto& [bid, consumed, _] : requirements) {
4,867✔
341
                        if(consumed.empty()) continue;
2,690✔
342
                        auto& buffer = m_buffers.at(bid);
1,777✔
343

344
                        const auto local_sources = buffer.local_last_writer.get_region_values(consumed);
1,777✔
345
                        for(const auto& [local_box, wcs] : local_sources) {
4,907✔
346
                                if(!wcs.is_fresh() || wcs.is_replicated()) { continue; }
3,130✔
347

348
                                // Make sure we don't push anything we've already pushed to this node before
349
                                region_builder<3> non_replicated_boxes;
855✔
350
                                for(const auto& [replicated_box, nodes] : buffer.replicated_regions.get_region_values(local_box)) {
1,812✔
351
                                        if(nodes.test(nid)) continue;
957✔
352
                                        non_replicated_boxes.add(replicated_box);
736✔
353
                                }
855✔
354

355
                                if(!non_replicated_boxes.empty()) {
855✔
356
                                        assert(!utils::isa<await_push_command>(wcs.get_command()) && "Attempting to push non-owned data?!");
666✔
357
                                        auto push_region = std::move(non_replicated_boxes).into_region();
666✔
358
                                        // Remember that we've replicated this region
359
                                        for(const auto& [replicated_box, nodes] : buffer.replicated_regions.get_region_values(push_region)) {
1,402✔
360
                                                buffer.replicated_regions.update_box(replicated_box, node_bitset{nodes}.set(nid));
736✔
361
                                        }
666✔
362
                                        auto& scratch = per_buffer_pushes[bid]; // allow default-insert
666✔
363
                                        scratch.target_regions[nid /* allow default-insert */].add(push_region);
666✔
364
                                        scratch.depends_on.insert(wcs);
666✔
365
                                }
666✔
366
                        }
855✔
367
                }
1,777✔
368
        }
369

370
        // Generate push command for each buffer
371
        for(auto& [bid, scratch] : per_buffer_pushes) {
3,508✔
372
                region_builder<3> combined_region;
438✔
373
                std::vector<std::pair<node_id, region<3>>> target_regions;
438✔
374
                for(auto& [nid, boxes] : scratch.target_regions) {
1,101✔
375
                        auto region = std::move(boxes).into_region();
663✔
376
                        combined_region.add(region);
663✔
377
                        target_regions.push_back({nid, std::move(region)});
663✔
378
                }
663✔
379

380
                auto* const cmd = create_command<push_command>(current_batch, transfer_id(tsk.get_id(), bid, no_reduction_id), std::move(target_regions),
438✔
381
                    [&, bid = bid](const auto& record_debug_info) { record_debug_info(m_buffers.at(bid).debug_name); });
961✔
382
                for(const auto dep : scratch.depends_on) {
879✔
383
                        add_dependency(cmd, dep, dependency_kind::true_dep, dependency_origin::dataflow);
441✔
384
                }
385

386
                // Store the read access for determining anti-dependencies
387
                m_command_buffer_reads[cmd->get_id()].emplace(bid, std::move(combined_region).into_region());
438✔
388
        }
438✔
389
}
6,140✔
390

391
// TODO: We currently generate an await push command for each local chunk, whereas we only generate a single push command for all remote chunks
392
void command_graph_generator::generate_await_pushes(batch& current_batch, const task& tsk, const assigned_chunks_with_requirements& chunks_with_requirements) {
3,070✔
393
        for(auto& [a_chunk, requirements] : chunks_with_requirements.local_chunks) {
5,938✔
394
                for(auto& [bid, consumed, _] : requirements) {
5,988✔
395
                        if(consumed.empty()) continue;
3,120✔
396
                        auto& buffer = m_buffers.at(bid);
2,406✔
397

398
                        const auto local_sources = buffer.local_last_writer.get_region_values(consumed);
2,406✔
399
                        region_builder<3> missing_part_boxes;
2,406✔
400
                        for(const auto& [box, wcs] : local_sources) {
6,090✔
401
                                // Note that we initialize all buffers as fresh, so this doesn't trigger for uninitialized reads
402
                                if(!box.empty() && !wcs.is_fresh()) { missing_part_boxes.add(box); }
3,684!
403
                        }
404

405
                        // There is data we don't yet have locally. Generate an await push command for it.
406
                        if(!missing_part_boxes.empty()) {
2,406✔
407
                                const auto missing_parts = std::move(missing_part_boxes).into_region();
413✔
408
                                assert(m_num_nodes > 1);
413✔
409
                                auto* const ap_cmd = create_command<await_push_command>(current_batch, transfer_id(tsk.get_id(), bid, no_reduction_id), missing_parts,
413✔
410
                                    [&](const auto& record_debug_info) { record_debug_info(buffer.debug_name); });
923✔
411
                                generate_anti_dependencies(tsk, bid, buffer.local_last_writer, missing_parts, ap_cmd);
413✔
412
                                generate_epoch_dependencies(ap_cmd);
413✔
413
                                // Remember that we have this data now
414
                                buffer.local_last_writer.update_region(missing_parts, {ap_cmd, true /* is_replicated */});
413✔
415
                        }
413✔
416
                }
2,406✔
417
        }
418
}
3,070✔
419

420
void command_graph_generator::update_local_buffer_fresh_regions(const task& tsk, const std::unordered_map<buffer_id, region<3>>& per_buffer_local_writes) {
3,067✔
421
        buffer_requirements_list requirements;
3,067✔
422
        for(const auto bid : tsk.get_buffer_access_map().get_accessed_buffers()) {
6,206✔
423
                const auto& bam = tsk.get_buffer_access_map();
3,139✔
424
                requirements.push_back({bid, bam.get_task_consumed_region(bid), bam.get_task_produced_region(bid)});
6,278✔
425
        }
426
        // Add requirements for reductions
427
        for(const auto& reduction : tsk.get_reductions()) {
3,239✔
428
                auto it = std::find_if(requirements.begin(), requirements.end(), [&](const buffer_requirements& br) { return br.bid == reduction.bid; });
223✔
429
                if(it == requirements.end()) { it = requirements.insert(requirements.end(), buffer_requirements{reduction.bid, {}, {}}); }
172!
430
                it->produced = scalar_reduction_box;
172✔
431
        }
432
        for(auto& [bid, _, produced] : requirements) {
6,378✔
433
                region global_writes = produced;
3,311✔
434
                auto& buffer = m_buffers.at(bid);
3,311✔
435
                if(m_policy.uninitialized_read_error != error_policy::ignore) { buffer.initialized_region = region_union(buffer.initialized_region, global_writes); }
3,311✔
436

437
                const auto remote_writes = ([&, bid = bid] {
6,622✔
438
                        if(auto it = per_buffer_local_writes.find(bid); it != per_buffer_local_writes.end()) {
3,311✔
439
                                const auto& local_writes = it->second;
2,290✔
440
                                assert(region_difference(local_writes, global_writes).empty()); // Local writes have to be a subset of global writes
2,290✔
441
                                return region_difference(global_writes, local_writes);
2,290✔
442
                        }
443
                        return std::move(global_writes);
1,021✔
444
                })(); // IIFE
3,311✔
445

446
                // TODO: We need a way of updating regions in place! E.g. apply_to_values(box, callback)
447
                auto boxes_and_cids = buffer.local_last_writer.get_region_values(remote_writes);
3,311✔
448
                for(auto& [box, wcs] : boxes_and_cids) {
4,245✔
449
                        if(wcs.is_fresh()) {
934✔
450
                                wcs.mark_as_stale();
634✔
451
                                buffer.local_last_writer.update_box(box, wcs);
634✔
452
                        }
453
                }
454
        }
3,311✔
455
}
9,273!
456

457
void command_graph_generator::generate_distributed_commands(batch& current_batch, const task& tsk) {
3,074✔
458
        const auto chunks = split_task_and_assign_chunks(tsk);
3,074✔
459
        const auto chunks_with_requirements = compute_per_chunk_requirements(tsk, chunks);
3,074✔
460

461
        // Check for and report overlapping writes between local chunks, and between local and remote chunks.
462
        if(m_policy.overlapping_write_error != error_policy::ignore) {
3,074✔
463
                box_vector<3> local_chunks;
3,070✔
464
                for(const auto& [a_chunk, _] : chunks_with_requirements.local_chunks) {
5,938✔
465
                        local_chunks.push_back(box<3>{a_chunk.chnk});
2,868✔
466
                }
467
                report_overlapping_writes(tsk, local_chunks);
3,070✔
468
        }
3,070✔
469

470
        resolve_pending_reductions(current_batch, tsk, chunks_with_requirements);
3,072✔
471
        generate_pushes(current_batch, tsk, chunks_with_requirements);
3,070✔
472
        generate_await_pushes(current_batch, tsk, chunks_with_requirements);
3,070✔
473

474
        // Union of all per-buffer writes on this node, used to determine which parts of a buffer are fresh/stale later on.
475
        std::unordered_map<buffer_id, region<3>> per_buffer_local_writes;
3,070✔
476

477
        // Create command for each local chunk and resolve local data dependencies.
478
        for(const auto& [a_chunk, requirements] : chunks_with_requirements.local_chunks) {
5,935✔
479
                command* cmd = nullptr;
2,868✔
480
                if(tsk.get_type() == task_type::fence) {
2,868✔
481
                        cmd = create_command<fence_command>(current_batch, &tsk,
176✔
482
                            [&](const auto& record_debug_info) { record_debug_info(tsk, [this](const buffer_id bid) { return m_buffers.at(bid).debug_name; }); });
218✔
483
                } else {
484
                        // Go over all reductions that are to be performed *during* the execution of this chunk,
485
                        // not to be confused with any pending reductions that need to be finalized *before* the
486
                        // execution of this chunk (those have already been handled by resolve_pending_reductions).
487
                        // If a reduction reads the previous value of the buffer (i.e. w/o property::reduction::initialize_to_identity),
488
                        // we have to include it in exactly one of the per-node intermediate reductions.
489
                        const bool is_reduction_initializer = std::any_of(tsk.get_reductions().begin(), tsk.get_reductions().end(),
2,780✔
490
                            [&](const auto& reduction) { return m_local_nid == reduction_initializer_nid && reduction.init_from_buffer; });
166✔
491
                        cmd = create_command<execution_command>(current_batch, &tsk, subrange{a_chunk.chnk}, is_reduction_initializer,
5,560✔
492
                            [&](const auto& record_debug_info) { record_debug_info(tsk, [this](const buffer_id bid) { return m_buffers.at(bid).debug_name; }); });
8,082✔
493
                }
494

495
                if(tsk.get_type() == task_type::collective) {
2,868✔
496
                        // Collective host tasks have an implicit dependency on the previous task in the same collective group,
497
                        // which is required in order to guarantee they are executed in the same order on every node.
498
                        auto cgid = tsk.get_collective_group_id();
76✔
499
                        if(const auto cg = m_collective_groups.find(cgid); cg != m_collective_groups.end()) {
76✔
500
                                add_dependency(cmd, cg->second.last_collective_command, dependency_kind::true_dep, dependency_origin::collective_group_serialization);
20✔
501
                                cg->second.last_collective_command = cmd;
20✔
502
                        } else {
503
                                m_collective_groups.emplace(cgid, cmd);
56✔
504
                        }
505
                }
506

507
                for(const auto& [bid, consumed, produced] : requirements) {
5,985✔
508
                        auto& buffer = m_buffers.at(bid);
3,120✔
509

510
                        // Process consuming accesses first, so we don't add dependencies onto our own writes
511
                        if(!consumed.empty()) {
3,120✔
512
                                for(const auto& [box, wcs] : buffer.local_last_writer.get_region_values(consumed)) {
6,090✔
513
                                        if(box.empty()) continue;
3,684!
514
                                        assert(wcs.is_fresh() && "Unresolved remote data dependency");
3,684✔
515
                                        add_dependency(cmd, wcs, dependency_kind::true_dep, dependency_origin::dataflow);
3,684✔
516
                                }
2,406✔
517

518
                                // Store the read access for determining anti-dependencies later on
519
                                m_command_buffer_reads[cmd->get_id()].emplace(bid, consumed);
2,406✔
520
                        }
521

522
                        if(!produced.empty()) {
3,120✔
523
                                generate_anti_dependencies(tsk, bid, buffer.local_last_writer, produced, cmd);
2,290✔
524

525
                                // Update last writer
526
                                buffer.local_last_writer.update_region(produced, cmd);
2,290✔
527
                                buffer.replicated_regions.update_region(produced, node_bitset{});
2,290✔
528

529
                                // In case this buffer was in a pending reduction state we discarded the result and need to remove the pending reduction.
530
                                if(buffer.pending_reduction.has_value()) {
2,290✔
531
                                        m_completed_reductions.push_back(buffer.pending_reduction->rid);
1✔
532
                                        buffer.pending_reduction = std::nullopt;
1✔
533
                                }
534

535
                                per_buffer_local_writes.emplace(bid, produced);
2,290✔
536
                        }
537

538
                        if(m_policy.uninitialized_read_error != error_policy::ignore) {
3,120✔
539
                                if(const auto uninitialized_reads = region_difference(consumed, buffer.initialized_region); !uninitialized_reads.empty()) {
1,022✔
540
                                        utils::report_error(m_policy.uninitialized_read_error,
12✔
541
                                            "Command C{} on N{}, which executes {} of {}, reads {} {}, which has not been written by any node.", cmd->get_id(), m_local_nid,
6✔
542
                                            box(subrange(a_chunk.chnk.offset, a_chunk.chnk.range)), print_task_debug_label(tsk), print_buffer_debug_label(bid),
27✔
543
                                            uninitialized_reads);
544
                                }
1,022✔
545
                        }
546
                }
547

548
                for(const auto& side_effect : tsk.get_side_effect_map()) {
3,016✔
549
                        const auto [hoid, order] = side_effect;
151✔
550
                        auto& host_object = m_host_objects.at(hoid);
151✔
551

552
                        if(host_object.last_side_effect != nullptr) {
151!
553
                                // TODO once we have different side_effect_orders, their interaction will determine the dependency kind
554
                                add_dependency(cmd, host_object.last_side_effect, dependency_kind::true_dep, dependency_origin::dataflow);
151✔
555
                        }
556

557
                        // Simplification: If there are multiple chunks per node, we generate true-dependencies between them in an arbitrary order, when all we really
558
                        // need is mutual exclusion (i.e. a bi-directional pseudo-dependency).
559
                        host_object.last_side_effect = cmd;
151✔
560
                }
561

562
                generate_epoch_dependencies(cmd);
2,865✔
563
        }
564

565
        // Mark any buffers that now are in a pending reduction state as such.
566
        // If there is only one chunk/command, it already implicitly generates the final reduced value
567
        // and the buffer does not need to be flagged as a pending reduction.
568
        for(const auto& reduction : tsk.get_reductions()) {
3,239✔
569
                if(chunks.size() > 1) {
172✔
570
                        m_buffers.at(reduction.bid).pending_reduction = reduction;
136✔
571
                } else {
572
                        m_completed_reductions.push_back(reduction.rid);
36✔
573
                }
574
        }
575
        update_local_buffer_fresh_regions(tsk, per_buffer_local_writes);
3,067✔
576
}
6,151✔
577

578
void command_graph_generator::generate_anti_dependencies(
2,771✔
579
    const task& tsk, const buffer_id bid, const region_map<write_command_state>& last_writers_map, const region<3>& write_req, command* const write_cmd) //
580
{
581
        const auto last_writers = last_writers_map.get_region_values(write_req);
2,771✔
582
        for(const auto& [box, wcs] : last_writers) {
5,675✔
583
                auto* const last_writer_cmd = wcs.get_command();
2,904✔
584
                assert(!utils::isa<task_command>(last_writer_cmd) || utils::as<task_command>(last_writer_cmd)->get_task() != &tsk);
2,904✔
585

586
                // Add anti-dependencies onto all successors of the writer
587
                bool has_successors = false;
2,904✔
588
                for(auto d : last_writer_cmd->get_dependents()) {
7,399✔
589
                        // Only consider true dependencies
590
                        if(d.kind != dependency_kind::true_dep) continue;
4,495✔
591

592
                        auto* const cmd = d.node;
4,169✔
593

594
                        // We might have already generated new commands within the same task that also depend on this; in that case, skip it
595
                        if(utils::isa<task_command>(cmd) && utils::as<task_command>(cmd)->get_task() == &tsk) continue;
4,169✔
596

597
                        // So far we don't know whether the dependent actually intersects with the subrange we're writing
598
                        if(const auto command_reads_it = m_command_buffer_reads.find(cmd->get_id()); command_reads_it != m_command_buffer_reads.end()) {
2,533✔
599
                                const auto& command_reads = command_reads_it->second;
1,086✔
600
                                // The task might be a dependent because of another buffer
601
                                if(const auto buffer_reads_it = command_reads.find(bid); buffer_reads_it != command_reads.end()) {
1,086✔
602
                                        if(!region_intersection(write_req, buffer_reads_it->second).empty()) {
1,042✔
603
                                                has_successors = true;
889✔
604
                                                add_dependency(write_cmd, cmd, dependency_kind::anti_dep, dependency_origin::dataflow);
889✔
605
                                        }
606
                                }
607
                        }
608
                }
609

610
                // In some cases (horizons, master node host task, weird discard_* constructs...)
611
                // the last writer might not have any successors. Just add the anti-dependency onto the writer itself then.
612
                if(!has_successors) { add_dependency(write_cmd, last_writer_cmd, dependency_kind::anti_dep, dependency_origin::dataflow); }
2,904✔
613
        }
614
}
5,542✔
615

616
void command_graph_generator::set_epoch_for_new_commands(command* const epoch_or_horizon) {
2,070✔
617
        // both an explicit epoch command and an applied horizon can be effective epochs
618
        assert(utils::isa<epoch_command>(epoch_or_horizon) || utils::isa<horizon_command>(epoch_or_horizon));
2,070✔
619

620
        for(auto& [_, buffer] : m_buffers) {
3,024✔
621
                buffer.local_last_writer.apply_to_values([epoch_or_horizon](write_command_state wcs) {
954✔
622
                        if(epoch_or_horizon->get_id() <= wcs.get_command()->get_id()) return wcs;
2,071✔
623
                        write_command_state new_wcs(epoch_or_horizon, wcs.is_replicated());
1,038✔
624
                        if(!wcs.is_fresh()) new_wcs.mark_as_stale();
1,038✔
625
                        return new_wcs;
1,038✔
626
                });
627
        }
628
        for(auto& [_, host_object] : m_host_objects) {
2,137✔
629
                if(host_object.last_side_effect != nullptr && host_object.last_side_effect->get_id() < epoch_or_horizon->get_id()) {
67!
630
                        host_object.last_side_effect = epoch_or_horizon;
45✔
631
                }
632
        }
633
        for(auto& [cgid, collective_group] : m_collective_groups) {
2,152✔
634
                if(collective_group.last_collective_command->get_id() < epoch_or_horizon->get_id()) { collective_group.last_collective_command = epoch_or_horizon; }
82!
635
        }
636

637
        m_epoch_for_new_commands = epoch_or_horizon;
2,070✔
638
}
2,070✔
639

640
void command_graph_generator::reduce_execution_front_to(command* const new_front) {
2,132✔
641
        const auto previous_execution_front = m_execution_front; // modified inside loop through add_dependency
2,132✔
642
        for(auto* const front_cmd : previous_execution_front) {
6,811✔
643
                if(front_cmd != new_front) { add_dependency(new_front, front_cmd, dependency_kind::true_dep, dependency_origin::execution_front); }
4,679✔
644
        }
645
        assert(m_execution_front.size() == 1 && *m_execution_front.begin() == new_front);
2,132✔
646
}
4,264✔
647

648
void command_graph_generator::generate_epoch_command(batch& current_batch, const task& tsk) {
1,323✔
649
        assert(tsk.get_type() == task_type::epoch);
1,323✔
650
        m_cdag->begin_epoch(tsk.get_id());
1,323✔
651
        auto* const epoch = create_command<epoch_command>(
2,646✔
652
            current_batch, &tsk, tsk.get_epoch_action(), std::move(m_completed_reductions), [&](const auto& record_debug_info) { record_debug_info(tsk); });
1,932✔
653
        set_epoch_for_new_commands(epoch);
1,323✔
654
        m_current_horizon = no_command;
1,323✔
655
        // Make the epoch depend on the previous execution front
656
        reduce_execution_front_to(epoch);
1,323✔
657
}
1,323✔
658

659
void command_graph_generator::generate_horizon_command(batch& current_batch, const task& tsk) {
809✔
660
        assert(tsk.get_type() == task_type::horizon);
809✔
661
        m_cdag->begin_epoch(tsk.get_id());
809✔
662
        auto* const new_horizon =
663
            create_command<horizon_command>(current_batch, &tsk, std::move(m_completed_reductions), [&](const auto& record_debug_info) { record_debug_info(tsk); });
1,048✔
664

665
        if(m_current_horizon != nullptr) {
809✔
666
                // Apply the previous horizon
667
                set_epoch_for_new_commands(m_current_horizon);
747✔
668
        }
669
        m_current_horizon = new_horizon;
809✔
670

671
        // Make the horizon depend on the previous execution front
672
        reduce_execution_front_to(new_horizon);
809✔
673
}
809✔
674

675
void command_graph_generator::generate_epoch_dependencies(command* cmd) {
3,350✔
676
        // No command must be re-ordered before its last preceding epoch to enforce the barrier semantics of epochs.
677
        // To guarantee that each node has a transitive true dependency (=temporal dependency) on the epoch, it is enough to add an epoch -> command dependency
678
        // to any command that has no other true dependencies itself and no graph traversal is necessary. This can be verified by a simple induction proof.
679

680
        // As long the first epoch is present in the graph, all transitive dependencies will be visible and the initial epoch commands (tid 0) are the only
681
        // commands with no true predecessor. As soon as the first epoch is pruned through the horizon mechanism however, more than one node with no true
682
        // predecessor can appear (when visualizing the graph). This does not violate the ordering constraint however, because all "free-floating" nodes
683
        // in that snapshot had a true-dependency chain to their predecessor epoch at the point they were flushed, which is sufficient for following the
684
        // dependency chain from the executor perspective.
685

686
        if(const auto deps = cmd->get_dependencies();
3,350✔
687
            std::none_of(deps.begin(), deps.end(), [](const command::dependency d) { return d.kind == dependency_kind::true_dep; })) {
6,605✔
688
                if(!utils::isa<epoch_command>(cmd) || utils::as<epoch_command>(cmd)->get_epoch_action() != epoch_action::init) {
1,141!
689
                        assert(cmd != m_epoch_for_new_commands);
1,141✔
690
                        add_dependency(cmd, m_epoch_for_new_commands, dependency_kind::true_dep, dependency_origin::last_epoch);
1,141✔
691
                }
692
        }
693
}
3,350✔
694

695
std::string command_graph_generator::print_buffer_debug_label(const buffer_id bid) const {
19✔
696
        return utils::make_buffer_debug_label(bid, m_buffers.at(bid).debug_name);
19✔
697
}
698

699
} // namespace celerity::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc