• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

celerity / celerity-runtime / 10325124557

09 Aug 2024 08:11PM UTC coverage: 95.087% (+0.02%) from 95.07%
10325124557

push

github

fknorr
Update benchmark results for Tracy integration

3014 of 3414 branches covered (88.28%)

Branch coverage included in aggregate %.

6624 of 6722 relevant lines covered (98.54%)

1474033.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.16
/include/thread_queue.h
1
#pragma once
2

3
#include "async_event.h"
4
#include "double_buffered_queue.h"
5
#include "named_threads.h"
6
#include "tracy.h"
7
#include "utils.h"
8

9
#include <chrono>
10
#include <future>
11
#include <thread>
12
#include <type_traits>
13
#include <variant>
14

15
namespace celerity::detail {
16

17
/// A single-thread job queue accepting functors and returning events that conditionally forward job results.
18
class thread_queue {
19
  public:
20
        /// Constructs a null thread queue that cannot receive jobs.
21
        thread_queue() = default;
22

23
        /// Spawns a new thread queue with the given thread name. If `enable_profiling` is set to `true`, completed events from this thread queue will report a
24
        /// non-nullopt duration.
25
        explicit thread_queue(std::string thread_name, const bool enable_profiling = false) : m_impl(new impl(std::move(thread_name), enable_profiling)) {}
446!
26

27
        // thread_queue is movable, but not copyable.
28
        thread_queue(const thread_queue&) = delete;
29
        thread_queue(thread_queue&&) = default;
30✔
30
        thread_queue& operator=(const thread_queue&) = delete;
31
        thread_queue& operator=(thread_queue&&) = default;
32

33
        /// Destruction will await all submitted and pending jobs.
34
        ~thread_queue() {
476✔
35
                if(m_impl != nullptr) {
476✔
36
                        m_impl->queue.push(job{} /* termination */);
446✔
37
                        m_impl->thread.join();
446✔
38
                }
39
        }
476✔
40

41
        /// Submit a job to the thread queue.
42
        /// `fn` must take no arguments and return either `void` or a type convertible to `void *`, which will be forwarded as the result into the event.
43
        template <typename Fn>
44
        async_event submit(Fn&& fn) {
3,708✔
45
                static_assert(std::is_void_v<std::invoke_result_t<std::decay_t<Fn>>> || std::is_convertible_v<std::invoke_result_t<std::decay_t<Fn>>, void*>,
46
                    "job function must return either void or a pointer convertible to void*");
47
                assert(m_impl != nullptr);
3,708✔
48
                job job(std::forward<Fn>(fn));
3,708✔
49
                auto evt = make_async_event<thread_queue::event>(job.promise.get_future());
3,708✔
50
                m_impl->queue.push(std::move(job));
3,708✔
51
                return evt;
7,416✔
52
        }
3,708✔
53

54
  private:
55
        friend struct thread_queue_testspy;
56

57
        /// The object passed through std::future from queue thread to owner thread
58
        struct completion {
59
                void* result = nullptr;
60
                std::optional<std::chrono::nanoseconds> execution_time;
61
        };
62

63
        struct job {
64
                std::function<void*()> fn;
65
                std::promise<completion> promise;
66

67
                job() = default; // empty (default-constructed) fn signals termination
446✔
68

69
                /// Constructor overload for `fn` returning `void`.
70
                template <typename Fn, std::enable_if_t<std::is_void_v<std::invoke_result_t<std::decay_t<Fn>>>, int> = 0>
71
                job(Fn&& fn) : fn([fn = std::forward<Fn>(fn)]() mutable { return std::invoke(fn), nullptr; }) {}
7,410✔
72

73
                /// Constructor overload for `fn` returning `void*`.
74
                template <typename Fn, std::enable_if_t<std::is_convertible_v<std::invoke_result_t<std::decay_t<Fn>>, void*>, int> = 0>
75
                job(Fn&& fn) : fn([fn = std::forward<Fn>(fn)]() mutable { return std::invoke(fn); }) {}
6✔
76
        };
77

78
        class event : public async_event_impl {
79
          public:
80
                explicit event(std::future<completion> future) : m_state(std::move(future)) {}
3,708✔
81

82
                bool is_complete() override { return get_completed() != nullptr; }
2,594,408✔
83

84
                void* get_result() override {
286✔
85
                        const auto completed = get_completed();
286✔
86
                        assert(completed);
286✔
87
                        return completed->result;
286✔
88
                }
89

90
                std::optional<std::chrono::nanoseconds> get_native_execution_time() override {
3,256✔
91
                        const auto completed = get_completed();
3,256✔
92
                        assert(completed);
3,256✔
93
                        return completed->execution_time;
3,256✔
94
                }
95

96
          private:
97
                // As the result from std::future can only be retrieved once and std::shared_future is not functionally necessary here, we replace the future by its
98
                // completion as soon as the first query succeeds.
99
                std::variant<std::future<completion>, completion> m_state;
100

101
                completion* get_completed() {
2,597,950✔
102
                        if(const auto completed = std::get_if<completion>(&m_state)) return completed;
2,597,950✔
103
                        if(auto& future = std::get<std::future<completion>>(m_state); future.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {
2,594,406✔
104
                                return &m_state.emplace<completion>(future.get());
3,708✔
105
                        }
106
                        return nullptr;
2,590,699✔
107
                }
108
        };
109

110
        // pimpl'd to keep thread_queue movable
111
        struct impl {
112
                double_buffered_queue<job> queue;
113
                const bool enable_profiling;
114
                std::thread thread;
115

116
                explicit impl(std::string name, const bool enable_profiling) : enable_profiling(enable_profiling), thread(&impl::thread_main, this, std::move(name)) {}
446✔
117

118
                void execute(job& job) const {
3,708✔
119
                        std::chrono::steady_clock::time_point start;
3,708✔
120
                        if(enable_profiling) { start = std::chrono::steady_clock::now(); }
3,708✔
121

122
                        completion completion;
3,708✔
123
                        completion.result = job.fn();
3,708✔
124

125
                        if(enable_profiling) {
3,708✔
126
                                const auto end = std::chrono::steady_clock::now();
4✔
127
                                completion.execution_time = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start);
4✔
128
                        }
129
                        job.promise.set_value(completion);
3,708✔
130
                }
3,708✔
131

132
                void loop() {
446✔
133
                        for(;;) {
134
                                queue.wait_while_empty();
3,971✔
135
                                for(auto& job : queue.pop_all()) {
7,679✔
136
                                        if(!job.fn) return;
4,600✔
137
                                        execute(job);
3,708✔
138
                                }
139
                        }
3,525✔
140
                }
141

142
                void thread_main(const std::string& name) {
446✔
143
                        set_thread_name(get_current_thread_handle(), name);
446✔
144
                        CELERITY_DETAIL_TRACY_SET_THREAD_NAME_AND_ORDER(tracy_detail::leak_name(name), tracy_detail::thread_order::thread_queue);
145

146
                        try {
147
                                loop();
446✔
148
                        } catch(std::exception& e) { //
×
149
                                utils::panic("exception in {}: {}", name, e.what());
×
150
                        } catch(...) { //
×
151
                                utils::panic("exception in {}", name);
×
152
                        }
×
153
                }
446✔
154
        };
155

156
        std::unique_ptr<impl> m_impl;
157
};
158

159
} // namespace celerity::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc