• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #851

16 Dec 2022 12:27PM CUT coverage: 86.568% (+0.2%) from 86.404%
#851

push

StellarBot
Merge #6104

6104: Adding parameters API: measure_iteration r=hkaiser a=hkaiser

- split `get_chunk_size` CP into `measure_iteration` and modified `get_chunk_size`
- introducing a breaking change to `get_chunk_size` and `processing_units_count` customization point, those now expect the time for one iteration

The new APIs are:
```
    template <typename Target, typename F>
    hpx::chrono::steady_duration measure_iteration(
        Target&&, F&&, std::size_t num_tasks);

    template <typename Target>
    std::size_t processing_units_count(
        Target&&, hpx::chrono::steady_duration const&, std::size_t num_tasks);

    template <typename Target>
    std::size_t get_chunk_size(Target&&,
        hpx::chrono::steady_duration const&, std::size_t num_cores,
        std::size_t num_tasks);
```
This also moves all executor parameter objects to `namespace hpx::execution::experimental`. 

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

209 of 209 new or added lines in 34 files covered. (100.0%)

174475 of 201546 relevant lines covered (86.57%)

1888115.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.48
/libs/full/resiliency_distributed/include/hpx/resiliency_distributed/async_replicate_distributed.hpp
1
//  Copyright (c) 2019 National Technology & Engineering Solutions of Sandia,
2
//                     LLC (NTESS).
3
//  Copyright (c) 2018-2020 Hartmut Kaiser
4
//  Copyright (c) 2018-2019 Adrian Serio
5
//  Copyright (c) 2019-2020 Nikunj Gupta
6
//
7
//  SPDX-License-Identifier: BSL-1.0
8
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
9
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
10

11
#pragma once
12

13
#include <hpx/config.hpp>
14
#if !defined(HPX_COMPUTE_DEVICE_CODE)
15

16
#include <hpx/resiliency/resiliency_cpos.hpp>
17
#include <hpx/resiliency/util.hpp>
18

19
#include <hpx/assert.hpp>
20
#include <hpx/async_distributed/async.hpp>
21
#include <hpx/futures/future.hpp>
22
#include <hpx/modules/async_local.hpp>
23

24
#include <cstddef>
25
#include <exception>
26
#include <stdexcept>
27
#include <utility>
28
#include <vector>
29

30
namespace hpx { namespace resiliency { namespace experimental {
31

32
    ///////////////////////////////////////////////////////////////////////////
33
    namespace detail {
34

35
        ///////////////////////////////////////////////////////////////////////
36
        template <typename Vote, typename Pred, typename Action, typename... Ts>
37
        hpx::future<typename hpx::util::detail::invoke_deferred_result<Action,
38
            hpx::id_type, Ts...>::type>
39
        async_replicate_vote_validate(const std::vector<hpx::id_type>& ids,
4,404✔
40
            Vote&& vote, Pred&& pred, Action&& action, Ts&&... ts)
41
        {
42
            using result_type =
43
                typename hpx::util::detail::invoke_deferred_result<Action,
44
                    hpx::id_type, Ts...>::type;
45

46
            // launch given function n times
47
            std::vector<hpx::future<result_type>> results;
4,404✔
48
            results.reserve(ids.size());
4,404✔
49

50
            for (std::size_t i = 0; i != ids.size(); ++i)
17,612✔
51
            {
52
                results.emplace_back(hpx::async(action, ids.at(i), ts...));
13,208✔
53
            }
13,208✔
54

55
            // wait for all threads to finish executing and return the first
56
            // result that passes the predicate, properly handle exceptions
57
            return hpx::dataflow(
4,404✔
58
                // do not schedule new thread for the lambda
59
                hpx::launch::sync,
60
                [pred = HPX_FORWARD(Pred, pred), vote = HPX_FORWARD(Vote, vote),
48,444✔
61
                    ids](
4,404✔
62
                    std::vector<hpx::future<result_type>>&& results) mutable
63
                -> result_type {
64
                    // Store all valid results
65
                    std::vector<result_type> valid_results;
4,404✔
66
                    valid_results.reserve(ids.size());
4,404✔
67

68
                    std::exception_ptr ex;
4,404✔
69

70
                    for (auto&& f : HPX_MOVE(results))
17,612✔
71
                    {
72
                        if (f.has_exception())
13,208✔
73
                        {
74
                            // rethrow abort_replicate_exception, if caught
75
                            ex = detail::rethrow_on_abort_replicate(f);
6,063✔
76
                        }
6,063✔
77
                        else
78
                        {
79
                            auto&& result = f.get();
7,145✔
80
                            if (HPX_INVOKE(pred, result))
7,145✔
81
                            {
82
                                valid_results.emplace_back(HPX_MOVE(result));
7,145✔
83
                            }
7,145✔
84
                        }
85
                    }
86

87
                    if (!valid_results.empty())
4,404✔
88
                    {
89
                        return HPX_INVOKE(
4,404✔
90
                            HPX_FORWARD(Vote, vote), HPX_MOVE(valid_results));
91
                    }
92

93
                    if (bool(ex))
×
94
                        std::rethrow_exception(ex);
×
95

96
                    // throw aborting exception no correct results ere produced
97
                    throw abort_replicate_exception{};
×
98
                },
4,404✔
99
                HPX_MOVE(results));
100
        }
4,404✔
101
    }    // namespace detail
102

103
    ///////////////////////////////////////////////////////////////////////////
104
    // Asynchronously launch given function \a f exactly \a n times on \a n
105
    // different localities, where \a n is the size of the vector of \a ids.
106
    // Verify the result of those invocations using the given predicate \a pred.
107
    // Run all the valid results against a user provided voting function.
108
    // Return the valid output.
109
    template <typename Vote, typename Pred, typename Action, typename... Ts>
110
    hpx::future<typename hpx::util::detail::invoke_deferred_result<Action,
111
        hpx::id_type, Ts...>::type>
112
    tag_invoke(async_replicate_vote_validate_t,
1,101✔
113
        const std::vector<hpx::id_type>& ids, Vote&& vote, Pred&& pred,
114
        Action&& action, Ts&&... ts)
115
    {
116
        HPX_ASSERT(ids.size() > 0);
1,101✔
117

118
        return detail::async_replicate_vote_validate(ids,
2,202✔
119
            HPX_FORWARD(Vote, vote), HPX_FORWARD(Pred, pred),
1,101✔
120
            HPX_FORWARD(Action, action), HPX_FORWARD(Ts, ts)...);
1,101✔
121
    }
122

123
    ///////////////////////////////////////////////////////////////////////////
124
    // Asynchronously launch given function \a f exactly \a n times on \a n
125
    // different localities, where \a n is the size of the vector of \a ids.
126
    // Verify the result of those invocations using the given predicate \a pred.
127
    // Run all the valid results against a user provided voting function.
128
    // Return the valid output.
129
    template <typename Vote, typename Action, typename... Ts>
130
    hpx::future<typename hpx::util::detail::invoke_deferred_result<Action,
131
        hpx::id_type, Ts...>::type>
132
    tag_invoke(async_replicate_vote_t, const std::vector<hpx::id_type>& ids,
1,101✔
133
        Vote&& vote, Action&& action, Ts&&... ts)
134
    {
135
        HPX_ASSERT(ids.size() > 0);
1,101✔
136

137
        return detail::async_replicate_vote_validate(ids,
2,202✔
138
            HPX_FORWARD(Vote, vote), detail::replicate_validator{},
1,101✔
139
            HPX_FORWARD(Action, action), HPX_FORWARD(Ts, ts)...);
1,101✔
140
    }
141

142
    ///////////////////////////////////////////////////////////////////////////
143
    // Asynchronously launch given function \a f exactly \a n times on \a n
144
    // different localities, where \a n is the size of the vector of \a ids.
145
    // Verify the result of those invocations using the given predicate \a pred.
146
    // Run all the valid results against a user provided voting function.
147
    // Return the valid output.
148
    template <typename Pred, typename Action, typename... Ts>
149
    hpx::future<typename hpx::util::detail::invoke_deferred_result<Action,
150
        hpx::id_type, Ts...>::type>
151
    tag_invoke(async_replicate_validate_t, const std::vector<hpx::id_type>& ids,
1,101✔
152
        Pred&& pred, Action&& action, Ts&&... ts)
153
    {
154
        HPX_ASSERT(ids.size() > 0);
1,101✔
155

156
        return detail::async_replicate_vote_validate(ids,
2,202✔
157
            detail::replicate_voter{}, HPX_FORWARD(Pred, pred),
1,101✔
158
            HPX_FORWARD(Action, action), HPX_FORWARD(Ts, ts)...);
1,101✔
159
    }
160

161
    ///////////////////////////////////////////////////////////////////////////
162
    // Asynchronously launch given function \a f exactly \a n times on \a n
163
    // different localities, where \a n is the size of the vector of \a ids.
164
    // Verify the result of those invocations using the given predicate \a pred.
165
    // Run all the valid results against a user provided voting function.
166
    // Return the valid output.
167
    template <typename Action, typename... Ts>
168
    hpx::future<typename hpx::util::detail::invoke_deferred_result<Action,
169
        hpx::id_type, Ts...>::type>
170
    tag_invoke(async_replicate_t, const std::vector<hpx::id_type>& ids,
1,101✔
171
        Action&& action, Ts&&... ts)
172
    {
173
        HPX_ASSERT(ids.size() > 0);
1,101✔
174

175
        return detail::async_replicate_vote_validate(ids,
2,202✔
176
            detail::replicate_voter{}, detail::replicate_validator{},
177
            HPX_FORWARD(Action, action), HPX_FORWARD(Ts, ts)...);
1,101✔
178
    }
179

180
}}}    // namespace hpx::resiliency::experimental
181

182
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc