• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #848

07 Dec 2022 11:00PM CUT coverage: 86.456% (+0.6%) from 85.835%
#848

push

StellarBot
Merge #6096

6096: Forking Boost.Tokenizer r=hkaiser a=hkaiser

- flyby: remove more Boost headers that are not needed anymore

Working towards #3440 

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

525 of 525 new or added lines in 20 files covered. (100.0%)

173087 of 200202 relevant lines covered (86.46%)

1845223.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.46
/libs/full/resiliency_distributed/examples/async_replicate_distributed.cpp
1
//  Copyright (c) 2019 National Technology & Engineering Solutions of Sandia,
2
//                     LLC (NTESS).
3
//  Copyright (c) 2018-2019 Hartmut Kaiser
4
//  Copyright (c) 2018-2019 Adrian Serio
5
//  Copyright (c) 2019-2020 Nikunj Gupta
6
//  SPDX-License-Identifier: BSL-1.0
7
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
8
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9

10
#include <hpx/config.hpp>
11
#if !defined(HPX_COMPUTE_DEVICE_CODE)
12

13
#include <hpx/actions_base/plain_action.hpp>
14
#include <hpx/assert.hpp>
15
#include <hpx/hpx_init.hpp>
16
#include <hpx/include/runtime.hpp>
17
#include <hpx/modules/futures.hpp>
18
#include <hpx/modules/resiliency_distributed.hpp>
19
#include <hpx/modules/testing.hpp>
20

21
#include <cstddef>
22
#include <iostream>
23
#include <random>
24
#include <vector>
25

26
int universal_ans(std::vector<hpx::id_type> f_locales, std::size_t size)
6,000✔
27
{
28
    // Pretending to do some useful work
29
    std::size_t start = hpx::chrono::high_resolution_clock::now();
6,000✔
30

31
    while ((hpx::chrono::high_resolution_clock::now() - start) < (size * 100))
3,603,756✔
32
    {
33
    }
34

35
    // Check if the node is faulty
36
    for (const auto& locale : f_locales)
12,000✔
37
    {
38
        // Throw a runtime error in case the node is faulty
39
        if (locale == hpx::find_here())
6,000✔
40
            throw std::runtime_error("runtime error occurred.");
×
41
    }
42

43
    return 42;
6,000✔
44
}
×
45

46
HPX_PLAIN_ACTION(universal_ans, universal_action)
18,004✔
47

48
bool validate(int ans)
3,000✔
49
{
50
    return ans == 42;
3,000✔
51
}
52

53
int vote(std::vector<int>&& results)
2,000✔
54
{
55
    return results.at(0);
2,000✔
56
}
57

58
int hpx_main(hpx::program_options::variables_map& vm)
1✔
59
{
60
    std::size_t f_nodes = vm["f-nodes"].as<std::size_t>();
1✔
61
    std::size_t size = vm["size"].as<std::size_t>();
1✔
62
    std::size_t num_tasks = vm["num-tasks"].as<std::size_t>();
1✔
63
    std::size_t num_replications = vm["num-replications"].as<std::size_t>();
1✔
64

65
    universal_action ac;
66
    std::vector<hpx::id_type> locales = hpx::find_all_localities();
1✔
67

68
    // Make sure that the number of faulty nodes are less than the number of
69
    // localities we work on.
70
    HPX_ASSERT(f_nodes < locales.size());
1✔
71

72
    // List of faulty nodes
73
    std::vector<hpx::id_type> f_locales;
1✔
74
    std::vector<std::size_t> visited;
1✔
75

76
    // Mark nodes as faulty
77
    for (std::size_t i = 0; i < f_nodes; ++i)
2✔
78
    {
79
        std::size_t num = std::rand() % locales.size();
1✔
80
        while (visited.end() != std::find(visited.begin(), visited.end(), num))
1✔
81
        {
82
            num = std::rand() % locales.size();
×
83
        }
84

85
        f_locales.push_back(locales.at(num));
1✔
86
    }
1✔
87

88
    {
89
        hpx::chrono::high_resolution_timer t;
1✔
90

91
        std::vector<hpx::future<int>> tasks;
1✔
92
        for (std::size_t i = 0; i < num_tasks; ++i)
1,001✔
93
        {
94
            std::vector<hpx::id_type> ids;
1,000✔
95
            ids.reserve(num_replications);
1,000✔
96
            if (num_replications < locales.size())
1,000✔
97
            {
98
                for (auto& locale : locales)
×
99
                    ids.push_back(locale);
×
100
            }
×
101
            else
102
            {
103
                for (auto& locale : locales)
3,000✔
104
                    ids.push_back(locale);
2,000✔
105

106
                for (std::size_t i = 0; i < num_replications - ids.size(); ++i)
2,000✔
107
                {
108
                    ids.emplace_back(locales.at(i));
1,000✔
109
                }
1,000✔
110
            }
111

112
            tasks.push_back(hpx::resiliency::experimental::async_replicate(
1,000✔
113
                ids, ac, f_locales, size));
114

115
            std::rotate(locales.begin(), locales.begin() + 1, locales.end());
1,000✔
116
        }
1,000✔
117

118
        hpx::wait_all(tasks);
1✔
119

120
        double elapsed = t.elapsed();
1✔
121
        std::cout << "Replicate: " << elapsed << std::endl;
1✔
122
    }
1✔
123

124
    {
125
        hpx::chrono::high_resolution_timer t;
1✔
126

127
        std::vector<hpx::future<int>> tasks;
1✔
128
        for (std::size_t i = 0; i < num_tasks; ++i)
1,001✔
129
        {
130
            std::vector<hpx::id_type> ids;
1,000✔
131
            ids.reserve(num_replications);
1,000✔
132
            if (num_replications < locales.size())
1,000✔
133
            {
134
                for (auto& locale : locales)
×
135
                    ids.push_back(locale);
×
136
            }
×
137
            else
138
            {
139
                for (auto& locale : locales)
3,000✔
140
                    ids.push_back(locale);
2,000✔
141

142
                for (std::size_t i = 0; i < num_replications - ids.size(); ++i)
2,000✔
143
                {
144
                    ids.emplace_back(locales.at(i));
1,000✔
145
                }
1,000✔
146
            }
147

148
            tasks.push_back(
1,000✔
149
                hpx::resiliency::experimental::async_replicate_validate(
1,000✔
150
                    ids, &validate, ac, f_locales, size));
1,000✔
151

152
            std::rotate(locales.begin(), locales.begin() + 1, locales.end());
1,000✔
153
        }
1,000✔
154

155
        hpx::wait_all(tasks);
1✔
156

157
        double elapsed = t.elapsed();
1✔
158
        std::cout << "Replicate Validate: " << elapsed << std::endl;
1✔
159
    }
1✔
160

161
    {
162
        hpx::chrono::high_resolution_timer t;
1✔
163

164
        std::vector<hpx::future<int>> tasks;
1✔
165
        for (std::size_t i = 0; i < num_tasks; ++i)
1,001✔
166
        {
167
            std::vector<hpx::id_type> ids;
1,000✔
168
            ids.reserve(num_replications);
1,000✔
169
            if (num_replications < locales.size())
1,000✔
170
            {
171
                for (auto& locale : locales)
×
172
                    ids.push_back(locale);
×
173
            }
×
174
            else
175
            {
176
                for (auto& locale : locales)
3,000✔
177
                    ids.push_back(locale);
2,000✔
178

179
                for (std::size_t i = 0; i < num_replications - ids.size(); ++i)
2,000✔
180
                {
181
                    ids.emplace_back(locales.at(i));
1,000✔
182
                }
1,000✔
183
            }
184

185
            tasks.push_back(hpx::resiliency::experimental::async_replicate_vote(
1,000✔
186
                ids, &vote, ac, f_locales, size));
1,000✔
187

188
            std::rotate(locales.begin(), locales.begin() + 1, locales.end());
1,000✔
189
        }
1,000✔
190

191
        hpx::wait_all(tasks);
1✔
192

193
        double elapsed = t.elapsed();
1✔
194
        std::cout << "Replicate Vote: " << elapsed << std::endl;
1✔
195
    }
1✔
196

197
    {
198
        hpx::chrono::high_resolution_timer t;
1✔
199

200
        std::vector<hpx::future<int>> tasks;
1✔
201
        for (std::size_t i = 0; i < num_tasks; ++i)
1,001✔
202
        {
203
            std::vector<hpx::id_type> ids;
1,000✔
204
            ids.reserve(num_replications);
1,000✔
205
            if (num_replications < locales.size())
1,000✔
206
            {
207
                for (auto& locale : locales)
×
208
                    ids.push_back(locale);
×
209
            }
×
210
            else
211
            {
212
                for (auto& locale : locales)
3,000✔
213
                    ids.push_back(locale);
2,000✔
214

215
                for (std::size_t i = 0; i < num_replications - ids.size(); ++i)
2,000✔
216
                {
217
                    ids.emplace_back(locales.at(i));
1,000✔
218
                }
1,000✔
219
            }
220

221
            tasks.push_back(
1,000✔
222
                hpx::resiliency::experimental::async_replicate_vote_validate(
1,000✔
223
                    ids, &vote, &validate, ac, f_locales, size));
1,000✔
224

225
            std::rotate(locales.begin(), locales.begin() + 1, locales.end());
1,000✔
226
        }
1,000✔
227

228
        hpx::wait_all(tasks);
1✔
229

230
        double elapsed = t.elapsed();
1✔
231
        std::cout << "Replicate Vote Validate: " << elapsed << std::endl;
1✔
232
    }
1✔
233

234
    return hpx::finalize();
1✔
235
}
1✔
236

237
int main(int argc, char* argv[])
1✔
238
{
239
    // Configure application-specific options
240
    hpx::program_options::options_description desc_commandline;
1✔
241

242
    namespace po = hpx::program_options;
243

244
    // clang-format off
245
    desc_commandline.add_options()
5✔
246
        ("f-nodes", po::value<std::size_t>()->default_value(1),
1✔
247
            "Number of faulty nodes to be injected")
248
        ("size", po::value<std::size_t>()->default_value(200),
1✔
249
            "Grain size of a task")
250
        ("num-tasks", po::value<std::size_t>()->default_value(1000),
1✔
251
            "Number of tasks to invoke")
252
        ("num-replications", po::value<std::size_t>()->default_value(3),
1✔
253
            "Total number of replicates for a task (including the task itself)")
254
    ;
255
    // clang-format on
256

257
    // Initialize and run HPX
258
    hpx::init_params params;
1✔
259
    params.desc_cmdline = desc_commandline;
1✔
260
    return hpx::init(argc, argv, params);
1✔
261
}
1✔
262

263
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc