• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #870

19 Jan 2023 10:31PM UTC coverage: 85.97% (-0.4%) from 86.397%
#870

push

hkaiser
Lessen restrictions on used CUDA version

173618 of 201952 relevant lines covered (85.97%)

1977502.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.25
/libs/core/execution/include/hpx/execution/algorithms/bulk.hpp
1
//  Copyright (c) 2020 ETH Zurich
2
//  Copyright (c) 2021-2022 Hartmut Kaiser
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
#pragma once
9

10
#include <hpx/config.hpp>
11
#include <hpx/concepts/concepts.hpp>
12
#include <hpx/datastructures/tuple.hpp>
13
#include <hpx/datastructures/variant.hpp>
14
#include <hpx/errors/try_catch_exception_ptr.hpp>
15
#include <hpx/execution/algorithms/detail/partial_algorithm.hpp>
16
#include <hpx/execution/algorithms/then.hpp>
17
#include <hpx/execution_base/completion_scheduler.hpp>
18
#include <hpx/execution_base/completion_signatures.hpp>
19
#include <hpx/execution_base/receiver.hpp>
20
#include <hpx/execution_base/sender.hpp>
21
#include <hpx/functional/detail/tag_priority_invoke.hpp>
22
#include <hpx/functional/invoke_result.hpp>
23
#include <hpx/iterator_support/counting_shape.hpp>
24
#include <hpx/type_support/pack.hpp>
25

26
#include <exception>
27
#include <iterator>
28
#include <type_traits>
29
#include <utility>
30

31
namespace hpx::execution::experimental {
32

33
    ///////////////////////////////////////////////////////////////////////////
34
    namespace detail {
35

36
        template <typename Sender, typename Shape, typename F>
37
        struct bulk_sender
4✔
38
        {
39
            HPX_NO_UNIQUE_ADDRESS std::decay_t<Sender> sender;
40
            HPX_NO_UNIQUE_ADDRESS std::decay_t<Shape> shape;
41
            HPX_NO_UNIQUE_ADDRESS std::decay_t<F> f;
42

43
            template <typename Env>
44
            struct generate_completion_signatures
45
            {
46
                template <template <typename...> typename Tuple,
47
                    template <typename...> typename Variant>
48
                using value_types =
49
                    value_types_of_t<Sender, Env, Tuple, Variant>;
50

51
                template <template <typename...> typename Variant>
52
                using error_types = hpx::util::detail::unique_concat_t<
53
                    error_types_of_t<Sender, Env, Variant>,
54
                    Variant<std::exception_ptr>>;
55

56
                static constexpr bool sends_stopped = false;
57
            };
58

59
            template <typename Env>
60
            friend auto tag_invoke(
61
                get_completion_signatures_t, bulk_sender const&, Env) noexcept
62
                -> generate_completion_signatures<Env>;
63

64
            // clang-format off
65
            template <typename CPO,
66
                HPX_CONCEPT_REQUIRES_(
67
                    hpx::execution::experimental::detail::is_receiver_cpo_v<CPO> &&
68
                    hpx::execution::experimental::detail::has_completion_scheduler_v<
69
                        CPO, std::decay_t<Sender>>
70
                )>
71
            // clang-format on
72
            friend constexpr auto tag_invoke(
×
73
                hpx::execution::experimental::get_completion_scheduler_t<CPO>
74
                    tag,
75
                bulk_sender const& sender)
76
            {
77
                return tag(sender.sender);
×
78
            }
79

80
            template <typename Receiver>
81
            struct bulk_receiver
×
82
            {
83
                HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver;
84
                HPX_NO_UNIQUE_ADDRESS std::decay_t<Shape> shape;
85
                HPX_NO_UNIQUE_ADDRESS std::decay_t<F> f;
86

87
                template <typename Receiver_, typename Shape_, typename F_>
88
                bulk_receiver(Receiver_&& receiver, Shape_&& shape, F_&& f)
34✔
89
                  : receiver(HPX_FORWARD(Receiver_, receiver))
34✔
90
                  , shape(HPX_FORWARD(Shape_, shape))
34✔
91
                  , f(HPX_FORWARD(F_, f))
34✔
92
                {
93
                }
34✔
94

95
                template <typename Error>
96
                friend void tag_invoke(
2✔
97
                    set_error_t, bulk_receiver&& r, Error&& error) noexcept
98
                {
99
                    hpx::execution::experimental::set_error(
2✔
100
                        HPX_MOVE(r.receiver), HPX_FORWARD(Error, error));
2✔
101
                }
2✔
102

103
                friend void tag_invoke(
×
104
                    set_stopped_t, bulk_receiver&& r) noexcept
105
                {
106
                    hpx::execution::experimental::set_stopped(
×
107
                        HPX_MOVE(r.receiver));
×
108
                }
×
109

110
                template <typename... Ts>
111
                void set_value(Ts&&... ts)
32✔
112
                {
113
                    hpx::detail::try_catch_exception_ptr(
32✔
114
                        [&]() {
64✔
115
                            for (auto const& s : shape)
1,485✔
116
                            {
117
                                HPX_INVOKE(f, s, ts...);
1,453✔
118
                            }
119
                            hpx::execution::experimental::set_value(
29✔
120
                                HPX_MOVE(receiver), HPX_FORWARD(Ts, ts)...);
29✔
121
                        },
29✔
122
                        [&](std::exception_ptr ep) {
35✔
123
                            hpx::execution::experimental::set_error(
3✔
124
                                HPX_MOVE(receiver), HPX_MOVE(ep));
3✔
125
                        });
3✔
126
                }
32✔
127

128
                template <typename... Ts>
129
                friend auto tag_invoke(
32✔
130
                    set_value_t, bulk_receiver&& r, Ts&&... ts) noexcept
131
                    -> decltype(hpx::execution::experimental::set_value(
132
                                    std::declval<std::decay_t<Receiver>&&>(),
133
                                    HPX_FORWARD(Ts, ts)...),
134
                        void())
135
                {
136
                    // set_value is in a member function only because of a
137
                    // compiler bug in GCC 7. When the body of set_value is
138
                    // inlined here compilation fails with an internal compiler
139
                    // error.
140
                    r.set_value(HPX_FORWARD(Ts, ts)...);
32✔
141
                }
32✔
142
            };
143

144
            template <typename Receiver>
145
            friend auto tag_invoke(
34✔
146
                connect_t, bulk_sender&& s, Receiver&& receiver)
147
            {
148
                return hpx::execution::experimental::connect(HPX_MOVE(s.sender),
68✔
149
                    bulk_receiver<Receiver>(HPX_FORWARD(Receiver, receiver),
68✔
150
                        HPX_MOVE(s.shape), HPX_MOVE(s.f)));
34✔
151
            }
152

153
            template <typename Receiver>
154
            friend auto tag_invoke(
155
                connect_t, bulk_sender& s, Receiver&& receiver)
156
            {
157
                return hpx::execution::experimental::connect(s.sender,
158
                    bulk_receiver<Receiver>(
159
                        HPX_FORWARD(Receiver, receiver), s.shape, s.f));
160
            }
161
        };
162
    }    // namespace detail
163

164
    ///////////////////////////////////////////////////////////////////////////
165
    //
166
    // execution::bulk is used to run a task repeatedly for every index in an
167
    // index space.
168
    //
169
    // Returns a sender describing the task of invoking the provided function
170
    // with every index in the provided shape along with the values sent by the
171
    // input sender. The returned sender completes once all invocations have
172
    // completed, or an error has occurred. If it completes by sending values,
173
    // they are equivalent to those sent by the input sender.
174
    //
175
    // No instance of function will begin executing until the returned sender is
176
    // started. Each invocation of function runs in an execution agent whose
177
    // forward progress guarantees are determined by the scheduler on which they
178
    // are run. All agents created by a single use of bulk execute with the same
179
    // guarantee. This allows, for instance, a scheduler to execute all
180
    // invocations of the function in parallel.
181
    //
182
    // The bulk operation is intended to be used at the point where the number
183
    // of agents to be created is known and provided to bulk via its shape
184
    // parameter. For some parallel computations, the number of agents to be
185
    // created may be a function of the input data or dynamic conditions of the
186
    // execution environment. In such cases, bulk can be combined with
187
    // additional operations such as let_value to deliver dynamic shape
188
    // information to the bulk operation.
189
    //
190
    inline constexpr struct bulk_t final
191
      : hpx::functional::detail::tag_priority<bulk_t>
192
    {
193
    private:
194
        // clang-format off
195
        template <typename Sender, typename Shape, typename F,
196
            HPX_CONCEPT_REQUIRES_(
197
                is_sender_v<Sender> &&
198
                experimental::detail::is_completion_scheduler_tag_invocable_v<
199
                    hpx::execution::experimental::set_value_t, Sender,
200
                    bulk_t, Shape, F
201
                >
202
            )>
203
        // clang-format on
204
        friend constexpr HPX_FORCEINLINE auto tag_override_invoke(
305✔
205
            bulk_t, Sender&& sender, Shape const& shape, F&& f)
206
        {
207
            auto scheduler =
208
                hpx::execution::experimental::get_completion_scheduler<
305✔
209
                    hpx::execution::experimental::set_value_t>(sender);
305✔
210

211
            return hpx::functional::tag_invoke(bulk_t{}, HPX_MOVE(scheduler),
610✔
212
                HPX_FORWARD(Sender, sender), shape, HPX_FORWARD(F, f));
305✔
213
        }
214

215
        // clang-format off
216
        template <typename Sender, typename Shape, typename F,
217
            HPX_CONCEPT_REQUIRES_(
218
                is_sender_v<Sender> &&
219
                std::is_integral_v<Shape>
220
            )>
221
        // clang-format on
222
        friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
20✔
223
            bulk_t, Sender&& sender, Shape const& shape, F&& f)
224
        {
225
            return detail::bulk_sender<Sender, hpx::util::counting_shape<Shape>,
20✔
226
                F>{HPX_FORWARD(Sender, sender),
33✔
227
                hpx::util::counting_shape(shape), HPX_FORWARD(F, f)};
33✔
228
        }
229

230
        // clang-format off
231
        template <typename Sender, typename Shape, typename F,
232
            HPX_CONCEPT_REQUIRES_(
233
                is_sender_v<Sender> &&
234
                !std::is_integral_v<std::decay_t<Shape>>
235
            )>
236
        // clang-format on
237
        friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
×
238
            bulk_t, Sender&& sender, Shape&& shape, F&& f)
239
        {
240
            return detail::bulk_sender<Sender, Shape, F>{
×
241
                HPX_FORWARD(Sender, sender), HPX_FORWARD(Shape, shape),
×
242
                HPX_FORWARD(F, f)};
×
243
        }
244

245
        template <typename Shape, typename F>
246
        friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
18✔
247
            bulk_t, Shape&& shape, F&& f)
248
        {
249
            return detail::partial_algorithm<bulk_t, Shape, F>{
18✔
250
                HPX_FORWARD(Shape, shape), HPX_FORWARD(F, f)};
18✔
251
        }
252
    } bulk{};
253
}    // namespace hpx::execution::experimental
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc