• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 8329571092

18 Mar 2024 03:49PM UTC coverage: 94.63% (+0.7%) from 93.968%
8329571092

push

github

fknorr
Update benchmark results for IDAG generation

2907 of 3248 branches covered (89.5%)

Branch coverage included in aggregate %.

6574 of 6771 relevant lines covered (97.09%)

179871.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.89
/src/distributed_graph_generator.cc
1
#include "distributed_graph_generator.h"
2

3
#include "access_modes.h"
4
#include "command.h"
5
#include "command_graph.h"
6
#include "recorders.h"
7
#include "split.h"
8
#include "task.h"
9
#include "task_manager.h"
10

11
namespace celerity::detail {
12

13
distributed_graph_generator::distributed_graph_generator(
527✔
14
    const size_t num_nodes, const node_id local_nid, command_graph& cdag, const task_manager& tm, detail::command_recorder* recorder, const policy_set& policy)
527✔
15
    : m_num_nodes(num_nodes), m_local_nid(local_nid), m_policy(policy), m_cdag(cdag), m_task_mngr(tm), m_recorder(recorder) {
527✔
16
        if(m_num_nodes > max_num_nodes) {
527!
17
                throw std::runtime_error(fmt::format("Number of nodes requested ({}) exceeds compile-time maximum of {}", m_num_nodes, max_num_nodes));
×
18
        }
19

20
        // Build initial epoch command (this is required to properly handle anti-dependencies on host-initialized buffers).
21
        // We manually generate the first command, this will be replaced by applied horizons or explicit epochs down the line (see
22
        // set_epoch_for_new_commands).
23
        auto* const epoch_cmd = cdag.create<epoch_command>(task_manager::initial_epoch_task, epoch_action::none, std::vector<reduction_id>{});
527✔
24
        epoch_cmd->mark_as_flushed(); // there is no point in flushing the initial epoch command
527✔
25
        if(m_recorder != nullptr) {
527✔
26
                const auto epoch_tsk = tm.get_task(task_manager::initial_epoch_task);
342✔
27
                m_recorder->record(command_record(*epoch_cmd, *epoch_tsk, {}));
342✔
28
        }
29
        m_epoch_for_new_commands = epoch_cmd->get_cid();
527✔
30
}
527✔
31

32
void distributed_graph_generator::notify_buffer_created(const buffer_id bid, const range<3>& range, bool host_initialized) {
591✔
33
        m_buffers.emplace(std::piecewise_construct, std::tuple{bid}, std::tuple{range, range});
591✔
34
        if(host_initialized && m_policy.uninitialized_read_error != error_policy::ignore) { m_buffers.at(bid).initialized_region = box(subrange({}, range)); }
659✔
35
        // Mark contents as available locally (= don't generate await push commands) and fully replicated (= don't generate push commands).
36
        // This is required when tasks access host-initialized or uninitialized buffers.
37
        m_buffers.at(bid).local_last_writer.update_region(subrange<3>({}, range), m_epoch_for_new_commands);
1,773✔
38
        m_buffers.at(bid).replicated_regions.update_region(subrange<3>({}, range), node_bitset{}.set());
1,773✔
39
}
591✔
40

41
void distributed_graph_generator::notify_buffer_debug_name_changed(const buffer_id bid, const std::string& debug_name) {
22✔
42
        m_buffers.at(bid).debug_name = debug_name;
22✔
43
}
22✔
44

45
void distributed_graph_generator::notify_buffer_destroyed(const buffer_id bid) {
169✔
46
        assert(m_buffers.count(bid) != 0);
169✔
47
        m_buffers.erase(bid);
169✔
48
}
169✔
49

50
void distributed_graph_generator::notify_host_object_created(const host_object_id hoid) {
62✔
51
        assert(m_host_objects.count(hoid) == 0);
62✔
52
        m_host_objects.emplace(hoid, host_object_state{m_epoch_for_new_commands});
62✔
53
}
62✔
54

55
void distributed_graph_generator::notify_host_object_destroyed(const host_object_id hoid) {
19✔
56
        assert(m_host_objects.count(hoid) != 0);
19✔
57
        m_host_objects.erase(hoid);
19✔
58
}
19✔
59

60
using buffer_requirements_map = std::unordered_map<buffer_id, std::unordered_map<access_mode, region<3>>>;
61

62
static buffer_requirements_map get_buffer_requirements_for_mapped_access(const task& tsk, subrange<3> sr, const range<3> global_size) {
11,451✔
63
        buffer_requirements_map result;
11,451✔
64
        const auto& access_map = tsk.get_buffer_access_map();
11,451✔
65
        const auto buffers = access_map.get_accessed_buffers();
11,451✔
66
        for(const buffer_id bid : buffers) {
21,128✔
67
                const auto modes = access_map.get_access_modes(bid);
9,677✔
68
                for(auto m : modes) {
20,010✔
69
                        result[bid][m] = access_map.get_mode_requirements(bid, m, tsk.get_dimensions(), sr, global_size);
10,333✔
70
                }
71
        }
9,677✔
72
        return result;
11,451✔
73
}
11,451✔
74

75
// According to Wikipedia https://en.wikipedia.org/wiki/Topological_sorting#Depth-first_search
76
std::vector<abstract_command*> sort_topologically(command_set unmarked) {
526✔
77
        command_set temporary_marked;
526✔
78
        command_set permanent_marked;
526✔
79
        std::vector<abstract_command*> sorted(unmarked.size());
1,052✔
80
        auto sorted_front = sorted.rbegin();
526✔
81

82
        const auto visit = [&](abstract_command* const cmd, auto& visit /* to allow recursion in lambda */) {
526✔
83
                if(permanent_marked.count(cmd) != 0) return;
689✔
84
                assert(temporary_marked.count(cmd) == 0 && "cyclic command graph");
651✔
85
                unmarked.erase(cmd);
651✔
86
                temporary_marked.insert(cmd);
651✔
87
                for(const auto dep : cmd->get_dependents()) {
765✔
88
                        visit(dep.node, visit);
114✔
89
                }
90
                temporary_marked.erase(cmd);
651✔
91
                permanent_marked.insert(cmd);
651✔
92
                *sorted_front++ = cmd;
651✔
93
        };
526✔
94

95
        while(!unmarked.empty()) {
1,101✔
96
                visit(*unmarked.begin(), visit);
575✔
97
        }
98

99
        return sorted;
1,052✔
100
}
526✔
101

102
command_set distributed_graph_generator::build_task(const task& tsk) {
6,927✔
103
        assert(m_current_cmd_batch.empty());
6,927✔
104
        [[maybe_unused]] const auto cmd_count_before = m_cdag.command_count();
6,927✔
105

106
        const auto epoch_to_prune_before = m_epoch_for_new_commands;
6,927✔
107

108
        switch(tsk.get_type()) {
6,927!
109
        case task_type::epoch: generate_epoch_command(tsk); break;
1,420✔
110
        case task_type::horizon: generate_horizon_command(tsk); break;
563✔
111
        case task_type::device_compute:
4,944✔
112
        case task_type::host_compute:
113
        case task_type::master_node:
114
        case task_type::collective:
115
        case task_type::fence: generate_distributed_commands(tsk); break;
4,944✔
116
        default: throw std::runtime_error("Task type NYI");
×
117
        }
118

119
        // It is currently undefined to split reduction-producer tasks into multiple chunks on the same node:
120
        //   - Per-node reduction intermediate results are stored with fixed access to a single backing buffer,
121
        //     so multiple chunks on the same node will race on this buffer access
122
        //   - Inputs to the final reduction command are ordered by origin node ids to guarantee bit-identical results. It is not possible to distinguish
123
        //     more than one chunk per node in the serialized commands, so different nodes can produce different final reduction results for non-associative
124
        //     or non-commutative operations
125
        if(!tsk.get_reductions().empty()) { assert(m_cdag.task_command_count(tsk.get_id()) <= 1); }
6,922!
126

127
        // Commands without any other true-dependency must depend on the active epoch command to ensure they cannot be re-ordered before the epoch.
128
        // Need to check count b/c for some tasks we may not have generated any commands locally.
129
        if(m_cdag.task_command_count(tsk.get_id()) > 0) {
6,922✔
130
                for(auto* const cmd : m_cdag.task_commands(tsk.get_id())) {
13,444✔
131
                        generate_epoch_dependencies(cmd);
6,722✔
132
                }
133
        }
134

135
        // Check that all commands have been created through create_command
136
        assert(m_cdag.command_count() - cmd_count_before == m_current_cmd_batch.size());
6,922✔
137

138
        // If a new epoch was completed in the CDAG before the current task, prune all predecessor commands of that epoch.
139
        prune_commands_before(epoch_to_prune_before);
6,922✔
140

141
        // If we have a command_recorder, record the current batch of commands
142
        if(m_recorder != nullptr) {
6,922✔
143
                for(const auto& cmd : m_current_cmd_batch) {
3,309✔
144
                        m_recorder->record(command_record(*cmd, tsk, [this](const buffer_id bid) { return m_buffers.at(bid).debug_name; }));
3,724✔
145
                }
146
        }
147

148
        return std::move(m_current_cmd_batch);
13,844✔
149
}
150

151
void distributed_graph_generator::report_overlapping_writes(const task& tsk, const box_vector<3>& local_chunks) const {
4,937✔
152
        const chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()};
4,937✔
153

154
        // Since this check is run distributed on every node, we avoid quadratic behavior by only checking for conflicts between all local chunks and the
155
        // region-union of remote chunks. This way, every conflict will be reported by at least one node.
156
        const box<3> global_chunk(subrange(full_chunk.offset, full_chunk.range));
4,937✔
157
        auto remote_chunks = region_difference(global_chunk, region(box_vector<3>(local_chunks))).into_boxes();
19,748✔
158

159
        // detect_overlapping_writes takes a single box_vector, so we concatenate local and global chunks (the order does not matter)
160
        auto distributed_chunks = std::move(remote_chunks);
4,937✔
161
        distributed_chunks.insert(distributed_chunks.end(), local_chunks.begin(), local_chunks.end());
4,937✔
162

163
        if(const auto overlapping_writes = detect_overlapping_writes(tsk, distributed_chunks); !overlapping_writes.empty()) {
4,937✔
164
                auto error = fmt::format("{} has overlapping writes between multiple nodes in", print_task_debug_label(tsk, true /* title case */));
28✔
165
                for(const auto& [bid, overlap] : overlapping_writes) {
28✔
166
                        fmt::format_to(std::back_inserter(error), " {} {}", print_buffer_debug_label(bid), overlap);
28✔
167
                }
168
                error += ". Choose a non-overlapping range mapper for this write access or constrain the split via experimental::constrain_split to make the access "
169
                         "non-overlapping.";
14✔
170
                utils::report_error(m_policy.overlapping_write_error, "{}", error);
14✔
171
        }
4,951✔
172
}
9,874✔
173

174
void distributed_graph_generator::generate_distributed_commands(const task& tsk) {
4,944✔
175
        const chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()};
4,944✔
176
        const size_t num_chunks = m_num_nodes * 1; // TODO Make configurable
4,944✔
177
        const auto chunks = ([&] {
4,944✔
178
                if(tsk.get_type() == task_type::collective || tsk.get_type() == task_type::fence) {
4,944✔
179
                        std::vector<chunk<3>> chunks;
128✔
180
                        for(size_t nid = 0; nid < m_num_nodes; ++nid) {
415✔
181
                                chunks.push_back(chunk_cast<3>(chunk<1>{id<1>{tsk.get_type() == task_type::collective ? nid : 0}, ones, {m_num_nodes}}));
287✔
182
                        }
183
                        return chunks;
128✔
184
                }
128✔
185
                if(tsk.has_variable_split()) {
4,816✔
186
                        if(tsk.get_hint<experimental::hints::split_1d>() != nullptr) {
1,371✔
187
                                // no-op, keeping this for documentation purposes
188
                        }
189
                        if(tsk.get_hint<experimental::hints::split_2d>() != nullptr) { return split_2d(full_chunk, tsk.get_granularity(), num_chunks); }
1,383✔
190
                        return split_1d(full_chunk, tsk.get_granularity(), num_chunks);
2,718✔
191
                }
192
                return std::vector<chunk<3>>{full_chunk};
3,445✔
193
        })();
4,944✔
194
        assert(chunks.size() <= num_chunks); // We may have created less than requested
4,944✔
195
        assert(!chunks.empty());
4,944✔
196

197
        // Assign each chunk to a node
198
        // We assign chunks next to each other to the same worker (if there is more chunks than workers), as this is likely to produce less
199
        // transfers between tasks than a round-robin assignment (for typical stencil codes).
200
        // FIXME: This only works if the number of chunks is an integer multiple of the number of workers, e.g. 3 chunks for 2 workers degrades to RR.
201
        const auto chunks_per_node = std::max<size_t>(1, chunks.size() / m_num_nodes);
4,944✔
202

203
        // Union of all per-buffer writes on this node, used to determine which parts of a buffer are fresh/stale later on.
204
        std::unordered_map<buffer_id, region<3>> per_buffer_local_writes;
4,944✔
205
        // In case we need to push a region that is overwritten in the same task, we have to defer updating the last writer.
206
        std::unordered_map<buffer_id, std::vector<std::pair<region<3>, command_id>>> per_buffer_last_writer_update_list;
4,944✔
207
        // Buffers that currently are in a pending reduction state will receive a new buffer state after a reduction has been generated.
208
        std::unordered_map<buffer_id, buffer_state> post_reduction_buffers;
4,944✔
209

210
        // Remember all generated pushes for determining intra-task anti-dependencies.
211
        std::vector<push_command*> generated_pushes;
4,944✔
212

213
        // Collect all local chunks for detecting overlapping writes between all local chunks and the union of remote chunks in a distributed manner.
214
        box_vector<3> local_chunks;
4,944✔
215

216
        // In the master/worker model, we used to try and find the node best suited for initializing multiple
217
        // reductions that do not initialize_to_identity based on current data distribution.
218
        // This is more difficult in a distributed setting, so for now we just hard code it to node 0.
219
        // TODO: Revisit this at some point.
220
        const node_id reduction_initializer_nid = 0;
4,944✔
221

222
        const box<3> empty_reduction_box({0, 0, 0}, {0, 0, 0});
4,944✔
223
        const box<3> scalar_reduction_box({0, 0, 0}, {1, 1, 1});
4,944✔
224

225
        // Iterate over all chunks, distinguish between local / remote chunks and normal / reduction access.
226
        //
227
        // Normal buffer access:
228
        // - For local chunks, find read requirements on remote data.
229
        //   Generate a single await push command for each buffer that awaits the entire required region.
230
        //   This will then be fulfilled by one or more incoming pushes.
231
        // - For remote chunks, find read requirements intersecting with owned buffer regions.
232
        //          Generate push commands for those regions.
233
        //
234
        // Reduction buffer access:
235
        // - For local chunks, create a reduction command and a single await_push command that receives the
236
        //   partial reduction results from all other nodes.
237
        // - For remote chunks, always create a push command, regardless of whether we have relevant data or not.
238
        //   This is required because the remote node does not know how many partial reduction results there are.
239
        for(size_t i = 0; i < chunks.size(); ++i) {
11,453✔
240
                const node_id nid = (i / chunks_per_node) % m_num_nodes;
6,512✔
241
                const bool is_local_chunk = nid == m_local_nid;
6,512✔
242

243
                auto requirements = get_buffer_requirements_for_mapped_access(tsk, chunks[i], tsk.get_global_size());
6,512✔
244

245
                // Add requirements for reductions
246
                for(const auto& reduction : tsk.get_reductions()) {
6,879✔
247
                        auto rmode = access_mode::discard_write;
367✔
248
                        if(nid == reduction_initializer_nid && reduction.init_from_buffer) { rmode = access_mode::read_write; }
367✔
249
#ifndef NDEBUG
250
                        for(auto pmode : access::producer_modes) {
2,202✔
251
                                assert(requirements[reduction.bid].count(pmode) == 0); // task_manager verifies that there are no reduction <-> write-access conflicts
1,835✔
252
                        }
253
#endif
254
                        requirements[reduction.bid][rmode] = scalar_reduction_box;
367✔
255
                }
256

257
                abstract_command* cmd = nullptr;
6,512✔
258
                if(is_local_chunk) {
6,512✔
259
                        if(tsk.get_type() == task_type::fence) {
4,744✔
260
                                cmd = create_command<fence_command>(tsk.get_id());
60✔
261
                        } else {
262
                                cmd = create_command<execution_command>(tsk.get_id(), subrange{chunks[i]});
4,684✔
263

264
                                // Go over all reductions that are to be performed *during* the execution of this chunk,
265
                                // not to be confused with any pending reductions that need to be finalized *before* the
266
                                // execution of this chunk.
267
                                // If a reduction reads the previous value of the buffer (i.e. w/o property::reduction::initialize_to_identity),
268
                                // we have to include it in exactly one of the per-node intermediate reductions.
269
                                for(const auto& reduction : tsk.get_reductions()) {
4,809✔
270
                                        if(nid == reduction_initializer_nid && reduction.init_from_buffer) {
139✔
271
                                                utils::as<execution_command>(cmd)->set_is_reduction_initializer(true);
14✔
272
                                                break;
14✔
273
                                        }
274
                                }
275
                        }
276

277
                        if(tsk.get_type() == task_type::collective) {
4,744✔
278
                                // Collective host tasks have an implicit dependency on the previous task in the same collective group,
279
                                // which is required in order to guarantee they are executed in the same order on every node.
280
                                auto cgid = tsk.get_collective_group_id();
68✔
281
                                if(auto prev = m_last_collective_commands.find(cgid); prev != m_last_collective_commands.end()) {
68✔
282
                                        m_cdag.add_dependency(cmd, m_cdag.get(prev->second), dependency_kind::true_dep, dependency_origin::collective_group_serialization);
16✔
283
                                        m_last_collective_commands.erase(prev);
16✔
284
                                }
285
                                m_last_collective_commands.emplace(cgid, cmd->get_cid());
68✔
286
                        }
287

288
                        local_chunks.push_back(subrange(chunks[i].offset, chunks[i].range));
4,744✔
289
                }
290

291
                // We use the task id, together with the "chunk id" and the buffer id (stored separately) to match pushes against their corresponding await pushes
292
                for(auto& [bid, reqs_by_mode] : requirements) {
12,606✔
293
                        auto& buffer = m_buffers.at(bid);
6,097✔
294
                        std::vector<access_mode> required_modes;
6,097✔
295
                        for(const auto mode : detail::access::all_modes) {
42,679✔
296
                                if(auto req_it = reqs_by_mode.find(mode); req_it != reqs_by_mode.end()) {
36,582✔
297
                                        // While uncommon, we do support chunks that don't require access to a particular buffer at all.
298
                                        if(!req_it->second.empty()) { required_modes.push_back(mode); }
6,433✔
299
                                }
300
                        }
301

302
                        // Don't add reduction commands within the loop to make sure there is at most one reduction command
303
                        // even in the presence of multiple consumer requirements.
304
                        const bool is_pending_reduction = buffer.pending_reduction.has_value();
6,097✔
305
                        const bool generate_reduction =
306
                            is_pending_reduction && std::any_of(required_modes.begin(), required_modes.end(), detail::access::mode_traits::is_consumer);
6,097✔
307
                        if(generate_reduction) {
6,097✔
308
                                // Prepare the buffer state for after the reduction has been performed:
309
                                // Set the current epoch as last writer and mark it as stale so that if we don't generate a reduction command,
310
                                // we'll know to get the data from elsewhere. If we generate a reduction command, this will be overwritten by its command id.
311
                                write_command_state wcs{m_epoch_for_new_commands};
173✔
312
                                wcs.mark_as_stale();
173✔
313
                                // We just treat this buffer as 1-dimensional, regardless of its actual dimensionality (as it must be unit-sized anyway)
314
                                post_reduction_buffers.emplace(std::piecewise_construct, std::tuple{bid},
346✔
315
                                    std::tuple{region_map<write_command_state>{ones, wcs}, region_map<node_bitset>{ones, node_bitset{}}});
346✔
316
                        }
317

318
                        if(is_pending_reduction && !generate_reduction) {
6,097✔
319
                                // TODO the per-node reduction result is discarded - warn user about dead store
320
                        }
321

322
                        region<3> uninitialized_reads;
6,097✔
323
                        for(const auto mode : required_modes) {
12,474✔
324
                                const auto& req = reqs_by_mode.at(mode);
6,377✔
325
                                if(detail::access::mode_traits::is_consumer(mode)) {
6,377✔
326
                                        if(is_local_chunk && m_policy.uninitialized_read_error != error_policy::ignore
8,716✔
327
                                            && !bounding_box(buffer.initialized_region).covers(bounding_box(req.get_boxes()))) {
8,716✔
328
                                                uninitialized_reads = region_union(uninitialized_reads, region_difference(req, buffer.initialized_region));
3✔
329
                                        }
330

331
                                        if(is_local_chunk) {
4,854✔
332
                                                // Store the read access for determining anti-dependencies later on
333
                                                m_command_buffer_reads[cmd->get_cid()][bid] = region_union(m_command_buffer_reads[cmd->get_cid()][bid], req);
3,315✔
334
                                        }
335

336
                                        if(is_local_chunk && !is_pending_reduction) {
4,854✔
337
                                                const auto local_sources = buffer.local_last_writer.get_region_values(req);
3,252✔
338
                                                box_vector<3> missing_part_boxes;
3,252✔
339
                                                for(const auto& [box, wcs] : local_sources) {
7,661✔
340
                                                        if(box.empty()) continue;
4,409!
341
                                                        if(!wcs.is_fresh()) {
4,409✔
342
                                                                missing_part_boxes.push_back(box);
402✔
343
                                                                continue;
402✔
344
                                                        }
345
                                                        m_cdag.add_dependency(cmd, m_cdag.get(wcs), dependency_kind::true_dep, dependency_origin::dataflow);
4,007✔
346
                                                }
347

348
                                                // There is data we don't yet have locally. Generate an await push command for it.
349
                                                if(!missing_part_boxes.empty()) {
3,252✔
350
                                                        const region missing_parts(std::move(missing_part_boxes));
331✔
351
                                                        assert(m_num_nodes > 1);
331✔
352
                                                        auto* const ap_cmd = create_command<await_push_command>(transfer_id(tsk.get_id(), bid, no_reduction_id), missing_parts);
331✔
353
                                                        m_cdag.add_dependency(cmd, ap_cmd, dependency_kind::true_dep, dependency_origin::dataflow);
331✔
354
                                                        generate_anti_dependencies(tsk.get_id(), bid, buffer.local_last_writer, missing_parts, ap_cmd);
331✔
355
                                                        generate_epoch_dependencies(ap_cmd);
331✔
356
                                                        // Remember that we have this data now
357
                                                        buffer.local_last_writer.update_region(missing_parts, {ap_cmd->get_cid(), true /* is_replicated */});
331✔
358
                                                }
331✔
359
                                        } else if(!is_pending_reduction) {
4,854✔
360
                                                // We generate separate push command for each last writer command for now, possibly even multiple for partially already-replicated data.
361
                                                // TODO: Can and/or should we consolidate?
362
                                                const auto local_sources = buffer.local_last_writer.get_region_values(req);
1,421✔
363
                                                for(const auto& [local_box, wcs] : local_sources) {
3,914✔
364
                                                        if(!wcs.is_fresh() || wcs.is_replicated()) { continue; }
2,493✔
365

366
                                                        // Make sure we don't push anything we've already pushed to this node before
367
                                                        box_vector<3> non_replicated_boxes;
575✔
368
                                                        for(const auto& [replicated_box, nodes] : buffer.replicated_regions.get_region_values(local_box)) {
1,248✔
369
                                                                if(nodes.test(nid)) continue;
673✔
370
                                                                non_replicated_boxes.push_back(replicated_box);
555✔
371
                                                        }
575✔
372

373
                                                        // Merge all connected boxes to determine final set of pushes
374
                                                        const auto push_region = region<3>(std::move(non_replicated_boxes));
575✔
375
                                                        for(auto& push_box : push_region.get_boxes()) {
1,064✔
376
                                                                auto* const push_cmd =
489✔
377
                                                                    create_command<push_command>(nid, transfer_id(tsk.get_id(), bid, no_reduction_id), push_box.get_subrange());
489✔
378
                                                                assert(!utils::isa<await_push_command>(m_cdag.get(wcs)) && "Attempting to push non-owned data?!");
489✔
379
                                                                m_cdag.add_dependency(push_cmd, m_cdag.get(wcs), dependency_kind::true_dep, dependency_origin::dataflow);
489✔
380
                                                                generated_pushes.push_back(push_cmd);
489✔
381

382
                                                                // Store the read access for determining anti-dependencies later on
383
                                                                m_command_buffer_reads[push_cmd->get_cid()][bid] = push_box;
489✔
384
                                                        }
385

386
                                                        // Remember that we've replicated this region
387
                                                        for(const auto& [replicated_box, nodes] : buffer.replicated_regions.get_region_values(push_region)) {
1,130✔
388
                                                                buffer.replicated_regions.update_box(replicated_box, node_bitset{nodes}.set(nid));
555✔
389
                                                        }
575✔
390
                                                }
575✔
391
                                        }
1,421✔
392
                                }
393

394
                                if(is_local_chunk && detail::access::mode_traits::is_producer(mode)) {
6,377✔
395
                                        // If we are going to insert a reduction command, we will also create a true-dependency chain to the last writer. The new last writer
396
                                        // cid however is not known at this point because the the reduction command has not been generated yet. Instead, we simply skip
397
                                        // generating anti-dependencies around this requirement. This might not be valid if (multivariate) reductions ever operate on regions.
398
                                        if(!generate_reduction) { generate_anti_dependencies(tsk.get_id(), bid, buffer.local_last_writer, req, cmd); }
3,225✔
399

400
                                        per_buffer_local_writes[bid] = region_union(per_buffer_local_writes[bid], req);
3,225✔
401
                                        per_buffer_last_writer_update_list[bid].push_back({req, cmd->get_cid()});
3,225✔
402
                                }
403
                        }
404

405
                        if(!uninitialized_reads.empty()) {
6,097✔
406
                                utils::report_error(m_policy.uninitialized_read_error,
15✔
407
                                    "Command C{} on N{}, which executes {} of {}, reads {} {}, which has not been written by any node.", cmd->get_cid(), m_local_nid,
6✔
408
                                    box(subrange(chunks[i].offset, chunks[i].range)), print_task_debug_label(tsk), print_buffer_debug_label(bid),
27✔
409
                                    detail::region(std::move(uninitialized_reads)));
9✔
410
                        }
411

412
                        if(generate_reduction) {
6,094✔
413
                                if(m_policy.uninitialized_read_error != error_policy::ignore) { post_reduction_buffers.at(bid).initialized_region = scalar_reduction_box; }
173✔
414

415
                                const auto& reduction = *buffer.pending_reduction;
173✔
416

417
                                const auto local_last_writer = buffer.local_last_writer.get_region_values(scalar_reduction_box);
173✔
418
                                assert(local_last_writer.size() == 1);
173✔
419

420
                                if(is_local_chunk) {
173✔
421
                                        auto* const reduce_cmd = create_command<reduction_command>(reduction, local_last_writer[0].second.is_fresh() /* has_local_contribution */);
61✔
422

423
                                        // Only generate a true dependency on the last writer if this node participated in the intermediate result computation.
424
                                        if(local_last_writer[0].second.is_fresh()) {
61✔
425
                                                m_cdag.add_dependency(reduce_cmd, m_cdag.get(local_last_writer[0].second), dependency_kind::true_dep, dependency_origin::dataflow);
58✔
426
                                        }
427

428
                                        auto* const ap_cmd = create_command<await_push_command>(transfer_id(tsk.get_id(), bid, reduction.rid), scalar_reduction_box.get_subrange());
61✔
429
                                        m_cdag.add_dependency(reduce_cmd, ap_cmd, dependency_kind::true_dep, dependency_origin::dataflow);
61✔
430
                                        generate_epoch_dependencies(ap_cmd);
61✔
431

432
                                        m_cdag.add_dependency(cmd, reduce_cmd, dependency_kind::true_dep, dependency_origin::dataflow);
61✔
433

434
                                        // Reduction command becomes the last writer (this may be overriden if this task also writes to the reduction buffer)
435
                                        post_reduction_buffers.at(bid).local_last_writer.update_box(scalar_reduction_box, reduce_cmd->get_cid());
61✔
436
                                } else {
437
                                        // Push an empty range if we don't have any fresh data on this node
438
                                        const bool notification_only = !local_last_writer[0].second.is_fresh();
112✔
439
                                        const auto push_box = notification_only ? empty_reduction_box : scalar_reduction_box;
112✔
440

441
                                        auto* const push_cmd = create_command<push_command>(nid, transfer_id(tsk.get_id(), bid, reduction.rid), push_box.get_subrange());
112✔
442
                                        generated_pushes.push_back(push_cmd);
112✔
443

444
                                        if(notification_only) {
112✔
445
                                                generate_epoch_dependencies(push_cmd);
8✔
446
                                        } else {
447
                                                m_command_buffer_reads[push_cmd->get_cid()][bid] = region_union(m_command_buffer_reads[push_cmd->get_cid()][bid], scalar_reduction_box);
104✔
448
                                                m_cdag.add_dependency(push_cmd, m_cdag.get(local_last_writer[0].second), dependency_kind::true_dep, dependency_origin::dataflow);
104✔
449
                                        }
450

451
                                        // Mark the reduction result as replicated so we don't generate data transfers to this node
452
                                        // TODO: We need a way of updating regions in place! E.g. apply_to_values(box, callback)
453
                                        const auto replicated_box = post_reduction_buffers.at(bid).replicated_regions.get_region_values(scalar_reduction_box);
112✔
454
                                        assert(replicated_box.size() == 1);
112✔
455
                                        for(const auto& [_, nodes] : replicated_box) {
224✔
456
                                                post_reduction_buffers.at(bid).replicated_regions.update_box(scalar_reduction_box, node_bitset{nodes}.set(nid));
112✔
457
                                        }
458
                                }
112✔
459
                        }
173✔
460
                }
6,100✔
461
        }
6,512✔
462

463
        // Check for and report overlapping writes between local chunks, and between local and remote chunks.
464
        if(m_policy.overlapping_write_error != error_policy::ignore) { report_overlapping_writes(tsk, local_chunks); }
4,941✔
465

466
        // For buffers that were in a pending reduction state and a reduction was generated
467
        // (i.e., the result was not discarded), set their new state.
468
        for(auto& [bid, new_state] : post_reduction_buffers) {
5,047✔
469
                auto& buffer = m_buffers.at(bid);
108✔
470
                if(buffer.pending_reduction.has_value()) { m_completed_reductions.push_back(buffer.pending_reduction->rid); }
108!
471
                buffer = std::move(new_state);
108✔
472
        }
473

474
        // Update per-buffer last writers
475
        // This has to happen after applying post_reduction_buffers to properly support chained reductions.
476
        for(auto& [bid, updates] : per_buffer_last_writer_update_list) {
8,157✔
477
                auto& buffer = m_buffers.at(bid);
3,218✔
478
                for(auto& [req, cid] : updates) {
6,441✔
479
                        buffer.local_last_writer.update_region(req, cid);
3,223✔
480
                        buffer.replicated_regions.update_region(req, node_bitset{});
3,223✔
481
                }
482

483
                // In case this buffer was in a pending reduction state but the result was discarded, remove the pending reduction.
484
                if(buffer.pending_reduction.has_value()) {
3,218✔
485
                        m_completed_reductions.push_back(buffer.pending_reduction->rid);
1✔
486
                        buffer.pending_reduction = std::nullopt;
1✔
487
                }
488
        }
489

490
        // Mark any buffers that now are in a pending reduction state as such.
491
        // This has to happen after applying post_reduction_buffers and per_buffer_last_writer_update_list
492
        // to properly support chained reductions.
493
        // If there is only one chunk/command, it already implicitly generates the final reduced value
494
        // and the buffer does not need to be flagged as a pending reduction.
495
        for(const auto& reduction : tsk.get_reductions()) {
5,085✔
496
                if(chunks.size() > 1) {
146✔
497
                        m_buffers.at(reduction.bid).pending_reduction = reduction;
113✔
498

499
                        // In some cases this node may not actually participate in the computation of the
500
                        // intermediate reduction result (because there was no chunk). If so, mark the
501
                        // reduction buffer as stale so we do not use it as input for the final reduction command.
502
                        if(per_buffer_local_writes.count(reduction.bid) == 0) {
113✔
503
                                [[maybe_unused]] size_t num_entries = 0;
4✔
504
                                m_buffers.at(reduction.bid).local_last_writer.apply_to_values([&num_entries](const write_command_state& wcs) {
4✔
505
                                        num_entries++;
4✔
506
                                        write_command_state stale_state{wcs};
4✔
507
                                        stale_state.mark_as_stale();
4✔
508
                                        return stale_state;
4✔
509
                                });
510
                                assert(num_entries == 1);
4✔
511
                        }
512
                } else {
513
                        m_completed_reductions.push_back(reduction.rid);
33✔
514
                }
515
        }
516

517
        // Determine potential "intra-task" race conditions.
518
        // These can happen in rare cases, when the node that pushes a buffer range also writes to that range within the same task.
519
        // We cannot do this while generating the push command, as we may not have the writing command recorded at that point.
520
        for(auto* push_cmd : generated_pushes) {
5,540✔
521
                const auto last_writers = m_buffers.at(push_cmd->get_transfer_id().bid).local_last_writer.get_region_values(region(push_cmd->get_range()));
1,202✔
522

523
                for(const auto& [box, wcs] : last_writers) {
1,194✔
524
                        assert(!box.empty()); // If we want to push it it cannot be empty
593✔
525
                        // In general we should only be pushing fresh data.
526
                        // If the push is for a reduction and the data no longer is fresh, it means
527
                        // that we did not generate a reduction command on this node and the data becomes
528
                        // stale after the remote reduction command has been executed.
529
                        assert(wcs.is_fresh() || push_cmd->get_transfer_id().rid != no_reduction_id);
593✔
530
                        auto* const writer_cmd = m_cdag.get(wcs);
593✔
531
                        assert(writer_cmd != nullptr);
593✔
532

533
                        // We're only interested in writes that happen within the same task as the push
534
                        if(utils::isa<task_command>(writer_cmd) && utils::as<task_command>(writer_cmd)->get_tid() == tsk.get_id()) {
593✔
535
                                // In certain situations the push might have a true dependency on the last writer,
536
                                // in that case don't add an anti-dependency (as that would cause a cycle).
537
                                // TODO: Is this still possible? We don't have a unit test exercising this branch...
538
                                if(push_cmd->has_dependency(writer_cmd, dependency_kind::true_dep)) {
8!
539
                                        // This can currently only happen for await_push commands.
540
                                        assert(utils::isa<await_push_command>(writer_cmd));
×
541
                                        continue;
×
542
                                }
543
                                m_cdag.add_dependency(writer_cmd, push_cmd, dependency_kind::anti_dep, dependency_origin::dataflow);
8✔
544
                        }
545

546
                        // reduction commands will overwrite their buffer, so they must anti-depend on their partial-result push-commands
547
                        if(utils::isa<reduction_command>(writer_cmd)
593✔
548
                            && utils::as<reduction_command>(writer_cmd)->get_reduction_info().rid == push_cmd->get_transfer_id().rid) {
593✔
549
                                m_cdag.add_dependency(writer_cmd, push_cmd, dependency_kind::anti_dep, dependency_origin::dataflow);
59✔
550
                        }
551
                }
552
        }
601✔
553

554
        // Determine which local data is fresh/stale based on task-level writes.
555
        auto requirements = get_buffer_requirements_for_mapped_access(tsk, subrange<3>(tsk.get_global_offset(), tsk.get_global_size()), tsk.get_global_size());
4,939✔
556
        // Add requirements for reductions
557
        for(const auto& reduction : tsk.get_reductions()) {
5,085✔
558
                // the actual mode is irrelevant as long as it's a producer - TODO have a better query API for task buffer requirements
559
                requirements[reduction.bid][access_mode::write] = scalar_reduction_box;
146✔
560
        }
561
        for(auto& [bid, reqs_by_mode] : requirements) {
9,032✔
562
                box_vector<3> global_write_boxes;
4,093✔
563
                for(const auto mode : access::producer_modes) {
24,558✔
564
                        if(reqs_by_mode.count(mode) == 0) continue;
20,465✔
565
                        const auto& by_mode = reqs_by_mode.at(mode);
3,292✔
566
                        global_write_boxes.insert(global_write_boxes.end(), by_mode.get_boxes().begin(), by_mode.get_boxes().end());
3,292✔
567
                }
568
                const region global_writes(std::move(global_write_boxes));
4,093✔
569
                const auto& local_writes = per_buffer_local_writes[bid];
4,093✔
570
                assert(region_difference(local_writes, global_writes).empty()); // Local writes have to be a subset of global writes
4,093✔
571
                const auto remote_writes = region_difference(global_writes, local_writes);
4,093✔
572
                auto& buffer = m_buffers.at(bid);
4,093✔
573

574
                if(m_policy.uninitialized_read_error != error_policy::ignore) { buffer.initialized_region = region_union(buffer.initialized_region, global_writes); }
4,093✔
575

576
                // TODO: We need a way of updating regions in place! E.g. apply_to_values(box, callback)
577
                auto boxes_and_cids = buffer.local_last_writer.get_region_values(remote_writes);
4,093✔
578
                for(auto& [box, wcs] : boxes_and_cids) {
4,932✔
579
                        if(wcs.is_fresh()) {
839✔
580
                                wcs.mark_as_stale();
533✔
581
                                buffer.local_last_writer.update_region(box, wcs);
533✔
582
                        }
583
                }
584
        }
4,093✔
585

586
        process_task_side_effect_requirements(tsk);
4,939✔
587
}
9,908✔
588

589
void distributed_graph_generator::generate_anti_dependencies(
3,550✔
590
    task_id tid, buffer_id bid, const region_map<write_command_state>& last_writers_map, const region<3>& write_req, abstract_command* write_cmd) {
591
        const auto last_writers = last_writers_map.get_region_values(write_req);
3,550✔
592
        for(const auto& [box, wcs] : last_writers) {
7,197✔
593
                auto* const last_writer_cmd = m_cdag.get(static_cast<command_id>(wcs));
3,647✔
594
                assert(!utils::isa<task_command>(last_writer_cmd) || utils::as<task_command>(last_writer_cmd)->get_tid() != tid);
3,647✔
595

596
                // Add anti-dependencies onto all successors of the writer
597
                bool has_successors = false;
3,647✔
598
                for(auto d : last_writer_cmd->get_dependents()) {
8,105✔
599
                        // Only consider true dependencies
600
                        if(d.kind != dependency_kind::true_dep) continue;
4,458✔
601

602
                        auto* const cmd = d.node;
4,371✔
603

604
                        // We might have already generated new commands within the same task that also depend on this; in that case, skip it
605
                        if(utils::isa<task_command>(cmd) && utils::as<task_command>(cmd)->get_tid() == tid) continue;
4,371!
606

607
                        // So far we don't know whether the dependent actually intersects with the subrange we're writing
608
                        if(const auto command_reads_it = m_command_buffer_reads.find(cmd->get_cid()); command_reads_it != m_command_buffer_reads.end()) {
1,996✔
609
                                const auto& command_reads = command_reads_it->second;
1,078✔
610
                                // The task might be a dependent because of another buffer
611
                                if(const auto buffer_reads_it = command_reads.find(bid); buffer_reads_it != command_reads.end()) {
1,078✔
612
                                        if(!region_intersection(write_req, buffer_reads_it->second).empty()) {
1,037✔
613
                                                has_successors = true;
885✔
614
                                                m_cdag.add_dependency(write_cmd, cmd, dependency_kind::anti_dep, dependency_origin::dataflow);
885✔
615
                                        }
616
                                }
617
                        }
618
                }
619

620
                // In some cases (horizons, master node host task, weird discard_* constructs...)
621
                // the last writer might not have any successors. Just add the anti-dependency onto the writer itself then.
622
                if(!has_successors) { m_cdag.add_dependency(write_cmd, last_writer_cmd, dependency_kind::anti_dep, dependency_origin::dataflow); }
3,647✔
623
        }
624
}
7,100✔
625

626
void distributed_graph_generator::process_task_side_effect_requirements(const task& tsk) {
4,939✔
627
        const task_id tid = tsk.get_id();
4,939✔
628
        if(tsk.get_side_effect_map().empty()) return; // skip the loop in the common case
4,939✔
629
        if(m_cdag.task_command_count(tid) == 0) return;
213✔
630

631
        for(auto* const cmd : m_cdag.task_commands(tid)) {
282✔
632
                for(const auto& side_effect : tsk.get_side_effect_map()) {
293✔
633
                        const auto [hoid, order] = side_effect;
152✔
634
                        auto& host_object = m_host_objects.at(hoid);
152✔
635

636
                        if(host_object.last_side_effect.has_value()) {
152!
637
                                // TODO once we have different side_effect_orders, their interaction will determine the dependency kind
638
                                m_cdag.add_dependency(cmd, m_cdag.get(*host_object.last_side_effect), dependency_kind::true_dep, dependency_origin::dataflow);
152✔
639
                        }
640

641
                        // Simplification: If there are multiple chunks per node, we generate true-dependencies between them in an arbitrary order, when all we really
642
                        // need is mutual exclusion (i.e. a bi-directional pseudo-dependency).
643
                        host_object.last_side_effect = cmd->get_cid();
152✔
644
                }
645
        }
646
}
647

648
void distributed_graph_generator::set_epoch_for_new_commands(const abstract_command* const epoch_or_horizon) {
1,922✔
649
        // both an explicit epoch command and an applied horizon can be effective epochs
650
        assert(utils::isa<epoch_command>(epoch_or_horizon) || utils::isa<horizon_command>(epoch_or_horizon));
1,922✔
651

652
        for(auto& [bid, bs] : m_buffers) {
3,837✔
653
                bs.local_last_writer.apply_to_values([epoch_or_horizon](const write_command_state& wcs) {
1,915✔
654
                        auto new_wcs = write_command_state(std::max(epoch_or_horizon->get_cid(), static_cast<command_id>(wcs)), wcs.is_replicated());
3,179✔
655
                        if(!wcs.is_fresh()) new_wcs.mark_as_stale();
3,179✔
656
                        return new_wcs;
3,179✔
657
                });
658
        }
659
        for(auto& [cgid, cid] : m_last_collective_commands) {
1,974✔
660
                cid = std::max(epoch_or_horizon->get_cid(), cid);
52✔
661
        }
662
        for(auto& [_, host_object] : m_host_objects) {
2,010✔
663
                if(host_object.last_side_effect.has_value()) { host_object.last_side_effect = std::max(epoch_or_horizon->get_cid(), *host_object.last_side_effect); }
88!
664
        }
665

666
        m_epoch_for_new_commands = epoch_or_horizon->get_cid();
1,922✔
667
}
1,922✔
668

669
void distributed_graph_generator::reduce_execution_front_to(abstract_command* const new_front) {
1,983✔
670
        const auto previous_execution_front = m_cdag.get_execution_front();
1,983✔
671
        for(auto* const front_cmd : previous_execution_front) {
7,770✔
672
                if(front_cmd != new_front) { m_cdag.add_dependency(new_front, front_cmd, dependency_kind::true_dep, dependency_origin::execution_front); }
5,787✔
673
        }
674
        assert(m_cdag.get_execution_front().size() == 1 && *m_cdag.get_execution_front().begin() == new_front);
3,966✔
675
}
3,966✔
676

677
void distributed_graph_generator::generate_epoch_command(const task& tsk) {
1,420✔
678
        assert(tsk.get_type() == task_type::epoch);
1,420✔
679
        auto* const epoch = create_command<epoch_command>(tsk.get_id(), tsk.get_epoch_action(), std::move(m_completed_reductions));
1,420✔
680
        set_epoch_for_new_commands(epoch);
1,420✔
681
        m_current_horizon = no_command;
1,420✔
682
        // Make the epoch depend on the previous execution front
683
        reduce_execution_front_to(epoch);
1,420✔
684
}
1,420✔
685

686
void distributed_graph_generator::generate_horizon_command(const task& tsk) {
563✔
687
        assert(tsk.get_type() == task_type::horizon);
563✔
688
        auto* const horizon = create_command<horizon_command>(tsk.get_id(), std::move(m_completed_reductions));
563✔
689

690
        if(m_current_horizon != static_cast<command_id>(no_command)) {
563✔
691
                // Apply the previous horizon
692
                set_epoch_for_new_commands(m_cdag.get(m_current_horizon));
502✔
693
        }
694
        m_current_horizon = horizon->get_cid();
563✔
695

696
        // Make the horizon depend on the previous execution front
697
        reduce_execution_front_to(horizon);
563✔
698
}
563✔
699

700
void distributed_graph_generator::generate_epoch_dependencies(abstract_command* cmd) {
7,122✔
701
        // No command must be re-ordered before its last preceding epoch to enforce the barrier semantics of epochs.
702
        // To guarantee that each node has a transitive true dependency (=temporal dependency) on the epoch, it is enough to add an epoch -> command dependency
703
        // to any command that has no other true dependencies itself and no graph traversal is necessary. This can be verified by a simple induction proof.
704

705
        // As long the first epoch is present in the graph, all transitive dependencies will be visible and the initial epoch commands (tid 0) are the only
706
        // commands with no true predecessor. As soon as the first epoch is pruned through the horizon mechanism however, more than one node with no true
707
        // predecessor can appear (when visualizing the graph). This does not violate the ordering constraint however, because all "free-floating" nodes
708
        // in that snapshot had a true-dependency chain to their predecessor epoch at the point they were flushed, which is sufficient for following the
709
        // dependency chain from the executor perspective.
710

711
        if(const auto deps = cmd->get_dependencies();
7,122✔
712
            std::none_of(deps.begin(), deps.end(), [](const abstract_command::dependency d) { return d.kind == dependency_kind::true_dep; })) {
13,093✔
713
                assert(cmd->get_cid() != m_epoch_for_new_commands);
2,014✔
714
                m_cdag.add_dependency(cmd, m_cdag.get(m_epoch_for_new_commands), dependency_kind::true_dep, dependency_origin::last_epoch);
2,014✔
715
        }
716
}
7,122✔
717

718
void distributed_graph_generator::prune_commands_before(const command_id epoch) {
6,922✔
719
        if(epoch > m_epoch_last_pruned_before) {
6,922✔
720
                m_cdag.erase_if([&](abstract_command* cmd) {
1,574✔
721
                        if(cmd->get_cid() < epoch) {
10,531✔
722
                                m_command_buffer_reads.erase(cmd->get_cid());
4,807✔
723
                                return true;
4,807✔
724
                        }
725
                        return false;
5,724✔
726
                });
727
                m_epoch_last_pruned_before = epoch;
1,574✔
728
        }
729
}
6,922✔
730

731
std::string distributed_graph_generator::print_buffer_debug_label(const buffer_id bid) const {
17✔
732
        return utils::make_buffer_debug_label(bid, m_buffers.at(bid).debug_name);
17✔
733
}
734

735
} // namespace celerity::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc