• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/libs/core/async_combinators/include/hpx/async_combinators/when_some.hpp
1
//  Copyright (c) 2007-2022 Hartmut Kaiser
2
//  Copyright (c) 2013 Agustin Berge
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
/// \file when_some.hpp
9
/// \page hpx::when_some
10
/// \headerfile hpx/future.hpp
11

12
#pragma once
13

14
#if defined(DOXYGEN)
15
namespace hpx {
16

17
    ///////////////////////////////////////////////////////////////////////////
18
    /// Result type for \a when_some, contains a sequence of futures and
19
    /// indices pointing to ready futures.
20
    template <typename Sequence>
21
    struct when_some_result
22
    {
23
        /// List of indices of futures that have become ready
24
        std::vector<std::size_t> indices;
25

26
        /// The sequence of futures as passed to \a hpx::when_some
27
        Sequence futures;
28
    };
29

30
    /// The function \a when_some is an operator allowing to join on the result
31
    /// of all given futures. It AND-composes all future objects given and
32
    /// returns a new future object representing the same list of futures
33
    /// after n of them finished executing.
34
    ///
35
    /// \param n        [in] The number of futures out of the arguments which
36
    ///                 have to become ready in order for the returned future
37
    ///                 to get ready.
38
    /// \param first    [in] The iterator pointing to the first element of a
39
    ///                 sequence of \a future or \a shared_future objects for
40
    ///                 which \a when_all should wait.
41
    /// \param last     [in] The iterator pointing to the last element of a
42
    ///                 sequence of \a future or \a shared_future objects for
43
    ///                 which \a when_all should wait.
44
    ///
45
    /// \note The future returned by the function \a when_some becomes ready
46
    ///       when at least \a n argument futures have become ready.
47
    ///
48
    /// \return   Returns a when_some_result holding the same list of futures
49
    ///           as has been passed to when_some and indices pointing to
50
    ///           ready futures.
51
    ///           - future<when_some_result<Container<future<R>>>>: If the input
52
    ///             cardinality is unknown at compile time and the futures
53
    ///             are all of the same type. The order of the futures in the
54
    ///             output container will be the same as given by the input
55
    ///             iterator.
56
    ///
57
    /// \note Calling this version of \a when_some where first == last, returns
58
    ///       a future with an empty container that is immediately ready.
59
    ///       Each future and shared_future is waited upon and then copied into
60
    ///       the collection of the output (returned) future, maintaining the
61
    ///       order of the futures in the input collection.
62
    ///       The future returned by \a when_some will not throw an exception,
63
    ///       but the futures held in the output collection may.
64
    ///
65
    template <typename InputIter,
66
        typename Container = vector<
67
            future<typename std::iterator_traits<InputIter>::value_type>>>
68
    future<when_some_result<Container>> when_some(
69
        std::size_t n, Iterator first, Iterator last);
70

71
    /// The function \a when_some is an operator allowing to join on the result
72
    /// of all given futures. It AND-composes all future objects given and
73
    /// returns a new future object representing the same list of futures
74
    /// after n of them finished executing.
75
    ///
76
    /// \param n        [in] The number of futures out of the arguments which
77
    ///                 have to become ready in order for the returned future
78
    ///                 to get ready.
79
    /// \param futures  [in] A container holding an arbitrary amount of \a future
80
    ///                 or \a shared_future objects for which \a when_some
81
    ///                 should wait.
82
    ///
83
    /// \note The future returned by the function \a when_some becomes ready
84
    ///       when at least \a n argument futures have become ready.
85
    ///
86
    /// \return   Returns a when_some_result holding the same list of futures
87
    ///           as has been passed to when_some and indices pointing to
88
    ///           ready futures.
89
    ///           - future<when_some_result<Container<future<R>>>>: If the input
90
    ///             cardinality is unknown at compile time and the futures
91
    ///             are all of the same type. The order of the futures in the
92
    ///             output container will be the same as given by the input
93
    ///             iterator.
94
    ///
95
    /// \note Each future and shared_future is waited upon and then copied into
96
    ///       the collection of the output (returned) future, maintaining the
97
    ///       order of the futures in the input collection.
98
    ///       The future returned by \a when_some will not throw an exception,
99
    ///       but the futures held in the output collection may.
100
    ///
101
    template <typename Range>
102
    future<when_some_result<Range>> when_some(std::size_t n, Range&& futures);
103

104
    /// The function \a when_some is an operator allowing to join on the result
105
    /// of all given futures. It AND-composes all future objects given and
106
    /// returns a new future object representing the same list of futures
107
    /// after n of them finished executing.
108
    ///
109
    /// \param n        [in] The number of futures out of the arguments which
110
    ///                 have to become ready in order for the returned future
111
    ///                 to get ready.
112
    /// \param futures  [in] An arbitrary number of \a future or \a shared_future
113
    ///                 objects, possibly holding different types for which
114
    ///                 \a when_some should wait.
115
    ///
116
    /// \note The future returned by the function \a when_some becomes ready
117
    ///       when at least \a n argument futures have become ready.
118
    ///
119
    /// \return   Returns a when_some_result holding the same list of futures
120
    ///           as has been passed to when_some and an index pointing to a
121
    ///           ready future..
122
    ///           - future<when_some_result<tuple<future<T0>, future<T1>...>>>:
123
    ///             If inputs are fixed in number and are of heterogeneous
124
    ///             types. The inputs can be any arbitrary number of future
125
    ///             objects.
126
    ///           - future<when_some_result<tuple<>>> if \a when_some is
127
    ///             called with zero arguments.
128
    ///             The returned future will be initially ready.
129
    ///
130
    /// \note Each future and shared_future is waited upon and then copied into
131
    ///       the collection of the output (returned) future, maintaining the
132
    ///       order of the futures in the input collection.
133
    ///       The future returned by \a when_some will not throw an exception,
134
    ///       but the futures held in the output collection may.
135
    ///
136
    template <typename... Ts>
137
    future<when_some_result<tuple<future<T>...>>> when_some(
138
        std::size_t n, Ts&&... futures);
139

140
    /// The function \a when_some_n is an operator allowing to join on the result
141
    /// of all given futures. It AND-composes all future objects given and
142
    /// returns a new future object representing the same list of futures
143
    /// after n of them finished executing.
144
    ///
145
    /// \param n        [in] The number of futures out of the arguments which
146
    ///                 have to become ready in order for the returned future
147
    ///                 to get ready.
148
    /// \param first    [in] The iterator pointing to the first element of a
149
    ///                 sequence of \a future or \a shared_future objects for
150
    ///                 which \a when_all should wait.
151
    /// \param count    [in] The number of elements in the sequence starting at
152
    ///                 \a first.
153
    ///
154
    /// \note The future returned by the function \a when_some_n becomes ready
155
    ///       when at least \a n argument futures have become ready.
156
    ///
157
    /// \return   Returns a when_some_result holding the same list of futures
158
    ///           as has been passed to when_some and indices pointing to
159
    ///           ready futures.
160
    ///           - future<when_some_result<Container<future<R>>>>: If the input
161
    ///             cardinality is unknown at compile time and the futures
162
    ///             are all of the same type. The order of the futures in the
163
    ///             output container will be the same as given by the input
164
    ///             iterator.
165
    ///
166
    /// \note Calling this version of \a when_some_n where count == 0, returns
167
    ///       a future with the same elements as the arguments that is
168
    ///       immediately ready. Possibly none of the futures in that container
169
    ///       are ready.
170
    ///       Each future and shared_future is waited upon and then copied into
171
    ///       the collection of the output (returned) future, maintaining the
172
    ///       order of the futures in the input collection.
173
    ///       The future returned by \a when_some_n will not throw an exception,
174
    ///       but the futures held in the output collection may.
175
    ///
176
    template <typename InputIter,
177
        typename Container = vector<
178
            future<typename std::iterator_traits<InputIter>::value_type>>>
179
    future<when_some_result<Container>> when_some_n(
180
        std::size_t n, Iterator first, std::size_t count);
181
}    // namespace hpx
182

183
#else    // DOXYGEN
184

185
#include <hpx/config.hpp>
186
#include <hpx/assert.hpp>
187
#include <hpx/modules/datastructures.hpp>
188
#include <hpx/modules/errors.hpp>
189
#include <hpx/modules/functional.hpp>
190
#include <hpx/modules/futures.hpp>
191
#include <hpx/modules/tag_invoke.hpp>
192
#include <hpx/modules/type_support.hpp>
193
#include <hpx/modules/util.hpp>
194

195
#include <algorithm>
196
#include <atomic>
197
#include <cstddef>
198
#include <iterator>
199
#include <memory>
200
#include <mutex>
201
#include <type_traits>
202
#include <utility>
203
#include <vector>
204

205
///////////////////////////////////////////////////////////////////////////////
206
namespace hpx {
207

208
    HPX_CXX_EXPORT template <typename Sequence>
209
    struct when_some_result
210
    {
211
        when_some_result() = default;
212

213
        explicit when_some_result(Sequence&& futures) noexcept
214
          : indices()
×
215
          , futures(HPX_MOVE(futures))
216
        {
217
        }
218

219
        std::vector<std::size_t> indices;
220
        Sequence futures;
221
    };
222
}    // namespace hpx
223

224
namespace hpx::lcos::detail {
225

226
    ///////////////////////////////////////////////////////////////////////
227
    template <typename Sequence>
228
    struct when_some;
229

230
    template <typename Sequence>
231
    struct set_when_some_callback_impl
232
    {
233
        explicit set_when_some_callback_impl(when_some<Sequence>& when) noexcept
234
          : when_(when)
235
          , idx_(0)
236
        {
237
        }
238

239
        template <typename Future>
240
        std::enable_if_t<traits::is_future_v<Future>> operator()(
241
            Future& future) const
242
        {
243
            std::size_t counter = when_.count_.load(std::memory_order_seq_cst);
244
            if (counter < when_.needed_count_)
245
            {
246
                // handle future only if not enough futures are ready
247
                // yet also, do not touch any futures which are already
248
                // ready
249

250
                auto shared_state = traits::detail::get_shared_state(future);
251

252
                if (shared_state &&
253
                    !shared_state->is_ready(std::memory_order_relaxed))
254
                {
255
                    shared_state->execute_deferred();
256

257
                    // execute_deferred might have made the future ready
258
                    if (!shared_state->is_ready(std::memory_order_relaxed))
259
                    {
260
                        shared_state->set_on_completed(util::deferred_call(
261
                            &detail::when_some<Sequence>::on_future_ready,
262
                            when_.shared_from_this(), idx_,
263
                            hpx::execution_base::this_thread::agent()));
264
                        ++idx_;
265

266
                        return;
267
                    }
268
                }
269

270
                {
271
                    using mutex_type =
272
                        typename detail::when_some<Sequence>::mutex_type;
273
                    std::lock_guard<mutex_type> l(when_.mtx_);
274
                    when_.values_.indices.push_back(idx_);
275
                }
276

277
                if (when_.count_.fetch_add(1) + 1 == when_.needed_count_)
278
                {
279
                    when_.goal_reached_on_calling_thread_.store(
280
                        true, std::memory_order_release);
281
                }
282
            }
283
            ++idx_;
284
        }
285

286
        template <typename Sequence_>
287
        HPX_FORCEINLINE std::enable_if_t<traits::is_future_range_v<Sequence_>>
288
        operator()(Sequence_& sequence) const
289
        {
290
            apply(sequence);
291
        }
292

293
        template <typename Tuple, std::size_t... Is>
294
        HPX_FORCEINLINE void apply(Tuple& tuple, util::index_pack<Is...>) const
295
        {
296
            ((*this)(hpx::get<Is>(tuple)), ...);
297
        }
298

299
        template <typename... Ts>
300
        HPX_FORCEINLINE void apply(hpx::tuple<Ts...>& sequence) const
301
        {
302
            apply(sequence, util::make_index_pack_t<sizeof...(Ts)>());
303
        }
304

305
        template <typename Sequence_>
306
        HPX_FORCEINLINE void apply(Sequence_& sequence) const
307
        {
308
            std::for_each(sequence.begin(), sequence.end(), *this);
309
        }
310

311
        detail::when_some<Sequence>& when_;
312
        mutable std::size_t idx_;
313
    };
314

315
    template <typename Sequence>
316
    HPX_FORCEINLINE void set_on_completed_callback(
317
        detail::when_some<Sequence>& when)
318
    {
319
        set_when_some_callback_impl<Sequence> callback(when);
320
        callback.apply(when.values_.futures);
321
    }
322

323
    template <typename Sequence>
324
    struct when_some
325
      : std::enable_shared_from_this<when_some<Sequence>>    //-V690
326
    {
327
        using mutex_type = hpx::spinlock;
328

329
    public:
330
        void on_future_ready(
331
            std::size_t idx, hpx::execution_base::agent_ref ctx)
332
        {
333
            std::size_t const new_count = count_.fetch_add(1) + 1;
334
            if (new_count <= needed_count_)
335
            {
336
                {
337
                    std::lock_guard<mutex_type> l(this->mtx_);
338
                    values_.indices.push_back(idx);
339
                }
340

341
                if (new_count == needed_count_)
342
                {
343
                    if (ctx != hpx::execution_base::this_thread::agent())
344
                    {
345
                        ctx.resume();
346
                    }
347
                    else
348
                    {
349
                        goal_reached_on_calling_thread_.store(
350
                            true, std::memory_order_release);
351
                    }
352
                }
353
            }
354
        }
355

356
    private:
357
        when_some(when_some const&) = delete;
358
        when_some(when_some&&) = delete;
359

360
        when_some& operator=(when_some const&) = delete;
361
        when_some& operator=(when_some&&) = delete;
362

363
    public:
364
        using argument_type = Sequence;
365

366
        when_some(argument_type&& values, std::size_t n) noexcept
367
          : values_(HPX_MOVE(values))
368
          , count_(0)
369
          , needed_count_(n)
370
          , goal_reached_on_calling_thread_(false)
371
        {
372
        }
373

374
        when_some_result<Sequence> operator()()
375
        {
376
            // set callback functions to executed when future is ready
377
            set_on_completed_callback(*this);
378

379
            // if all the requested futures are already set, our
380
            // callback above has already been called often enough, otherwise
381
            // we suspend ourselves
382
            if (!goal_reached_on_calling_thread_.load(
383
                    std::memory_order_acquire))
384
            {
385
                // wait for any of the futures to return to become ready
386
                hpx::execution_base::this_thread::suspend(
387
                    "hpx::lcos::detail::when_some::operator()");
388
            }
389

390
            // at least N futures should be ready
391
            HPX_ASSERT(count_.load(std::memory_order_acquire) >= needed_count_);
392

393
            return HPX_MOVE(values_);
394
        }
395

396
        mutable mutex_type mtx_;
397
        when_some_result<Sequence> values_;
398
        std::atomic<std::size_t> count_;
399
        std::size_t needed_count_;
400
        std::atomic<bool> goal_reached_on_calling_thread_;
401
    };
402
}    // namespace hpx::lcos::detail
403

404
namespace hpx {
405

406
    ///////////////////////////////////////////////////////////////////////////
407
    HPX_CXX_EXPORT inline constexpr struct when_some_t final
408
      : hpx::functional::tag<when_some_t>
409
    {
410
    private:
411
        template <typename Range,
412
            typename Enable =
413
                std::enable_if_t<traits::is_future_range_v<Range>>>
414
        friend auto tag_invoke(when_some_t, std::size_t n, Range&& lazy_values)
415
        {
416
            using result_type = std::decay_t<Range>;
417

418
            if (n == 0)
419
            {
420
                return hpx::make_ready_future(when_some_result<result_type>());
421
            }
422

423
            result_type values =
424
                traits::acquire_future<result_type>()(lazy_values);
425

426
            if (n > values.size())
427
            {
428
                return hpx::make_exceptional_future<
429
                    when_some_result<result_type>>(HPX_GET_EXCEPTION(
430
                    hpx::error::bad_parameter, "hpx::when_some",
431
                    "number of results to wait for is out of bounds"));
432
            }
433

434
            auto f = std::make_shared<lcos::detail::when_some<result_type>>(
435
                HPX_MOVE(values), n);
436

437
            lcos::local::futures_factory<when_some_result<result_type>()> p(
438
                [f = HPX_MOVE(f)]() -> when_some_result<result_type> {
439
                    return (*f)();
440
                });
441

442
            auto result = p.get_future();
443
            p.post();
444

445
            return result;
446
        }
447

448
        template <typename Iterator,
449
            typename Enable =
450
                std::enable_if_t<hpx::traits::is_iterator_v<Iterator>>>
451
        friend decltype(auto) tag_invoke(
452
            when_some_t, std::size_t n, Iterator begin, Iterator end)
453
        {
454
            using value_type = lcos::detail::future_iterator_traits_t<Iterator>;
455

456
            std::vector<value_type> values;
457
            traits::detail::reserve_if_random_access_by_range(
458
                values, begin, end);
459

460
            std::transform(begin, end, std::back_inserter(values),
461
                traits::acquire_future_disp());
462

463
            return tag_invoke(when_some_t{}, n, HPX_MOVE(values));
464
        }
465

466
        friend decltype(auto) tag_invoke(when_some_t, std::size_t n)
467
        {
468
            using result_type = hpx::tuple<>;
469

470
            if (n == 0)
471
            {
472
                return hpx::make_ready_future(when_some_result<result_type>());
473
            }
474

475
            return hpx::make_exceptional_future<when_some_result<result_type>>(
476
                HPX_GET_EXCEPTION(hpx::error::bad_parameter, "hpx::when_some",
477
                    "number of results to wait for is out of bounds"));
478
        }
479

480
        ///////////////////////////////////////////////////////////////////////////
481
        template <typename T, typename... Ts,
482
            typename Enable = std::enable_if_t<!(
483
                traits::is_future_range_v<T> && sizeof...(Ts) == 0)>>
484
        friend auto tag_invoke(when_some_t, std::size_t n, T&& t, Ts&&... ts)
485
        {
486
            using result_type = hpx::tuple<traits::acquire_future_t<T>,
487
                traits::acquire_future_t<Ts>...>;
488

489
            if (n == 0)
490
            {
491
                return hpx::make_ready_future(when_some_result<result_type>());
492
            }
493

494
            if (n > 1 + sizeof...(Ts))
495
            {
496
                return hpx::make_exceptional_future<
497
                    when_some_result<result_type>>(HPX_GET_EXCEPTION(
498
                    hpx::error::bad_parameter, "hpx::when_some",
499
                    "number of results to wait for is out of bounds"));
500
            }
501

502
            traits::acquire_future_disp func;
503
            result_type values(
504
                func(HPX_FORWARD(T, t)), func(HPX_FORWARD(Ts, ts))...);
505

506
            auto f = std::make_shared<lcos::detail::when_some<result_type>>(
507
                HPX_MOVE(values), n);
508

509
            lcos::local::futures_factory<when_some_result<result_type>()> p(
510
                [f = HPX_MOVE(f)]() -> when_some_result<result_type> {
511
                    return (*f)();
512
                });
513

514
            auto result = p.get_future();
515
            p.post();
516

517
            return result;
518
        }
519
    } when_some{};
520

521
    ///////////////////////////////////////////////////////////////////////////
522
    HPX_CXX_EXPORT inline constexpr struct when_some_n_t final
523
      : hpx::functional::tag<when_some_n_t>
524
    {
525
    private:
526
        template <typename Iterator,
527
            typename Enable =
528
                std::enable_if_t<hpx::traits::is_iterator_v<Iterator>>>
529
        friend decltype(auto) tag_invoke(
530
            when_some_n_t, std::size_t n, Iterator begin, std::size_t count)
531
        {
532
            using value_type = lcos::detail::future_iterator_traits_t<Iterator>;
533

534
            std::vector<value_type> values;
535
            values.reserve(count);
536

537
            traits::acquire_future_disp func;
538
            for (std::size_t i = 0; i != count; ++i)
539
            {
540
                values.push_back(func(*begin++));
541
            }
542

543
            return hpx::when_some(n, HPX_MOVE(values));
544
        }
545
    } when_some_n{};
546
}    // namespace hpx
547

548
#endif    // DOXYGEN
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc