• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 9945915519

15 Jul 2024 08:07PM UTC coverage: 93.077% (-1.3%) from 94.362%
9945915519

push

github

fknorr
Rename existing backend / executor -> legacy_backend / legacy_executor

Names 'backend' and 'executor' will be re-used, but we want to keep the
old APIs around in the meantime to keep changesets small.

3188 of 3687 branches covered (86.47%)

Branch coverage included in aggregate %.

17 of 23 new or added lines in 6 files covered. (73.91%)

95 existing lines in 8 files now uncovered.

7232 of 7508 relevant lines covered (96.32%)

169246.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.89
/src/legacy_executor.cc
1
#include "legacy_executor.h"
2

3
#include <queue>
4

5
#include "closure_hydrator.h"
6
#include "distr_queue.h"
7
#include "frame.h"
8
#include "log.h"
9
#include "mpi_support.h"
10
#include "named_threads.h"
11

12
// TODO: Get rid of this. (This could potentialy even cause deadlocks on large clusters)
13
constexpr size_t MAX_CONCURRENT_JOBS = 20;
14

15
namespace celerity {
16
namespace detail {
17
        void duration_metric::resume() {
201✔
18
                assert(!m_running);
201✔
19
                m_current_start = m_clock.now();
201✔
20
                m_running = true;
201✔
21
        }
201✔
22

23
        void duration_metric::pause() {
×
24
                assert(m_running);
×
25
                m_duration += std::chrono::duration_cast<std::chrono::microseconds>(m_clock.now() - m_current_start);
×
26
                m_running = false;
×
27
        }
×
28

29
        legacy_executor::legacy_executor(const size_t num_nodes, const node_id local_nid, host_queue& h_queue, device_queue& d_queue, task_manager& tm,
201✔
30
            buffer_manager& buffer_mngr, reduction_manager& reduction_mngr)
201✔
31
            : m_local_nid(local_nid), m_h_queue(h_queue), m_d_queue(d_queue), m_task_mngr(tm), m_buffer_mngr(buffer_mngr), m_reduction_mngr(reduction_mngr) {
201✔
32
                m_btm = std::make_unique<buffer_transfer_manager>(num_nodes);
201✔
33
                m_metrics.initial_idle.resume();
201✔
34
        }
201✔
35

36
        void legacy_executor::startup() {
189✔
37
                m_exec_thrd = std::thread(&legacy_executor::run, this);
189✔
38
                set_thread_name(m_exec_thrd.native_handle(), "cy-executor");
567✔
39
        }
189✔
40

41
        void legacy_executor::shutdown() {
189✔
42
                if(m_exec_thrd.joinable()) { m_exec_thrd.join(); }
189!
43

44
                CELERITY_DEBUG("Executor initial idle time = {}us, compute idle time = {}us, starvation time = {}us", m_metrics.initial_idle.get().count(),
378✔
45
                    m_metrics.device_idle.get().count(), m_metrics.starvation.get().count());
46
        }
189✔
47

48
        void legacy_executor::run() {
189✔
49
                closure_hydrator::make_available();
189✔
50
                bool done = false;
189✔
51

52
                while(!done || !m_jobs.empty()) {
8,108,834!
53
                        // Bail if a device error ocurred.
54
                        if(m_running_device_compute_jobs > 0) { m_d_queue.get_sycl_queue().throw_asynchronous(); }
8,108,645✔
55

56
                        // We poll transfers from here (in the same thread, interleaved with job updates),
57
                        // as it allows us to omit any sort of locking when interacting with the BTM through jobs.
58
                        // This actually makes quite a big difference, especially for lots of small transfers.
59
                        // The BTM uses non-blocking MPI routines internally, making this a relatively cheap operation.
60
                        m_btm->poll();
8,108,645✔
61

62
                        std::vector<command_id> ready_jobs;
8,108,645✔
63
                        for(auto it = m_jobs.begin(); it != m_jobs.end();) {
34,671,847✔
64
                                auto& job_handle = it->second;
26,563,202✔
65

66
                                if(job_handle.unsatisfied_dependencies > 0) {
26,563,202✔
67
                                        ++it;
18,283,529✔
68
                                        continue;
18,283,529✔
69
                                }
70

71
                                if(!job_handle.job->is_running()) {
8,279,673✔
72
                                        if(std::find(ready_jobs.cbegin(), ready_jobs.cend(), it->first) == ready_jobs.cend()) { ready_jobs.push_back(it->first); }
2,631✔
73
                                        ++it;
2,631✔
74
                                        continue;
2,631✔
75
                                }
76

77
                                if(!job_handle.job->is_done()) {
8,277,042✔
78
                                        job_handle.job->update();
8,271,470✔
79
                                        ++it;
8,271,470✔
80
                                        continue;
8,271,470✔
81
                                }
82

83
                                for(const auto& d : job_handle.dependents) {
9,836✔
84
                                        assert(m_jobs.count(d) == 1);
4,264✔
85
                                        m_jobs[d].unsatisfied_dependencies--;
4,264✔
86
                                        if(m_jobs[d].unsatisfied_dependencies == 0) { ready_jobs.push_back(d); }
4,264✔
87
                                }
88

89
                                if(utils::isa<device_execute_job>(job_handle.job.get())) {
5,572✔
90
                                        m_running_device_compute_jobs--;
470✔
91
                                } else if(const auto epoch = dynamic_cast<epoch_job*>(job_handle.job.get()); epoch && epoch->get_epoch_action() == epoch_action::shutdown) {
5,102!
92
                                        assert(m_command_queue.empty());
189✔
93
                                        done = true;
189✔
94
                                }
95

96
                                it = m_jobs.erase(it);
5,572✔
97
                        }
98

99
                        // Process newly available jobs
100
                        if(!ready_jobs.empty()) {
8,108,645✔
101
                                // Make sure to start any push jobs before other jobs, as on some platforms copying data from a compute device while
102
                                // also reading it from within a kernel is not supported. To avoid stalling other nodes, we thus perform the push first.
103
                                std::sort(ready_jobs.begin(), ready_jobs.end(),
4,933✔
104
                                    [this](command_id a, command_id b) { return m_jobs[a].cmd == command_type::push && m_jobs[b].cmd != command_type::push; });
1,251✔
105
                                for(command_id cid : ready_jobs) {
10,505✔
106
                                        auto* job = m_jobs.at(cid).job.get();
5,572✔
107
                                        job->start();
5,572✔
108
                                        job->update();
5,572✔
109
                                        if(utils::isa<device_execute_job>(job)) { m_running_device_compute_jobs++; }
5,572✔
110
                                }
111
                        }
112

113
                        if(m_jobs.size() < MAX_CONCURRENT_JOBS) {
8,108,645✔
114
                                // TODO: Double-buffer command queue?
115
                                std::unique_lock lk(m_command_queue_mutex);
7,771,610✔
116
                                if(!m_command_queue.empty()) {
7,771,610✔
117
                                        auto pkg = std::move(m_command_queue.front());
5,572✔
118
                                        lk.unlock();
5,572✔
119
                                        const auto handled = handle_command(pkg);
5,572✔
120
                                        lk.lock();
5,572✔
121
                                        if(handled) {
5,572!
122
                                                m_command_queue.pop();
5,572✔
123
                                        } else {
124
                                                // In case the command couldn't be handled, put it back into the queue.
125
                                                m_command_queue.front() = std::move(pkg);
×
126
                                                continue;
×
127
                                        }
128
                                }
5,572!
129
                        }
7,771,610!
130

131
                        if(m_first_command_received) { update_metrics(); }
8,108,645!
132
                }
8,108,645!
133

134
                assert(m_running_device_compute_jobs == 0);
189✔
135
                closure_hydrator::teardown();
189✔
136
        }
189✔
137

138
        bool legacy_executor::handle_command(const command_pkg& pkg) {
5,572✔
139
                // A worker might receive a task command before creating the corresponding task graph node
140
                if(const auto tid = pkg.get_tid()) {
5,572✔
141
                        if(!m_task_mngr.has_task(*tid)) { return false; }
4,844!
142
                }
143

144
                switch(pkg.get_command_type()) {
5,572!
145
                case command_type::horizon: create_job<horizon_job>(pkg, m_task_mngr); break;
844✔
146
                case command_type::epoch: create_job<epoch_job>(pkg, m_task_mngr); break;
255✔
147
                case command_type::push: create_job<push_job>(pkg, *m_btm, m_buffer_mngr); break;
432✔
148
                case command_type::await_push: create_job<await_push_job>(pkg, *m_btm); break;
280✔
149
                case command_type::reduction: create_job<reduction_job>(pkg, m_reduction_mngr); break;
16✔
150
                case command_type::execution:
3,704✔
151
                        if(m_task_mngr.get_task(pkg.get_tid().value())->get_execution_target() == execution_target::host) {
3,704✔
152
                                create_job<host_execute_job>(pkg, m_h_queue, m_task_mngr, m_buffer_mngr);
3,234✔
153
                        } else {
154
                                create_job<device_execute_job>(pkg, m_d_queue, m_task_mngr, m_buffer_mngr, m_reduction_mngr, m_local_nid);
470✔
155
                        }
156
                        break;
3,704✔
157
                case command_type::fence: create_job<fence_job>(pkg, m_task_mngr); break;
41✔
158
                default: assert(!"Unexpected command");
×
159
                }
160
                return true;
5,572✔
161
        }
162

NEW
163
        void legacy_executor::update_metrics() {
×
164
                if(m_running_device_compute_jobs == 0) {
×
165
                        if(!m_metrics.device_idle.is_running()) { m_metrics.device_idle.resume(); }
×
166
                } else {
167
                        if(m_metrics.device_idle.is_running()) { m_metrics.device_idle.pause(); }
×
168
                }
169
                if(m_jobs.empty()) {
×
170
                        if(!m_metrics.starvation.is_running()) { m_metrics.starvation.resume(); }
×
171
                } else {
172
                        if(m_metrics.starvation.is_running()) { m_metrics.starvation.pause(); }
×
173
                }
174
        }
×
175
} // namespace detail
176
} // namespace celerity
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc