• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #848

07 Dec 2022 11:00PM UTC coverage: 86.456% (+0.6%) from 85.835%
#848

push

StellarBot
Merge #6096

6096: Forking Boost.Tokenizer r=hkaiser a=hkaiser

- flyby: remove more Boost headers that are not needed anymore

Working towards #3440 

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

525 of 525 new or added lines in 20 files covered. (100.0%)

173087 of 200202 relevant lines covered (86.46%)

1845223.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.2
/libs/full/resiliency_distributed/include/hpx/resiliency_distributed/async_replay_distributed.hpp
1
//  Copyright (c) 2019-2020 Nikunj Gupta
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#pragma once
8

9
#include <hpx/config.hpp>
10
#if !defined(HPX_COMPUTE_DEVICE_CODE)
11

12
#include <hpx/resiliency/resiliency_cpos.hpp>
13
#include <hpx/resiliency/util.hpp>
14

15
#include <hpx/assert.hpp>
16
#include <hpx/async_distributed/async.hpp>
17
#include <hpx/futures/future.hpp>
18
#include <hpx/type_support/pack.hpp>
19

20
#include <cstddef>
21
#include <exception>
22
#include <memory>
23
#include <stdexcept>
24
#include <tuple>
25
#include <type_traits>
26
#include <utility>
27
#include <vector>
28

29
namespace hpx { namespace resiliency { namespace experimental {
30

31
    ///////////////////////////////////////////////////////////////////////////
32
    namespace detail {
33

34
        ///////////////////////////////////////////////////////////////////////
35
        template <typename Result, typename Pred, typename Action,
36
            typename Tuple>
37
        struct distributed_async_replay_helper
2,202✔
38
          : std::enable_shared_from_this<
39
                distributed_async_replay_helper<Result, Pred, Action, Tuple>>
40
        {
41
            template <typename Pred_, typename Action_, typename Tuple_>
42
            distributed_async_replay_helper(
2,202✔
43
                Pred_&& pred, Action_&& action, Tuple_&& tuple)
44
              : pred_(HPX_FORWARD(Pred_, pred))
2,202✔
45
              , action_(HPX_FORWARD(Action_, action))
2,202✔
46
              , t_(HPX_FORWARD(Tuple_, tuple))
2,202✔
47
            {
2,202✔
48
            }
2,202✔
49

50
            template <std::size_t... Is>
51
            hpx::future<Result> invoke_distributed(
3,257✔
52
                hpx::id_type id, hpx::util::index_pack<Is...>)
53
            {
54
                return hpx::async(action_, id, std::get<Is>(t_)...);
3,257✔
55
            }
56

57
            hpx::future<Result> call(
3,257✔
58
                const std::vector<hpx::id_type>& ids, std::size_t iteration = 0)
59
            {
60
                hpx::future<Result> f = invoke_distributed(ids.at(iteration),
6,514✔
61
                    hpx::util::make_index_pack<
3,257✔
62
                        std::tuple_size<Tuple>::value>{});
3,257✔
63

64
                // attach a continuation that will relaunch the task, if
65
                // necessary
66
                auto this_ = this->shared_from_this();
3,257✔
67
                return f.then(hpx::launch::sync,
3,257✔
68
                    [this_ = HPX_MOVE(this_), ids, iteration](
16,285✔
69
                        hpx::future<Result>&& f) {
70
                        if (f.has_exception())
3,257✔
71
                        {
72
                            // rethrow abort_replay_exception, if caught
73
                            auto ex = rethrow_on_abort_replay(f);
1,059✔
74

75
                            // execute the task again if an error occurred and
76
                            // this was not the last attempt
77
                            if (iteration != ids.size() - 1)
1,059✔
78
                            {
79
                                return this_->call(ids, iteration + 1);
1,055✔
80
                            }
81

82
                            // rethrow exception if the number of replays has
83
                            // been exhausted
84
                            std::rethrow_exception(ex);
4✔
85
                        }
1,059✔
86

87
                        auto&& result = f.get();
2,198✔
88

89
                        if (!HPX_INVOKE(this_->pred_, result))
2,198✔
90
                        {
91
                            // execute the task again if an error occurred and
92
                            // this was not the last attempt
93
                            if (iteration != ids.size() - 1)
×
94
                            {
95
                                return this_->call(ids, iteration + 1);
×
96
                            }
97

98
                            // throw aborting exception as attempts were
99
                            // exhausted
100
                            throw abort_replay_exception();
×
101
                        }
102

103
                        if (iteration != ids.size())
2,198✔
104
                        {
105
                            // return result
106
                            return hpx::make_ready_future(HPX_MOVE(result));
2,198✔
107
                        }
108

109
                        // throw aborting exception as attempts were
110
                        // exhausted
111
                        throw abort_replay_exception();
×
112
                    });
3,257✔
113
            }
3,257✔
114

115
            Pred pred_;
116
            Action action_;
117
            Tuple t_;
118
        };
119

120
        template <typename Result, typename Pred, typename Action,
121
            typename... Ts>
122
        std::shared_ptr<distributed_async_replay_helper<Result,
123
            typename std::decay<Pred>::type, typename std::decay<Action>::type,
124
            std::tuple<typename std::decay<Ts>::type...>>>
125
        make_distributed_async_replay_helper(
2,202✔
126
            Pred&& pred, Action&& action, Ts&&... ts)
127
        {
128
            using return_type = distributed_async_replay_helper<Result,
129
                typename std::decay<Pred>::type,
130
                typename std::decay<Action>::type,
131
                std::tuple<typename std::decay<Ts>::type...>>;
132

133
            return std::make_shared<return_type>(HPX_FORWARD(Pred, pred),
4,404✔
134
                HPX_FORWARD(Action, action),
2,202✔
135
                std::make_tuple(HPX_FORWARD(Ts, ts)...));
2,202✔
136
        }
×
137
    }    // namespace detail
138

139
    ///////////////////////////////////////////////////////////////////////////
140
    // Asynchronously launch given Action \a action on locality \a id.
141
    // Repeat launching on error exactly \a n times (except if
142
    // abort_replay_exception is thrown).
143
    template <typename Action, typename... Ts>
144
    hpx::future<typename hpx::util::detail::invoke_deferred_result<Action,
145
        hpx::id_type, Ts...>::type>
146
    tag_invoke(async_replay_t, const std::vector<hpx::id_type>& ids,
1,101✔
147
        Action&& action, Ts&&... ts)
148
    {
149
        HPX_ASSERT(ids.size() > 0);
1,101✔
150

151
        using result_type =
152
            typename hpx::util::detail::invoke_deferred_result<Action,
153
                hpx::id_type, Ts...>::type;
154

155
        auto helper = detail::make_distributed_async_replay_helper<result_type>(
1,101✔
156
            detail::replay_validator{}, HPX_FORWARD(Action, action),
1,101✔
157
            HPX_FORWARD(Ts, ts)...);
1,100✔
158

159
        return helper->call(ids);
1,101✔
160
    }
1,101✔
161

162
    ///////////////////////////////////////////////////////////////////////////
163
    // Asynchronously launch given Action \a action on locality \a id.
164
    // Repeat launching on error exactly \a n times (except if
165
    // abort_replay_exception is thrown).
166
    template <typename Pred, typename Action, typename... Ts>
167
    hpx::future<typename hpx::util::detail::invoke_deferred_result<Action,
168
        hpx::id_type, Ts...>::type>
169
    tag_invoke(async_replay_validate_t, const std::vector<hpx::id_type>& ids,
1,101✔
170
        Pred&& pred, Action&& action, Ts&&... ts)
171
    {
172
        HPX_ASSERT(ids.size() > 0);
1,101✔
173

174
        using result_type =
175
            typename hpx::util::detail::invoke_deferred_result<Action,
176
                hpx::id_type, Ts...>::type;
177

178
        auto helper = detail::make_distributed_async_replay_helper<result_type>(
1,101✔
179
            HPX_FORWARD(Pred, pred), HPX_FORWARD(Action, action),
1,101✔
180
            HPX_FORWARD(Ts, ts)...);
1,100✔
181

182
        return helper->call(ids);
1,101✔
183
    }
1,101✔
184

185
}}}    // namespace hpx::resiliency::experimental
186

187
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc