• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #868

16 Jan 2023 08:21PM UTC coverage: 86.487%. Remained the same
#868

push

StellarBot
Merge #6137

6137: Adding example of a simple master/slave distributed application r=hkaiser a=hkaiser

The purpose of this example is to demonstrate how HPX actions can be used to build a simple master-slave application. The master (locality 0) assigns work to the slaves (all other localities). Note that if this application is run on one locality only it uses the same locality for the master and the slave functionalities.

The slaves receive a message that encodes how many sub-tasks of a certain type they should spawn locally.


Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

72 of 72 new or added lines in 1 file covered. (100.0%)

174663 of 201952 relevant lines covered (86.49%)

1849169.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.97
/libs/full/resiliency_distributed/tests/performance/replay/async_replay_distributed_validate.cpp
1
//  Copyright (c) 2019 National Technology & Engineering Solutions of Sandia,
2
//                     LLC (NTESS).
3
//  Copyright (c) 2018-2022 Hartmut Kaiser
4
//  Copyright (c) 2018-2019 Adrian Serio
5
//  Copyright (c) 2019-2020 Nikunj Gupta
6
//
7
//  SPDX-License-Identifier: BSL-1.0
8
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
9
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
10

11
#include <hpx/config.hpp>
12
#if !defined(HPX_COMPUTE_DEVICE_CODE)
13

14
#include <hpx/actions_base/plain_action.hpp>
15
#include <hpx/assert.hpp>
16
#include <hpx/hpx_init.hpp>
17
#include <hpx/include/runtime.hpp>
18
#include <hpx/modules/futures.hpp>
19
#include <hpx/modules/resiliency.hpp>
20
#include <hpx/modules/resiliency_distributed.hpp>
21
#include <hpx/modules/testing.hpp>
22

23
#include <algorithm>
24
#include <cassert>
25
#include <cstddef>
26
#include <ctime>
27
#include <iostream>
28
#include <random>
29
#include <vector>
30

31
int universal_ans(std::vector<hpx::id_type> const& f_locales, std::size_t err,
119✔
32
    std::size_t size)
33
{
34
    std::vector<hpx::future<int>> local_tasks;
119✔
35

36
    for (std::size_t i = 0; i < 10; ++i)
1,309✔
37
    {
38
        local_tasks.push_back(hpx::async([size]() {
2,380✔
39
            // Pretending to do some useful work
40
            std::size_t start = hpx::chrono::high_resolution_clock::now();
1,190✔
41

42
            while ((hpx::chrono::high_resolution_clock::now() - start) <
666,135✔
43
                (size * 100))
666,135✔
44
            {
45
            }
46

47
            return 42;
1,190✔
48
        }));
49
    }
1,190✔
50

51
    hpx::wait_all(local_tasks);
119✔
52

53
    std::random_device rd;
119✔
54
    std::mt19937 gen(rd());
119✔
55
    std::uniform_int_distribution<std::size_t> dist(1, 100);
119✔
56

57
    bool is_faulty = false;
119✔
58

59
    // Check if the node is faulty
60
    for (const auto& locale : f_locales)
219✔
61
    {
62
        // Throw a runtime error in case the node is faulty
63
        if (locale == hpx::find_here())
119✔
64
        {
65
            is_faulty = true;
50✔
66
            if (dist(gen) < err * 10)
50✔
67
                throw std::runtime_error("runtime error occurred.");
19✔
68
        }
31✔
69
    }
70

71
    if (!is_faulty && dist(gen) < err)
100✔
72
        throw std::runtime_error("runtime error occurred.");
×
73

74
    return 42;
75
}
119✔
76

77
HPX_PLAIN_ACTION(universal_ans, universal_action)
245✔
78

79
bool validate(int ans)
100✔
80
{
81
    return ans == 42;
100✔
82
}
83

84
int hpx_main(hpx::program_options::variables_map& vm)
1✔
85
{
86
    std::size_t f_nodes = vm["f-nodes"].as<std::size_t>();
1✔
87
    std::size_t err = vm["error"].as<std::size_t>();
1✔
88
    std::size_t size = vm["size"].as<std::size_t>();
1✔
89
    std::size_t num_tasks = vm["num-tasks"].as<std::size_t>();
1✔
90

91
    universal_action ac;
92
    std::vector<hpx::id_type> locales = hpx::find_all_localities();
1✔
93

94
    // Make sure that the number of faulty nodes are less than the number of
95
    // localities we work on.
96
    HPX_ASSERT(f_nodes < locales.size());
1✔
97

98
    // List of faulty nodes
99
    std::vector<hpx::id_type> f_locales;
1✔
100
    std::vector<std::size_t> visited;
1✔
101

102
    // Mark nodes as faulty
103
    for (std::size_t i = 0; i < f_nodes; ++i)
2✔
104
    {
105
        std::size_t num = std::rand() % locales.size();
1✔
106
        while (visited.end() != std::find(visited.begin(), visited.end(), num))
1✔
107
        {
108
            num = std::rand() % locales.size();
×
109
        }
110

111
        f_locales.push_back(locales.at(num));
1✔
112
    }
1✔
113

114
    {
115
        hpx::chrono::high_resolution_timer t;
1✔
116

117
        std::vector<hpx::future<int>> tasks;
1✔
118
        for (std::size_t i = 0; i < num_tasks; ++i)
101✔
119
        {
120
            tasks.push_back(
100✔
121
                hpx::resiliency::experimental::async_replay_validate(
100✔
122
                    locales, &validate, ac, f_locales, err, size));
100✔
123

124
            std::rotate(locales.begin(), locales.begin() + 1, locales.end());
100✔
125
        }
100✔
126

127
        HPX_TEST(!hpx::wait_all_nothrow(tasks));
1✔
128

129
        double elapsed = t.elapsed();
1✔
130
        std::cout << "Replay Validate: " << elapsed << std::endl;
1✔
131
    }
1✔
132

133
    return hpx::finalize();
1✔
134
}
1✔
135

136
int main(int argc, char* argv[])
2✔
137
{
138
    namespace po = hpx::program_options;
139

140
    // Configure application-specific options
141
    po::options_description desc_commandline;
2✔
142

143
    // clang-format off
144
    desc_commandline.add_options()
10✔
145
        ("f-nodes", po::value<std::size_t>()->default_value(1),
2✔
146
            "Number of faulty nodes to be injected")
147
        ("error", po::value<std::size_t>()->default_value(5),
2✔
148
            "Error rates for all nodes. Faulty nodes will have 10x error rates.")
149
        ("size", po::value<std::size_t>()->default_value(200),
2✔
150
            "Grain size of a task")
151
        ("num-tasks", po::value<std::size_t>()->default_value(100),
2✔
152
            "Number of tasks to invoke")
153
        ;
154
    // clang-format on
155

156
    // Initialize and run HPX
157
    hpx::init_params params;
2✔
158
    params.desc_cmdline = desc_commandline;
2✔
159
    return hpx::init(argc, argv, params);
2✔
160
}
2✔
161

162
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc