• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #866

14 Jan 2023 04:16PM UTC coverage: 85.951% (-0.5%) from 86.431%
#866

push

StellarBot
Merge #6134

6134: Adding notification function for parcelports to be called after early parcel handling r=hkaiser a=hkaiser

Parcelports now can override a new function `void initialized()` that will be called after early parcel handling is finished and before the thread pools are operational (i.e. before background work starts).

`@JiakunYan` please let me know if this is what you requested.

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

4 of 4 new or added lines in 2 files covered. (100.0%)

173540 of 201905 relevant lines covered (85.95%)

1871917.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.87
/libs/core/execution/include/hpx/execution/algorithms/start_detached.hpp
1
//  Copyright (c) 2021 ETH Zurich
2
//  Copyright (c) 2022 Hartmut Kaiser
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
#pragma once
9

10
#include <hpx/config.hpp>
11
#include <hpx/allocator_support/allocator_deleter.hpp>
12
#include <hpx/allocator_support/internal_allocator.hpp>
13
#include <hpx/allocator_support/traits/is_allocator.hpp>
14
#include <hpx/assert.hpp>
15
#include <hpx/concepts/concepts.hpp>
16
#include <hpx/execution/algorithms/detail/inject_scheduler.hpp>
17
#include <hpx/execution/algorithms/detail/partial_algorithm.hpp>
18
#include <hpx/execution/algorithms/run_loop.hpp>
19
#include <hpx/execution_base/completion_scheduler.hpp>
20
#include <hpx/execution_base/completion_signatures.hpp>
21
#include <hpx/execution_base/operation_state.hpp>
22
#include <hpx/execution_base/receiver.hpp>
23
#include <hpx/execution_base/sender.hpp>
24
#include <hpx/functional/detail/tag_priority_invoke.hpp>
25
#include <hpx/functional/invoke_result.hpp>
26
#include <hpx/modules/memory.hpp>
27
#include <hpx/thread_support/atomic_count.hpp>
28
#include <hpx/type_support/meta.hpp>
29
#include <hpx/type_support/unused.hpp>
30

31
#include <atomic>
32
#include <cstddef>
33
#include <exception>
34
#include <memory>
35
#include <type_traits>
36
#include <utility>
37

38
namespace hpx::execution::experimental {
39

40
    namespace detail {
41

42
        template <typename Derived, typename Sender, typename Allocator>
43
        struct operation_state_holder_base
3,203✔
44
        {
45
            struct start_detached_receiver
41,491✔
46
            {
47
                hpx::intrusive_ptr<Derived> op_state;
48

49
                template <typename Error>
50
                [[noreturn]] friend void tag_invoke(
×
51
                    set_error_t, start_detached_receiver&&, Error&&) noexcept
52
                {
53
                    HPX_ASSERT_MSG(false,
×
54
                        "set_error was called on the receiver of "
55
                        "start_detached, terminating. If you want to allow "
56
                        "errors from the predecessor sender, handle them first "
57
                        "with e.g. let_error.");
58
                    std::terminate();
×
59
                }
×
60

61
                friend void tag_invoke(
×
62
                    set_stopped_t, start_detached_receiver&& r) noexcept
63
                {
64
                    r.op_state->finish();
×
65
                    r.op_state.reset();
×
66
                };
×
67

68
                template <typename... Ts>
69
                friend void tag_invoke(
3,200✔
70
                    set_value_t, start_detached_receiver&& r, Ts&&...) noexcept
71
                {
72
                    r.op_state->finish();
3,200✔
73
                    r.op_state.reset();
3,200✔
74
                }
3,200✔
75
            };
76

77
        protected:
78
            using allocator_type = typename std::allocator_traits<
79
                Allocator>::template rebind_alloc<Derived>;
80

81
        private:
82
            HPX_NO_UNIQUE_ADDRESS allocator_type alloc;
83
            hpx::util::atomic_count count{0};
3,203✔
84

85
            using operation_state_type =
86
                connect_result_t<Sender, start_detached_receiver>;
87
            std::decay_t<operation_state_type> op_state;
88

89
        public:
90
            template <typename Sender_>
91
            explicit operation_state_holder_base(
3,203✔
92
                Sender_&& sender, allocator_type const& alloc)
93
              : alloc(alloc)
3,203✔
94
              , op_state(connect(HPX_FORWARD(Sender_, sender),
6,406✔
95
                    start_detached_receiver{static_cast<Derived*>(this)}))
3,203✔
96
            {
97
                hpx::execution::experimental::start(op_state);
3,203✔
98
            }
3,203✔
99

100
        private:
101
            friend void intrusive_ptr_add_ref(
3,203✔
102
                operation_state_holder_base* p) noexcept
103
            {
104
                ++p->count;
3,203✔
105
            }
3,203✔
106

107
            friend void intrusive_ptr_release(
3,203✔
108
                operation_state_holder_base* p) noexcept
109
            {
110
                if (--p->count == 0)
3,203✔
111
                {
112
                    allocator_type other_alloc(p->alloc);
3,203✔
113
                    std::allocator_traits<allocator_type>::destroy(
3,203✔
114
                        other_alloc, static_cast<Derived*>(p));
3,203✔
115
                    std::allocator_traits<allocator_type>::deallocate(
3,203✔
116
                        other_alloc, static_cast<Derived*>(p), 1);
3,203✔
117
                }
3,203✔
118
            }
3,203✔
119
        };
120

121
        template <typename Sender, typename Allocator>
122
        struct operation_state_holder
3,203✔
123
          : operation_state_holder_base<
124
                operation_state_holder<Sender, Allocator>, Sender, Allocator>
125
        {
126
            using base_type = operation_state_holder_base<
127
                operation_state_holder<Sender, Allocator>, Sender, Allocator>;
128
            using allocator_type = typename base_type::allocator_type;
129

130
            template <typename Sender_>
131
            explicit operation_state_holder(
3,203✔
132
                Sender_&& sender, allocator_type const& alloc)
133
              : base_type(HPX_FORWARD(Sender_, sender), alloc)
3,203✔
134
            {
3,203✔
135
            }
3,203✔
136

137
            static constexpr void finish() noexcept {}
3,200✔
138
        };
139

140
        template <typename Sender, typename Allocator>
141
        struct operation_state_holder_with_run_loop
×
142
          : operation_state_holder_base<
143
                operation_state_holder_with_run_loop<Sender, Allocator>, Sender,
144
                Allocator>
145
        {
146
        private:
147
            hpx::execution::experimental::run_loop& loop;
148

149
            using base_type = operation_state_holder_base<
150
                operation_state_holder_with_run_loop<Sender, Allocator>, Sender,
151
                Allocator>;
152

153
        public:
154
            template <typename Sender_, typename Allocator_>
155
            explicit operation_state_holder_with_run_loop(
×
156
                hpx::execution::experimental::run_loop_scheduler const& sched,
157
                Sender_&& sender, Allocator_ const& alloc)
158
              : base_type(HPX_FORWARD(Sender_, sender), alloc)
×
159
              , loop(sched.get_run_loop())
×
160
            {
×
161
                // keep ourselves alive
162
                hpx::intrusive_ptr<operation_state_holder_with_run_loop> this_(
×
163
                    this);
164
                loop.run();
×
165
            }
×
166

167
            void finish() noexcept
×
168
            {
169
                loop.finish();
×
170
            }
×
171
        };
172
    }    // namespace detail
173

174
    // execution::start_detached is used to eagerly start a sender without the
175
    // caller needing to manage the lifetimes of any objects.
176
    //
177
    // Like ensure_started, but does not return a value; if the provided sender
178
    // sends an error instead of a value, std::terminate is called.
179
    inline constexpr struct start_detached_t final
180
      : hpx::functional::detail::tag_priority<start_detached_t>
181
    {
182
    private:
183
        // clang-format off
184
        template <typename Sender,
185
            typename Allocator = hpx::util::internal_allocator<>,
186
            HPX_CONCEPT_REQUIRES_(
187
                is_sender_v<Sender> &&
188
                hpx::traits::is_allocator_v<Allocator> &&
189
                experimental::detail::is_completion_scheduler_tag_invocable_v<
190
                    hpx::execution::experimental::set_value_t,
191
                    start_detached_t, Sender, Allocator
192
                >
193
            )>
194
        // clang-format on
195
        friend constexpr HPX_FORCEINLINE auto tag_override_invoke(
196
            start_detached_t, Sender&& sender,
197
            Allocator const& allocator = Allocator{})
198
        {
199
            auto scheduler = get_completion_scheduler<
200
                hpx::execution::experimental::set_value_t>(sender);
201

202
            return hpx::functional::tag_invoke(start_detached_t{},
203
                HPX_MOVE(scheduler), HPX_FORWARD(Sender, sender), allocator);
204
        }
205

206
        // clang-format off
207
        template <typename Sender,
208
            typename Allocator = hpx::util::internal_allocator<>,
209
            HPX_CONCEPT_REQUIRES_(
210
                hpx::execution::experimental::is_sender_v<Sender> &&
211
                hpx::traits::is_allocator_v<Allocator>
212
            )>
213
        // clang-format on
214
        friend constexpr HPX_FORCEINLINE auto tag_invoke(start_detached_t,
×
215
            hpx::execution::experimental::run_loop_scheduler const& sched,
216
            Sender&& sender, Allocator const& allocator = {})
217
        {
218
            using allocator_type = Allocator;
219
            using operation_state_type =
220
                detail::operation_state_holder_with_run_loop<Sender, Allocator>;
221
            using other_allocator = typename std::allocator_traits<
222
                allocator_type>::template rebind_alloc<operation_state_type>;
223
            using allocator_traits = std::allocator_traits<other_allocator>;
224
            using unique_ptr = std::unique_ptr<operation_state_type,
225
                util::allocator_deleter<other_allocator>>;
226

227
            other_allocator alloc(allocator);
×
228
            unique_ptr p(allocator_traits::allocate(alloc, 1),
×
229
                hpx::util::allocator_deleter<other_allocator>{alloc});
×
230

231
            allocator_traits::construct(
×
232
                alloc, p.get(), sched, HPX_FORWARD(Sender, sender), alloc);
×
233
            HPX_UNUSED(p.release());
×
234
        }
×
235

236
        // clang-format off
237
        template <typename Sender,
238
            typename Allocator = hpx::util::internal_allocator<>,
239
            HPX_CONCEPT_REQUIRES_(
240
                is_sender_v<Sender> &&
241
                hpx::traits::is_allocator_v<Allocator>
242
            )>
243
        // clang-format on
244
        friend constexpr HPX_FORCEINLINE void tag_fallback_invoke(
3,203✔
245
            start_detached_t, Sender&& sender,
246
            Allocator const& allocator = Allocator{})
247
        {
248
            using allocator_type = Allocator;
249
            using operation_state_type =
250
                detail::operation_state_holder<Sender, Allocator>;
251
            using other_allocator = typename std::allocator_traits<
252
                allocator_type>::template rebind_alloc<operation_state_type>;
253
            using allocator_traits = std::allocator_traits<other_allocator>;
254
            using unique_ptr = std::unique_ptr<operation_state_type,
255
                util::allocator_deleter<other_allocator>>;
256

257
            other_allocator alloc(allocator);
3,203✔
258
            unique_ptr p(allocator_traits::allocate(alloc, 1),
3,203✔
259
                hpx::util::allocator_deleter<other_allocator>{alloc});
3,203✔
260

261
            allocator_traits::construct(
3,203✔
262
                alloc, p.get(), HPX_FORWARD(Sender, sender), alloc);
3,203✔
263
            HPX_UNUSED(p.release());
3,203✔
264
        }
3,203✔
265

266
        // clang-format off
267
        template <typename Scheduler, typename Allocator,
268
            HPX_CONCEPT_REQUIRES_(
269
                hpx::execution::experimental::is_scheduler_v<Scheduler> &&
270
                hpx::traits::is_allocator_v<Allocator>
271
            )>
272
        // clang-format on
273
        friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
274
            start_detached_t, Scheduler&& scheduler,
275
            Allocator const& allocator = {})
276
        {
277
            return hpx::execution::experimental::detail::inject_scheduler<
278
                start_detached_t, Scheduler, Allocator>{
279
                HPX_FORWARD(Scheduler, scheduler), allocator};
280
        }
281

282
        // clang-format off
283
        template <typename Allocator = hpx::util::internal_allocator<>,
284
            HPX_CONCEPT_REQUIRES_(
285
                hpx::traits::is_allocator_v<Allocator>
286
            )>
287
        // clang-format on
288
        friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
3✔
289
            start_detached_t, Allocator const& allocator = Allocator{})
290
        {
291
            return detail::partial_algorithm<start_detached_t, Allocator>{
3✔
292
                allocator};
3✔
293
        }
294
    } start_detached{};
295
}    // namespace hpx::execution::experimental
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc