• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 11380705670

17 Oct 2024 07:54AM UTC coverage: 95.038% (-0.3%) from 95.308%
11380705670

Pull #293

github

fknorr
Only poll events of instructions that are actively executing
Pull Request #293: Only poll events of instructions that are actively executing

3029 of 3436 branches covered (88.15%)

Branch coverage included in aggregate %.

20 of 21 new or added lines in 2 files covered. (95.24%)

21 existing lines in 5 files now uncovered.

6662 of 6761 relevant lines covered (98.54%)

1480649.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.89
/src/command_graph_generator.cc
1
#include "command_graph_generator.h"
2

3
#include "access_modes.h"
4
#include "command.h"
5
#include "command_graph.h"
6
#include "recorders.h"
7
#include "split.h"
8
#include "task.h"
9
#include "task_manager.h"
10

11
namespace celerity::detail {
12

13
command_graph_generator::command_graph_generator(
553✔
14
    const size_t num_nodes, const node_id local_nid, command_graph& cdag, const task_manager& tm, detail::command_recorder* recorder, const policy_set& policy)
553✔
15
    : m_num_nodes(num_nodes), m_local_nid(local_nid), m_policy(policy), m_cdag(cdag), m_task_mngr(tm), m_recorder(recorder) {
553✔
16
        if(m_num_nodes > max_num_nodes) {
553!
17
                throw std::runtime_error(fmt::format("Number of nodes requested ({}) exceeds compile-time maximum of {}", m_num_nodes, max_num_nodes));
×
18
        }
19

20
        // Build initial epoch command (this is required to properly handle anti-dependencies on host-initialized buffers).
21
        // We manually generate the first command, this will be replaced by applied horizons or explicit epochs down the line (see
22
        // set_epoch_for_new_commands).
23
        auto* const epoch_cmd = cdag.create<epoch_command>(task_manager::initial_epoch_task, epoch_action::none, std::vector<reduction_id>{});
553✔
24
        if(m_recorder != nullptr) {
553✔
25
                const auto epoch_tsk = tm.get_task(task_manager::initial_epoch_task);
341✔
26
                m_recorder->record(command_record(*epoch_cmd, *epoch_tsk, {}));
341✔
27
        }
28
        m_epoch_for_new_commands = epoch_cmd->get_cid();
553✔
29
}
553✔
30

31
void command_graph_generator::notify_buffer_created(const buffer_id bid, const range<3>& range, bool host_initialized) {
655✔
32
        m_buffers.emplace(std::piecewise_construct, std::tuple{bid}, std::tuple{range, range});
655✔
33
        if(host_initialized && m_policy.uninitialized_read_error != error_policy::ignore) { m_buffers.at(bid).initialized_region = box(subrange({}, range)); }
727✔
34
        // Mark contents as available locally (= don't generate await push commands) and fully replicated (= don't generate push commands).
35
        // This is required when tasks access host-initialized or uninitialized buffers.
36
        m_buffers.at(bid).local_last_writer.update_region(subrange<3>({}, range), m_epoch_for_new_commands);
1,965✔
37
        m_buffers.at(bid).replicated_regions.update_region(subrange<3>({}, range), node_bitset{}.set());
1,965✔
38
}
655✔
39

40
void command_graph_generator::notify_buffer_debug_name_changed(const buffer_id bid, const std::string& debug_name) {
23✔
41
        m_buffers.at(bid).debug_name = debug_name;
23✔
42
}
23✔
43

44
void command_graph_generator::notify_buffer_destroyed(const buffer_id bid) {
493✔
45
        assert(m_buffers.count(bid) != 0);
493✔
46
        m_buffers.erase(bid);
493✔
47
}
493✔
48

49
void command_graph_generator::notify_host_object_created(const host_object_id hoid) {
62✔
50
        assert(m_host_objects.count(hoid) == 0);
62✔
51
        m_host_objects.emplace(hoid, host_object_state{m_epoch_for_new_commands});
62✔
52
}
62✔
53

54
void command_graph_generator::notify_host_object_destroyed(const host_object_id hoid) {
49✔
55
        assert(m_host_objects.count(hoid) != 0);
49✔
56
        m_host_objects.erase(hoid);
49✔
57
}
49✔
58

59
using buffer_requirements_map = std::unordered_map<buffer_id, std::unordered_map<access_mode, region<3>>>;
60

61
static buffer_requirements_map get_buffer_requirements_for_mapped_access(const task& tsk, subrange<3> sr, const range<3> global_size) {
11,757✔
62
        buffer_requirements_map result;
11,757✔
63
        const auto& access_map = tsk.get_buffer_access_map();
11,757✔
64
        const auto buffers = access_map.get_accessed_buffers();
11,757✔
65
        for(const buffer_id bid : buffers) {
21,868✔
66
                const auto modes = access_map.get_access_modes(bid);
10,111✔
67
                for(auto m : modes) {
20,880✔
68
                        result[bid][m] = access_map.get_mode_requirements(bid, m, tsk.get_dimensions(), sr, global_size);
10,769✔
69
                }
70
        }
10,111✔
71
        return result;
11,757✔
72
}
11,757✔
73

74
// According to Wikipedia https://en.wikipedia.org/wiki/Topological_sorting#Depth-first_search
75
std::vector<abstract_command*> sort_topologically(command_set unmarked) {
5,881✔
76
        command_set temporary_marked;
5,881✔
77
        command_set permanent_marked;
5,881✔
78
        std::vector<abstract_command*> sorted(unmarked.size());
17,643✔
79
        auto sorted_front = sorted.rbegin();
5,881✔
80

81
        const auto visit = [&](abstract_command* const cmd, auto& visit /* to allow recursion in lambda */) {
5,881✔
82
                if(permanent_marked.count(cmd) != 0) return;
6,861✔
83
                assert(temporary_marked.count(cmd) == 0 && "cyclic command graph");
6,811✔
84
                unmarked.erase(cmd);
6,811✔
85
                temporary_marked.insert(cmd);
6,811✔
86
                for(const auto dep : cmd->get_dependents()) {
7,281✔
87
                        visit(dep.node, visit);
470✔
88
                }
89
                temporary_marked.erase(cmd);
6,811✔
90
                permanent_marked.insert(cmd);
6,811✔
91
                *sorted_front++ = cmd;
6,811✔
92
        };
5,881✔
93

94
        while(!unmarked.empty()) {
12,272✔
95
                visit(*unmarked.begin(), visit);
6,391✔
96
        }
97

98
        return sorted;
11,762✔
99
}
5,881✔
100

101
command_set command_graph_generator::build_task(const task& tsk) {
6,800✔
102
        assert(m_current_cmd_batch.empty());
6,800✔
103
        [[maybe_unused]] const auto cmd_count_before = m_cdag.command_count();
6,800✔
104

105
        const auto epoch_to_prune_before = m_epoch_for_new_commands;
6,800✔
106

107
        switch(tsk.get_type()) {
6,800!
108
        case task_type::epoch: generate_epoch_command(tsk); break;
728✔
109
        case task_type::horizon: generate_horizon_command(tsk); break;
1,057✔
110
        case task_type::device_compute:
5,015✔
111
        case task_type::host_compute:
112
        case task_type::master_node:
113
        case task_type::collective:
114
        case task_type::fence: generate_distributed_commands(tsk); break;
5,015✔
UNCOV
115
        default: throw std::runtime_error("Task type NYI");
×
116
        }
117

118
        // It is currently undefined to split reduction-producer tasks into multiple chunks on the same node:
119
        //   - Per-node reduction intermediate results are stored with fixed access to a single backing buffer,
120
        //     so multiple chunks on the same node will race on this buffer access
121
        //   - Inputs to the final reduction command are ordered by origin node ids to guarantee bit-identical results. It is not possible to distinguish
122
        //     more than one chunk per node in the serialized commands, so different nodes can produce different final reduction results for non-associative
123
        //     or non-commutative operations
124
        if(!tsk.get_reductions().empty()) { assert(m_cdag.task_command_count(tsk.get_id()) <= 1); }
6,795!
125

126
        // Commands without any other true-dependency must depend on the active epoch command to ensure they cannot be re-ordered before the epoch.
127
        // Need to check count b/c for some tasks we may not have generated any commands locally.
128
        if(m_cdag.task_command_count(tsk.get_id()) > 0) {
6,795✔
129
                for(auto* const cmd : m_cdag.task_commands(tsk.get_id())) {
13,190✔
130
                        generate_epoch_dependencies(cmd);
6,595✔
131
                }
132
        }
133

134
        // Check that all commands have been created through create_command
135
        assert(m_cdag.command_count() - cmd_count_before == m_current_cmd_batch.size());
6,795✔
136

137
        // If a new epoch was completed in the CDAG before the current task, prune all predecessor commands of that epoch.
138
        prune_commands_before(epoch_to_prune_before);
6,795✔
139

140
        // If we have a command_recorder, record the current batch of commands
141
        if(m_recorder != nullptr) {
6,795✔
142
                for(const auto& cmd : m_current_cmd_batch) {
3,333✔
143
                        m_recorder->record(command_record(*cmd, tsk, [this](const buffer_id bid) { return m_buffers.at(bid).debug_name; }));
3,738✔
144
                }
145
        }
146

147
        return std::move(m_current_cmd_batch);
13,590✔
148
}
149

150
void command_graph_generator::report_overlapping_writes(const task& tsk, const box_vector<3>& local_chunks) const {
5,008✔
151
        const chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()};
5,008✔
152

153
        // Since this check is run distributed on every node, we avoid quadratic behavior by only checking for conflicts between all local chunks and the
154
        // region-union of remote chunks. This way, every conflict will be reported by at least one node.
155
        const box<3> global_chunk(subrange(full_chunk.offset, full_chunk.range));
5,008✔
156
        auto remote_chunks = region_difference(global_chunk, region(box_vector<3>(local_chunks))).into_boxes();
5,008✔
157

158
        // detect_overlapping_writes takes a single box_vector, so we concatenate local and global chunks (the order does not matter)
159
        auto distributed_chunks = std::move(remote_chunks);
5,008✔
160
        distributed_chunks.insert(distributed_chunks.end(), local_chunks.begin(), local_chunks.end());
5,008✔
161

162
        if(const auto overlapping_writes = detect_overlapping_writes(tsk, distributed_chunks); !overlapping_writes.empty()) {
5,008✔
163
                auto error = fmt::format("{} has overlapping writes between multiple nodes in", print_task_debug_label(tsk, true /* title case */));
28✔
164
                for(const auto& [bid, overlap] : overlapping_writes) {
28✔
165
                        fmt::format_to(std::back_inserter(error), " {} {}", print_buffer_debug_label(bid), overlap);
28✔
166
                }
167
                error += ". Choose a non-overlapping range mapper for this write access or constrain the split via experimental::constrain_split to make the access "
168
                         "non-overlapping.";
14✔
169
                utils::report_error(m_policy.overlapping_write_error, "{}", error);
14✔
170
        }
5,022✔
171
}
10,016✔
172

173
void command_graph_generator::generate_distributed_commands(const task& tsk) {
5,015✔
174
        const chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()};
5,015✔
175
        const size_t num_chunks = m_num_nodes * 1; // TODO Make configurable
5,015✔
176
        const auto chunks = ([&] {
10,030✔
177
                if(tsk.get_type() == task_type::collective || tsk.get_type() == task_type::fence) {
5,015✔
178
                        std::vector<chunk<3>> chunks;
152✔
179
                        for(size_t nid = 0; nid < m_num_nodes; ++nid) {
519✔
180
                                chunks.push_back(chunk_cast<3>(chunk<1>{id<1>{tsk.get_type() == task_type::collective ? nid : 0}, ones, {m_num_nodes}}));
367✔
181
                        }
182
                        return chunks;
152✔
183
                }
152✔
184
                if(tsk.has_variable_split()) {
4,863✔
185
                        if(tsk.get_hint<experimental::hints::split_1d>() != nullptr) {
1,442✔
186
                                // no-op, keeping this for documentation purposes
187
                        }
188
                        if(tsk.get_hint<experimental::hints::split_2d>() != nullptr) { return split_2d(full_chunk, tsk.get_granularity(), num_chunks); }
1,442✔
189
                        return split_1d(full_chunk, tsk.get_granularity(), num_chunks);
2,858✔
190
                }
191
                return std::vector<chunk<3>>{full_chunk};
10,263✔
192
        })();
5,015✔
193
        assert(chunks.size() <= num_chunks); // We may have created less than requested
5,015✔
194
        assert(!chunks.empty());
5,015✔
195

196
        // Assign each chunk to a node
197
        // We assign chunks next to each other to the same worker (if there is more chunks than workers), as this is likely to produce less
198
        // transfers between tasks than a round-robin assignment (for typical stencil codes).
199
        // FIXME: This only works if the number of chunks is an integer multiple of the number of workers, e.g. 3 chunks for 2 workers degrades to RR.
200
        const auto chunks_per_node = std::max<size_t>(1, chunks.size() / m_num_nodes);
5,015✔
201

202
        // Union of all per-buffer writes on this node, used to determine which parts of a buffer are fresh/stale later on.
203
        std::unordered_map<buffer_id, region<3>> per_buffer_local_writes;
5,015✔
204
        // In case we need to push a region that is overwritten in the same task, we have to defer updating the last writer.
205
        std::unordered_map<buffer_id, std::vector<std::pair<region<3>, command_id>>> per_buffer_last_writer_update_list;
5,015✔
206
        // Buffers that currently are in a pending reduction state will receive a new buffer state after a reduction has been generated.
207
        std::unordered_map<buffer_id, buffer_state> post_reduction_buffers;
5,015✔
208

209
        // Remember all generated pushes for determining intra-task anti-dependencies.
210
        std::vector<push_command*> generated_pushes;
5,015✔
211

212
        // Collect all local chunks for detecting overlapping writes between all local chunks and the union of remote chunks in a distributed manner.
213
        box_vector<3> local_chunks;
5,015✔
214

215
        // In the master/worker model, we used to try and find the node best suited for initializing multiple
216
        // reductions that do not initialize_to_identity based on current data distribution.
217
        // This is more difficult in a distributed setting, so for now we just hard code it to node 0.
218
        // TODO: Revisit this at some point.
219
        const node_id reduction_initializer_nid = 0;
5,015✔
220

221
        const box<3> empty_reduction_box({0, 0, 0}, {0, 0, 0});
5,015✔
222
        const box<3> scalar_reduction_box({0, 0, 0}, {1, 1, 1});
5,015✔
223

224
        // Iterate over all chunks, distinguish between local / remote chunks and normal / reduction access.
225
        //
226
        // Normal buffer access:
227
        // - For local chunks, find read requirements on remote data.
228
        //   Generate a single await push command for each buffer that awaits the entire required region.
229
        //   This will then be fulfilled by one or more incoming pushes.
230
        // - For remote chunks, find read requirements intersecting with owned buffer regions.
231
        //          Generate push commands for those regions.
232
        //
233
        // Reduction buffer access:
234
        // - For local chunks, create a reduction command and a single await_push command that receives the
235
        //   partial reduction results from all other nodes.
236
        // - For remote chunks, always create a push command, regardless of whether we have relevant data or not.
237
        //   This is required because the remote node does not know how many partial reduction results there are.
238
        for(size_t i = 0; i < chunks.size(); ++i) {
11,759✔
239
                const node_id nid = (i / chunks_per_node) % m_num_nodes;
6,747✔
240
                const bool is_local_chunk = nid == m_local_nid;
6,747✔
241

242
                auto requirements = get_buffer_requirements_for_mapped_access(tsk, chunks[i], tsk.get_global_size());
6,747✔
243

244
                // Add requirements for reductions
245
                for(const auto& reduction : tsk.get_reductions()) {
7,138✔
246
                        auto rmode = access_mode::discard_write;
391✔
247
                        if(nid == reduction_initializer_nid && reduction.init_from_buffer) { rmode = access_mode::read_write; }
391✔
248
#ifndef NDEBUG
249
                        for(auto pmode : access::producer_modes) {
2,346✔
250
                                assert(requirements[reduction.bid].count(pmode) == 0); // task_manager verifies that there are no reduction <-> write-access conflicts
1,955✔
251
                        }
252
#endif
253
                        requirements[reduction.bid][rmode] = scalar_reduction_box;
391✔
254
                }
255

256
                abstract_command* cmd = nullptr;
6,747✔
257
                if(is_local_chunk) {
6,747✔
258
                        if(tsk.get_type() == task_type::fence) {
4,815✔
259
                                cmd = create_command<fence_command>(tsk.get_id());
84✔
260
                        } else {
261
                                cmd = create_command<execution_command>(tsk.get_id(), subrange{chunks[i]});
4,731✔
262

263
                                // Go over all reductions that are to be performed *during* the execution of this chunk,
264
                                // not to be confused with any pending reductions that need to be finalized *before* the
265
                                // execution of this chunk.
266
                                // If a reduction reads the previous value of the buffer (i.e. w/o property::reduction::initialize_to_identity),
267
                                // we have to include it in exactly one of the per-node intermediate reductions.
268
                                for(const auto& reduction : tsk.get_reductions()) {
4,866✔
269
                                        if(nid == reduction_initializer_nid && reduction.init_from_buffer) {
149✔
270
                                                utils::as<execution_command>(cmd)->set_is_reduction_initializer(true);
14✔
271
                                                break;
14✔
272
                                        }
273
                                }
274
                        }
275

276
                        if(tsk.get_type() == task_type::collective) {
4,815✔
277
                                // Collective host tasks have an implicit dependency on the previous task in the same collective group,
278
                                // which is required in order to guarantee they are executed in the same order on every node.
279
                                auto cgid = tsk.get_collective_group_id();
68✔
280
                                if(auto prev = m_last_collective_commands.find(cgid); prev != m_last_collective_commands.end()) {
68✔
281
                                        m_cdag.add_dependency(cmd, m_cdag.get(prev->second), dependency_kind::true_dep, dependency_origin::collective_group_serialization);
16✔
282
                                        m_last_collective_commands.erase(prev);
16✔
283
                                }
284
                                m_last_collective_commands.emplace(cgid, cmd->get_cid());
68✔
285
                        }
286

287
                        local_chunks.push_back(subrange(chunks[i].offset, chunks[i].range));
4,815✔
288
                }
289

290
                // We use the task id, together with the "chunk id" and the buffer id (stored separately) to match pushes against their corresponding await pushes
291
                for(auto& [bid, reqs_by_mode] : requirements) {
13,199✔
292
                        auto& buffer = m_buffers.at(bid);
6,455✔
293
                        std::vector<access_mode> required_modes;
6,455✔
294
                        for(const auto mode : detail::access::all_modes) {
45,185✔
295
                                if(auto req_it = reqs_by_mode.find(mode); req_it != reqs_by_mode.end()) {
38,730✔
296
                                        // While uncommon, we do support chunks that don't require access to a particular buffer at all.
297
                                        if(!req_it->second.empty()) { required_modes.push_back(mode); }
6,792✔
298
                                }
299
                        }
300

301
                        // Don't add reduction commands within the loop to make sure there is at most one reduction command
302
                        // even in the presence of multiple consumer requirements.
303
                        const bool is_pending_reduction = buffer.pending_reduction.has_value();
6,455✔
304
                        const bool generate_reduction =
305
                            is_pending_reduction && std::any_of(required_modes.begin(), required_modes.end(), detail::access::mode_traits::is_consumer);
6,455✔
306
                        if(generate_reduction) {
6,455✔
307
                                // Prepare the buffer state for after the reduction has been performed:
308
                                // Set the current epoch as last writer and mark it as stale so that if we don't generate a reduction command,
309
                                // we'll know to get the data from elsewhere. If we generate a reduction command, this will be overwritten by its command id.
310
                                write_command_state wcs{m_epoch_for_new_commands};
179✔
311
                                wcs.mark_as_stale();
179✔
312
                                // We just treat this buffer as 1-dimensional, regardless of its actual dimensionality (as it must be unit-sized anyway)
313
                                post_reduction_buffers.emplace(std::piecewise_construct, std::tuple{bid},
358✔
314
                                    std::tuple{region_map<write_command_state>{ones, wcs}, region_map<node_bitset>{ones, node_bitset{}}});
358✔
315
                        }
316

317
                        if(is_pending_reduction && !generate_reduction) {
6,455✔
318
                                // TODO the per-node reduction result is discarded - warn user about dead store
319
                        }
320

321
                        region<3> uninitialized_reads;
6,455✔
322
                        for(const auto mode : required_modes) {
13,191✔
323
                                const auto& req = reqs_by_mode.at(mode);
6,736✔
324
                                if(detail::access::mode_traits::is_consumer(mode)) {
6,736✔
325
                                        if(is_local_chunk && m_policy.uninitialized_read_error != error_policy::ignore
8,970✔
326
                                            && !bounding_box(buffer.initialized_region).covers(bounding_box(req.get_boxes()))) {
8,970✔
327
                                                uninitialized_reads = region_union(uninitialized_reads, region_difference(req, buffer.initialized_region));
3✔
328
                                        }
329

330
                                        if(is_local_chunk) {
5,043✔
331
                                                // Store the read access for determining anti-dependencies later on
332
                                                m_command_buffer_reads[cmd->get_cid()][bid] = region_union(m_command_buffer_reads[cmd->get_cid()][bid], req);
3,376✔
333
                                        }
334

335
                                        if(is_local_chunk && !is_pending_reduction) {
5,043✔
336
                                                const auto local_sources = buffer.local_last_writer.get_region_values(req);
3,311✔
337
                                                box_vector<3> missing_part_boxes;
3,311✔
338
                                                for(const auto& [box, wcs] : local_sources) {
7,843✔
339
                                                        if(box.empty()) continue;
4,532!
340
                                                        if(!wcs.is_fresh()) {
4,532✔
341
                                                                missing_part_boxes.push_back(box);
470✔
342
                                                                continue;
470✔
343
                                                        }
344
                                                        m_cdag.add_dependency(cmd, m_cdag.get(wcs), dependency_kind::true_dep, dependency_origin::dataflow);
4,062✔
345
                                                }
346

347
                                                // There is data we don't yet have locally. Generate an await push command for it.
348
                                                if(!missing_part_boxes.empty()) {
3,311✔
349
                                                        const region missing_parts(std::move(missing_part_boxes));
383✔
350
                                                        assert(m_num_nodes > 1);
383✔
351
                                                        auto* const ap_cmd = create_command<await_push_command>(transfer_id(tsk.get_id(), bid, no_reduction_id), missing_parts);
383✔
352
                                                        m_cdag.add_dependency(cmd, ap_cmd, dependency_kind::true_dep, dependency_origin::dataflow);
383✔
353
                                                        generate_anti_dependencies(tsk.get_id(), bid, buffer.local_last_writer, missing_parts, ap_cmd);
383✔
354
                                                        generate_epoch_dependencies(ap_cmd);
383✔
355
                                                        // Remember that we have this data now
356
                                                        buffer.local_last_writer.update_region(missing_parts, {ap_cmd->get_cid(), true /* is_replicated */});
383✔
357
                                                }
383✔
358
                                        } else if(!is_pending_reduction) {
5,043✔
359
                                                // We generate separate push command for each last writer command for now, possibly even multiple for partially already-replicated data.
360
                                                // TODO: Can and/or should we consolidate?
361
                                                const auto local_sources = buffer.local_last_writer.get_region_values(req);
1,545✔
362
                                                for(const auto& [local_box, wcs] : local_sources) {
4,322✔
363
                                                        if(!wcs.is_fresh() || wcs.is_replicated()) { continue; }
2,777✔
364

365
                                                        // Make sure we don't push anything we've already pushed to this node before
366
                                                        box_vector<3> non_replicated_boxes;
691✔
367
                                                        for(const auto& [replicated_box, nodes] : buffer.replicated_regions.get_region_values(local_box)) {
1,480✔
368
                                                                if(nodes.test(nid)) continue;
789✔
369
                                                                non_replicated_boxes.push_back(replicated_box);
671✔
370
                                                        }
691✔
371

372
                                                        // Merge all connected boxes to determine final set of pushes
373
                                                        const auto push_region = region<3>(std::move(non_replicated_boxes));
691✔
374
                                                        for(auto& push_box : push_region.get_boxes()) {
1,296✔
375
                                                                auto* const push_cmd =
605✔
376
                                                                    create_command<push_command>(nid, transfer_id(tsk.get_id(), bid, no_reduction_id), push_box.get_subrange());
605✔
377
                                                                assert(!utils::isa<await_push_command>(m_cdag.get(wcs)) && "Attempting to push non-owned data?!");
605✔
378
                                                                m_cdag.add_dependency(push_cmd, m_cdag.get(wcs), dependency_kind::true_dep, dependency_origin::dataflow);
605✔
379
                                                                generated_pushes.push_back(push_cmd);
605✔
380

381
                                                                // Store the read access for determining anti-dependencies later on
382
                                                                m_command_buffer_reads[push_cmd->get_cid()][bid] = push_box;
605✔
383
                                                        }
384

385
                                                        // Remember that we've replicated this region
386
                                                        for(const auto& [replicated_box, nodes] : buffer.replicated_regions.get_region_values(push_region)) {
1,362✔
387
                                                                buffer.replicated_regions.update_box(replicated_box, node_bitset{nodes}.set(nid));
671✔
388
                                                        }
691✔
389
                                                }
691✔
390
                                        }
1,545✔
391
                                }
392

393
                                if(is_local_chunk && detail::access::mode_traits::is_producer(mode)) {
6,736✔
394
                                        // If we are going to insert a reduction command, we will also create a true-dependency chain to the last writer. The new last writer
395
                                        // cid however is not known at this point because the the reduction command has not been generated yet. Instead, we simply skip
396
                                        // generating anti-dependencies around this requirement. This might not be valid if (multivariate) reductions ever operate on regions.
397
                                        if(!generate_reduction) { generate_anti_dependencies(tsk.get_id(), bid, buffer.local_last_writer, req, cmd); }
3,270✔
398

399
                                        per_buffer_local_writes[bid] = region_union(per_buffer_local_writes[bid], req);
3,270✔
400
                                        per_buffer_last_writer_update_list[bid].push_back({req, cmd->get_cid()});
3,270✔
401
                                }
402
                        }
403

404
                        if(!uninitialized_reads.empty()) {
6,455✔
405
                                utils::report_error(m_policy.uninitialized_read_error,
15✔
406
                                    "Command C{} on N{}, which executes {} of {}, reads {} {}, which has not been written by any node.", cmd->get_cid(), m_local_nid,
6✔
407
                                    box(subrange(chunks[i].offset, chunks[i].range)), print_task_debug_label(tsk), print_buffer_debug_label(bid),
27✔
408
                                    detail::region(std::move(uninitialized_reads)));
9✔
409
                        }
410

411
                        if(generate_reduction) {
6,452✔
412
                                if(m_policy.uninitialized_read_error != error_policy::ignore) { post_reduction_buffers.at(bid).initialized_region = scalar_reduction_box; }
179✔
413

414
                                const auto& reduction = *buffer.pending_reduction;
179✔
415

416
                                const auto local_last_writer = buffer.local_last_writer.get_region_values(scalar_reduction_box);
179✔
417
                                assert(local_last_writer.size() == 1);
179✔
418

419
                                if(is_local_chunk) {
179✔
420
                                        auto* const reduce_cmd = create_command<reduction_command>(reduction, local_last_writer[0].second.is_fresh() /* has_local_contribution */);
63✔
421

422
                                        // Only generate a true dependency on the last writer if this node participated in the intermediate result computation.
423
                                        if(local_last_writer[0].second.is_fresh()) {
63✔
424
                                                m_cdag.add_dependency(reduce_cmd, m_cdag.get(local_last_writer[0].second), dependency_kind::true_dep, dependency_origin::dataflow);
60✔
425
                                        }
426

427
                                        auto* const ap_cmd = create_command<await_push_command>(transfer_id(tsk.get_id(), bid, reduction.rid), scalar_reduction_box.get_subrange());
63✔
428
                                        m_cdag.add_dependency(reduce_cmd, ap_cmd, dependency_kind::true_dep, dependency_origin::dataflow);
63✔
429
                                        generate_epoch_dependencies(ap_cmd);
63✔
430

431
                                        m_cdag.add_dependency(cmd, reduce_cmd, dependency_kind::true_dep, dependency_origin::dataflow);
63✔
432

433
                                        // Reduction command becomes the last writer (this may be overriden if this task also writes to the reduction buffer)
434
                                        post_reduction_buffers.at(bid).local_last_writer.update_box(scalar_reduction_box, reduce_cmd->get_cid());
63✔
435
                                } else {
436
                                        // Push an empty range if we don't have any fresh data on this node
437
                                        const bool notification_only = !local_last_writer[0].second.is_fresh();
116✔
438
                                        const auto push_box = notification_only ? empty_reduction_box : scalar_reduction_box;
116✔
439

440
                                        auto* const push_cmd = create_command<push_command>(nid, transfer_id(tsk.get_id(), bid, reduction.rid), push_box.get_subrange());
116✔
441
                                        generated_pushes.push_back(push_cmd);
116✔
442

443
                                        if(notification_only) {
116✔
444
                                                generate_epoch_dependencies(push_cmd);
8✔
445
                                        } else {
446
                                                m_command_buffer_reads[push_cmd->get_cid()][bid] = region_union(m_command_buffer_reads[push_cmd->get_cid()][bid], scalar_reduction_box);
108✔
447
                                                m_cdag.add_dependency(push_cmd, m_cdag.get(local_last_writer[0].second), dependency_kind::true_dep, dependency_origin::dataflow);
108✔
448
                                        }
449

450
                                        // Mark the reduction result as replicated so we don't generate data transfers to this node
451
                                        // TODO: We need a way of updating regions in place! E.g. apply_to_values(box, callback)
452
                                        const auto replicated_box = post_reduction_buffers.at(bid).replicated_regions.get_region_values(scalar_reduction_box);
116✔
453
                                        assert(replicated_box.size() == 1);
116✔
454
                                        for(const auto& [_, nodes] : replicated_box) {
232✔
455
                                                post_reduction_buffers.at(bid).replicated_regions.update_box(scalar_reduction_box, node_bitset{nodes}.set(nid));
116✔
456
                                        }
457
                                }
116✔
458
                        }
179✔
459
                }
6,458✔
460
        }
6,747✔
461

462
        // Check for and report overlapping writes between local chunks, and between local and remote chunks.
463
        if(m_policy.overlapping_write_error != error_policy::ignore) { report_overlapping_writes(tsk, local_chunks); }
5,012✔
464

465
        // For buffers that were in a pending reduction state and a reduction was generated
466
        // (i.e., the result was not discarded), set their new state.
467
        for(auto& [bid, new_state] : post_reduction_buffers) {
5,124✔
468
                auto& buffer = m_buffers.at(bid);
114✔
469
                if(buffer.pending_reduction.has_value()) { m_completed_reductions.push_back(buffer.pending_reduction->rid); }
114!
470
                buffer = std::move(new_state);
114✔
471
        }
472

473
        // Update per-buffer last writers
474
        // This has to happen after applying post_reduction_buffers to properly support chained reductions.
475
        for(auto& [bid, updates] : per_buffer_last_writer_update_list) {
8,273✔
476
                auto& buffer = m_buffers.at(bid);
3,263✔
477
                for(auto& [req, cid] : updates) {
6,531✔
478
                        buffer.local_last_writer.update_region(req, cid);
3,268✔
479
                        buffer.replicated_regions.update_region(req, node_bitset{});
3,268✔
480
                }
481

482
                // In case this buffer was in a pending reduction state but the result was discarded, remove the pending reduction.
483
                if(buffer.pending_reduction.has_value()) {
3,263✔
484
                        m_completed_reductions.push_back(buffer.pending_reduction->rid);
1✔
485
                        buffer.pending_reduction = std::nullopt;
1✔
486
                }
487
        }
488

489
        // Mark any buffers that now are in a pending reduction state as such.
490
        // This has to happen after applying post_reduction_buffers and per_buffer_last_writer_update_list
491
        // to properly support chained reductions.
492
        // If there is only one chunk/command, it already implicitly generates the final reduced value
493
        // and the buffer does not need to be flagged as a pending reduction.
494
        for(const auto& reduction : tsk.get_reductions()) {
5,166✔
495
                if(chunks.size() > 1) {
156✔
496
                        m_buffers.at(reduction.bid).pending_reduction = reduction;
119✔
497

498
                        // In some cases this node may not actually participate in the computation of the
499
                        // intermediate reduction result (because there was no chunk). If so, mark the
500
                        // reduction buffer as stale so we do not use it as input for the final reduction command.
501
                        if(per_buffer_local_writes.count(reduction.bid) == 0) {
119✔
502
                                [[maybe_unused]] size_t num_entries = 0;
4✔
503
                                m_buffers.at(reduction.bid).local_last_writer.apply_to_values([&num_entries](const write_command_state& wcs) {
4✔
504
                                        num_entries++;
4✔
505
                                        write_command_state stale_state{wcs};
4✔
506
                                        stale_state.mark_as_stale();
4✔
507
                                        return stale_state;
4✔
508
                                });
509
                                assert(num_entries == 1);
4✔
510
                        }
511
                } else {
512
                        m_completed_reductions.push_back(reduction.rid);
37✔
513
                }
514
        }
515

516
        // Determine potential "intra-task" race conditions.
517
        // These can happen in rare cases, when the node that pushes a buffer range also writes to that range within the same task.
518
        // We cannot do this while generating the push command, as we may not have the writing command recorded at that point.
519
        for(auto* push_cmd : generated_pushes) {
5,731✔
520
                const auto last_writers = m_buffers.at(push_cmd->get_transfer_id().bid).local_last_writer.get_region_values(region(push_cmd->get_range()));
721✔
521

522
                for(const auto& [box, wcs] : last_writers) {
1,434✔
523
                        assert(!box.empty()); // If we want to push it it cannot be empty
713✔
524
                        // In general we should only be pushing fresh data.
525
                        // If the push is for a reduction and the data no longer is fresh, it means
526
                        // that we did not generate a reduction command on this node and the data becomes
527
                        // stale after the remote reduction command has been executed.
528
                        assert(wcs.is_fresh() || push_cmd->get_transfer_id().rid != no_reduction_id);
713✔
529
                        auto* const writer_cmd = m_cdag.get(wcs);
713✔
530
                        assert(writer_cmd != nullptr);
713✔
531

532
                        // We're only interested in writes that happen within the same task as the push
533
                        if(utils::isa<task_command>(writer_cmd) && utils::as<task_command>(writer_cmd)->get_tid() == tsk.get_id()) {
713✔
534
                                // In certain situations the push might have a true dependency on the last writer,
535
                                // in that case don't add an anti-dependency (as that would cause a cycle).
536
                                // TODO: Is this still possible? We don't have a unit test exercising this branch...
537
                                if(push_cmd->has_dependency(writer_cmd, dependency_kind::true_dep)) {
8!
538
                                        // This can currently only happen for await_push commands.
UNCOV
539
                                        assert(utils::isa<await_push_command>(writer_cmd));
×
UNCOV
540
                                        continue;
×
541
                                }
542
                                m_cdag.add_dependency(writer_cmd, push_cmd, dependency_kind::anti_dep, dependency_origin::dataflow);
8✔
543
                        }
544

545
                        // reduction commands will overwrite their buffer, so they must anti-depend on their partial-result push-commands
546
                        if(utils::isa<reduction_command>(writer_cmd)
713✔
547
                            && utils::as<reduction_command>(writer_cmd)->get_reduction_info().rid == push_cmd->get_transfer_id().rid) {
713✔
548
                                m_cdag.add_dependency(writer_cmd, push_cmd, dependency_kind::anti_dep, dependency_origin::dataflow);
59✔
549
                        }
550
                }
551
        }
721✔
552

553
        // Determine which local data is fresh/stale based on task-level writes.
554
        auto requirements = get_buffer_requirements_for_mapped_access(tsk, subrange<3>(tsk.get_global_offset(), tsk.get_global_size()), tsk.get_global_size());
5,010✔
555
        // Add requirements for reductions
556
        for(const auto& reduction : tsk.get_reductions()) {
5,166✔
557
                // the actual mode is irrelevant as long as it's a producer - TODO have a better query API for task buffer requirements
558
                requirements[reduction.bid][access_mode::write] = scalar_reduction_box;
156✔
559
        }
560
        for(auto& [bid, reqs_by_mode] : requirements) {
9,213✔
561
                box_vector<3> global_write_boxes;
4,203✔
562
                for(const auto mode : access::producer_modes) {
25,218✔
563
                        if(reqs_by_mode.count(mode) == 0) continue;
21,015✔
564
                        const auto& by_mode = reqs_by_mode.at(mode);
3,337✔
565
                        global_write_boxes.insert(global_write_boxes.end(), by_mode.get_boxes().begin(), by_mode.get_boxes().end());
3,337✔
566
                }
567
                const region global_writes(std::move(global_write_boxes));
4,203✔
568
                const auto& local_writes = per_buffer_local_writes[bid];
4,203✔
569
                assert(region_difference(local_writes, global_writes).empty()); // Local writes have to be a subset of global writes
4,203✔
570
                const auto remote_writes = region_difference(global_writes, local_writes);
4,203✔
571
                auto& buffer = m_buffers.at(bid);
4,203✔
572

573
                if(m_policy.uninitialized_read_error != error_policy::ignore) { buffer.initialized_region = region_union(buffer.initialized_region, global_writes); }
4,203✔
574

575
                // TODO: We need a way of updating regions in place! E.g. apply_to_values(box, callback)
576
                auto boxes_and_cids = buffer.local_last_writer.get_region_values(remote_writes);
4,203✔
577
                for(auto& [box, wcs] : boxes_and_cids) {
5,102✔
578
                        if(wcs.is_fresh()) {
899✔
579
                                wcs.mark_as_stale();
595✔
580
                                buffer.local_last_writer.update_region(box, wcs);
595✔
581
                        }
582
                }
583
        }
4,203✔
584

585
        process_task_side_effect_requirements(tsk);
5,010✔
586
}
10,050✔
587

588
void command_graph_generator::generate_anti_dependencies(
3,647✔
589
    task_id tid, buffer_id bid, const region_map<write_command_state>& last_writers_map, const region<3>& write_req, abstract_command* write_cmd) {
590
        const auto last_writers = last_writers_map.get_region_values(write_req);
3,647✔
591
        for(const auto& [box, wcs] : last_writers) {
7,407✔
592
                auto* const last_writer_cmd = m_cdag.get(static_cast<command_id>(wcs));
3,760✔
593
                assert(!utils::isa<task_command>(last_writer_cmd) || utils::as<task_command>(last_writer_cmd)->get_tid() != tid);
3,760✔
594

595
                // Add anti-dependencies onto all successors of the writer
596
                bool has_successors = false;
3,760✔
597
                for(auto d : last_writer_cmd->get_dependents()) {
8,933✔
598
                        // Only consider true dependencies
599
                        if(d.kind != dependency_kind::true_dep) continue;
5,173✔
600

601
                        auto* const cmd = d.node;
5,032✔
602

603
                        // We might have already generated new commands within the same task that also depend on this; in that case, skip it
604
                        if(utils::isa<task_command>(cmd) && utils::as<task_command>(cmd)->get_tid() == tid) continue;
5,032!
605

606
                        // So far we don't know whether the dependent actually intersects with the subrange we're writing
607
                        if(const auto command_reads_it = m_command_buffer_reads.find(cmd->get_cid()); command_reads_it != m_command_buffer_reads.end()) {
2,658✔
608
                                const auto& command_reads = command_reads_it->second;
1,087✔
609
                                // The task might be a dependent because of another buffer
610
                                if(const auto buffer_reads_it = command_reads.find(bid); buffer_reads_it != command_reads.end()) {
1,087✔
611
                                        if(!region_intersection(write_req, buffer_reads_it->second).empty()) {
1,039✔
612
                                                has_successors = true;
887✔
613
                                                m_cdag.add_dependency(write_cmd, cmd, dependency_kind::anti_dep, dependency_origin::dataflow);
887✔
614
                                        }
615
                                }
616
                        }
617
                }
618

619
                // In some cases (horizons, master node host task, weird discard_* constructs...)
620
                // the last writer might not have any successors. Just add the anti-dependency onto the writer itself then.
621
                if(!has_successors) { m_cdag.add_dependency(write_cmd, last_writer_cmd, dependency_kind::anti_dep, dependency_origin::dataflow); }
3,760✔
622
        }
623
}
7,294✔
624

625
void command_graph_generator::process_task_side_effect_requirements(const task& tsk) {
5,010✔
626
        const task_id tid = tsk.get_id();
5,010✔
627
        if(tsk.get_side_effect_map().empty()) return; // skip the loop in the common case
5,010✔
628
        if(m_cdag.task_command_count(tid) == 0) return;
212✔
629

630
        for(auto* const cmd : m_cdag.task_commands(tid)) {
280✔
631
                for(const auto& side_effect : tsk.get_side_effect_map()) {
291✔
632
                        const auto [hoid, order] = side_effect;
151✔
633
                        auto& host_object = m_host_objects.at(hoid);
151✔
634

635
                        if(host_object.last_side_effect.has_value()) {
151!
636
                                // TODO once we have different side_effect_orders, their interaction will determine the dependency kind
637
                                m_cdag.add_dependency(cmd, m_cdag.get(*host_object.last_side_effect), dependency_kind::true_dep, dependency_origin::dataflow);
151✔
638
                        }
639

640
                        // Simplification: If there are multiple chunks per node, we generate true-dependencies between them in an arbitrary order, when all we really
641
                        // need is mutual exclusion (i.e. a bi-directional pseudo-dependency).
642
                        host_object.last_side_effect = cmd->get_cid();
151✔
643
                }
644
        }
645
}
646

647
void command_graph_generator::set_epoch_for_new_commands(const abstract_command* const epoch_or_horizon) {
1,727✔
648
        // both an explicit epoch command and an applied horizon can be effective epochs
649
        assert(utils::isa<epoch_command>(epoch_or_horizon) || utils::isa<horizon_command>(epoch_or_horizon));
1,727✔
650

651
        for(auto& [bid, bs] : m_buffers) {
2,924✔
652
                bs.local_last_writer.apply_to_values([epoch_or_horizon](const write_command_state& wcs) {
1,197✔
653
                        auto new_wcs = write_command_state(std::max(epoch_or_horizon->get_cid(), static_cast<command_id>(wcs)), wcs.is_replicated());
2,314✔
654
                        if(!wcs.is_fresh()) new_wcs.mark_as_stale();
2,314✔
655
                        return new_wcs;
2,314✔
656
                });
657
        }
658
        for(auto& [cgid, cid] : m_last_collective_commands) {
1,809✔
659
                cid = std::max(epoch_or_horizon->get_cid(), cid);
82✔
660
        }
661
        for(auto& [_, host_object] : m_host_objects) {
1,794✔
662
                if(host_object.last_side_effect.has_value()) { host_object.last_side_effect = std::max(epoch_or_horizon->get_cid(), *host_object.last_side_effect); }
67!
663
        }
664

665
        m_epoch_for_new_commands = epoch_or_horizon->get_cid();
1,727✔
666
}
1,727✔
667

668
void command_graph_generator::reduce_execution_front_to(abstract_command* const new_front) {
1,785✔
669
        const auto previous_execution_front = m_cdag.get_execution_front();
1,785✔
670
        for(auto* const front_cmd : previous_execution_front) {
7,795✔
671
                if(front_cmd != new_front) { m_cdag.add_dependency(new_front, front_cmd, dependency_kind::true_dep, dependency_origin::execution_front); }
6,010✔
672
        }
673
        assert(m_cdag.get_execution_front().size() == 1 && *m_cdag.get_execution_front().begin() == new_front);
1,785✔
674
}
3,570✔
675

676
void command_graph_generator::generate_epoch_command(const task& tsk) {
728✔
677
        assert(tsk.get_type() == task_type::epoch);
728✔
678
        auto* const epoch = create_command<epoch_command>(tsk.get_id(), tsk.get_epoch_action(), std::move(m_completed_reductions));
728✔
679
        set_epoch_for_new_commands(epoch);
728✔
680
        m_current_horizon = no_command;
728✔
681
        // Make the epoch depend on the previous execution front
682
        reduce_execution_front_to(epoch);
728✔
683
}
728✔
684

685
void command_graph_generator::generate_horizon_command(const task& tsk) {
1,057✔
686
        assert(tsk.get_type() == task_type::horizon);
1,057✔
687
        auto* const horizon = create_command<horizon_command>(tsk.get_id(), std::move(m_completed_reductions));
1,057✔
688

689
        if(m_current_horizon != static_cast<command_id>(no_command)) {
1,057✔
690
                // Apply the previous horizon
691
                set_epoch_for_new_commands(m_cdag.get(m_current_horizon));
999✔
692
        }
693
        m_current_horizon = horizon->get_cid();
1,057✔
694

695
        // Make the horizon depend on the previous execution front
696
        reduce_execution_front_to(horizon);
1,057✔
697
}
1,057✔
698

699
void command_graph_generator::generate_epoch_dependencies(abstract_command* cmd) {
7,049✔
700
        // No command must be re-ordered before its last preceding epoch to enforce the barrier semantics of epochs.
701
        // To guarantee that each node has a transitive true dependency (=temporal dependency) on the epoch, it is enough to add an epoch -> command dependency
702
        // to any command that has no other true dependencies itself and no graph traversal is necessary. This can be verified by a simple induction proof.
703

704
        // As long the first epoch is present in the graph, all transitive dependencies will be visible and the initial epoch commands (tid 0) are the only
705
        // commands with no true predecessor. As soon as the first epoch is pruned through the horizon mechanism however, more than one node with no true
706
        // predecessor can appear (when visualizing the graph). This does not violate the ordering constraint however, because all "free-floating" nodes
707
        // in that snapshot had a true-dependency chain to their predecessor epoch at the point they were flushed, which is sufficient for following the
708
        // dependency chain from the executor perspective.
709

710
        if(const auto deps = cmd->get_dependencies();
7,049✔
711
            std::none_of(deps.begin(), deps.end(), [](const abstract_command::dependency d) { return d.kind == dependency_kind::true_dep; })) {
12,940✔
712
                assert(cmd->get_cid() != m_epoch_for_new_commands);
2,089✔
713
                m_cdag.add_dependency(cmd, m_cdag.get(m_epoch_for_new_commands), dependency_kind::true_dep, dependency_origin::last_epoch);
2,089✔
714
        }
715
}
7,049✔
716

717
void command_graph_generator::prune_commands_before(const command_id epoch) {
6,795✔
718
        if(epoch > m_epoch_last_pruned_before) {
6,795✔
719
                m_cdag.erase_if([&](abstract_command* cmd) {
1,341✔
720
                        if(cmd->get_cid() < epoch) {
13,193✔
721
                                m_command_buffer_reads.erase(cmd->get_cid());
6,442✔
722
                                return true;
6,442✔
723
                        }
724
                        return false;
6,751✔
725
                });
726
                m_epoch_last_pruned_before = epoch;
1,341✔
727
        }
728
}
6,795✔
729

730
std::string command_graph_generator::print_buffer_debug_label(const buffer_id bid) const {
17✔
731
        return utils::make_buffer_debug_label(bid, m_buffers.at(bid).debug_name);
17✔
732
}
733

734
} // namespace celerity::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc