• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #863

11 Jan 2023 05:02PM UTC coverage: 86.533% (-0.05%) from 86.582%
#863

push

StellarBot
Merge #6126

6126: Deprecate hpx::parallel::task_block in favor of hpx::experimental::ta… r=hkaiser a=dimitraka



Co-authored-by: kadimitra <kadimitra@ece.auth.gr>

174614 of 201789 relevant lines covered (86.53%)

1954694.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.93
/libs/core/execution/include/hpx/execution/algorithms/let_error.hpp
1
//  Copyright (c) 2021 ETH Zurich
2
//  Copyright (c) 2022 Hartmut Kaiser
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
#pragma once
9

10
#include <hpx/config.hpp>
11
#include <hpx/assert.hpp>
12
#include <hpx/concepts/concepts.hpp>
13
#include <hpx/datastructures/variant.hpp>
14
#include <hpx/errors/try_catch_exception_ptr.hpp>
15
#include <hpx/execution/algorithms/detail/inject_scheduler.hpp>
16
#include <hpx/execution/algorithms/detail/partial_algorithm.hpp>
17
#include <hpx/execution/algorithms/run_loop.hpp>
18
#include <hpx/execution_base/completion_scheduler.hpp>
19
#include <hpx/execution_base/completion_signatures.hpp>
20
#include <hpx/execution_base/get_env.hpp>
21
#include <hpx/execution_base/receiver.hpp>
22
#include <hpx/execution_base/sender.hpp>
23
#include <hpx/functional/detail/tag_priority_invoke.hpp>
24
#include <hpx/functional/invoke_fused.hpp>
25
#include <hpx/functional/invoke_result.hpp>
26
#include <hpx/type_support/detail/with_result_of.hpp>
27
#include <hpx/type_support/meta.hpp>
28
#include <hpx/type_support/pack.hpp>
29

30
#include <exception>
31
#include <type_traits>
32
#include <utility>
33

34
namespace hpx::execution::experimental {
35

36
    namespace detail {
37

38
        template <typename PredecessorSender, typename F,
39
            typename Scheduler = no_scheduler>
40
        struct let_error_sender
41
        {
42
            using predecessor_sender_t = std::decay_t<PredecessorSender>;
43

44
            HPX_NO_UNIQUE_ADDRESS predecessor_sender_t predecessor_sender;
45
            HPX_NO_UNIQUE_ADDRESS std::decay_t<F> f;
46
            HPX_NO_UNIQUE_ADDRESS std::decay_t<Scheduler> scheduler;
47

48
            template <typename Env = empty_env>
49
            struct generate_completion_signatures
50
            {
51
                template <typename Error,
52
                    typename Enable =
53
                        typename std::enable_if_t<hpx::is_invocable_v<F,
54
                            std::add_lvalue_reference_t<std::decay_t<Error>>>>>
55
                struct successor_sender_types_helper
56
                {
57
                    using type = hpx::util::invoke_result_t<F,
58
                        std::add_lvalue_reference_t<std::decay_t<Error>>>;
59

60
                    static_assert(is_sender_v<type>,
61
                        "let_error expects the invocable sender factory to "
62
                        "return a sender");
63
                };
64

65
                template <typename Error>
66
                using successor_sender_types =
67
                    meta::type<successor_sender_types_helper<Error>>;
68

69
                // Type of the potential values returned from the predecessor
70
                // sender
71
                template <template <typename...> class Tuple = meta::pack,
72
                    template <typename...> class Variant = meta::pack>
73
                using predecessor_value_types =
74
                    value_types_of_t<predecessor_sender_t, Env, Tuple, Variant>;
75

76
                // Type of the potential errors returned from the predecessor
77
                // sender
78
                template <template <typename...> class Variant = meta::pack>
79
                using predecessor_error_types =
80
                    error_types_of_t<predecessor_sender_t, Env, Variant>;
81

82
                static_assert(
83
                    !std::is_same_v<predecessor_error_types<>, meta::pack<>>,
84
                    "let_error used with a predecessor that has an empty "
85
                    "error_types. Is let_error misplaced?");
86

87
                // Type of the potential senders returned from the sender
88
                // factory F
89

90
                // clang-format off
91
                template <template <typename...> typename Variant = meta::pack>
92
                using sender_types =
93
                    meta::apply<
94
                        meta::transform<
95
                            meta::func1<successor_sender_types>,
96
                            meta::unique<meta::func<Variant>>>,
97
                        predecessor_error_types<>>;
98

99
                template <template <typename...> typename Tuple,
100
                    template <typename...> typename Variant>
101
                using value_types =
102
                    meta::invoke<
103
                        meta::push_back<meta::uncurry<
104
                            meta::unique<meta::func<Variant>>>>,
105
                        meta::apply<
106
                            meta::transform<
107
                                meta::bind_back<
108
                                    meta::defer<detail::value_types_of>, Env,
109
                                    meta::func<Tuple>>>,
110
                            sender_types<>>,
111
                        predecessor_value_types<Tuple>>;
112

113
                template <template <typename...> typename Variant>
114
                using error_types =
115
                    meta::invoke<
116
                        meta::push_back<meta::uncurry<
117
                            meta::unique<meta::func<Variant>>>>,
118
                        meta::apply<
119
                            meta::transform<meta::bind_back<
120
                                meta::defer<detail::error_types_of>, Env>>,
121
                            sender_types<>>,
122
                        predecessor_error_types<>,
123
                        meta::pack<std::exception_ptr>>;
124

125
                static constexpr bool sends_stopped = meta::value<
126
                    meta::apply<
127
                        meta::transform<
128
                            meta::bind_back1_func<
129
                                detail::sends_stopped_of, Env>,
130
                            meta::right_fold<
131
                                detail::sends_stopped_of<
132
                                    predecessor_sender_t, Env>,
133
                                meta::func2<meta::or_>>>,
134
                        sender_types<>>>;
135
                // clang-format on
136
            };
137

138
            template <typename Env>
139
            friend auto tag_invoke(get_completion_signatures_t,
140
                let_error_sender const&, Env) noexcept
141
                -> generate_completion_signatures<Env>;
142

143
            // clang-format off
144
            template <typename CPO, typename Scheduler_ = Scheduler,
145
                HPX_CONCEPT_REQUIRES_(
146
                   !hpx::execution::experimental::is_scheduler_v<Scheduler_> &&
147
                    hpx::execution::experimental::detail::is_receiver_cpo_v<CPO> &&
148
                    hpx::execution::experimental::detail::has_completion_scheduler_v<
149
                        CPO, predecessor_sender_t>
150
                )>
151
            // clang-format on
152
            friend constexpr auto tag_invoke(
153
                hpx::execution::experimental::get_completion_scheduler_t<CPO>
154
                    tag,
155
                let_error_sender const& sender)
156
            {
157
                return tag(sender.predecessor_sender);
158
            }
159

160
            // clang-format off
161
            template <typename CPO, typename Scheduler_ = Scheduler,
162
                HPX_CONCEPT_REQUIRES_(
163
                    hpx::execution::experimental::is_scheduler_v<Scheduler_> &&
164
                    hpx::execution::experimental::detail::is_receiver_cpo_v<CPO>
165
                )>
166
            // clang-format on
167
            friend constexpr auto tag_invoke(
6✔
168
                hpx::execution::experimental::get_completion_scheduler_t<CPO>,
169
                let_error_sender const& sender)
170
            {
171
                return sender.scheduler;
6✔
172
            }
173

174
            // TODO: add forwarding_sender_query
175

176
            template <typename Receiver>
177
            struct operation_state
28✔
178
            {
179
                struct let_error_predecessor_receiver
180
                {
181
                    HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver;
182
                    HPX_NO_UNIQUE_ADDRESS std::decay_t<F> f;
183
                    operation_state& op_state;
184

185
                    template <typename Receiver_, typename F_>
186
                    let_error_predecessor_receiver(
28✔
187
                        Receiver_&& receiver, F_&& f, operation_state& op_state)
188
                      : receiver(HPX_FORWARD(Receiver_, receiver))
28✔
189
                      , f(HPX_FORWARD(F_, f))
28✔
190
                      , op_state(op_state)
28✔
191
                    {
192
                    }
28✔
193

194
                    struct start_visitor
195
                    {
196
                        [[noreturn]] void operator()(hpx::monostate) const
×
197
                        {
198
                            HPX_UNREACHABLE;
×
199
                        }
×
200

201
                        template <typename OperationState_,
202
                            typename = std::enable_if_t<!std::is_same_v<
203
                                std::decay_t<OperationState_>, hpx::monostate>>>
204
                        void operator()(OperationState_& op_state) const
18✔
205
                        {
206
                            hpx::execution::experimental::start(op_state);
18✔
207
                        }
18✔
208
                    };
209

210
                    struct set_error_visitor
211
                    {
212
                        HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver;
213
                        HPX_NO_UNIQUE_ADDRESS std::decay_t<F> f;
214
                        operation_state& op_state;
215

216
                        [[noreturn]] void operator()(hpx::monostate) const
×
217
                        {
218
                            HPX_UNREACHABLE;
×
219
                        }
×
220

221
                        template <typename Error,
222
                            typename = std::enable_if_t<!std::is_same_v<
223
                                std::decay_t<Error>, hpx::monostate>>>
224
                        void operator()(Error& error)
18✔
225
                        {
226
                            using operation_state_type =
227
                                decltype(hpx::execution::experimental::connect(
228
                                    HPX_INVOKE(HPX_MOVE(f), error),
229
                                    std::declval<Receiver>()));
230

231
#if defined(HPX_HAVE_CXX17_COPY_ELISION)
232
                            // with_result_of is used to emplace the operation
233
                            // state returned from connect without any
234
                            // intermediate copy construction (the operation
235
                            // state is not required to be copyable nor movable).
236
                            op_state.successor_op_state
18✔
237
                                .template emplace<operation_state_type>(
18✔
238
                                    hpx::util::detail::with_result_of([&]() {
36✔
239
                                        return hpx::execution::experimental::
18✔
240
                                            connect(
241
                                                HPX_INVOKE(HPX_MOVE(f), error),
18✔
242
                                                HPX_MOVE(receiver));
18✔
243
                                    }));
×
244
#else
245
                            // earlier versions of MSVC don't get copy elision
246
                            // quite right, the operation state must be
247
                            // constructed explicitly directly in place
248
                            op_state.successor_op_state
249
                                .template emplace_f<operation_state_type>(
250
                                    hpx::execution::experimental::connect,
251
                                    HPX_INVOKE(HPX_MOVE(f), error),
252
                                    HPX_MOVE(receiver));
253
#endif
254
                            hpx::visit(
18✔
255
                                start_visitor{}, op_state.successor_op_state);
18✔
256
                        }
18✔
257
                    };
258

259
                    template <typename Error>
260
                    friend void tag_invoke(set_error_t,
18✔
261
                        let_error_predecessor_receiver&& r,
262
                        Error&& error) noexcept
263
                    {
264
                        hpx::detail::try_catch_exception_ptr(
18✔
265
                            [&]() {
36✔
266
                                // TODO: receiver is moved before the visit, but
267
                                // the invoke inside the visit may throw.
268
                                r.op_state.predecessor_error
18✔
269
                                    .template emplace<Error>(
18✔
270
                                        HPX_FORWARD(Error, error));
18✔
271

272
                                hpx::visit(
18✔
273
                                    set_error_visitor{HPX_MOVE(r.receiver),
54✔
274
                                        HPX_MOVE(r.f), r.op_state},
36✔
275
                                    r.op_state.predecessor_error);
18✔
276
                            },
18✔
277
                            [&](std::exception_ptr ep) {
18✔
278
                                hpx::execution::experimental::set_error(
×
279
                                    HPX_MOVE(r.receiver), HPX_MOVE(ep));
×
280
                            });
×
281
                    }
18✔
282

283
                    friend void tag_invoke(set_stopped_t,
×
284
                        let_error_predecessor_receiver&& r) noexcept
285
                    {
286
                        hpx::execution::experimental::set_stopped(
×
287
                            HPX_MOVE(r.receiver));
×
288
                    };
×
289

290
                    template <typename... Ts,
291
                        typename = std::enable_if_t<hpx::is_invocable_v<
292
                            hpx::execution::experimental::set_value_t,
293
                            Receiver&&, Ts...>>>
294
                    friend void tag_invoke(set_value_t,
10✔
295
                        let_error_predecessor_receiver&& r, Ts&&... ts) noexcept
296
                    {
297
                        hpx::execution::experimental::set_value(
10✔
298
                            HPX_MOVE(r.receiver), HPX_FORWARD(Ts, ts)...);
10✔
299
                    }
10✔
300

301
                    // Pass through the get_env receiver query
302
                    friend auto tag_invoke(
2✔
303
                        get_env_t, let_error_predecessor_receiver const& r)
304
                        -> env_of_t<std::decay_t<Receiver>>
305
                    {
306
                        return hpx::execution::experimental::get_env(
2✔
307
                            r.receiver);
2✔
308
                    }
309
                };
310

311
                // Type of the operation state returned when connecting the
312
                // predecessor sender to the let_error_predecessor_receiver
313
                using predecessor_operation_state_type =
314
                    std::decay_t<connect_result_t<PredecessorSender&&,
315
                        let_error_predecessor_receiver>>;
316

317
                // Type of the potential operation states returned when
318
                // connecting a sender in successor_sender_types to the receiver
319
                // connected to the let_error_sender
320
                template <typename Sender>
321
                struct successor_operation_state_types_helper
322
                {
323
                    using type = connect_result_t<Sender, Receiver>;
324
                };
325

326
                template <template <typename...> class Variant>
327
                using successor_operation_state_types =
328
                    hpx::util::detail::transform_t<
329
                        typename generate_completion_signatures<>::
330
                            template sender_types<Variant>,
331
                        successor_operation_state_types_helper>;
332

333
                // Operation state from connecting predecessor sender to
334
                // let_error_predecessor_receiver
335
                predecessor_operation_state_type predecessor_operation_state;
336

337
                // Potential errors returned from the predecessor sender
338
                hpx::util::detail::prepend_t<
339
                    typename generate_completion_signatures<>::
340
                        template predecessor_error_types<hpx::variant>,
341
                    hpx::monostate>
342
                    predecessor_error;
343

344
                // Potential operation states returned when connecting a sender
345
                // in successor_sender_types to the receiver connected to the
346
                // let_error_sender
347
                hpx::util::detail::prepend_t<
348
                    successor_operation_state_types<hpx::variant>,
349
                    hpx::monostate>
350
                    successor_op_state;
351

352
                template <typename PredecessorSender_, typename Receiver_,
353
                    typename F_>
354
                operation_state(PredecessorSender_&& predecessor_sender,
28✔
355
                    Receiver_&& receiver, F_&& f)
356
                  : predecessor_operation_state{
28✔
357
                        hpx::execution::experimental::connect(
28✔
358
                            std::forward<PredecessorSender_>(
28✔
359
                                predecessor_sender),
28✔
360
                            let_error_predecessor_receiver(
28✔
361
                                HPX_FORWARD(Receiver_, receiver),
28✔
362
                                HPX_FORWARD(F_, f), *this))}
28✔
363
                {
364
                }
28✔
365

366
                operation_state(operation_state&&) = delete;
367
                operation_state& operator=(operation_state&&) = delete;
368
                operation_state(operation_state const&) = delete;
369
                operation_state& operator=(operation_state const&) = delete;
370

371
                friend void tag_invoke(start_t, operation_state& os) noexcept
28✔
372
                {
373
                    hpx::execution::experimental::start(
28✔
374
                        os.predecessor_operation_state);
28✔
375
                }
28✔
376
            };
377

378
            template <typename Receiver>
379
            friend auto tag_invoke(
28✔
380
                connect_t, let_error_sender&& s, Receiver&& receiver)
381
            {
382
                return operation_state<Receiver>(HPX_MOVE(s.predecessor_sender),
56✔
383
                    HPX_FORWARD(Receiver, receiver), HPX_MOVE(s.f));
28✔
384
            }
385
        };
386
    }    // namespace detail
387

388
    // let_error and let_stopped are similar to let_value, but where let_value
389
    // works with values sent by the input sender, let_error works with errors,
390
    // and let_stopped is invoked when the "stopped" signal is sent.
391
    inline constexpr struct let_error_t final
392
      : hpx::functional::detail::tag_priority<let_error_t>
393
    {
394
    private:
395
        // clang-format off
396
        template <typename PredecessorSender, typename F,
397
            HPX_CONCEPT_REQUIRES_(
398
                is_sender_v<PredecessorSender> &&
399
                experimental::detail::is_completion_scheduler_tag_invocable_v<
400
                    hpx::execution::experimental::set_value_t,
401
                    PredecessorSender, let_error_t, F
402
                >
403
            )>
404
        // clang-format on
405
        friend constexpr HPX_FORCEINLINE auto tag_override_invoke(
406
            let_error_t, PredecessorSender&& predecessor_sender, F&& f)
407
        {
408
            auto scheduler =
409
                hpx::execution::experimental::get_completion_scheduler<
410
                    hpx::execution::experimental::set_value_t>(
411
                    predecessor_sender);
412

413
            return hpx::functional::tag_invoke(let_error_t{},
414
                HPX_MOVE(scheduler),
415
                HPX_FORWARD(PredecessorSender, predecessor_sender),
416
                HPX_FORWARD(F, f));
417
        }
418

419
        // clang-format off
420
        template <typename PredecessorSender, typename F,
421
            HPX_CONCEPT_REQUIRES_(
422
                hpx::execution::experimental::is_sender_v<PredecessorSender>
423
            )>
424
        // clang-format on
425
        friend constexpr HPX_FORCEINLINE auto tag_invoke(let_error_t,
6✔
426
            hpx::execution::experimental::run_loop_scheduler const& sched,
427
            PredecessorSender&& predecessor_sender, F&& f)
428
        {
429
            return detail::let_error_sender<PredecessorSender, F,
6✔
430
                hpx::execution::experimental::run_loop_scheduler>{
16✔
431
                HPX_FORWARD(PredecessorSender, predecessor_sender),
6✔
432
                HPX_FORWARD(F, f), sched};
12✔
433
        }
434

435
        // clang-format off
436
        template <typename PredecessorSender, typename F,
437
            HPX_CONCEPT_REQUIRES_(
438
                is_sender_v<PredecessorSender>
439
            )>
440
        // clang-format on
441
        friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
22✔
442
            let_error_t, PredecessorSender&& predecessor_sender, F&& f)
443
        {
444
            return detail::let_error_sender<PredecessorSender, F>{
54✔
445
                HPX_FORWARD(PredecessorSender, predecessor_sender),
22✔
446
                HPX_FORWARD(F, f), detail::no_scheduler{}};
22✔
447
        }
448

449
        // clang-format off
450
        template <typename F, typename Scheduler,
451
            HPX_CONCEPT_REQUIRES_(
452
                hpx::execution::experimental::is_scheduler_v<Scheduler>
453
            )>
454
        // clang-format on
455
        friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
456
            let_error_t, Scheduler&& scheduler, F&& f)
457
        {
458
            return hpx::execution::experimental::detail::inject_scheduler<
459
                let_error_t, Scheduler, F>{
460
                HPX_FORWARD(Scheduler, scheduler), HPX_FORWARD(F, f)};
461
        }
462

463
        template <typename F>
464
        friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
22✔
465
            let_error_t, F&& f)
466
        {
467
            return detail::partial_algorithm<let_error_t, F>{HPX_FORWARD(F, f)};
22✔
468
        }
469
    } let_error{};
470
}    // namespace hpx::execution::experimental
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc