• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

STEllAR-GROUP / hpx / #866

14 Jan 2023 04:16PM UTC coverage: 85.951% (-0.5%) from 86.431%
#866

push

StellarBot
Merge #6134

6134: Adding notification function for parcelports to be called after early parcel handling r=hkaiser a=hkaiser

Parcelports now can override a new function `void initialized()` that will be called after early parcel handling is finished and before the thread pools are operational (i.e. before background work starts).

`@JiakunYan` please let me know if this is what you requested.

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

4 of 4 new or added lines in 2 files covered. (100.0%)

173540 of 201905 relevant lines covered (85.95%)

1871917.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.32
/libs/core/execution/include/hpx/execution/algorithms/run_loop.hpp
1
//  Copyright (c) 2022 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#pragma once
8

9
#include <hpx/config.hpp>
10
#include <hpx/assert.hpp>
11
#include <hpx/concepts/concepts.hpp>
12
#include <hpx/errors/try_catch_exception_ptr.hpp>
13
#include <hpx/execution/queries/get_scheduler.hpp>
14
#include <hpx/execution/queries/get_stop_token.hpp>
15
#include <hpx/execution_base/completion_signatures.hpp>
16
#include <hpx/execution_base/execution.hpp>
17
#include <hpx/execution_base/get_env.hpp>
18
#include <hpx/modules/lock_registration.hpp>
19
#include <hpx/synchronization/condition_variable.hpp>
20
#include <hpx/synchronization/spinlock.hpp>
21
#include <hpx/type_support/meta.hpp>
22
#include <hpx/type_support/unused.hpp>
23

24
#include <exception>
25
#include <mutex>
26

27
namespace hpx::execution::experimental {
28

29
    // A run_loop is an execution context on which work can be scheduled. It
30
    // maintains a simple, thread-safe first-in-first-out queue of work. Its
31
    // run() member function removes elements from the queue and executes them
32
    // in a loop on whatever thread of execution calls run().
33
    //
34
    // A run_loop instance has an associated count that corresponds to the
35
    // number of work items that are in its queue. Additionally, a run_loop has
36
    // an associated state that can be one of starting, running, or finishing.
37
    //
38
    // Concurrent invocations of the member functions of run_loop, other than
39
    // run and its destructor, do not introduce data races. The member functions
40
    // pop_front, push_back, and finish execute atomically.
41
    //
42
    // [Note: Implementations are encouraged to use an intrusive queue of
43
    // operation states to hold the work units to make scheduling
44
    // allocation-free. -- end note]
45
    //
46
    class run_loop
47
    {
48
        struct run_loop_opstate_base
49
        {
50
            explicit run_loop_opstate_base(run_loop_opstate_base* tail) noexcept
3,569✔
51
              : next(this)
3,569✔
52
              , tail(tail)
3,569✔
53
            {
54
            }
3,569✔
55

56
            run_loop_opstate_base(run_loop_opstate_base* tail,
×
57
                void (*execute)(run_loop_opstate_base*) noexcept) noexcept
58
              : next(tail)
×
59
              , execute_(execute)
×
60
            {
61
            }
×
62

63
            run_loop_opstate_base(run_loop_opstate_base&&) = delete;
64
            run_loop_opstate_base& operator=(run_loop_opstate_base&&) = delete;
65

66
            run_loop_opstate_base* next;
67
            union
68
            {
69
                void (*execute_)(run_loop_opstate_base*) noexcept;
70
                run_loop_opstate_base* tail;
71
            };
72

73
            void execute() noexcept
×
74
            {
75
                (*execute_)(this);
×
76
            }
×
77
        };
78

79
        template <typename Receiver>
80
        struct run_loop_opstate : run_loop_opstate_base
×
81
        {
82
            run_loop& loop;
83
            HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver;
84

85
            template <typename Receiver_>
86
            run_loop_opstate(run_loop_opstate_base* tail, run_loop& loop,
87
                Receiver_&& receiver) noexcept(noexcept(std::
88
                    is_nothrow_constructible_v<std::decay_t<Receiver>,
89
                        Receiver_>))
90
              : run_loop_opstate_base(tail)
91
              , loop(loop)
92
              , receiver(HPX_FORWARD(Receiver_, receiver))
93
            {
94
            }
95

96
            static void execute(run_loop_opstate_base* p) noexcept
×
97
            {
98
                auto& receiver = static_cast<run_loop_opstate*>(p)->receiver;
×
99
                hpx::detail::try_catch_exception_ptr(
×
100
                    [&]() {
×
101
                        if (get_stop_token(get_env(receiver)).stop_requested())
×
102
                        {
103
                            hpx::execution::experimental::set_stopped(
×
104
                                HPX_MOVE(receiver));
×
105
                        }
×
106
                        else
107
                        {
108
                            hpx::execution::experimental::set_value(
×
109
                                HPX_MOVE(receiver));
×
110
                        }
111
                    },
×
112
                    [&](std::exception_ptr ep) {
×
113
                        hpx::execution::experimental::set_error(
×
114
                            HPX_MOVE(receiver), HPX_MOVE(ep));
×
115
                    });
×
116
            }
×
117

118
            explicit run_loop_opstate(run_loop_opstate_base* tail) noexcept
119
              : run_loop_opstate_base(tail)
120
            {
121
            }
122

123
            run_loop_opstate(
×
124
                run_loop_opstate_base* next, run_loop& loop, Receiver r)
125
              : run_loop_opstate_base(next, &execute)
×
126
              , loop(loop)
×
127
              , receiver(HPX_MOVE(r))
×
128
            {
×
129
            }
×
130

131
            friend void tag_invoke(hpx::execution::experimental::start_t,
×
132
                run_loop_opstate& os) noexcept
133
            {
134
                os.start();
×
135
            }
×
136

137
            void start() & noexcept;
138
        };
139

140
    public:
141
        class run_loop_scheduler
142
        {
143
        public:
144
            struct run_loop_sender
145
            {
146
                explicit run_loop_sender(run_loop& loop) noexcept
×
147
                  : loop(loop)
×
148
                {
149
                }
×
150

151
            private:
152
                friend run_loop_scheduler;
153

154
                template <typename Receiver>
155
                using operation_state = run_loop_opstate<Receiver>;
156

157
                template <typename Receiver>
158
                friend operation_state<Receiver> tag_invoke(
×
159
                    hpx::execution::experimental::connect_t,
160
                    run_loop_sender const& s,
161
                    Receiver&& receiver) noexcept(noexcept(std::
162
                        is_nothrow_constructible_v<operation_state<Receiver>,
163
                            run_loop_opstate_base*, run_loop&>))
164
                {
165
                    return operation_state<Receiver>(
×
166
                        &s.loop.head, s.loop, HPX_FORWARD(Receiver, receiver));
×
167
                }
×
168

169
                // clang-format off
170
                template <typename CPO,
171
                    HPX_CONCEPT_REQUIRES_(
172
                        meta::value<meta::one_of<
173
                            std::decay_t<CPO>, set_value_t, set_stopped_t>>
174
                    )>
175
                // clang-format on
176
                friend run_loop_scheduler tag_invoke(
×
177
                    hpx::execution::experimental::get_completion_scheduler_t<
178
                        CPO>,
179
                    run_loop_sender const& s) noexcept
180
                {
181
                    return run_loop_scheduler{s.loop};
×
182
                }
183

184
                using completion_signatures =
185
                    hpx::execution::experimental::completion_signatures<
186
                        hpx::execution::experimental::set_value_t(),
187
                        hpx::execution::experimental::set_error_t(
188
                            std::exception_ptr),
189
                        hpx::execution::experimental::set_stopped_t()>;
190

191
                template <typename Env>
192
                friend auto tag_invoke(
193
                    hpx::execution::experimental::get_completion_signatures_t,
194
                    run_loop_sender const&, Env) noexcept
195
                    -> completion_signatures;
196

197
                run_loop& loop;
198
            };
199

200
            friend run_loop;
201

202
        public:
203
            explicit run_loop_scheduler(run_loop& loop) noexcept
×
204
              : loop(loop)
×
205
            {
206
            }
×
207

208
            run_loop& get_run_loop() const noexcept
×
209
            {
210
                return loop;
×
211
            }
212

213
        private:
214
            friend run_loop_sender tag_invoke(
×
215
                hpx::execution::experimental::schedule_t,
216
                run_loop_scheduler const& sched) noexcept
217
            {
218
                return run_loop_sender(sched.loop);
×
219
            }
220

221
            friend constexpr hpx::execution::experimental::
222
                forward_progress_guarantee
223
                tag_invoke(hpx::execution::experimental::
224
                               get_forward_progress_guarantee_t,
225
                    run_loop_scheduler const&) noexcept
226
            {
227
                return hpx::execution::experimental::
228
                    forward_progress_guarantee::parallel;
229
            }
230

231
            friend constexpr bool operator==(run_loop_scheduler const& lhs,
×
232
                run_loop_scheduler const& rhs) noexcept
233
            {
234
                return &lhs.loop == &rhs.loop;
×
235
            }
236
            friend constexpr bool operator!=(run_loop_scheduler const& lhs,
237
                run_loop_scheduler const& rhs) noexcept
238
            {
239
                return !(lhs == rhs);
240
            }
241

242
        private:
243
            run_loop& loop;
244
        };
245

246
    private:
247
        friend struct run_loop_scheduler::run_loop_sender;
248

249
        hpx::spinlock mtx;
250
        hpx::condition_variable cond_var;
251

252
        // MSVC and gcc don't properly handle the friend declaration above
253
#if defined(HPX_MSVC) || defined(HPX_GCC_VERSION)
254
    public:
255
#endif
256
        run_loop_opstate_base head;
257

258
    private:
259
        bool stop = false;
3,569✔
260

261
        void push_back(run_loop_opstate_base* t)
×
262
        {
263
            std::unique_lock l(mtx);
×
264
            stop = false;
×
265
            t->next = &head;
×
266
            head.tail = head.tail->next = t;
×
267
            cond_var.notify_one();
×
268
        }
×
269

270
        run_loop_opstate_base* pop_front()
3,569✔
271
        {
272
            std::unique_lock l(mtx);
3,569✔
273
            cond_var.wait(l, [this] { return head.next != &head || stop; });
10,516✔
274
            if (head.tail == head.next)
3,569✔
275
            {
276
                head.tail = &head;
3,569✔
277
            }
3,569✔
278

279
            // std::exchange(head.next, head.next->next);
280
            auto old_val = HPX_MOVE(head.next);
3,569✔
281
            head.next = HPX_MOVE(head.next->next);
3,569✔
282
            return old_val;
3,569✔
283
        }
3,569✔
284

285
    public:
286
        // [exec.run_loop.ctor] construct/copy/destroy
287
        run_loop() noexcept
3,569✔
288
          : head(&head)
3,569✔
289
        {
290
        }
3,569✔
291

292
        run_loop(run_loop&&) = delete;
293
        run_loop& operator=(run_loop&&) = delete;
294

295
        // If count is not 0 or if state is running, invokes terminate().
296
        // Otherwise, has no effects.
297
        ~run_loop()
3,569✔
298
        {
299
            if (head.next != &head || !stop)
3,569✔
300
            {
301
                std::terminate();
×
302
            }
303
        }
3,569✔
304

305
        // [exec.run_loop.members] Member functions:
306
        run_loop_scheduler get_scheduler()
×
307
        {
308
            return run_loop_scheduler(*this);
×
309
        }
310

311
        void run()
3,569✔
312
        {
313
            // Precondition: state is starting.
314
            //HPX_ASSERT(head.next != &head || !stop);
315
            for (run_loop_opstate_base* t; (t = pop_front()) != &head; /**/)
3,569✔
316
            {
317
                t->execute();
×
318
            }
319
            HPX_ASSERT(stop);    // Postcondition: state is finishing.
3,569✔
320
        }
3,569✔
321

322
        void finish()
3,569✔
323
        {
324
            std::unique_lock l(mtx);
3,569✔
325
            hpx::util::ignore_while_checking<decltype(l)> il(&l);
3,569✔
326
            HPX_UNUSED(il);
3,569✔
327
            stop = true;
3,569✔
328
            cond_var.notify_all();
3,569✔
329
        }
3,569✔
330
    };
331

332
    using run_loop_scheduler = run_loop::run_loop_scheduler;
333

334
    ///////////////////////////////////////////////////////////////////////////
335
    template <typename Receiver>
336
    inline void run_loop::run_loop_opstate<Receiver>::start() & noexcept
×
337
    try
338
    {
339
        loop.push_back(this);
×
340
    }
×
341
    catch (...)
342
    {
343
        set_error(HPX_MOVE(receiver), std::current_exception());
×
344
    }
×
345
}    // namespace hpx::execution::experimental
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc