• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 12009901531

25 Nov 2024 12:20PM UTC coverage: 94.92% (+0.009%) from 94.911%
12009901531

push

github

fknorr
Add missing includes and consistently order them

We can't add the misc-include-cleaner lint because it causes too many
false positives with "interface headers" such as sycl.hpp.

3190 of 3626 branches covered (87.98%)

Branch coverage included in aggregate %.

7049 of 7161 relevant lines covered (98.44%)

1242183.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.16
/include/thread_queue.h
1
#pragma once
2

3
#include "async_event.h"
4
#include "double_buffered_queue.h"
5
#include "named_threads.h"
6
#include "tracy.h"
7
#include "utils.h"
8

9
#include <cassert>
10
#include <chrono>
11
#include <exception>
12
#include <functional>
13
#include <future>
14
#include <memory>
15
#include <optional>
16
#include <string>
17
#include <thread>
18
#include <type_traits>
19
#include <utility>
20
#include <variant>
21

22

23
namespace celerity::detail {
24

25
/// A single-thread job queue accepting functors and returning events that conditionally forward job results.
26
class thread_queue {
27
  public:
28
        /// Constructs a null thread queue that cannot receive jobs.
29
        thread_queue() = default;
30

31
        /// Spawns a new thread queue with the given thread name. If `enable_profiling` is set to `true`, completed events from this thread queue will report a
32
        /// non-nullopt duration.
33
        explicit thread_queue(std::string thread_name, const bool enable_profiling = false) : m_impl(new impl(std::move(thread_name), enable_profiling)) {}
520!
34

35
        // thread_queue is movable, but not copyable.
36
        thread_queue(const thread_queue&) = delete;
37
        thread_queue(thread_queue&&) = default;
31✔
38
        thread_queue& operator=(const thread_queue&) = delete;
39
        thread_queue& operator=(thread_queue&&) = default;
40

41
        /// Destruction will await all submitted and pending jobs.
42
        ~thread_queue() {
551✔
43
                if(m_impl != nullptr) {
551✔
44
                        m_impl->queue.push(job{} /* termination */);
520✔
45
                        m_impl->thread.join();
520✔
46
                }
47
        }
551✔
48

49
        /// Submit a job to the thread queue.
50
        /// `fn` must take no arguments and return either `void` or a type convertible to `void *`, which will be forwarded as the result into the event.
51
        template <typename Fn>
52
        async_event submit(Fn&& fn) {
2,612✔
53
                static_assert(std::is_void_v<std::invoke_result_t<std::decay_t<Fn>>> || std::is_convertible_v<std::invoke_result_t<std::decay_t<Fn>>, void*>,
54
                    "job function must return either void or a pointer convertible to void*");
55
                assert(m_impl != nullptr);
2,612✔
56
                job job(std::forward<Fn>(fn));
2,612✔
57
                auto evt = make_async_event<thread_queue::event>(job.promise.get_future());
2,612✔
58
                m_impl->queue.push(std::move(job));
2,612✔
59
                return evt;
5,224✔
60
        }
2,612✔
61

62
  private:
63
        friend struct thread_queue_testspy;
64

65
        /// The object passed through std::future from queue thread to owner thread
66
        struct completion {
67
                void* result = nullptr;
68
                std::optional<std::chrono::nanoseconds> execution_time;
69
        };
70

71
        struct job {
72
                std::function<void*()> fn;
73
                std::promise<completion> promise;
74

75
                job() = default; // empty (default-constructed) fn signals termination
520✔
76

77
                /// Constructor overload for `fn` returning `void`.
78
                template <typename Fn, std::enable_if_t<std::is_void_v<std::invoke_result_t<std::decay_t<Fn>>>, int> = 0>
79
                job(Fn&& fn) : fn([fn = std::forward<Fn>(fn)]() mutable { return std::invoke(fn), nullptr; }) {}
5,217✔
80

81
                /// Constructor overload for `fn` returning `void*`.
82
                template <typename Fn, std::enable_if_t<std::is_convertible_v<std::invoke_result_t<std::decay_t<Fn>>, void*>, int> = 0>
83
                job(Fn&& fn) : fn([fn = std::forward<Fn>(fn)]() mutable { return std::invoke(fn); }) {}
6✔
84
        };
85

86
        class event : public async_event_impl {
87
          public:
88
                explicit event(std::future<completion> future) : m_state(std::move(future)) {}
2,612✔
89

90
                bool is_complete() override { return get_completed() != nullptr; }
2,462,425✔
91

92
                void* get_result() override {
286✔
93
                        const auto completed = get_completed();
286✔
94
                        assert(completed);
286✔
95
                        return completed->result;
286✔
96
                }
97

98
                std::optional<std::chrono::nanoseconds> get_native_execution_time() override {
1,213✔
99
                        const auto completed = get_completed();
1,213✔
100
                        assert(completed);
1,213✔
101
                        return completed->execution_time;
1,213✔
102
                }
103

104
          private:
105
                // As the result from std::future can only be retrieved once and std::shared_future is not functionally necessary here, we replace the future by its
106
                // completion as soon as the first query succeeds.
107
                std::variant<std::future<completion>, completion> m_state;
108

109
                completion* get_completed() {
2,463,924✔
110
                        if(const auto completed = std::get_if<completion>(&m_state)) return completed;
2,463,924✔
111
                        if(auto& future = std::get<std::future<completion>>(m_state); future.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {
2,462,423✔
112
                                return &m_state.emplace<completion>(future.get());
1,645✔
113
                        }
114
                        return nullptr;
2,460,779✔
115
                }
116
        };
117

118
        // pimpl'd to keep thread_queue movable
119
        struct impl {
120
                double_buffered_queue<job> queue;
121
                const bool enable_profiling;
122
                std::thread thread;
123

124
                explicit impl(std::string name, const bool enable_profiling) : enable_profiling(enable_profiling), thread(&impl::thread_main, this, std::move(name)) {}
520✔
125

126
                void execute(job& job) const {
2,612✔
127
                        std::chrono::steady_clock::time_point start;
2,612✔
128
                        if(enable_profiling) { start = std::chrono::steady_clock::now(); }
2,612✔
129

130
                        completion completion;
2,612✔
131
                        completion.result = job.fn();
2,612✔
132

133
                        if(enable_profiling) {
2,612✔
134
                                const auto end = std::chrono::steady_clock::now();
38✔
135
                                completion.execution_time = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start);
38✔
136
                        }
137
                        job.promise.set_value(completion);
2,612✔
138
                }
2,611✔
139

140
                void loop() {
520✔
141
                        for(;;) {
142
                                queue.wait_while_empty();
3,099✔
143
                                for(auto& job : queue.pop_all()) {
5,711✔
144
                                        if(!job.fn) return;
3,651✔
145
                                        execute(job);
2,611✔
146
                                }
147
                        }
2,579✔
148
                }
149

150
                void thread_main(const std::string& name) {
520✔
151
                        set_thread_name(get_current_thread_handle(), name);
520✔
152
                        CELERITY_DETAIL_TRACY_SET_THREAD_NAME_AND_ORDER(tracy_detail::leak_name(name), tracy_detail::thread_order::thread_queue);
153

154
                        try {
155
                                loop();
519✔
156
                        } catch(std::exception& e) { //
×
157
                                utils::panic("exception in {}: {}", name, e.what());
×
158
                        } catch(...) { //
×
159
                                utils::panic("exception in {}", name);
×
160
                        }
×
161
                }
520✔
162
        };
163

164
        std::unique_ptr<impl> m_impl;
165
};
166

167
} // namespace celerity::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc