• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #852

17 Dec 2022 04:43PM UTC coverage: 85.912% (-0.7%) from 86.568%
#852

push

StellarBot
Merge #6106

6106: Modernizing modules of levels 0 to 5 r=hkaiser a=hkaiser

- flyby: HPX_FORWARD/HPX_MOVE now expand to std::forward and std::move if those are implemented as builtin functions

working towards #5497

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

87 of 87 new or added lines in 24 files covered. (100.0%)

173152 of 201546 relevant lines covered (85.91%)

1910264.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.79
/libs/core/execution/include/hpx/execution/algorithms/schedule_from.hpp
1
//  Copyright (c) 2020 ETH Zurich
2
//  Copyright (c) 2022 Hartmut Kaiser
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
#pragma once
9

10
#include <hpx/config.hpp>
11
#include <hpx/concepts/concepts.hpp>
12
#include <hpx/datastructures/optional.hpp>
13
#include <hpx/datastructures/tuple.hpp>
14
#include <hpx/datastructures/variant.hpp>
15
#include <hpx/execution_base/completion_scheduler.hpp>
16
#include <hpx/execution_base/completion_signatures.hpp>
17
#include <hpx/execution_base/get_env.hpp>
18
#include <hpx/execution_base/receiver.hpp>
19
#include <hpx/execution_base/sender.hpp>
20
#include <hpx/functional/bind_front.hpp>
21
#include <hpx/functional/detail/tag_fallback_invoke.hpp>
22
#include <hpx/functional/invoke_fused.hpp>
23
#include <hpx/type_support/detail/with_result_of.hpp>
24
#include <hpx/type_support/meta.hpp>
25
#include <hpx/type_support/pack.hpp>
26

27
#include <atomic>
28
#include <cstddef>
29
#include <exception>
30
#include <type_traits>
31
#include <utility>
32

33
namespace hpx::execution::experimental {
34

35
    namespace detail {
36

37
        template <typename Sender, typename Scheduler>
38
        struct schedule_from_sender
81,407✔
39
        {
40
            HPX_NO_UNIQUE_ADDRESS std::decay_t<Sender> predecessor_sender;
41
            HPX_NO_UNIQUE_ADDRESS std::decay_t<Scheduler> scheduler;
42

43
            template <typename Env>
44
            struct generate_completion_signatures
45
            {
46
                template <template <typename...> typename Tuple,
47
                    template <typename...> typename Variant>
48
                using value_types =
49
                    value_types_of_t<Sender, Env, Tuple, Variant>;
50

51
                template <template <typename...> typename Variant>
52
                using predecessor_sender_error_types =
53
                    error_types_of_t<Sender, Env, Variant>;
54

55
                using scheduler_sender_type = hpx::util::invoke_result_t<
56
                    hpx::execution::experimental::schedule_t, Scheduler>;
57

58
                template <template <typename...> typename Variant>
59
                using scheduler_sender_error_types =
60
                    error_types_of_t<scheduler_sender_type, Env, Variant>;
61

62
                template <template <typename...> typename Variant>
63
                using error_types = hpx::util::detail::unique_concat_t<
64
                    predecessor_sender_error_types<Variant>,
65
                    scheduler_sender_error_types<Variant>>;
66

67
                static constexpr bool sends_stopped = false;
68
            };
69

70
            template <typename Env>
71
            friend auto tag_invoke(
72
                get_completion_signatures_t, schedule_from_sender const&, Env)
73
                -> generate_completion_signatures<Env>;
74

75
            // clang-format off
76
            template <typename CPO,
77
                HPX_CONCEPT_REQUIRES_(
78
                    meta::value<meta::one_of<
79
                        std::decay_t<CPO>, set_value_t, set_stopped_t>>
80
                )>
81
            // clang-format on
82
            friend constexpr auto tag_invoke(
33✔
83
                hpx::execution::experimental::get_completion_scheduler_t<CPO>,
84
                schedule_from_sender const& sender)
85
            {
86
                return sender.scheduler;
33✔
87
            }
88

89
            // TODO: add forwarding_sender_query
90

91
            template <typename Receiver>
92
            struct operation_state
3,346✔
93
            {
94
                HPX_NO_UNIQUE_ADDRESS std::decay_t<Scheduler> scheduler;
95
                HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver;
96

97
                struct predecessor_sender_receiver;
98
                struct scheduler_sender_receiver;
99

100
                using value_type = hpx::util::detail::prepend_t<
101
                    value_types_of_t<Sender, empty_env, decayed_tuple,
102
                        hpx::variant>,
103
                    hpx::monostate>;
104
                value_type ts;
105

106
                using sender_operation_state_type =
107
                    connect_result_t<Sender, predecessor_sender_receiver>;
108
                sender_operation_state_type sender_os;
109

110
                using scheduler_operation_state_type = connect_result_t<
111
                    hpx::util::invoke_result_t<schedule_t, Scheduler>,
112
                    scheduler_sender_receiver>;
113
                hpx::optional<scheduler_operation_state_type>
114
                    scheduler_op_state;
115

116
                template <typename Sender_, typename Scheduler_,
117
                    typename Receiver_>
118
                operation_state(Sender_&& predecessor_sender,
3,393✔
119
                    Scheduler_&& scheduler, Receiver_&& receiver)
120
                  : scheduler(HPX_FORWARD(Scheduler, scheduler))
3,393✔
121
                  , receiver(HPX_FORWARD(Receiver_, receiver))
3,393✔
122
                  , sender_os(hpx::execution::experimental::connect(
6,786✔
123
                        HPX_FORWARD(Sender_, predecessor_sender),
3,393✔
124
                        predecessor_sender_receiver{*this}))
3,393✔
125
                {
126
                }
3,393✔
127

128
                operation_state(operation_state&&) = delete;
129
                operation_state(operation_state const&) = delete;
130
                operation_state& operator=(operation_state&&) = delete;
131
                operation_state& operator=(operation_state const&) = delete;
132

133
                struct predecessor_sender_receiver
134
                {
135
                    operation_state& op_state;
136

137
                    template <typename Error>
138
                    friend void tag_invoke(set_error_t,
2✔
139
                        predecessor_sender_receiver&& r, Error&& error) noexcept
140
                    {
141
                        r.op_state.set_error_predecessor_sender(
4✔
142
                            HPX_FORWARD(Error, error));
2✔
143
                    }
2✔
144

145
                    friend void tag_invoke(
×
146
                        set_stopped_t, predecessor_sender_receiver&& r) noexcept
147
                    {
148
                        r.op_state.set_stopped_predecessor_sender();
×
149
                    }
×
150

151
                    // This typedef is duplicated from the parent struct. The
152
                    // parent typedef is not instantiated early enough for use
153
                    // here.
154
                    using value_type = hpx::util::detail::prepend_t<
155
                        value_types_of_t<Sender, empty_env, decayed_tuple,
156
                            hpx::variant>,
157
                        hpx::monostate>;
158

159
                    template <typename... Ts>
160
                    friend auto tag_invoke(set_value_t,
3,391✔
161
                        predecessor_sender_receiver&& r, Ts&&... ts) noexcept
162
                        -> decltype(std::declval<value_type>()
163
                                        .template emplace<hpx::tuple<Ts...>>(
164
                                            HPX_FORWARD(Ts, ts)...),
165
                            void())
166
                    {
167
                        r.op_state.set_value_predecessor_sender(
6,769✔
168
                            HPX_FORWARD(Ts, ts)...);
3,378✔
169
                    }
3,391✔
170

171
                    // Pass through the get_env receiver query
172
                    friend auto tag_invoke(
17✔
173
                        get_env_t, predecessor_sender_receiver const& r)
174
                        -> env_of_t<std::decay_t<Receiver>>
175
                    {
176
                        return hpx::execution::experimental::get_env(
17✔
177
                            r.op_state.receiver);
17✔
178
                    }
179
                };
180

181
                template <typename Error>
182
                void set_error_predecessor_sender(Error&& error) noexcept
2✔
183
                {
184
                    hpx::execution::experimental::set_error(
2✔
185
                        HPX_MOVE(receiver), HPX_FORWARD(Error, error));
2✔
186
                }
2✔
187

188
                void set_stopped_predecessor_sender() noexcept
×
189
                {
190
                    hpx::execution::experimental::set_stopped(
×
191
                        HPX_MOVE(receiver));
×
192
                }
×
193

194
                template <typename... Us>
195
                void set_value_predecessor_sender(Us&&... us) noexcept
3,391✔
196
                {
197
                    ts.template emplace<hpx::tuple<Us...>>(
6,769✔
198
                        HPX_FORWARD(Us, us)...);
3,378✔
199
#if defined(HPX_HAVE_CXX17_COPY_ELISION) &&                                    \
200
    defined(HPX_HAVE_CXX17_OPTIONAL_COPY_ELISION)
201
                    // with_result_of is used to emplace the operation
202
                    // state returned from connect without any
203
                    // intermediate copy construction (the operation
204
                    // state is not required to be copyable nor movable).
205
                    scheduler_op_state.emplace(
6,782✔
206
                        hpx::util::detail::with_result_of([&]() {
6,782✔
207
                            return hpx::execution::experimental::connect(
3,391✔
208
                                hpx::execution::experimental::schedule(
3,391✔
209
                                    HPX_MOVE(scheduler)),
3,391✔
210
                                scheduler_sender_receiver{*this});
3,391✔
211
                        }));
212
#else
213
                    // MSVC doesn't get copy elision quite right, the operation
214
                    // state must be constructed explicitly directly in place
215
                    scheduler_op_state.emplace_f(
216
                        hpx::execution::experimental::connect,
217
                        hpx::execution::experimental::schedule(
218
                            HPX_MOVE(scheduler)),
219
                        scheduler_sender_receiver{*this});
220
#endif
221
                    hpx::execution::experimental::start(
3,391✔
222
                        scheduler_op_state.value());
3,391✔
223
                }
3,391✔
224

225
                struct scheduler_sender_receiver
226
                {
227
                    operation_state& op_state;
228

229
                    template <typename Error>
230
                    friend void tag_invoke(set_error_t,
×
231
                        scheduler_sender_receiver&& r, Error&& error) noexcept
232
                    {
233
                        r.op_state.set_error_scheduler_sender(
×
234
                            HPX_FORWARD(Error, error));
×
235
                    }
×
236

237
                    friend void tag_invoke(
×
238
                        set_stopped_t, scheduler_sender_receiver&& r) noexcept
239
                    {
240
                        r.op_state.set_stopped_scheduler_sender();
×
241
                    }
×
242

243
                    friend void tag_invoke(
3,391✔
244
                        set_value_t, scheduler_sender_receiver&& r) noexcept
245
                    {
246
                        r.op_state.set_value_scheduler_sender();
3,391✔
247
                    }
3,391✔
248

249
                    // Pass through the get_env receiver query
250
                    friend auto tag_invoke(
×
251
                        get_env_t, scheduler_sender_receiver const& r)
252
                        -> env_of_t<std::decay_t<Receiver>>
253
                    {
254
                        return get_env(r.op_state.receiver);
×
255
                    }
256
                };
257

258
                struct scheduler_sender_value_visitor
11✔
259
                {
260
                    HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver;
261

262
                    [[noreturn]] void operator()(hpx::monostate) const
×
263
                    {
264
                        HPX_UNREACHABLE;
×
265
                    }
×
266

267
                    template <typename Ts,
268
                        typename = std::enable_if_t<
269
                            !std::is_same_v<std::decay_t<Ts>, hpx::monostate>>>
270
                    void operator()(Ts&& ts)
3,391✔
271
                    {
272
                        hpx::invoke_fused(
3,391✔
273
                            hpx::bind_front(
3,391✔
274
                                hpx::execution::experimental::set_value,
275
                                HPX_MOVE(receiver)),
3,391✔
276
                            HPX_FORWARD(Ts, ts));
3,391✔
277
                    }
3,391✔
278
                };
279

280
                template <typename Error>
281
                void set_error_scheduler_sender(Error&& error) noexcept
×
282
                {
283
                    scheduler_op_state.reset();
×
284
                    hpx::execution::experimental::set_error(
×
285
                        HPX_MOVE(receiver), HPX_FORWARD(Error, error));
×
286
                }
×
287

288
                void set_stopped_scheduler_sender() noexcept
×
289
                {
290
                    scheduler_op_state.reset();
×
291
                    hpx::execution::experimental::set_stopped(
×
292
                        HPX_MOVE(receiver));
×
293
                }
×
294

295
                void set_value_scheduler_sender() noexcept
3,391✔
296
                {
297
                    scheduler_op_state.reset();
3,391✔
298
                    hpx::visit(
3,391✔
299
                        scheduler_sender_value_visitor{HPX_MOVE(receiver)},
3,391✔
300
                        HPX_MOVE(ts));
3,391✔
301
                }
3,391✔
302

303
                friend void tag_invoke(start_t, operation_state& os) noexcept
3,393✔
304
                {
305
                    hpx::execution::experimental::start(os.sender_os);
3,393✔
306
                }
3,393✔
307
            };
308

309
            template <typename Receiver>
310
            friend operation_state<Receiver> tag_invoke(
3,383✔
311
                connect_t, schedule_from_sender&& s, Receiver&& receiver)
312
            {
313
                return {HPX_MOVE(s.predecessor_sender), HPX_MOVE(s.scheduler),
6,766✔
314
                    HPX_FORWARD(Receiver, receiver)};
3,383✔
315
            }
316

317
            template <typename Receiver>
318
            friend operation_state<Receiver> tag_invoke(
10✔
319
                connect_t, schedule_from_sender& s, Receiver&& receiver)
320
            {
321
                return {s.predecessor_sender, s.scheduler,
20✔
322
                    HPX_FORWARD(Receiver, receiver)};
10✔
323
            }
324
        };
325
    }    // namespace detail
326

327
    // execution::schedule_from is used to schedule work dependent on the
328
    // completion of a sender onto a scheduler's associated execution context.
329
    //
330
    // [Note: schedule_from is not meant to be used in user code; it is used in
331
    // the implementation of transfer. -end note]
332
    //
333
    // Senders returned from execution::schedule_from shall not propagate the
334
    // sender queries get_completion_scheduler<CPO> to an input sender. They
335
    // will implement get_completion_scheduler<CPO>, where CPO is one of
336
    // set_value_t and set_stopped_t; this query returns a scheduler equivalent
337
    // to the sch argument from those queries. The
338
    // get_completion_scheduler<set_error_t> is not implemented, as the
339
    // scheduler cannot be guaranteed in case an error is thrown while trying to
340
    // schedule work on the given scheduler object.
341
    HPX_HOST_DEVICE_INLINE_CONSTEXPR_VARIABLE struct schedule_from_t final
342
      : hpx::functional::detail::tag_fallback<schedule_from_t>
343
    {
344
    private:
345
        // clang-format off
346
        template <typename Scheduler, typename Sender,
347
            HPX_CONCEPT_REQUIRES_(
348
                is_sender_v<Sender>
349
            )>
350
        // clang-format on
351
        friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
3,396✔
352
            schedule_from_t, Scheduler&& scheduler, Sender&& predecessor_sender)
353
        {
354
            return detail::schedule_from_sender<Sender, Scheduler>{
10,187✔
355
                HPX_FORWARD(Sender, predecessor_sender),
3,396✔
356
                HPX_FORWARD(Scheduler, scheduler)};
3,396✔
357
        }
358
    } schedule_from{};
359
}    // namespace hpx::execution::experimental
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc