• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #852

17 Dec 2022 04:43PM UTC coverage: 85.912% (-0.7%) from 86.568%
#852

push

StellarBot
Merge #6106

6106: Modernizing modules of levels 0 to 5 r=hkaiser a=hkaiser

- flyby: HPX_FORWARD/HPX_MOVE now expand to std::forward and std::move if those are implemented as builtin functions

working towards #5497

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

87 of 87 new or added lines in 24 files covered. (100.0%)

173152 of 201546 relevant lines covered (85.91%)

1910264.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.32
/libs/core/execution/include/hpx/execution/algorithms/run_loop.hpp
1
//  Copyright (c) 2022 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#pragma once
8

9
#include <hpx/config.hpp>
10
#include <hpx/assert.hpp>
11
#include <hpx/concepts/concepts.hpp>
12
#include <hpx/errors/try_catch_exception_ptr.hpp>
13
#include <hpx/execution/queries/get_scheduler.hpp>
14
#include <hpx/execution/queries/get_stop_token.hpp>
15
#include <hpx/execution_base/completion_signatures.hpp>
16
#include <hpx/execution_base/execution.hpp>
17
#include <hpx/execution_base/get_env.hpp>
18
#include <hpx/modules/lock_registration.hpp>
19
#include <hpx/synchronization/condition_variable.hpp>
20
#include <hpx/synchronization/spinlock.hpp>
21
#include <hpx/type_support/meta.hpp>
22
#include <hpx/type_support/unused.hpp>
23

24
#include <exception>
25
#include <mutex>
26

27
namespace hpx::execution::experimental {
28

29
    // A run_loop is an execution context on which work can be scheduled. It
30
    // maintains a simple, thread-safe first-in-first-out queue of work. Its
31
    // run() member function removes elements from the queue and executes them
32
    // in a loop on whatever thread of execution calls run().
33
    //
34
    // A run_loop instance has an associated count that corresponds to the
35
    // number of work items that are in its queue. Additionally, a run_loop has
36
    // an associated state that can be one of starting, running, or finishing.
37
    //
38
    // Concurrent invocations of the member functions of run_loop, other than
39
    // run and its destructor, do not introduce data races. The member functions
40
    // pop_front, push_back, and finish execute atomically.
41
    //
42
    // [Note: Implementations are encouraged to use an intrusive queue of
43
    // operation states to hold the work units to make scheduling
44
    // allocation-free. -- end note]
45
    //
46
    class run_loop
47
    {
48
        struct run_loop_opstate_base
49
        {
50
            explicit run_loop_opstate_base(run_loop_opstate_base* tail) noexcept
3,672✔
51
              : next(this)
3,672✔
52
              , tail(tail)
3,672✔
53
            {
54
            }
3,672✔
55

56
            run_loop_opstate_base(run_loop_opstate_base* tail,
×
57
                void (*execute)(run_loop_opstate_base*) noexcept) noexcept
58
              : next(tail)
×
59
              , execute_(execute)
×
60
            {
61
            }
×
62

63
            run_loop_opstate_base(run_loop_opstate_base&&) = delete;
64
            run_loop_opstate_base& operator=(run_loop_opstate_base&&) = delete;
65

66
            run_loop_opstate_base* next;
67
            union
68
            {
69
                void (*execute_)(run_loop_opstate_base*) noexcept;
70
                run_loop_opstate_base* tail;
71
            };
72

73
            void execute() noexcept
×
74
            {
75
                (*execute_)(this);
×
76
            }
×
77
        };
78

79
        template <typename Receiver>
80
        struct run_loop_opstate : run_loop_opstate_base
×
81
        {
82
            run_loop& loop;
83
            HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver;
84

85
            template <typename Receiver_>
86
            run_loop_opstate(run_loop_opstate_base* tail, run_loop& loop,
87
                Receiver_&& receiver) noexcept(noexcept(std::
88
                    is_nothrow_constructible_v<std::decay_t<Receiver>,
89
                        Receiver_>))
90
              : run_loop_opstate_base(tail)
91
              , loop(loop)
92
              , receiver(HPX_FORWARD(Receiver_, receiver))
93
            {
94
            }
95

96
            static void execute(run_loop_opstate_base* p) noexcept
×
97
            {
98
                auto& receiver = static_cast<run_loop_opstate*>(p)->receiver;
×
99
                hpx::detail::try_catch_exception_ptr(
×
100
                    [&]() {
×
101
                        if (get_stop_token(get_env(receiver)).stop_requested())
×
102
                        {
103
                            hpx::execution::experimental::set_stopped(
×
104
                                HPX_MOVE(receiver));
×
105
                        }
×
106
                        else
107
                        {
108
                            hpx::execution::experimental::set_value(
×
109
                                HPX_MOVE(receiver));
×
110
                        }
111
                    },
×
112
                    [&](std::exception_ptr ep) {
×
113
                        hpx::execution::experimental::set_error(
×
114
                            HPX_MOVE(receiver), HPX_MOVE(ep));
×
115
                    });
×
116
            }
×
117

118
            explicit run_loop_opstate(run_loop_opstate_base* tail) noexcept
119
              : run_loop_opstate_base(tail)
120
            {
121
            }
122

123
            run_loop_opstate(
×
124
                run_loop_opstate_base* next, run_loop& loop, Receiver r)
125
              : run_loop_opstate_base(next, &execute)
×
126
              , loop(loop)
×
127
              , receiver(HPX_MOVE(r))
×
128
            {
×
129
            }
×
130

131
            friend void tag_invoke(hpx::execution::experimental::start_t,
×
132
                run_loop_opstate& os) noexcept
133
            {
134
                os.start();
×
135
            }
×
136

137
            void start() & noexcept;
138
        };
139

140
    public:
141
        class run_loop_scheduler
142
        {
143
        public:
144
            struct run_loop_sender
145
            {
146
                explicit run_loop_sender(run_loop& loop) noexcept
×
147
                  : loop(loop)
×
148
                {
149
                }
×
150

151
            private:
152
                friend run_loop_scheduler;
153

154
                template <typename Receiver>
155
                using operation_state = run_loop_opstate<Receiver>;
156

157
                template <typename Receiver>
158
                friend operation_state<Receiver> tag_invoke(
×
159
                    hpx::execution::experimental::connect_t,
160
                    run_loop_sender const& s,
161
                    Receiver&& receiver) noexcept(noexcept(std::
162
                        is_nothrow_constructible_v<operation_state<Receiver>,
163
                            run_loop_opstate_base*, run_loop&>))
164
                {
165
                    return operation_state<Receiver>(
×
166
                        &s.loop.head, s.loop, HPX_FORWARD(Receiver, receiver));
×
167
                }
×
168

169
                // clang-format off
170
                template <typename CPO,
171
                    HPX_CONCEPT_REQUIRES_(
172
                        meta::value<meta::one_of<
173
                            std::decay_t<CPO>, set_value_t, set_stopped_t>>
174
                    )>
175
                // clang-format on
176
                friend run_loop_scheduler tag_invoke(
×
177
                    hpx::execution::experimental::get_completion_scheduler_t<
178
                        CPO>,
179
                    run_loop_sender const& s) noexcept
180
                {
181
                    return run_loop_scheduler{s.loop};
×
182
                }
183

184
                using completion_signatures =
185
                    hpx::execution::experimental::completion_signatures<
186
                        hpx::execution::experimental::set_value_t(),
187
                        hpx::execution::experimental::set_error_t(
188
                            std::exception_ptr),
189
                        hpx::execution::experimental::set_stopped_t()>;
190

191
                template <typename Env>
192
                friend auto tag_invoke(
193
                    hpx::execution::experimental::get_completion_signatures_t,
194
                    run_loop_sender const&, Env) noexcept
195
                    -> completion_signatures;
196

197
                run_loop& loop;
198
            };
199

200
            friend run_loop;
201

202
        public:
203
            explicit run_loop_scheduler(run_loop& loop) noexcept
×
204
              : loop(loop)
×
205
            {
206
            }
×
207

208
            run_loop& get_run_loop() const
×
209
            {
210
                return loop;
×
211
            }
212

213
        private:
214
            friend run_loop_sender tag_invoke(
×
215
                hpx::execution::experimental::schedule_t,
216
                run_loop_scheduler const& sched) noexcept
217
            {
218
                return run_loop_sender(sched.loop);
×
219
            }
220

221
            friend constexpr hpx::execution::experimental::
222
                forward_progress_guarantee
223
                tag_invoke(hpx::execution::experimental::
224
                               get_forward_progress_guarantee_t,
225
                    run_loop_scheduler const&) noexcept
226
            {
227
                return hpx::execution::experimental::
228
                    forward_progress_guarantee::parallel;
229
            }
230

231
            friend constexpr bool operator==(run_loop_scheduler const& lhs,
×
232
                run_loop_scheduler const& rhs) noexcept
233
            {
234
                return &lhs.loop == &rhs.loop;
×
235
            }
236
            friend constexpr bool operator!=(run_loop_scheduler const& lhs,
237
                run_loop_scheduler const& rhs) noexcept
238
            {
239
                return !(lhs == rhs);
240
            }
241

242
        private:
243
            run_loop& loop;
244
        };
245

246
    private:
247
        friend struct run_loop_scheduler::run_loop_sender;
248

249
        hpx::spinlock mtx;
250
        hpx::condition_variable cond_var;
251

252
        // MSVC and gcc don't properly handle the friend declaration above
253
#if defined(HPX_MSVC) || defined(HPX_GCC_VERSION)
254
    public:
255
#endif
256
        run_loop_opstate_base head;
257

258
    private:
259
        bool stop = false;
3,672✔
260

261
        void push_back(run_loop_opstate_base* t)
×
262
        {
263
            std::unique_lock l(mtx);
×
264
            stop = false;
×
265
            t->next = &head;
×
266
            head.tail = head.tail->next = t;
×
267
            cond_var.notify_one();
×
268
        }
×
269

270
        run_loop_opstate_base* pop_front()
3,671✔
271
        {
272
            std::unique_lock l(mtx);
3,672✔
273
            cond_var.wait(l, [this] { return head.next != &head || stop; });
10,828✔
274
            if (head.tail == head.next)
3,672✔
275
            {
276
                head.tail = &head;
3,672✔
277
            }
3,672✔
278

279
            // std::exchange(head.next, head.next->next);
280
            auto old_val = HPX_MOVE(head.next);
3,672✔
281
            head.next = HPX_MOVE(head.next->next);
3,672✔
282
            return old_val;
3,672✔
283
        }
3,672✔
284

285
    public:
286
        // [exec.run_loop.ctor] construct/copy/destroy
287
        run_loop() noexcept
3,672✔
288
          : head(&head)
3,672✔
289
        {
290
        }
3,672✔
291

292
        run_loop(run_loop&&) = delete;
293

294
        // If count is not 0 or if state is running, invokes terminate().
295
        // Otherwise, has no effects.
296
        ~run_loop()
3,672✔
297
        {
298
            if (head.next != &head || !stop)
3,672✔
299
            {
300
                std::terminate();
×
301
            }
302
        }
3,672✔
303

304
        // [exec.run_loop.members] Member functions:
305
        run_loop_scheduler get_scheduler()
×
306
        {
307
            return run_loop_scheduler(*this);
×
308
        }
309

310
        void run()
3,671✔
311
        {
312
            // Precondition: state is starting.
313
            //HPX_ASSERT(head.next != &head || !stop);
314
            for (run_loop_opstate_base* t; (t = pop_front()) != &head; /**/)
3,671✔
315
            {
316
                t->execute();
×
317
            }
318
            HPX_ASSERT(stop);    // Postcondition: state is finishing.
3,672✔
319
        }
3,672✔
320

321
        void finish()
3,672✔
322
        {
323
            std::unique_lock l(mtx);
3,672✔
324
            hpx::util::ignore_while_checking<decltype(l)> il(&l);
3,672✔
325
            HPX_UNUSED(il);
3,672✔
326
            stop = true;
3,672✔
327
            cond_var.notify_all();
3,672✔
328
        }
3,672✔
329
    };
330

331
    using run_loop_scheduler = run_loop::run_loop_scheduler;
332

333
    ///////////////////////////////////////////////////////////////////////////
334
    template <typename Receiver>
335
    inline void run_loop::run_loop_opstate<Receiver>::start() & noexcept
×
336
    try
337
    {
338
        loop.push_back(this);
×
339
    }
×
340
    catch (...)
341
    {
342
        set_error(HPX_MOVE(receiver), std::current_exception());
×
343
    }
×
344
}    // namespace hpx::execution::experimental
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc