• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #863

11 Jan 2023 05:02PM UTC coverage: 86.533% (-0.05%) from 86.582%
#863

push

StellarBot
Merge #6126

6126: Deprecate hpx::parallel::task_block in favor of hpx::experimental::ta… r=hkaiser a=dimitraka



Co-authored-by: kadimitra <kadimitra@ece.auth.gr>

174614 of 201789 relevant lines covered (86.53%)

1954694.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.88
/libs/core/execution/include/hpx/execution/algorithms/schedule_from.hpp
1
//  Copyright (c) 2020 ETH Zurich
2
//  Copyright (c) 2022 Hartmut Kaiser
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
#pragma once
9

10
#include <hpx/config.hpp>
11
#include <hpx/concepts/concepts.hpp>
12
#include <hpx/datastructures/optional.hpp>
13
#include <hpx/datastructures/tuple.hpp>
14
#include <hpx/datastructures/variant.hpp>
15
#include <hpx/execution_base/completion_scheduler.hpp>
16
#include <hpx/execution_base/completion_signatures.hpp>
17
#include <hpx/execution_base/get_env.hpp>
18
#include <hpx/execution_base/receiver.hpp>
19
#include <hpx/execution_base/sender.hpp>
20
#include <hpx/functional/bind_front.hpp>
21
#include <hpx/functional/detail/tag_fallback_invoke.hpp>
22
#include <hpx/functional/invoke_fused.hpp>
23
#include <hpx/type_support/detail/with_result_of.hpp>
24
#include <hpx/type_support/meta.hpp>
25
#include <hpx/type_support/pack.hpp>
26

27
#include <atomic>
28
#include <cstddef>
29
#include <exception>
30
#include <type_traits>
31
#include <utility>
32

33
namespace hpx::execution::experimental {
34

35
    namespace detail {
36

37
        template <typename Sender, typename Scheduler>
38
        struct schedule_from_sender
81,284✔
39
        {
40
            HPX_NO_UNIQUE_ADDRESS std::decay_t<Sender> predecessor_sender;
41
            HPX_NO_UNIQUE_ADDRESS std::decay_t<Scheduler> scheduler;
42

43
            template <typename Env>
44
            struct generate_completion_signatures
45
            {
46
                template <template <typename...> typename Tuple,
47
                    template <typename...> typename Variant>
48
                using value_types =
49
                    value_types_of_t<Sender, Env, Tuple, Variant>;
50

51
                template <template <typename...> typename Variant>
52
                using predecessor_sender_error_types =
53
                    error_types_of_t<Sender, Env, Variant>;
54

55
                using scheduler_sender_type = hpx::util::invoke_result_t<
56
                    hpx::execution::experimental::schedule_t, Scheduler>;
57

58
                template <template <typename...> typename Variant>
59
                using scheduler_sender_error_types =
60
                    error_types_of_t<scheduler_sender_type, Env, Variant>;
61

62
                template <template <typename...> typename Variant>
63
                using error_types = hpx::util::detail::unique_concat_t<
64
                    predecessor_sender_error_types<Variant>,
65
                    scheduler_sender_error_types<Variant>>;
66

67
                static constexpr bool sends_stopped = false;
68
            };
69

70
            template <typename Env>
71
            friend auto tag_invoke(
72
                get_completion_signatures_t, schedule_from_sender const&, Env)
73
                -> generate_completion_signatures<Env>;
74

75
            // clang-format off
76
            template <typename CPO,
77
                HPX_CONCEPT_REQUIRES_(
78
                    meta::value<meta::one_of<
79
                        std::decay_t<CPO>, set_value_t, set_stopped_t>>
80
                )>
81
            // clang-format on
82
            friend constexpr auto tag_invoke(
75✔
83
                hpx::execution::experimental::get_completion_scheduler_t<CPO>,
84
                schedule_from_sender const& sender)
85
            {
86
                return sender.scheduler;
75✔
87
            }
88

89
            // TODO: add forwarding_sender_query
90

91
            template <typename Receiver>
92
            struct operation_state
3,366✔
93
            {
94
                HPX_NO_UNIQUE_ADDRESS std::decay_t<Scheduler> scheduler;
95
                HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver;
96

97
                struct predecessor_sender_receiver;
98
                struct scheduler_sender_receiver;
99

100
                using value_type = hpx::util::detail::prepend_t<
101
                    value_types_of_t<Sender, empty_env, decayed_tuple,
102
                        hpx::variant>,
103
                    hpx::monostate>;
104
                value_type ts;
105

106
                using sender_operation_state_type =
107
                    connect_result_t<Sender, predecessor_sender_receiver>;
108
                sender_operation_state_type sender_os;
109

110
                using scheduler_operation_state_type = connect_result_t<
111
                    hpx::util::invoke_result_t<schedule_t, Scheduler>,
112
                    scheduler_sender_receiver>;
113
                hpx::optional<scheduler_operation_state_type>
114
                    scheduler_op_state;
115

116
                template <typename Sender_, typename Scheduler_,
117
                    typename Receiver_>
118
                operation_state(Sender_&& predecessor_sender,
3,445✔
119
                    Scheduler_&& scheduler, Receiver_&& receiver)
120
                  : scheduler(HPX_FORWARD(Scheduler, scheduler))
3,445✔
121
                  , receiver(HPX_FORWARD(Receiver_, receiver))
3,445✔
122
                  , sender_os(hpx::execution::experimental::connect(
6,890✔
123
                        HPX_FORWARD(Sender_, predecessor_sender),
3,445✔
124
                        predecessor_sender_receiver{*this}))
3,445✔
125
                {
126
                }
3,445✔
127

128
                operation_state(operation_state&&) = delete;
129
                operation_state(operation_state const&) = delete;
130
                operation_state& operator=(operation_state&&) = delete;
131
                operation_state& operator=(operation_state const&) = delete;
132

133
                struct predecessor_sender_receiver
134
                {
135
                    operation_state& op_state;
136

137
                    template <typename Error>
138
                    friend void tag_invoke(set_error_t,
2✔
139
                        predecessor_sender_receiver&& r, Error&& error) noexcept
140
                    {
141
                        r.op_state.set_error_predecessor_sender(
4✔
142
                            HPX_FORWARD(Error, error));
2✔
143
                    }
2✔
144

145
                    friend void tag_invoke(
×
146
                        set_stopped_t, predecessor_sender_receiver&& r) noexcept
147
                    {
148
                        r.op_state.set_stopped_predecessor_sender();
×
149
                    }
×
150

151
                    // This typedef is duplicated from the parent struct. The
152
                    // parent typedef is not instantiated early enough for use
153
                    // here.
154
                    using value_type = hpx::util::detail::prepend_t<
155
                        value_types_of_t<Sender, empty_env, decayed_tuple,
156
                            hpx::variant>,
157
                        hpx::monostate>;
158

159
                    template <typename... Ts>
160
                    friend auto tag_invoke(set_value_t,
3,443✔
161
                        predecessor_sender_receiver&& r, Ts&&... ts) noexcept
162
                        -> decltype(std::declval<value_type>()
163
                                        .template emplace<hpx::tuple<Ts...>>(
164
                                            HPX_FORWARD(Ts, ts)...),
165
                            void())
166
                    {
167
                        r.op_state.set_value_predecessor_sender(
6,863✔
168
                            HPX_FORWARD(Ts, ts)...);
3,420✔
169
                    }
3,443✔
170

171
                    // Pass through the get_env receiver query
172
                    friend auto tag_invoke(
18✔
173
                        get_env_t, predecessor_sender_receiver const& r)
174
                        -> env_of_t<std::decay_t<Receiver>>
175
                    {
176
                        return hpx::execution::experimental::get_env(
18✔
177
                            r.op_state.receiver);
18✔
178
                    }
179
                };
180

181
                template <typename Error>
182
                void set_error_predecessor_sender(Error&& error) noexcept
2✔
183
                {
184
                    hpx::execution::experimental::set_error(
2✔
185
                        HPX_MOVE(receiver), HPX_FORWARD(Error, error));
2✔
186
                }
2✔
187

188
                void set_stopped_predecessor_sender() noexcept
×
189
                {
190
                    hpx::execution::experimental::set_stopped(
×
191
                        HPX_MOVE(receiver));
×
192
                }
×
193

194
                template <typename... Us>
195
                void set_value_predecessor_sender(Us&&... us) noexcept
3,443✔
196
                {
197
                    ts.template emplace<hpx::tuple<Us...>>(
6,863✔
198
                        HPX_FORWARD(Us, us)...);
3,420✔
199
#if defined(HPX_HAVE_CXX17_COPY_ELISION) &&                                    \
200
    defined(HPX_HAVE_CXX17_OPTIONAL_COPY_ELISION)
201
                    // with_result_of is used to emplace the operation
202
                    // state returned from connect without any
203
                    // intermediate copy construction (the operation
204
                    // state is not required to be copyable nor movable).
205
                    scheduler_op_state.emplace(
6,886✔
206
                        hpx::util::detail::with_result_of([&]() {
6,886✔
207
                            return hpx::execution::experimental::connect(
3,443✔
208
                                hpx::execution::experimental::schedule(
3,443✔
209
                                    HPX_MOVE(scheduler)),
3,443✔
210
                                scheduler_sender_receiver{*this});
3,443✔
211
                        }));
212
#else
213
                    // earlier versions of MSVC don't get copy elision quite
214
                    // right, the operation state must be constructed explicitly
215
                    // directly in place
216
                    scheduler_op_state.emplace_f(
217
                        hpx::execution::experimental::connect,
218
                        hpx::execution::experimental::schedule(
219
                            HPX_MOVE(scheduler)),
220
                        scheduler_sender_receiver{*this});
221
#endif
222
                    hpx::execution::experimental::start(
3,443✔
223
                        scheduler_op_state.value());
3,443✔
224
                }
3,443✔
225

226
                struct scheduler_sender_receiver
227
                {
228
                    operation_state& op_state;
229

230
                    template <typename Error>
231
                    friend void tag_invoke(set_error_t,
×
232
                        scheduler_sender_receiver&& r, Error&& error) noexcept
233
                    {
234
                        r.op_state.set_error_scheduler_sender(
×
235
                            HPX_FORWARD(Error, error));
×
236
                    }
×
237

238
                    friend void tag_invoke(
×
239
                        set_stopped_t, scheduler_sender_receiver&& r) noexcept
240
                    {
241
                        r.op_state.set_stopped_scheduler_sender();
×
242
                    }
×
243

244
                    friend void tag_invoke(
3,443✔
245
                        set_value_t, scheduler_sender_receiver&& r) noexcept
246
                    {
247
                        r.op_state.set_value_scheduler_sender();
3,443✔
248
                    }
3,443✔
249

250
                    // Pass through the get_env receiver query
251
                    friend auto tag_invoke(
61✔
252
                        get_env_t, scheduler_sender_receiver const& r)
253
                        -> env_of_t<std::decay_t<Receiver>>
254
                    {
255
                        return get_env(r.op_state.receiver);
61✔
256
                    }
257
                };
258

259
                struct scheduler_sender_value_visitor
21✔
260
                {
261
                    HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver;
262

263
                    [[noreturn]] void operator()(hpx::monostate) const
×
264
                    {
265
                        HPX_UNREACHABLE;
×
266
                    }
×
267

268
                    template <typename Ts,
269
                        typename = std::enable_if_t<
270
                            !std::is_same_v<std::decay_t<Ts>, hpx::monostate>>>
271
                    void operator()(Ts&& ts)
3,443✔
272
                    {
273
                        hpx::invoke_fused(
3,443✔
274
                            hpx::bind_front(
3,443✔
275
                                hpx::execution::experimental::set_value,
276
                                HPX_MOVE(receiver)),
3,443✔
277
                            HPX_FORWARD(Ts, ts));
3,443✔
278
                    }
3,443✔
279
                };
280

281
                template <typename Error>
282
                void set_error_scheduler_sender(Error&& error) noexcept
×
283
                {
284
                    scheduler_op_state.reset();
×
285
                    hpx::execution::experimental::set_error(
×
286
                        HPX_MOVE(receiver), HPX_FORWARD(Error, error));
×
287
                }
×
288

289
                void set_stopped_scheduler_sender() noexcept
×
290
                {
291
                    scheduler_op_state.reset();
×
292
                    hpx::execution::experimental::set_stopped(
×
293
                        HPX_MOVE(receiver));
×
294
                }
×
295

296
                void set_value_scheduler_sender() noexcept
3,443✔
297
                {
298
                    scheduler_op_state.reset();
3,443✔
299
                    hpx::visit(
3,443✔
300
                        scheduler_sender_value_visitor{HPX_MOVE(receiver)},
3,443✔
301
                        HPX_MOVE(ts));
3,443✔
302
                }
3,443✔
303

304
                friend void tag_invoke(start_t, operation_state& os) noexcept
3,445✔
305
                {
306
                    hpx::execution::experimental::start(os.sender_os);
3,445✔
307
                }
3,445✔
308
            };
309

310
            template <typename Receiver>
311
            friend operation_state<Receiver> tag_invoke(
3,425✔
312
                connect_t, schedule_from_sender&& s, Receiver&& receiver)
313
            {
314
                return {HPX_MOVE(s.predecessor_sender), HPX_MOVE(s.scheduler),
6,850✔
315
                    HPX_FORWARD(Receiver, receiver)};
3,425✔
316
            }
317

318
            template <typename Receiver>
319
            friend operation_state<Receiver> tag_invoke(
20✔
320
                connect_t, schedule_from_sender& s, Receiver&& receiver)
321
            {
322
                return {s.predecessor_sender, s.scheduler,
40✔
323
                    HPX_FORWARD(Receiver, receiver)};
20✔
324
            }
325
        };
326
    }    // namespace detail
327

328
    // execution::schedule_from is used to schedule work dependent on the
329
    // completion of a sender onto a scheduler's associated execution context.
330
    //
331
    // [Note: schedule_from is not meant to be used in user code; it is used in
332
    // the implementation of transfer. -end note]
333
    //
334
    // Senders returned from execution::schedule_from shall not propagate the
335
    // sender queries get_completion_scheduler<CPO> to an input sender. They
336
    // will implement get_completion_scheduler<CPO>, where CPO is one of
337
    // set_value_t and set_stopped_t; this query returns a scheduler equivalent
338
    // to the sch argument from those queries. The
339
    // get_completion_scheduler<set_error_t> is not implemented, as the
340
    // scheduler cannot be guaranteed in case an error is thrown while trying to
341
    // schedule work on the given scheduler object.
342
    HPX_HOST_DEVICE_INLINE_CONSTEXPR_VARIABLE struct schedule_from_t final
343
      : hpx::functional::detail::tag_fallback<schedule_from_t>
344
    {
345
    private:
346
        // clang-format off
347
        template <typename Scheduler, typename Sender,
348
            HPX_CONCEPT_REQUIRES_(
349
                is_sender_v<Sender>
350
            )>
351
        // clang-format on
352
        friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
3,451✔
353
            schedule_from_t, Scheduler&& scheduler, Sender&& predecessor_sender)
354
        {
355
            return detail::schedule_from_sender<Sender, Scheduler>{
10,352✔
356
                HPX_FORWARD(Sender, predecessor_sender),
3,451✔
357
                HPX_FORWARD(Scheduler, scheduler)};
3,451✔
358
        }
359
    } schedule_from{};
360
}    // namespace hpx::execution::experimental
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc