• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #852

17 Dec 2022 04:43PM UTC coverage: 85.912% (-0.7%) from 86.568%
#852

push

StellarBot
Merge #6106

6106: Modernizing modules of levels 0 to 5 r=hkaiser a=hkaiser

- flyby: HPX_FORWARD/HPX_MOVE now expand to std::forward and std::move if those are implemented as builtin functions

working towards #5497

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

87 of 87 new or added lines in 24 files covered. (100.0%)

173152 of 201546 relevant lines covered (85.91%)

1910264.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.94
/libs/core/execution/include/hpx/execution/algorithms/as_sender.hpp
1
//  Copyright (c) 2021 ETH Zurich
2
//  Copyright (c) 2022 Hartmut Kaiser
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
#pragma once
9

10
#include <hpx/assert.hpp>
11
#include <hpx/concepts/concepts.hpp>
12
#include <hpx/errors/try_catch_exception_ptr.hpp>
13
#include <hpx/execution/algorithms/detail/partial_algorithm.hpp>
14
#include <hpx/execution_base/completion_signatures.hpp>
15
#include <hpx/execution_base/operation_state.hpp>
16
#include <hpx/execution_base/receiver.hpp>
17
#include <hpx/execution_base/sender.hpp>
18
#include <hpx/functional/tag_invoke.hpp>
19
#include <hpx/futures/detail/future_data.hpp>
20
#include <hpx/futures/future.hpp>
21
#include <hpx/futures/traits/acquire_shared_state.hpp>
22

23
#include <exception>
24
#include <type_traits>
25
#include <utility>
26

27
namespace hpx::execution::experimental {
28

29
    namespace detail {
30

31
        ///////////////////////////////////////////////////////////////////////////
32
        // Operation state for sender compatibility
33
        template <typename Receiver, typename Future>
34
        class as_sender_operation_state
26✔
35
        {
36
        private:
37
            using receiver_type = std::decay_t<Receiver>;
38
            using future_type = std::decay_t<Future>;
39
            using result_type = typename future_type::result_type;
40

41
        public:
42
            template <typename Receiver_>
43
            as_sender_operation_state(Receiver_&& r, future_type f)
26✔
44
              : receiver_(HPX_FORWARD(Receiver_, r))
26✔
45
              , future_(HPX_MOVE(f))
26✔
46
            {
47
            }
26✔
48

49
            as_sender_operation_state(as_sender_operation_state&&) = delete;
50
            as_sender_operation_state& operator=(
51
                as_sender_operation_state&&) = delete;
52
            as_sender_operation_state(
53
                as_sender_operation_state const&) = delete;
54
            as_sender_operation_state& operator=(
55
                as_sender_operation_state const&) = delete;
56

57
            friend void tag_invoke(hpx::execution::experimental::start_t,
26✔
58
                as_sender_operation_state& os) noexcept
59
            {
60
                os.start_helper();
26✔
61
            }
26✔
62

63
        private:
64
            void start_helper() & noexcept
26✔
65
            {
66
                hpx::detail::try_catch_exception_ptr(
26✔
67
                    [&]() {
52✔
68
                        auto state = traits::detail::get_shared_state(future_);
26✔
69

70
                        if (!state)
26✔
71
                        {
72
                            HPX_THROW_EXCEPTION(no_state,
4✔
73
                                "as_sender_operation_state::start",
74
                                "the future has no valid shared state");
75
                        }
76

77
                        auto on_completed = [this]() mutable {
44✔
78
                            if (future_.has_value())
22✔
79
                            {
80
                                if constexpr (std::is_void_v<result_type>)
81
                                {
82
                                    hpx::execution::experimental::set_value(
6✔
83
                                        HPX_MOVE(receiver_));
6✔
84
                                }
85
                                else
86
                                {
87
                                    hpx::execution::experimental::set_value(
15✔
88
                                        HPX_MOVE(receiver_), future_.get());
15✔
89
                                }
90
                            }
21✔
91
                            else if (future_.has_exception())
1✔
92
                            {
93
                                hpx::execution::experimental::set_error(
1✔
94
                                    HPX_MOVE(receiver_),
1✔
95
                                    future_.get_exception_ptr());
1✔
96
                            }
1✔
97
                        };
22✔
98

99
                        if (!state->is_ready(std::memory_order_relaxed))
22✔
100
                        {
101
                            state->execute_deferred();
12✔
102

103
                            // execute_deferred might have made the future ready
104
                            if (!state->is_ready(std::memory_order_relaxed))
12✔
105
                            {
106
                                // The operation state has to be kept alive until
107
                                // set_value is called, which means that we don't
108
                                // need to move receiver and future into the
109
                                // on_completed callback.
110
                                state->set_on_completed(HPX_MOVE(on_completed));
12✔
111
                            }
12✔
112
                            else
113
                            {
114
                                on_completed();
×
115
                            }
116
                        }
12✔
117
                        else
118
                        {
119
                            on_completed();
10✔
120
                        }
121
                    },
30✔
122
                    [&](std::exception_ptr ep) {
30✔
123
                        hpx::execution::experimental::set_error(
4✔
124
                            HPX_MOVE(receiver_), HPX_MOVE(ep));
4✔
125
                    });
4✔
126
            }
26✔
127

128
            HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver_;
129
            future_type future_;
130
        };
131

132
        template <typename Future>
133
        struct as_sender_sender_base
36✔
134
        {
135
            using result_type = typename std::decay_t<Future>::result_type;
136

137
            std::decay_t<Future> future_;
138

139
            // Sender compatibility
140
            template <typename, typename T>
141
            struct completion_signatures_base
142
            {
143
                template <template <typename...> typename Tuple,
144
                    template <typename...> typename Variant>
145
                using value_types = Variant<Tuple<result_type>>;
146
            };
147

148
            template <typename T>
149
            struct completion_signatures_base<T, void>
150
            {
151
                template <template <typename...> typename Tuple,
152
                    template <typename...> typename Variant>
153
                using value_types = Variant<Tuple<>>;
154
            };
155

156
            struct completion_signatures
157
              : completion_signatures_base<void, result_type>
158
            {
159
                template <template <typename...> typename Variant>
160
                using error_types = Variant<std::exception_ptr>;
161

162
                static constexpr bool sends_stopped = false;
163
            };
164
        };
165

166
        template <typename Future>
167
        struct as_sender_sender;
168

169
        template <typename T>
170
        struct as_sender_sender<hpx::future<T>>
20✔
171
          : public as_sender_sender_base<hpx::future<T>>
172
        {
173
            using future_type = hpx::future<T>;
174
            using base_type = as_sender_sender_base<hpx::future<T>>;
175
            using base_type::future_;
176

177
            template <typename Future,
178
                typename = std::enable_if_t<!std::is_same<std::decay_t<Future>,
179
                    as_sender_sender>::value>>
180
            explicit as_sender_sender(Future&& future)
14✔
181
              : base_type{HPX_FORWARD(Future, future)}
14✔
182
            {
14✔
183
            }
14✔
184

185
            as_sender_sender(as_sender_sender&&) = default;
6✔
186
            as_sender_sender& operator=(as_sender_sender&&) = default;
187
            as_sender_sender(as_sender_sender const&) = delete;
188
            as_sender_sender& operator=(as_sender_sender const&) = delete;
189

190
            template <typename Receiver>
191
            friend as_sender_operation_state<Receiver, future_type> tag_invoke(
14✔
192
                connect_t, as_sender_sender&& s, Receiver&& receiver)
193
            {
194
                return {HPX_FORWARD(Receiver, receiver), HPX_MOVE(s.future_)};
14✔
195
            }
×
196
        };
197

198
        template <typename T>
199
        struct as_sender_sender<hpx::shared_future<T>>
8✔
200
          : as_sender_sender_base<hpx::shared_future<T>>
201
        {
202
            using future_type = hpx::shared_future<T>;
203
            using base_type = as_sender_sender_base<hpx::shared_future<T>>;
204
            using base_type::future_;
205

206
            template <typename Future,
207
                typename = std::enable_if_t<!std::is_same<std::decay_t<Future>,
208
                    as_sender_sender>::value>>
209
            explicit as_sender_sender(Future&& future)
6✔
210
              : base_type{HPX_FORWARD(Future, future)}
6✔
211
            {
6✔
212
            }
6✔
213

214
            as_sender_sender(as_sender_sender&&) = default;
2✔
215
            as_sender_sender& operator=(as_sender_sender&&) = default;
216
            as_sender_sender(as_sender_sender const&) = default;
217
            as_sender_sender& operator=(as_sender_sender const&) = default;
218

219
            template <typename Receiver>
220
            friend as_sender_operation_state<Receiver, future_type> tag_invoke(
6✔
221
                connect_t, as_sender_sender&& s, Receiver&& receiver)
222
            {
223
                return {HPX_FORWARD(Receiver, receiver), HPX_MOVE(s.future_)};
6✔
224
            }
×
225

226
            template <typename Receiver>
227
            friend as_sender_operation_state<Receiver, future_type> tag_invoke(
6✔
228
                connect_t, as_sender_sender& s, Receiver&& receiver)
229
            {
230
                return {HPX_FORWARD(Receiver, receiver), s.future_};
6✔
231
            }
×
232
        };
233
    }    // namespace detail
234

235
    // The as_sender CPO can be used to adapt any HPX future as a sender. The
236
    // value provided by the future will be used to call set_value on the
237
    // connected receiver once the future has become ready. If the future is
238
    // exceptional, set_error will be invoked on the connected receiver.
239
    //
240
    // The difference to keep_future is that as_future propagates the value
241
    // stored in the future while keep_future will propagate the future instance
242
    // itself.
243
    inline constexpr struct as_sender_t final
244
    {
245
        // clang-format off
246
        template <typename Future,
247
            HPX_CONCEPT_REQUIRES_(
248
                hpx::traits::is_future_v<std::decay_t<Future>>
249
            )>
250
        // clang-format on
251
        constexpr HPX_FORCEINLINE auto operator()(Future&& future) const
20✔
252
        {
253
            return detail::as_sender_sender<std::decay_t<Future>>(
20✔
254
                HPX_FORWARD(Future, future));
20✔
255
        }
256

257
        constexpr HPX_FORCEINLINE auto operator()() const
258
        {
259
            return detail::partial_algorithm<as_sender_t>{};
260
        }
261
    } as_sender{};
262
}    // namespace hpx::execution::experimental
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc