• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 10216674169

02 Aug 2024 01:45PM UTC coverage: 94.951% (+2.1%) from 92.884%
10216674169

push

github

fknorr
Remove experimental::user_benchmarker

user_benchmarker has been obsolete ever since we moved away from
structured logging as a the profiler (CPAT) interface.

2978 of 3372 branches covered (88.32%)

Branch coverage included in aggregate %.

6557 of 6670 relevant lines covered (98.31%)

1534446.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.49
/src/task_manager.cc
1
#include "task_manager.h"
2

3
#include "access_modes.h"
4
#include "recorders.h"
5

6
namespace celerity {
7
namespace detail {
8

9
        task_manager::task_manager(size_t num_collective_nodes, detail::task_recorder* recorder, const policy_set& error_policy)
518✔
10
            : m_num_collective_nodes(num_collective_nodes), m_policy(error_policy), m_task_recorder(recorder) {
518✔
11
                // We manually generate the initial epoch task, which we treat as if it has been reached immediately.
12
                auto reserve = m_task_buffer.reserve_task_entry(await_free_task_slot_callback());
518✔
13
                auto initial_epoch = task::make_epoch(initial_epoch_task, epoch_action::none);
518✔
14
                if(m_task_recorder != nullptr) { m_task_recorder->record(task_record(*initial_epoch, {})); }
518✔
15
                m_task_buffer.put(std::move(reserve), std::move(initial_epoch));
518✔
16
        }
1,036✔
17

18
        void task_manager::notify_buffer_created(const buffer_id bid, const range<3>& range, const bool host_initialized) {
640✔
19
                const auto [iter, inserted] = m_buffers.emplace(bid, range);
640✔
20
                assert(inserted);
640✔
21
                auto& buffer = iter->second;
640✔
22
                if(host_initialized) { buffer.last_writers.update_region(subrange<3>({}, range), m_epoch_for_new_tasks); }
878✔
23
        }
640✔
24

25
        void task_manager::notify_buffer_debug_name_changed(const buffer_id bid, const std::string& debug_name) { m_buffers.at(bid).debug_name = debug_name; }
23✔
26

27
        void task_manager::notify_buffer_destroyed(const buffer_id bid) {
490✔
28
                assert(m_buffers.count(bid) != 0);
490✔
29
                m_buffers.erase(bid);
490✔
30
        }
490✔
31
        void task_manager::notify_host_object_created(const host_object_id hoid) { m_host_objects.emplace(hoid, host_object_state()); }
63✔
32

33
        void task_manager::notify_host_object_destroyed(const host_object_id hoid) {
49✔
34
                assert(m_host_objects.count(hoid) != 0);
49✔
35
                m_host_objects.erase(hoid);
49✔
36
        }
49✔
37

38
        const task* task_manager::find_task(task_id tid) const { return m_task_buffer.find_task(tid); }
×
39

40
        bool task_manager::has_task(task_id tid) const { return m_task_buffer.has_task(tid); }
11✔
41

42
        // Note that we assume tasks are not modified after their initial creation, which is why
43
        // we don't need to worry about thread-safety after returning the task pointer.
44
        const task* task_manager::get_task(task_id tid) const { return m_task_buffer.get_task(tid); }
6,390✔
45

46
        void task_manager::notify_horizon_reached(task_id horizon_tid) {
847✔
47
                // m_latest_horizon_reached does not need synchronization (see definition), all other accesses are implicitly synchronized.
48

49
                assert(m_task_buffer.get_task(horizon_tid)->get_type() == task_type::horizon);
847✔
50
                assert(!m_latest_horizon_reached || *m_latest_horizon_reached < horizon_tid);
847✔
51
                assert(m_latest_epoch_reached.get() < horizon_tid);
847✔
52

53
                if(m_latest_horizon_reached) { m_latest_epoch_reached.set(*m_latest_horizon_reached); }
847✔
54

55
                m_latest_horizon_reached = horizon_tid;
847✔
56
        }
847✔
57

58
        void task_manager::notify_epoch_reached(task_id epoch_tid) {
297✔
59
                // m_latest_horizon_reached does not need synchronization (see definition), all other accesses are implicitly synchronized.
60

61
                assert(get_task(epoch_tid)->get_type() == task_type::epoch);
297✔
62
                assert(!m_latest_horizon_reached || *m_latest_horizon_reached < epoch_tid);
297✔
63
                assert(m_latest_epoch_reached.get() < epoch_tid);
297✔
64

65
                m_latest_epoch_reached.set(epoch_tid);
297✔
66
                m_latest_horizon_reached = std::nullopt; // Any non-applied horizon is now behind the epoch and will therefore never become an epoch itself
297✔
67
        }
297✔
68

69
        void task_manager::await_epoch(task_id epoch) { m_latest_epoch_reached.await(epoch); }
296✔
70

71
        region<3> get_requirements(const task& tsk, buffer_id bid, const std::vector<cl::sycl::access::mode>& modes) {
8,164✔
72
                const auto& access_map = tsk.get_buffer_access_map();
8,164✔
73
                const subrange<3> full_range{tsk.get_global_offset(), tsk.get_global_size()};
8,164✔
74
                box_vector<3> boxes;
8,164✔
75
                for(auto m : modes) {
44,180✔
76
                        const auto req = access_map.get_mode_requirements(bid, m, tsk.get_dimensions(), full_range, tsk.get_global_size());
36,018✔
77
                        boxes.insert(boxes.end(), req.get_boxes().begin(), req.get_boxes().end());
36,016✔
78
                }
36,016✔
79
                return region(std::move(boxes));
16,324✔
80
        }
8,164✔
81

82
        void task_manager::compute_dependencies(task& tsk) {
5,449✔
83
                using namespace cl::sycl::access;
84

85
                const auto& access_map = tsk.get_buffer_access_map();
5,449✔
86

87
                auto buffers = access_map.get_accessed_buffers();
5,449✔
88
                for(const auto& reduction : tsk.get_reductions()) {
5,584✔
89
                        buffers.emplace(reduction.bid);
135✔
90
                }
91

92
                const box<3> scalar_box({0, 0, 0}, {1, 1, 1});
5,449✔
93

94
                for(const auto bid : buffers) {
9,653✔
95
                        auto& buffer = m_buffers.at(bid);
4,213✔
96
                        const auto modes = access_map.get_access_modes(bid);
4,213✔
97

98
                        std::optional<reduction_info> reduction;
4,213✔
99
                        for(const auto& maybe_reduction : tsk.get_reductions()) {
4,407✔
100
                                if(maybe_reduction.bid == bid) {
195✔
101
                                        if(reduction) { throw std::runtime_error(fmt::format("Multiple reductions attempt to write buffer {} in task {}", bid, tsk.get_id())); }
136✔
102
                                        reduction = maybe_reduction;
134✔
103
                                }
104
                        }
105

106
                        if(reduction && !modes.empty()) {
4,212✔
107
                                throw std::runtime_error(
108
                                    fmt::format("Buffer {} is both required through an accessor and used as a reduction output in task {}", bid, tsk.get_id()));
8✔
109
                        }
110

111
                        // Determine reader dependencies
112
                        if(std::any_of(modes.cbegin(), modes.cend(), detail::access::mode_traits::is_consumer) || (reduction.has_value() && reduction->init_from_buffer)) {
4,208✔
113
                                auto read_requirements = get_requirements(tsk, bid, {detail::access::consumer_modes.cbegin(), detail::access::consumer_modes.cend()});
10,683✔
114
                                if(reduction.has_value()) { read_requirements = region_union(read_requirements, scalar_box); }
3,561✔
115
                                const auto last_writers = buffer.last_writers.get_region_values(read_requirements);
3,561✔
116

117
                                box_vector<3> uninitialized_reads;
3,561✔
118
                                for(const auto& [box, writer] : last_writers) {
7,881✔
119
                                        // host-initialized buffers are last-written by the current epoch
120
                                        if(writer.has_value()) {
4,320✔
121
                                                add_dependency(tsk, *m_task_buffer.get_task(*writer), dependency_kind::true_dep, dependency_origin::dataflow);
4,304✔
122
                                        } else if(m_policy.uninitialized_read_error != error_policy::ignore) {
16✔
123
                                                uninitialized_reads.push_back(box);
5✔
124
                                        }
125
                                }
126
                                if(!uninitialized_reads.empty()) {
3,561✔
127
                                        utils::report_error(m_policy.uninitialized_read_error,
16✔
128
                                            "{} declares a reading access on uninitialized {} {}. Make sure to construct the accessor with no_init if possible.",
129
                                            print_task_debug_label(tsk, true /* title case */), print_buffer_debug_label(bid), region(std::move(uninitialized_reads)));
26✔
130
                                }
131
                        }
3,565✔
132

133
                        // Update last writers and determine anti-dependencies
134
                        if(std::any_of(modes.cbegin(), modes.cend(), detail::access::mode_traits::is_producer) || reduction.has_value()) {
4,206✔
135
                                auto write_requirements = get_requirements(tsk, bid, {detail::access::producer_modes.cbegin(), detail::access::producer_modes.cend()});
10,100✔
136
                                if(reduction.has_value()) { write_requirements = region_union(write_requirements, scalar_box); }
3,364✔
137
                                if(write_requirements.empty()) continue;
3,364✔
138

139
                                const auto last_writers = buffer.last_writers.get_region_values(write_requirements);
3,352✔
140
                                for(auto& p : last_writers) {
6,730✔
141
                                        if(p.second == std::nullopt) continue;
3,378✔
142
                                        task* last_writer = m_task_buffer.get_task(*p.second);
2,577✔
143

144
                                        // Determine anti-dependencies by looking at all the dependents of the last writing task
145
                                        bool has_anti_dependents = false;
2,577✔
146

147
                                        for(auto dependent : last_writer->get_dependents()) {
6,246✔
148
                                                if(dependent.node->get_id() == tsk.get_id()) {
3,669✔
149
                                                        // This can happen
150
                                                        // - if a task writes to two or more buffers with the same last writer
151
                                                        // - if the task itself also needs read access to that buffer (R/W access)
152
                                                        continue;
2,432✔
153
                                                }
154
                                                const auto dependent_read_requirements =
1,237✔
155
                                                    get_requirements(*dependent.node, bid, {detail::access::consumer_modes.cbegin(), detail::access::consumer_modes.cend()});
3,711✔
156
                                                // Only add an anti-dependency if we are really writing over the region read by this task
157
                                                if(!region_intersection(write_requirements, dependent_read_requirements).empty()) {
1,237✔
158
                                                        add_dependency(tsk, *dependent.node, dependency_kind::anti_dep, dependency_origin::dataflow);
359✔
159
                                                        has_anti_dependents = true;
359✔
160
                                                }
161
                                        }
1,237✔
162

163
                                        if(!has_anti_dependents) {
2,577✔
164
                                                // If no intermediate consumers exist, add an anti-dependency on the last writer directly.
165
                                                // Note that unless this task is a pure producer, a true dependency will be created and this is a no-op.
166
                                                // While it might not always make total sense to have anti-dependencies between (pure) producers without an
167
                                                // intermediate consumer, we at least have a defined behavior, and the thus enforced ordering of tasks
168
                                                // likely reflects what the user expects.
169
                                                add_dependency(tsk, *last_writer, dependency_kind::anti_dep, dependency_origin::dataflow);
2,330✔
170
                                        }
171
                                }
172

173
                                buffer.last_writers.update_region(write_requirements, tsk.get_id());
3,352✔
174
                        }
3,364✔
175
                }
4,213✔
176

177
                for(const auto& side_effect : tsk.get_side_effect_map()) {
5,665✔
178
                        const auto [hoid, order] = side_effect;
225✔
179
                        auto& host_object = m_host_objects.at(hoid);
225✔
180
                        if(host_object.last_side_effect.has_value()) {
225✔
181
                                add_dependency(tsk, *m_task_buffer.get_task(*host_object.last_side_effect), dependency_kind::true_dep, dependency_origin::dataflow);
171✔
182
                        }
183
                        host_object.last_side_effect = tsk.get_id();
225✔
184
                }
185

186
                if(auto cgid = tsk.get_collective_group_id(); cgid != 0) {
5,440✔
187
                        if(auto prev = m_last_collective_tasks.find(cgid); prev != m_last_collective_tasks.end()) {
66✔
188
                                add_dependency(tsk, *m_task_buffer.get_task(prev->second), dependency_kind::true_dep, dependency_origin::collective_group_serialization);
17✔
189
                                m_last_collective_tasks.erase(prev);
17✔
190
                        }
191
                        m_last_collective_tasks.emplace(cgid, tsk.get_id());
66✔
192
                }
193

194
                // Tasks without any other true-dependency must depend on the last epoch to ensure they cannot be re-ordered before the epoch
195
                if(const auto deps = tsk.get_dependencies();
5,440✔
196
                    std::none_of(deps.begin(), deps.end(), [](const task::dependency d) { return d.kind == dependency_kind::true_dep; })) {
9,317✔
197
                        add_dependency(tsk, *m_task_buffer.get_task(m_epoch_for_new_tasks), dependency_kind::true_dep, dependency_origin::last_epoch);
1,690✔
198
                }
199
        }
10,889✔
200

201
        task& task_manager::register_task_internal(task_ring_buffer::reservation&& reserve, std::unique_ptr<task> task) {
6,514✔
202
                auto& task_ref = *task;
6,514✔
203
                assert(task != nullptr);
6,514✔
204
                m_task_buffer.put(std::move(reserve), std::move(task));
6,514✔
205
                m_execution_front.insert(&task_ref);
6,514✔
206
                return task_ref;
6,514✔
207
        }
208

209
        void task_manager::invoke_callbacks(const task* tsk) const {
6,505✔
210
                for(const auto& cb : m_task_callbacks) {
11,586✔
211
                        cb(tsk);
5,081✔
212
                }
213
                if(m_task_recorder != nullptr) {
6,505✔
214
                        m_task_recorder->record(task_record(*tsk, [this](const buffer_id bid) { return m_buffers.at(bid).debug_name; }));
2,835✔
215
                }
216
        }
6,505✔
217

218
        void task_manager::add_dependency(task& depender, task& dependee, dependency_kind kind, dependency_origin origin) {
12,528✔
219
                assert(&depender != &dependee);
12,528✔
220
                depender.add_dependency({&dependee, kind, origin});
12,528✔
221
                m_execution_front.erase(&dependee);
12,528✔
222
                m_max_pseudo_critical_path_length = std::max(m_max_pseudo_critical_path_length, depender.get_pseudo_critical_path_length());
12,528✔
223
        }
12,528✔
224

225
        bool task_manager::need_new_horizon() const {
4,895✔
226
                const bool need_seq_horizon = m_max_pseudo_critical_path_length - m_current_horizon_critical_path_length >= m_task_horizon_step_size;
4,895✔
227
                const bool need_para_horizon = static_cast<int>(m_execution_front.size()) >= m_task_horizon_max_parallelism;
4,895✔
228
                const bool need_horizon = need_seq_horizon || need_para_horizon;
4,895✔
229
                CELERITY_TRACE("Horizon decision: {} - seq: {} para: {} - crit_p: {} exec_f: {}", need_horizon, need_seq_horizon, need_para_horizon,
4,895✔
230
                    m_current_horizon_critical_path_length, m_execution_front.size());
231
                return need_horizon;
9,790✔
232
        }
233

234
        task& task_manager::reduce_execution_front(task_ring_buffer::reservation&& reserve, std::unique_ptr<task> new_front) {
1,526✔
235
                // add dependencies from a copy of the front to this task
236
                const auto current_front = m_execution_front;
1,526✔
237
                for(task* front_task : current_front) {
5,183✔
238
                        add_dependency(*new_front, *front_task, dependency_kind::true_dep, dependency_origin::execution_front);
3,657✔
239
                }
240
                assert(m_execution_front.empty());
1,526✔
241
                return register_task_internal(std::move(reserve), std::move(new_front));
3,052✔
242
        }
1,526✔
243

244
        void task_manager::set_epoch_for_new_tasks(const task_id epoch) {
1,467✔
245
                // apply the new epoch to buffers_last_writers and last_collective_tasks data structs
246
                for(auto& [_, buffer] : m_buffers) {
2,648✔
247
                        buffer.last_writers.apply_to_values([epoch](const std::optional<task_id> tid) -> std::optional<task_id> {
1,181✔
248
                                if(!tid) return tid;
2,009✔
249
                                return {std::max(epoch, *tid)};
1,774✔
250
                        });
251
                }
252
                for(auto& [_, tid] : m_last_collective_tasks) {
1,526✔
253
                        tid = std::max(epoch, tid);
59✔
254
                }
255
                for(auto& [_, host_object] : m_host_objects) {
1,532✔
256
                        if(host_object.last_side_effect.has_value() && *host_object.last_side_effect < epoch) { host_object.last_side_effect = epoch; }
65!
257
                }
258

259
                m_epoch_for_new_tasks = epoch;
1,467✔
260
        }
1,467✔
261

262
        task_id task_manager::generate_horizon_task() {
1,065✔
263
                auto reserve = m_task_buffer.reserve_task_entry(await_free_task_slot_callback());
1,065✔
264
                const auto tid = reserve.get_tid();
1,065✔
265

266
                m_current_horizon_critical_path_length = m_max_pseudo_critical_path_length;
1,065✔
267
                const auto previous_horizon = m_current_horizon;
1,065✔
268
                m_current_horizon = tid;
1,065✔
269

270
                task& new_horizon = reduce_execution_front(std::move(reserve), task::make_horizon(*m_current_horizon));
1,065✔
271
                if(previous_horizon) { set_epoch_for_new_tasks(*previous_horizon); }
1,065✔
272

273
                invoke_callbacks(&new_horizon);
1,065✔
274
                return tid;
1,065✔
275
        }
1,065✔
276

277
        task_id task_manager::generate_epoch_task(epoch_action action) {
461✔
278
                auto reserve = m_task_buffer.reserve_task_entry(await_free_task_slot_callback());
461✔
279
                const auto tid = reserve.get_tid();
461✔
280

281
                task& new_epoch = reduce_execution_front(std::move(reserve), task::make_epoch(tid, action));
461✔
282
                compute_dependencies(new_epoch);
461✔
283
                set_epoch_for_new_tasks(tid);
461✔
284

285
                m_current_horizon = std::nullopt; // this horizon is now behind the epoch_for_new_tasks, so it will never become an epoch itself
461✔
286
                m_current_horizon_critical_path_length = m_max_pseudo_critical_path_length; // the explicit epoch resets the need to create horizons
461✔
287

288
                invoke_callbacks(&new_epoch);
461✔
289
                return tid;
461✔
290
        }
461✔
291

292
        task_id task_manager::generate_fence_task(buffer_access_map access_map, side_effect_map side_effects, std::unique_ptr<fence_promise> fence_promise) {
84✔
293
                auto reserve = m_task_buffer.reserve_task_entry(await_free_task_slot_callback());
84✔
294
                const auto tid = reserve.get_tid();
84✔
295
                task& tsk = register_task_internal(std::move(reserve), task::make_fence(tid, std::move(access_map), std::move(side_effects), std::move(fence_promise)));
84✔
296
                compute_dependencies(tsk);
84✔
297
                invoke_callbacks(&tsk);
84✔
298
                return tid;
84✔
299
        }
84✔
300

301
        task_id task_manager::get_first_in_flight_epoch() const {
2✔
302
                task_id current_horizon = 0;
2✔
303
                task_id latest_epoch = m_latest_epoch_reached.get();
2✔
304
                // we need either one epoch or two horizons that have yet to be executed
305
                // so that it is possible for task slots to be freed in the future
306
                for(const auto& tsk : m_task_buffer) {
1,036✔
307
                        if(tsk->get_id() <= latest_epoch) continue;
1,035✔
308
                        if(tsk->get_type() == task_type::epoch) {
1,033!
309
                                return tsk->get_id();
×
310
                        } else if(tsk->get_type() == task_type::horizon) {
1,033✔
311
                                if(current_horizon) return current_horizon;
2✔
312
                                current_horizon = tsk->get_id();
1✔
313
                        }
314
                }
315
                return latest_epoch;
1✔
316
        }
317

318
        task_ring_buffer::wait_callback task_manager::await_free_task_slot_callback() {
7,053✔
319
                return [&](task_id previous_free_tid) {
7,053✔
320
                        if(get_first_in_flight_epoch() == m_latest_epoch_reached.get()) {
2✔
321
                                // verify that the epoch didn't get reached between the invocation of the callback and the in flight check
322
                                if(m_latest_epoch_reached.get() < previous_free_tid + 1) {
1!
323
                                        throw std::runtime_error("Exhausted task slots with no horizons or epochs in flight."
324
                                                                 "\nLikely due to generating a very large number of tasks with no dependencies.");
1✔
325
                                }
326
                        }
327
                        task_id reached_epoch = m_latest_epoch_reached.await(previous_free_tid + 1);
1✔
328
                        m_task_buffer.delete_up_to(reached_epoch);
1✔
329
                };
7,054✔
330
        }
331

332
        std::string task_manager::print_buffer_debug_label(const buffer_id bid) const { return utils::make_buffer_debug_label(bid, m_buffers.at(bid).debug_name); }
4✔
333

334
} // namespace detail
335
} // namespace celerity
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc