• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #863

11 Jan 2023 05:02PM UTC coverage: 86.533% (-0.05%) from 86.582%
#863

push

StellarBot
Merge #6126

6126: Deprecate hpx::parallel::task_block in favor of hpx::experimental::ta… r=hkaiser a=dimitraka



Co-authored-by: kadimitra <kadimitra@ece.auth.gr>

174614 of 201789 relevant lines covered (86.53%)

1954694.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.91
/libs/core/execution/include/hpx/execution/algorithms/run_loop.hpp
1
//  Copyright (c) 2022 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#pragma once
8

9
#include <hpx/config.hpp>
10
#include <hpx/assert.hpp>
11
#include <hpx/concepts/concepts.hpp>
12
#include <hpx/errors/try_catch_exception_ptr.hpp>
13
#include <hpx/execution/queries/get_scheduler.hpp>
14
#include <hpx/execution/queries/get_stop_token.hpp>
15
#include <hpx/execution_base/completion_signatures.hpp>
16
#include <hpx/execution_base/execution.hpp>
17
#include <hpx/execution_base/get_env.hpp>
18
#include <hpx/modules/lock_registration.hpp>
19
#include <hpx/synchronization/condition_variable.hpp>
20
#include <hpx/synchronization/spinlock.hpp>
21
#include <hpx/type_support/meta.hpp>
22
#include <hpx/type_support/unused.hpp>
23

24
#include <exception>
25
#include <mutex>
26

27
namespace hpx::execution::experimental {
28

29
    // A run_loop is an execution context on which work can be scheduled. It
30
    // maintains a simple, thread-safe first-in-first-out queue of work. Its
31
    // run() member function removes elements from the queue and executes them
32
    // in a loop on whatever thread of execution calls run().
33
    //
34
    // A run_loop instance has an associated count that corresponds to the
35
    // number of work items that are in its queue. Additionally, a run_loop has
36
    // an associated state that can be one of starting, running, or finishing.
37
    //
38
    // Concurrent invocations of the member functions of run_loop, other than
39
    // run and its destructor, do not introduce data races. The member functions
40
    // pop_front, push_back, and finish execute atomically.
41
    //
42
    // [Note: Implementations are encouraged to use an intrusive queue of
43
    // operation states to hold the work units to make scheduling
44
    // allocation-free. -- end note]
45
    //
46
    class run_loop
47
    {
48
        struct run_loop_opstate_base
49
        {
50
            explicit run_loop_opstate_base(run_loop_opstate_base* tail) noexcept
3,743✔
51
              : next(this)
3,743✔
52
              , tail(tail)
3,743✔
53
            {
54
            }
3,743✔
55

56
            run_loop_opstate_base(run_loop_opstate_base* tail,
98✔
57
                void (*execute)(run_loop_opstate_base*) noexcept) noexcept
58
              : next(tail)
98✔
59
              , execute_(execute)
98✔
60
            {
61
            }
98✔
62

63
            run_loop_opstate_base(run_loop_opstate_base&&) = delete;
64
            run_loop_opstate_base& operator=(run_loop_opstate_base&&) = delete;
65

66
            run_loop_opstate_base* next;
67
            union
68
            {
69
                void (*execute_)(run_loop_opstate_base*) noexcept;
70
                run_loop_opstate_base* tail;
71
            };
72

73
            void execute() noexcept
98✔
74
            {
75
                (*execute_)(this);
98✔
76
            }
98✔
77
        };
78

79
        template <typename Receiver>
80
        struct run_loop_opstate : run_loop_opstate_base
13✔
81
        {
82
            run_loop& loop;
83
            HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver;
84

85
            template <typename Receiver_>
86
            run_loop_opstate(run_loop_opstate_base* tail, run_loop& loop,
87
                Receiver_&& receiver) noexcept(noexcept(std::
88
                    is_nothrow_constructible_v<std::decay_t<Receiver>,
89
                        Receiver_>))
90
              : run_loop_opstate_base(tail)
91
              , loop(loop)
92
              , receiver(HPX_FORWARD(Receiver_, receiver))
93
            {
94
            }
95

96
            static void execute(run_loop_opstate_base* p) noexcept
98✔
97
            {
98
                auto& receiver = static_cast<run_loop_opstate*>(p)->receiver;
98✔
99
                hpx::detail::try_catch_exception_ptr(
98✔
100
                    [&]() {
196✔
101
                        if (get_stop_token(get_env(receiver)).stop_requested())
98✔
102
                        {
103
                            hpx::execution::experimental::set_stopped(
×
104
                                HPX_MOVE(receiver));
×
105
                        }
×
106
                        else
107
                        {
108
                            hpx::execution::experimental::set_value(
98✔
109
                                HPX_MOVE(receiver));
98✔
110
                        }
111
                    },
98✔
112
                    [&](std::exception_ptr ep) {
98✔
113
                        hpx::execution::experimental::set_error(
×
114
                            HPX_MOVE(receiver), HPX_MOVE(ep));
×
115
                    });
×
116
            }
98✔
117

118
            explicit run_loop_opstate(run_loop_opstate_base* tail) noexcept
119
              : run_loop_opstate_base(tail)
120
            {
121
            }
122

123
            run_loop_opstate(
98✔
124
                run_loop_opstate_base* next, run_loop& loop, Receiver r)
125
              : run_loop_opstate_base(next, &execute)
98✔
126
              , loop(loop)
98✔
127
              , receiver(HPX_MOVE(r))
98✔
128
            {
98✔
129
            }
98✔
130

131
            friend void tag_invoke(hpx::execution::experimental::start_t,
98✔
132
                run_loop_opstate& os) noexcept
133
            {
134
                os.start();
98✔
135
            }
98✔
136

137
            void start() & noexcept;
138
        };
139

140
    public:
141
        class run_loop_scheduler
142
        {
143
        public:
144
            struct run_loop_sender
145
            {
146
                explicit run_loop_sender(run_loop& loop) noexcept
102✔
147
                  : loop(loop)
102✔
148
                {
149
                }
102✔
150

151
            private:
152
                friend run_loop_scheduler;
153

154
                template <typename Receiver>
155
                using operation_state = run_loop_opstate<Receiver>;
156

157
                template <typename Receiver>
158
                friend operation_state<Receiver> tag_invoke(
98✔
159
                    hpx::execution::experimental::connect_t,
160
                    run_loop_sender const& s,
161
                    Receiver&& receiver) noexcept(noexcept(std::
162
                        is_nothrow_constructible_v<operation_state<Receiver>,
163
                            run_loop_opstate_base*, run_loop&>))
164
                {
165
                    return operation_state<Receiver>(
98✔
166
                        &s.loop.head, s.loop, HPX_FORWARD(Receiver, receiver));
98✔
167
                }
×
168

169
                // clang-format off
170
                template <typename CPO,
171
                    HPX_CONCEPT_REQUIRES_(
172
                        meta::value<meta::one_of<
173
                            std::decay_t<CPO>, set_value_t, set_stopped_t>>
174
                    )>
175
                // clang-format on
176
                friend run_loop_scheduler tag_invoke(
31✔
177
                    hpx::execution::experimental::get_completion_scheduler_t<
178
                        CPO>,
179
                    run_loop_sender const& s) noexcept
180
                {
181
                    return run_loop_scheduler{s.loop};
31✔
182
                }
183

184
                using completion_signatures =
185
                    hpx::execution::experimental::completion_signatures<
186
                        hpx::execution::experimental::set_value_t(),
187
                        hpx::execution::experimental::set_error_t(
188
                            std::exception_ptr),
189
                        hpx::execution::experimental::set_stopped_t()>;
190

191
                template <typename Env>
192
                friend auto tag_invoke(
193
                    hpx::execution::experimental::get_completion_signatures_t,
194
                    run_loop_sender const&, Env) noexcept
195
                    -> completion_signatures;
196

197
                run_loop& loop;
198
            };
199

200
            friend run_loop;
201

202
        public:
203
            explicit run_loop_scheduler(run_loop& loop) noexcept
130✔
204
              : loop(loop)
130✔
205
            {
206
            }
130✔
207

208
            run_loop& get_run_loop() const noexcept
121✔
209
            {
210
                return loop;
121✔
211
            }
212

213
        private:
214
            friend run_loop_sender tag_invoke(
102✔
215
                hpx::execution::experimental::schedule_t,
216
                run_loop_scheduler const& sched) noexcept
217
            {
218
                return run_loop_sender(sched.loop);
102✔
219
            }
220

221
            friend constexpr hpx::execution::experimental::
222
                forward_progress_guarantee
223
                tag_invoke(hpx::execution::experimental::
224
                               get_forward_progress_guarantee_t,
225
                    run_loop_scheduler const&) noexcept
226
            {
227
                return hpx::execution::experimental::
228
                    forward_progress_guarantee::parallel;
229
            }
230

231
            friend constexpr bool operator==(run_loop_scheduler const& lhs,
3✔
232
                run_loop_scheduler const& rhs) noexcept
233
            {
234
                return &lhs.loop == &rhs.loop;
3✔
235
            }
236
            friend constexpr bool operator!=(run_loop_scheduler const& lhs,
237
                run_loop_scheduler const& rhs) noexcept
238
            {
239
                return !(lhs == rhs);
240
            }
241

242
        private:
243
            run_loop& loop;
244
        };
245

246
    private:
247
        friend struct run_loop_scheduler::run_loop_sender;
248

249
        hpx::spinlock mtx;
250
        hpx::condition_variable cond_var;
251

252
        // MSVC and gcc don't properly handle the friend declaration above
253
#if defined(HPX_MSVC) || defined(HPX_GCC_VERSION)
254
    public:
255
#endif
256
        run_loop_opstate_base head;
257

258
    private:
259
        bool stop = false;
3,743✔
260

261
        void push_back(run_loop_opstate_base* t)
98✔
262
        {
263
            std::unique_lock l(mtx);
98✔
264
            stop = false;
98✔
265
            t->next = &head;
98✔
266
            head.tail = head.tail->next = t;
98✔
267
            cond_var.notify_one();
98✔
268
        }
98✔
269

270
        run_loop_opstate_base* pop_front()
3,878✔
271
        {
272
            std::unique_lock l(mtx);
3,878✔
273
            cond_var.wait(l, [this] { return head.next != &head || stop; });
11,242✔
274
            if (head.tail == head.next)
3,878✔
275
            {
276
                head.tail = &head;
3,868✔
277
            }
3,868✔
278

279
            // std::exchange(head.next, head.next->next);
280
            auto old_val = HPX_MOVE(head.next);
3,878✔
281
            head.next = HPX_MOVE(head.next->next);
3,878✔
282
            return old_val;
3,878✔
283
        }
3,878✔
284

285
    public:
286
        // [exec.run_loop.ctor] construct/copy/destroy
287
        run_loop() noexcept
3,743✔
288
          : head(&head)
3,743✔
289
        {
290
        }
3,743✔
291

292
        run_loop(run_loop&&) = delete;
293
        run_loop& operator=(run_loop&&) = delete;
294

295
        // If count is not 0 or if state is running, invokes terminate().
296
        // Otherwise, has no effects.
297
        ~run_loop()
3,743✔
298
        {
299
            if (head.next != &head || !stop)
3,743✔
300
            {
301
                std::terminate();
×
302
            }
303
        }
3,743✔
304

305
        // [exec.run_loop.members] Member functions:
306
        run_loop_scheduler get_scheduler()
99✔
307
        {
308
            return run_loop_scheduler(*this);
99✔
309
        }
310

311
        void run()
3,780✔
312
        {
313
            // Precondition: state is starting.
314
            //HPX_ASSERT(head.next != &head || !stop);
315
            for (run_loop_opstate_base* t; (t = pop_front()) != &head; /**/)
3,878✔
316
            {
317
                t->execute();
98✔
318
            }
319
            HPX_ASSERT(stop);    // Postcondition: state is finishing.
3,780✔
320
        }
3,780✔
321

322
        void finish()
3,784✔
323
        {
324
            std::unique_lock l(mtx);
3,784✔
325
            hpx::util::ignore_while_checking<decltype(l)> il(&l);
3,784✔
326
            HPX_UNUSED(il);
3,784✔
327
            stop = true;
3,784✔
328
            cond_var.notify_all();
3,784✔
329
        }
3,784✔
330
    };
331

332
    using run_loop_scheduler = run_loop::run_loop_scheduler;
333

334
    ///////////////////////////////////////////////////////////////////////////
335
    template <typename Receiver>
336
    inline void run_loop::run_loop_opstate<Receiver>::start() & noexcept
98✔
337
    try
338
    {
339
        loop.push_back(this);
98✔
340
    }
98✔
341
    catch (...)
342
    {
343
        set_error(HPX_MOVE(receiver), std::current_exception());
×
344
    }
98✔
345
}    // namespace hpx::execution::experimental
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc