• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.94
/libs/core/async_combinators/include/hpx/async_combinators/when_each.hpp
1
//  Copyright (c) 2007-2025 Hartmut Kaiser
2
//  Copyright (c) 2013 Agustin Berge
3
//  Copyright (c) 2016 Lukas Troska
4
//
5
//  SPDX-License-Identifier: BSL-1.0
6
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
7
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8

9
/// \file when_each.hpp
10
/// \page hpx::when_each
11
/// \headerfile hpx/future.hpp
12

13
#pragma once
14

15
#if defined(DOXYGEN)
16
namespace hpx {
17
    /// The function \a when_each is an operator allowing to join on the results
18
    /// of all given futures. It AND-composes all future objects given and
19
    /// returns a new future object representing the event of all those futures
20
    /// having finished executing. It also calls the supplied callback
21
    /// for each of the futures which becomes ready.
22
    ///
23
    /// \param f        The function which will be called for each of the
24
    ///                 input futures once the future has become ready.
25
    ///
26
    /// \param futures  A vector holding an arbitrary amount of \a future or
27
    ///                 \a shared_future objects for which \a wait_each should
28
    ///                 wait.
29
    ///
30
    /// \note This function consumes the futures as they are passed on to the
31
    ///       supplied function. The callback should take one or two parameters,
32
    ///       namely either a \a future to be processed or a type that
33
    ///       \a std::size_t is implicitly convertible to as the
34
    ///       first parameter and the \a future as the second
35
    ///       parameter. The first parameter will correspond to the
36
    ///       index of the current \a future in the collection.
37
    ///
38
    /// \return   Returns a future representing the event of all input futures
39
    ///           being ready.
40
    ///
41
    template <typename F, typename Future>
42
    future<void> when_each(F&& f, std::vector<Future>&& futures);
43

44
    /// The function \a when_each is an operator allowing to join on the results
45
    /// of all given futures. It AND-composes all future objects given and
46
    /// returns a new future object representing the event of all those futures
47
    /// having finished executing. It also calls the supplied callback
48
    /// for each of the futures which becomes ready.
49
    ///
50
    /// \param f        The function which will be called for each of the
51
    ///                 input futures once the future has become ready.
52
    /// \param begin    The iterator pointing to the first element of a
53
    ///                 sequence of \a future or \a shared_future objects for
54
    ///                 which \a wait_each should wait.
55
    /// \param end      The iterator pointing to the last element of a
56
    ///                 sequence of \a future or \a shared_future objects for
57
    ///                 which \a wait_each should wait.
58
    ///
59
    /// \note This function consumes the futures as they are passed on to the
60
    ///       supplied function. The callback should take one or two parameters,
61
    ///       namely either a \a future to be processed or a type that
62
    ///       \a std::size_t is implicitly convertible to as the
63
    ///       first parameter and the \a future as the second
64
    ///       parameter. The first parameter will correspond to the
65
    ///       index of the current \a future in the collection.
66
    ///
67
    /// \return   Returns a future representing the event of all input futures
68
    ///           being ready.
69
    ///
70
    template <typename F, typename Iterator>
71
    future<Iterator> when_each(F&& f, Iterator begin, Iterator end);
72

73
    /// The function \a when_each is an operator allowing to join on the results
74
    /// of all given futures. It AND-composes all future objects given and
75
    /// returns a new future object representing the event of all those futures
76
    /// having finished executing. It also calls the supplied callback
77
    /// for each of the futures which becomes ready.
78
    ///
79
    /// \param f        The function which will be called for each of the
80
    ///                 input futures once the future has become ready.
81
    /// \param futures  An arbitrary number of \a future or \a shared_future
82
    ///                 objects, possibly holding different types for which
83
    ///                 \a wait_each should wait.
84
    ///
85
    /// \note This function consumes the futures as they are passed on to the
86
    ///       supplied function. The callback should take one or two parameters,
87
    ///       namely either a \a future to be processed or a type that
88
    ///       \a std::size_t is implicitly convertible to as the
89
    ///       first parameter and the \a future as the second
90
    ///       parameter. The first parameter will correspond to the
91
    ///       index of the current \a future in the collection.
92
    ///
93
    /// \return   Returns a future representing the event of all input futures
94
    ///           being ready.
95
    ///
96
    template <typename F, typename... Ts>
97
    future<void> when_each(F&& f, Ts&&... futures);
98

99
    /// The function \a when_each is an operator allowing to join on the results
100
    /// of all given futures. It AND-composes all future objects given and
101
    /// returns a new future object representing the event of all those futures
102
    /// having finished executing. It also calls the supplied callback
103
    /// for each of the futures which becomes ready.
104
    ///
105
    /// \param f        The function which will be called for each of the
106
    ///                 input futures once the future has become ready.
107
    /// \param begin    The iterator pointing to the first element of a
108
    ///                 sequence of \a future or \a shared_future objects for
109
    ///                 which \a wait_each_n should wait.
110
    /// \param count    The number of elements in the sequence starting at
111
    ///                 \a first.
112
    ///
113
    /// \note This function consumes the futures as they are passed on to the
114
    ///       supplied function. The callback should take one or two parameters,
115
    ///       namely either a \a future to be processed or a type that
116
    ///       \a std::size_t is implicitly convertible to as the
117
    ///       first parameter and the \a future as the second
118
    ///       parameter. The first parameter will correspond to the
119
    ///       index of the current \a future in the collection.
120
    ///
121
    /// \return   Returns a future holding the iterator pointing to the first
122
    ///           element after the last one.
123
    ///
124
    template <typename F, typename Iterator>
125
    future<Iterator> when_each_n(F&& f, Iterator begin, std::size_t count);
126
}    // namespace hpx
127

128
#else    // DOXYGEN
129

130
#include <hpx/config.hpp>
131
#include <hpx/modules/async_base.hpp>
132
#include <hpx/modules/datastructures.hpp>
133
#include <hpx/modules/futures.hpp>
134
#include <hpx/modules/iterator_support.hpp>
135
#include <hpx/modules/memory.hpp>
136
#include <hpx/modules/type_support.hpp>
137

138
#include <algorithm>
139
#include <cstddef>
140
#include <iterator>
141
#include <type_traits>
142
#include <utility>
143
#include <vector>
144

145
///////////////////////////////////////////////////////////////////////////////
146
namespace hpx::lcos::detail {
147

148
    template <typename Tuple, typename F>
149
    struct when_each_frame    //-V690
150
      : lcos::detail::future_data<void>
151
    {
152
        using type = hpx::future<void>;
153

154
    private:
155
        when_each_frame(when_each_frame const&) = delete;
156
        when_each_frame(when_each_frame&&) = delete;
157

158
        when_each_frame& operator=(when_each_frame const&) = delete;
159
        when_each_frame& operator=(when_each_frame&&) = delete;
160

161
        template <std::size_t I>
162
        struct is_end
163
          : std::integral_constant<bool, hpx::tuple_size<Tuple>::value == I>
164
        {
165
        };
166

167
        template <std::size_t I>
168
        static constexpr bool is_end_v = is_end<I>::value;
169

170
    public:
171
        template <typename Tuple_, typename F_>
172
        when_each_frame(Tuple_&& t, F_&& f, std::size_t needed_count)
173
          : t_(HPX_FORWARD(Tuple_, t))
174
          , f_(HPX_FORWARD(F_, f))
175
          , count_(0)
176
          , needed_count_(needed_count)
177
        {
2✔
178
        }
179

2✔
180
    public:
2✔
181
        template <std::size_t I>
2✔
182
        HPX_FORCEINLINE void do_await()
183
        {
184
            if constexpr (is_end_v<I>)
185
            {
186
                this->set_data(util::unused);
187
            }
188
            else
189
            {
190
                using future_type = hpx::util::decay_unwrap_t<
191
                    typename hpx::tuple_element<I, Tuple>::type>;
×
192

193
                if constexpr (hpx::traits::is_future_v<future_type> ||
194
                    hpx::traits::is_ref_wrapped_future_v<future_type>)
195
                {
196
                    await_future<I>();
197
                }
198
                else
199
                {
200
                    static_assert(hpx::traits::is_future_range_v<future_type> ||
201
                            hpx::traits::is_ref_wrapped_future_range_v<
202
                                future_type>,
203
                        "element must be future or range of futures");
204

205
                    auto&& curr = hpx::util::unwrap_ref(hpx::get<I>(t_));
206
                    await_range<I>(
207
                        hpx::util::begin(curr), hpx::util::end(curr));
208
                }
209
            }
210
        }
211

2✔
212
    protected:
2✔
213
        // Current element is a range (vector) of futures
214
        template <std::size_t I, typename Iter>
215
        void await_range(Iter&& next, Iter&& end)
×
216
        {
217
            using future_type = typename std::iterator_traits<Iter>::value_type;
218

219
            hpx::intrusive_ptr<when_each_frame> this_(this);
220
            for (/**/; next != end; ++next)
5✔
221
            {
222
                auto next_future_data = traits::detail::get_shared_state(*next);
223

224
                if (next_future_data &&
225
                    !next_future_data->is_ready(std::memory_order_relaxed))
107✔
226
                {
227
                    next_future_data->execute_deferred();
228

229
                    // execute_deferred might have made the future ready
107✔
230
                    if (!next_future_data->is_ready(std::memory_order_relaxed))
231
                    {
232
                        // Attach a continuation to this future which will
3✔
233
                        // re-evaluate it and continue to the next argument
234
                        // (if any).
235
                        Iter next_ = HPX_FORWARD(Iter, next);
3✔
236
                        Iter end_ = HPX_FORWARD(Iter, end);
237
                        next_future_data->set_on_completed(
238
                            [this_ = HPX_MOVE(this_), next_,
239
                                end_]() mutable -> void {
240
                                this_->template await_range<I>(
3✔
241
                                    HPX_MOVE(next_), HPX_MOVE(end_));
9✔
242
                            });
243

3✔
244
                        // explicitly destruct iterators as those might
3✔
245
                        // become dangling after we make ourselves ready
246
                        next = std::decay_t<Iter>{};
247
                        end = std::decay_t<Iter>{};
248
                        return;
249
                    }
3✔
250
                }
3✔
251

3✔
252
                // call supplied callback with or without index
253
                if constexpr (hpx::is_invocable_v<F, std::size_t, future_type>)
254
                {
255
                    f_(count_, HPX_MOVE(*next));
256
                }
257
                else
258
                {
100✔
259
                    f_(HPX_MOVE(*next));
260
                }
261

262
                if (++count_ == needed_count_)
263
                {
264
                    this->set_data(util::unused);
265

104✔
266
                    // explicitly destruct iterators as those might
267
                    // become dangling after we make ourselves ready
2✔
268
                    next = std::decay_t<Iter>{};
269
                    end = std::decay_t<Iter>{};
270
                    return;
271
                }
2✔
272
            }
2✔
273

2✔
274
            do_await<I + 1>();
275
        }
276

277
        // Current element is a simple future
278
        template <std::size_t I>
279
        HPX_FORCEINLINE void await_future()
280
        {
281
            using future_type = hpx::util::decay_unwrap_t<
282
                typename hpx::tuple_element<I, Tuple>::type>;
283

284
            hpx::intrusive_ptr<when_each_frame> this_(this);
285

286
            future_type& fut = hpx::get<I>(t_);
287
            auto next_future_data = traits::detail::get_shared_state(fut);
288
            if (next_future_data &&
289
                !next_future_data->is_ready(std::memory_order_relaxed))
290
            {
291
                next_future_data->execute_deferred();
292

293
                // execute_deferred might have made the future ready
294
                if (!next_future_data->is_ready(std::memory_order_relaxed))
295
                {
296
                    // Attach a continuation to this future which will
297
                    // re-evaluate it and continue to the next argument
298
                    // (if any).
299
                    next_future_data->set_on_completed(
300
                        [this_ = HPX_MOVE(this_)]() -> void {
301
                            this_->template await_future<I>();
302
                        });
303

304
                    return;
305
                }
306
            }
307

308
            // call supplied callback with or without index
309
            if constexpr (hpx::is_invocable_v<F, std::size_t, future_type>)
310
            {
311
                f_(count_, HPX_MOVE(fut));
312
            }
313
            else
314
            {
315
                f_(HPX_MOVE(fut));
316
            }
317

318
            if (++count_ == needed_count_)
319
            {
320
                this->set_data(util::unused);
321
                return;
322
            }
323

324
            do_await<I + 1>();
325
        }
326

327
    private:
328
        Tuple t_;
329
        F f_;
330
        std::size_t count_;
331
        std::size_t needed_count_;
332
    };
333
}    // namespace hpx::lcos::detail
334

335
namespace hpx {
336

337
    ///////////////////////////////////////////////////////////////////////////
338
    HPX_CXX_EXPORT inline constexpr struct when_each_t final
339
      : hpx::functional::tag<when_each_t>
340
    {
341
    private:
342
        template <typename F, typename Future,
343
            typename Enable =
344
                std::enable_if_t<hpx::traits::is_future_v<Future>>>
345
        friend decltype(auto) tag_invoke(
346
            when_each_t, F&& func, std::vector<Future>& lazy_values)
347
        {
348
            using argument_type = hpx::tuple<std::vector<Future>>;
2✔
349
            using frame_type =
350
                lcos::detail::when_each_frame<argument_type, std::decay_t<F>>;
351

352
            std::vector<Future> values;
353
            values.reserve(lazy_values.size());
354

355
            std::transform(lazy_values.begin(), lazy_values.end(),
2✔
356
                std::back_inserter(values), traits::acquire_future_disp());
2✔
357

358
            auto const tuple_size = values.size();
2✔
359
            hpx::intrusive_ptr<frame_type> p(
360
                new frame_type(hpx::forward_as_tuple(HPX_MOVE(values)),
361
                    HPX_FORWARD(F, func), tuple_size));
362

2✔
363
            p->template do_await<0>();
364

365
            return hpx::traits::future_access<
366
                typename frame_type::type>::create(HPX_MOVE(p));
367
        }
368

2✔
369
        template <typename F, typename Future>
2✔
370
        friend decltype(auto) tag_invoke(
371
            when_each_t, F&& f, std::vector<Future>&& values)
372
        {
373
            return tag_invoke(when_each_t{}, HPX_FORWARD(F, f), values);
374
        }
375

376
        template <typename F, typename Iterator,
377
            typename Enable =
378
                std::enable_if_t<hpx::traits::is_iterator_v<Iterator>>>
379
        friend decltype(auto) tag_invoke(
380
            when_each_t, F&& f, Iterator begin, Iterator end)
381
        {
382
            using future_type =
383
                lcos::detail::future_iterator_traits_t<Iterator>;
384

385
            std::vector<future_type> values;
386
            traits::detail::reserve_if_random_access_by_range(
387
                values, begin, end);
388

389
            std::transform(begin, end, std::back_inserter(values),
390
                traits::acquire_future_disp());
391

392
            return tag_invoke(when_each_t{}, HPX_FORWARD(F, f), values)
393
                .then(hpx::launch::sync,
394
                    [end = HPX_MOVE(end)](hpx::future<void> fut) -> Iterator {
395
                        fut.get();    // rethrow exceptions, if any
396
                        return end;
397
                    });
398
        }
399

400
        template <typename F>
401
        friend decltype(auto) tag_invoke(when_each_t, F&&)
402
        {
403
            return hpx::make_ready_future();
404
        }
405

406
        template <typename F, typename... Ts,
407
            typename Enable =
408
                std::enable_if_t<!hpx::traits::is_future_v<std::decay_t<F>> &&
409
                    hpx::util::all_of_v<hpx::traits::is_future<Ts>...>>>
410
        friend decltype(auto) tag_invoke(when_each_t, F&& f, Ts&&... ts)
411
        {
412
            using argument_type = hpx::tuple<traits::acquire_future_t<Ts>...>;
413
            using frame_type =
414
                lcos::detail::when_each_frame<argument_type, std::decay_t<F>>;
415

416
            traits::acquire_future_disp func;
417
            argument_type values(func(HPX_FORWARD(Ts, ts))...);
418

419
            hpx::intrusive_ptr<frame_type> p(new frame_type(
420
                HPX_MOVE(values), HPX_FORWARD(F, f), sizeof...(Ts)));
421

422
            p->template do_await<0>();
423

424
            return hpx::traits::future_access<
425
                typename frame_type::type>::create(HPX_MOVE(p));
426
        }
427
    } when_each{};
428

429
    ///////////////////////////////////////////////////////////////////////////
430
    HPX_CXX_EXPORT inline constexpr struct when_each_n_t final
431
      : hpx::functional::tag<when_each_n_t>
432
    {
433
    private:
434
        template <typename F, typename Iterator,
435
            typename Enable =
436
                std::enable_if_t<hpx::traits::is_iterator_v<Iterator>>>
437
        friend decltype(auto) tag_invoke(
438
            when_each_n_t, F&& f, Iterator begin, std::size_t count)
439
        {
440
            using future_type =
441
                lcos::detail::future_iterator_traits_t<Iterator>;
442

443
            std::vector<future_type> values;
444
            values.reserve(count);
445

446
            traits::acquire_future_disp func;
447
            while (count-- != 0)
448
            {
449
                values.push_back(func(*begin++));
450
            }
451

452
            return hpx::when_each(HPX_FORWARD(F, f), values)
453
                .then(hpx::launch::sync,
454
                    [begin = HPX_MOVE(begin)](auto&& fut) -> Iterator {
455
                        fut.get();    // rethrow exceptions, if any
456
                        return begin;
457
                    });
458
        }
459
    } when_each_n{};
460
}    // namespace hpx
461

462
#endif    // DOXYGEN
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc