• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #852

17 Dec 2022 04:43PM UTC coverage: 85.912% (-0.7%) from 86.568%
#852

push

StellarBot
Merge #6106

6106: Modernizing modules of levels 0 to 5 r=hkaiser a=hkaiser

- flyby: HPX_FORWARD/HPX_MOVE now expand to std::forward and std::move if those are implemented as builtin functions

working towards #5497

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

87 of 87 new or added lines in 24 files covered. (100.0%)

173152 of 201546 relevant lines covered (85.91%)

1910264.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.25
/libs/core/execution/include/hpx/execution/algorithms/bulk.hpp
1
//  Copyright (c) 2020 ETH Zurich
2
//  Copyright (c) 2021-2022 Hartmut Kaiser
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
#pragma once
9

10
#include <hpx/config.hpp>
11
#include <hpx/concepts/concepts.hpp>
12
#include <hpx/datastructures/tuple.hpp>
13
#include <hpx/datastructures/variant.hpp>
14
#include <hpx/errors/try_catch_exception_ptr.hpp>
15
#include <hpx/execution/algorithms/detail/partial_algorithm.hpp>
16
#include <hpx/execution/algorithms/then.hpp>
17
#include <hpx/execution_base/completion_scheduler.hpp>
18
#include <hpx/execution_base/completion_signatures.hpp>
19
#include <hpx/execution_base/receiver.hpp>
20
#include <hpx/execution_base/sender.hpp>
21
#include <hpx/functional/detail/tag_priority_invoke.hpp>
22
#include <hpx/functional/invoke_result.hpp>
23
#include <hpx/iterator_support/counting_shape.hpp>
24
#include <hpx/type_support/pack.hpp>
25

26
#include <exception>
27
#include <iterator>
28
#include <type_traits>
29
#include <utility>
30

31
namespace hpx::execution::experimental {
32

33
    ///////////////////////////////////////////////////////////////////////////
34
    namespace detail {
35
        template <typename Sender, typename Shape, typename F>
36
        struct bulk_sender
4✔
37
        {
38
            HPX_NO_UNIQUE_ADDRESS std::decay_t<Sender> sender;
39
            HPX_NO_UNIQUE_ADDRESS std::decay_t<Shape> shape;
40
            HPX_NO_UNIQUE_ADDRESS std::decay_t<F> f;
41

42
            template <typename Env>
43
            struct generate_completion_signatures
44
            {
45
                template <template <typename...> typename Tuple,
46
                    template <typename...> typename Variant>
47
                using value_types =
48
                    value_types_of_t<Sender, Env, Tuple, Variant>;
49

50
                template <template <typename...> typename Variant>
51
                using error_types = hpx::util::detail::unique_concat_t<
52
                    error_types_of_t<Sender, Env, Variant>,
53
                    Variant<std::exception_ptr>>;
54

55
                static constexpr bool sends_stopped = false;
56
            };
57

58
            template <typename Env>
59
            friend auto tag_invoke(
60
                get_completion_signatures_t, bulk_sender const&, Env) noexcept
61
                -> generate_completion_signatures<Env>;
62

63
            // clang-format off
64
            template <typename CPO,
65
                HPX_CONCEPT_REQUIRES_(
66
                    hpx::execution::experimental::detail::is_receiver_cpo_v<CPO> &&
67
                    hpx::execution::experimental::detail::has_completion_scheduler_v<
68
                        CPO, std::decay_t<Sender>>
69
                )>
70
            // clang-format on
71
            friend constexpr auto tag_invoke(
×
72
                hpx::execution::experimental::get_completion_scheduler_t<CPO>
73
                    tag,
74
                bulk_sender const& sender)
75
            {
76
                return tag(sender.sender);
×
77
            }
78

79
            template <typename Receiver>
80
            struct bulk_receiver
×
81
            {
82
                HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver;
83
                HPX_NO_UNIQUE_ADDRESS std::decay_t<Shape> shape;
84
                HPX_NO_UNIQUE_ADDRESS std::decay_t<F> f;
85

86
                template <typename Receiver_, typename Shape_, typename F_>
87
                bulk_receiver(Receiver_&& receiver, Shape_&& shape, F_&& f)
34✔
88
                  : receiver(HPX_FORWARD(Receiver_, receiver))
34✔
89
                  , shape(HPX_FORWARD(Shape_, shape))
34✔
90
                  , f(HPX_FORWARD(F_, f))
34✔
91
                {
92
                }
34✔
93

94
                template <typename Error>
95
                friend void tag_invoke(
2✔
96
                    set_error_t, bulk_receiver&& r, Error&& error) noexcept
97
                {
98
                    hpx::execution::experimental::set_error(
2✔
99
                        HPX_MOVE(r.receiver), HPX_FORWARD(Error, error));
2✔
100
                }
2✔
101

102
                friend void tag_invoke(
×
103
                    set_stopped_t, bulk_receiver&& r) noexcept
104
                {
105
                    hpx::execution::experimental::set_stopped(
×
106
                        HPX_MOVE(r.receiver));
×
107
                }
×
108

109
                template <typename... Ts>
110
                void set_value(Ts&&... ts)
32✔
111
                {
112
                    hpx::detail::try_catch_exception_ptr(
32✔
113
                        [&]() {
64✔
114
                            for (auto const& s : shape)
1,485✔
115
                            {
116
                                HPX_INVOKE(f, s, ts...);
1,453✔
117
                            }
118
                            hpx::execution::experimental::set_value(
29✔
119
                                HPX_MOVE(receiver), HPX_FORWARD(Ts, ts)...);
29✔
120
                        },
29✔
121
                        [&](std::exception_ptr ep) {
35✔
122
                            hpx::execution::experimental::set_error(
3✔
123
                                HPX_MOVE(receiver), HPX_MOVE(ep));
3✔
124
                        });
3✔
125
                }
32✔
126

127
                template <typename... Ts>
128
                friend auto tag_invoke(
32✔
129
                    set_value_t, bulk_receiver&& r, Ts&&... ts) noexcept
130
                    -> decltype(hpx::execution::experimental::set_value(
131
                                    std::declval<std::decay_t<Receiver>&&>(),
132
                                    HPX_FORWARD(Ts, ts)...),
133
                        void())
134
                {
135
                    // set_value is in a member function only because of a
136
                    // compiler bug in GCC 7. When the body of set_value is
137
                    // inlined here compilation fails with an internal compiler
138
                    // error.
139
                    r.set_value(HPX_FORWARD(Ts, ts)...);
32✔
140
                }
32✔
141
            };
142

143
            template <typename Receiver>
144
            friend auto tag_invoke(
34✔
145
                connect_t, bulk_sender&& s, Receiver&& receiver)
146
            {
147
                return hpx::execution::experimental::connect(HPX_MOVE(s.sender),
68✔
148
                    bulk_receiver<Receiver>(HPX_FORWARD(Receiver, receiver),
68✔
149
                        HPX_MOVE(s.shape), HPX_MOVE(s.f)));
34✔
150
            }
151

152
            template <typename Receiver>
153
            friend auto tag_invoke(
154
                connect_t, bulk_sender& s, Receiver&& receiver)
155
            {
156
                return hpx::execution::experimental::connect(s.sender,
157
                    bulk_receiver<Receiver>(
158
                        HPX_FORWARD(Receiver, receiver), s.shape, s.f));
159
            }
160
        };
161
    }    // namespace detail
162

163
    ///////////////////////////////////////////////////////////////////////////
164
    //
165
    // execution::bulk is used to run a task repeatedly for every index in an
166
    // index space.
167
    //
168
    // Returns a sender describing the task of invoking the provided function
169
    // with every index in the provided shape along with the values sent by the
170
    // input sender. The returned sender completes once all invocations have
171
    // completed, or an error has occurred. If it completes by sending values,
172
    // they are equivalent to those sent by the input sender.
173
    //
174
    // No instance of function will begin executing until the returned sender is
175
    // started. Each invocation of function runs in an execution agent whose
176
    // forward progress guarantees are determined by the scheduler on which they
177
    // are run. All agents created by a single use of bulk execute with the same
178
    // guarantee. This allows, for instance, a scheduler to execute all
179
    // invocations of the function in parallel.
180
    //
181
    // The bulk operation is intended to be used at the point where the number
182
    // of agents to be created is known and provided to bulk via its shape
183
    // parameter. For some parallel computations, the number of agents to be
184
    // created may be a function of the input data or dynamic conditions of the
185
    // execution environment. In such cases, bulk can be combined with
186
    // additional operations such as let_value to deliver dynamic shape
187
    // information to the bulk operation.
188
    //
189
    inline constexpr struct bulk_t final
190
      : hpx::functional::detail::tag_priority<bulk_t>
191
    {
192
    private:
193
        // clang-format off
194
        template <typename Sender, typename Shape, typename F,
195
            HPX_CONCEPT_REQUIRES_(
196
                is_sender_v<Sender> &&
197
                experimental::detail::is_completion_scheduler_tag_invocable_v<
198
                    hpx::execution::experimental::set_value_t, Sender,
199
                    bulk_t, Shape, F
200
                >
201
            )>
202
        // clang-format on
203
        friend constexpr HPX_FORCEINLINE auto tag_override_invoke(
305✔
204
            bulk_t, Sender&& sender, Shape const& shape, F&& f)
205
        {
206
            auto scheduler =
207
                hpx::execution::experimental::get_completion_scheduler<
305✔
208
                    hpx::execution::experimental::set_value_t>(sender);
305✔
209

210
            return hpx::functional::tag_invoke(bulk_t{}, HPX_MOVE(scheduler),
610✔
211
                HPX_FORWARD(Sender, sender), shape, HPX_FORWARD(F, f));
305✔
212
        }
213

214
        // clang-format off
215
        template <typename Sender, typename Shape, typename F,
216
            HPX_CONCEPT_REQUIRES_(
217
                is_sender_v<Sender> &&
218
                std::is_integral_v<Shape>
219
            )>
220
        // clang-format on
221
        friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
20✔
222
            bulk_t, Sender&& sender, Shape const& shape, F&& f)
223
        {
224
            return detail::bulk_sender<Sender, hpx::util::counting_shape<Shape>,
20✔
225
                F>{HPX_FORWARD(Sender, sender),
33✔
226
                hpx::util::counting_shape(shape), HPX_FORWARD(F, f)};
33✔
227
        }
228

229
        // clang-format off
230
        template <typename Sender, typename Shape, typename F,
231
            HPX_CONCEPT_REQUIRES_(
232
                is_sender_v<Sender> &&
233
                !std::is_integral_v<std::decay_t<Shape>>
234
            )>
235
        // clang-format on
236
        friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
×
237
            bulk_t, Sender&& sender, Shape&& shape, F&& f)
238
        {
239
            return detail::bulk_sender<Sender, Shape, F>{
×
240
                HPX_FORWARD(Sender, sender), HPX_FORWARD(Shape, shape),
×
241
                HPX_FORWARD(F, f)};
×
242
        }
243

244
        template <typename Shape, typename F>
245
        friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
17✔
246
            bulk_t, Shape&& shape, F&& f)
247
        {
248
            return detail::partial_algorithm<bulk_t, Shape, F>{
17✔
249
                HPX_FORWARD(Shape, shape), HPX_FORWARD(F, f)};
17✔
250
        }
251
    } bulk{};
252
}    // namespace hpx::execution::experimental
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc