• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 10074265948

24 Jul 2024 09:38AM UTC coverage: 92.909% (-0.2%) from 93.126%
10074265948

push

github

fknorr
Introduce new backend and executor infrastructure

'executor' has two implementations, live_executor (for normal execution)
and dry_run_executor for dry runs. The live_executor maintains the state
persistent between instructions, such as memory allocations, but
delegates all state management complexity to out_of_order_engine and
receive_arbiter.

'backend' is an abstract interface for executing any operation that
might touch backend-allocated memory. The interface itself is
independent of SYCL, and the tie-in happens with the sycl_backend
derived class. Backend specialization for CUDA nd-copies is achieved
by instantiating either a sycl_generic_backend or a sycl_cuda_backend.

3305 of 3839 branches covered (86.09%)

Branch coverage included in aggregate %.

509 of 553 new or added lines in 15 files covered. (92.04%)

2 existing lines in 2 files now uncovered.

7753 of 8063 relevant lines covered (96.16%)

180915.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.53
/include/thread_queue.h
1
#pragma once
2

3
#include "async_event.h"
4
#include "double_buffered_queue.h"
5
#include "named_threads.h"
6
#include "utils.h"
7

8
#include <chrono>
9
#include <future>
10
#include <thread>
11
#include <type_traits>
12
#include <variant>
13

14
namespace celerity::detail {
15

16
/// A single-thread job queue accepting functors and returning events that conditionally forward job results.
17
class thread_queue {
18
  public:
19
        /// Constructs a null thread queue that cannot receive jobs.
20
        thread_queue() = default;
21

22
        /// Spawns a new thread queue with the given thread name. If `enable_profiling` is set to `true`, completed events from this thread queue will report a
23
        /// non-nullopt duration.
24
        explicit thread_queue(std::string thread_name, const bool enable_profiling = false) : m_impl(new impl(std::move(thread_name), enable_profiling)) {}
60!
25

26
        // thread_queue is movable, but not copyable.
27
        thread_queue(const thread_queue&) = delete;
UNCOV
28
        thread_queue(thread_queue&&) = default;
×
29
        thread_queue& operator=(const thread_queue&) = delete;
30
        thread_queue& operator=(thread_queue&&) = default;
31

32
        /// Destruction will await all submitted and pending jobs.
33
        ~thread_queue() {
60✔
34
                if(m_impl != nullptr) {
60!
35
                        m_impl->queue.push(job{} /* termination */);
60✔
36
                        m_impl->thread.join();
60✔
37
                }
38
        }
60✔
39

40
        /// Submit a job to the thread queue.
41
        /// `fn` must take no arguments and return either `void` or a type convertible to `void *`, which will be forwarded as the result into the event.
42
        template <typename Fn>
43
        async_event submit(Fn&& fn) {
289✔
44
                static_assert(std::is_void_v<std::invoke_result_t<std::decay_t<Fn>>> || std::is_convertible_v<std::invoke_result_t<std::decay_t<Fn>>, void*>,
45
                    "job function must return either void or a pointer convertible to void*");
46
                assert(m_impl != nullptr);
289✔
47
                job job(std::forward<Fn>(fn));
289✔
48
                auto evt = make_async_event<thread_queue::event>(job.promise.get_future());
289✔
49
                m_impl->queue.push(std::move(job));
289✔
50
                return evt;
578✔
51
        }
289✔
52

53
  private:
54
        friend struct thread_queue_testspy;
55

56
        /// The object passed through std::future from queue thread to owner thread
57
        struct completion {
58
                void* result = nullptr;
59
                std::optional<std::chrono::nanoseconds> execution_time;
60
        };
61

62
        struct job {
63
                std::function<void*()> fn;
64
                std::promise<completion> promise;
65

66
                job() = default; // empty (default-constructed) fn signals termination
60✔
67

68
                /// Constructor overload for `fn` returning `void`.
69
                template <typename Fn, std::enable_if_t<std::is_void_v<std::invoke_result_t<std::decay_t<Fn>>>, int> = 0>
70
                job(Fn&& fn) : fn([fn = std::forward<Fn>(fn)]() mutable { return std::invoke(fn), nullptr; }) {}
572✔
71

72
                /// Constructor overload for `fn` returning `void*`.
73
                template <typename Fn, std::enable_if_t<std::is_convertible_v<std::invoke_result_t<std::decay_t<Fn>>, void*>, int> = 0>
74
                job(Fn&& fn) : fn([fn = std::forward<Fn>(fn)]() mutable { return std::invoke(fn); }) {}
6✔
75
        };
76

77
        class event : public async_event_impl {
78
          public:
79
                explicit event(std::future<completion> future) : m_state(std::move(future)) {}
289✔
80

81
                bool is_complete() override { return get_completed() != nullptr; }
537,026✔
82

83
                void* get_result() override {
286✔
84
                        const auto completed = get_completed();
286✔
85
                        assert(completed);
286✔
86
                        return completed->result;
286✔
87
                }
88

89
                std::optional<std::chrono::nanoseconds> get_native_execution_time() override {
11✔
90
                        const auto completed = get_completed();
11✔
91
                        assert(completed);
11✔
92
                        return completed->execution_time;
11✔
93
                }
94

95
          private:
96
                // As the result from std::future can only be retrieved once and std::shared_future is not functionally necessary here, we replace the future by its
97
                // completion as soon as the first query succeeds.
98
                std::variant<std::future<completion>, completion> m_state;
99

100
                completion* get_completed() {
537,323✔
101
                        if(const auto completed = std::get_if<completion>(&m_state)) return completed;
537,323✔
102
                        if(auto& future = std::get<std::future<completion>>(m_state); future.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {
537,025✔
103
                                return &m_state.emplace<completion>(future.get());
289✔
104
                        }
105
                        return nullptr;
536,736✔
106
                }
107
        };
108

109
        // pimpl'd to keep thread_queue movable
110
        struct impl {
111
                double_buffered_queue<job> queue;
112
                const bool enable_profiling;
113
                std::thread thread;
114

115
                explicit impl(std::string name, const bool enable_profiling) : enable_profiling(enable_profiling), thread(&impl::thread_main, this, std::move(name)) {}
60✔
116

117
                void execute(job& job) const {
289✔
118
                        std::chrono::steady_clock::time_point start;
289✔
119
                        if(enable_profiling) { start = std::chrono::steady_clock::now(); }
289✔
120

121
                        completion completion;
289✔
122
                        completion.result = job.fn();
289✔
123

124
                        if(enable_profiling) {
289✔
125
                                const auto end = std::chrono::steady_clock::now();
4✔
126
                                completion.execution_time = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start);
4✔
127
                        }
128
                        job.promise.set_value(completion);
289✔
129
                }
289✔
130

131
                void loop() {
60✔
132
                        for(;;) {
133
                                queue.wait_while_empty();
344✔
134
                                for(auto& job : queue.pop_all()) {
633✔
135
                                        if(!job.fn) return;
409✔
136
                                        execute(job);
289✔
137
                                }
138
                        }
284✔
139
                }
140

141
                void thread_main(const std::string& name) {
60✔
142
                        set_thread_name(get_current_thread_handle(), name);
60✔
143

144
                        try {
145
                                loop();
60✔
146
                        } catch(std::exception& e) { //
×
147
                                utils::panic("exception in {}: {}", name, e.what());
×
148
                        } catch(...) { //
×
149
                                utils::panic("exception in {}", name);
×
150
                        }
×
151
                }
60✔
152
        };
153

154
        std::unique_ptr<impl> m_impl;
155
};
156

157
} // namespace celerity::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc