• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #868

16 Jan 2023 08:21PM UTC coverage: 86.487%. Remained the same
#868

push

StellarBot
Merge #6137

6137: Adding example of a simple master/slave distributed application r=hkaiser a=hkaiser

The purpose of this example is to demonstrate how HPX actions can be used to build a simple master-slave application. The master (locality 0) assigns work to the slaves (all other localities). Note that if this application is run on one locality only it uses the same locality for the master and the slave functionalities.

The slaves receive a message that encodes how many sub-tasks of a certain type they should spawn locally.


Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

72 of 72 new or added lines in 1 file covered. (100.0%)

174663 of 201952 relevant lines covered (86.49%)

1849169.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.86
/libs/core/threading/tests/unit/jthread2.cpp
1
//  Copyright (c) 2020 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
//  Parts of this code were inspired by https://github.com/josuttis/jthread. The
8
//  original code was published by Nicolai Josuttis and Lewis Baker under the
9
//  Creative Commons Attribution 4.0 International License
10
//  (http://creativecommons.org/licenses/by/4.0/).
11

12
#include <hpx/local/init.hpp>
13
#include <hpx/modules/testing.hpp>
14
#include <hpx/modules/threading.hpp>
15

16
#include <atomic>
17
#include <chrono>
18
#include <utility>
19
#include <vector>
20

21
////////////////////////////////////////////////////////////////////////////////
22
void test_interrupt_by_destructor()
1✔
23
{
24
    auto interval = std::chrono::milliseconds(200);
1✔
25
    bool was_interrupted = false;
1✔
26

27
    {
28
        hpx::jthread t([interval, &was_interrupted](hpx::stop_token stoken) {
2✔
29
            HPX_TEST(!stoken.stop_requested());
1✔
30
            try
31
            {
32
                // loop until interrupted (at most 40 times the interval)
33
                for (int i = 0; i < 40; ++i)
5✔
34
                {
35
                    if (stoken.stop_requested())
5✔
36
                    {
37
                        throw "interrupted";
1✔
38
                    }
39

40
                    hpx::this_thread::sleep_for(interval);
4✔
41
                }
4✔
42
                HPX_TEST(false);
×
43
            }
1✔
44
            catch (std::exception&)
45
            {
46
                // "interrupted" not derived from std::exception
47
                HPX_TEST(false);
×
48
            }
×
49
            catch (const char*)
50
            {
51
                HPX_TEST(stoken.stop_requested());
1✔
52
                was_interrupted = true;
1✔
53
            }
1✔
54
            catch (...)
55
            {
56
                HPX_TEST(false);
×
57
            }
1✔
58
        });
2✔
59

60
        HPX_TEST(!t.get_stop_source().stop_requested());
1✔
61

62
        // call destructor after 4 times the interval (should signal the interrupt)
63
        hpx::this_thread::sleep_for(4 * interval);
1✔
64
        HPX_TEST(!t.get_stop_source().stop_requested());
1✔
65
    }
1✔
66

67
    // key HPX_TESTion: signaled interrupt was processed
68
    HPX_TEST(was_interrupted);
1✔
69
}
1✔
70

71
///////////////////////////////////////////////////////////////////////////////
72
void test_interrupt_started_thread()
1✔
73
{
74
    auto interval = std::chrono::milliseconds(200);
1✔
75

76
    {
77
        bool interrupted = false;
1✔
78
        hpx::jthread t([interval, &interrupted](hpx::stop_token stoken) {
2✔
79
            try
80
            {
81
                // loop until interrupted (at most 40 times the interval)
82
                for (int i = 0; i < 40; ++i)
5✔
83
                {
84
                    if (stoken.stop_requested())
5✔
85
                    {
86
                        throw "interrupted";
1✔
87
                    }
88
                    hpx::this_thread::sleep_for(interval);
4✔
89
                }
4✔
90
                HPX_TEST(false);
×
91
            }
1✔
92
            catch (...)
93
            {
94
                interrupted = true;
1✔
95
            }
1✔
96
        });
2✔
97

98
        hpx::this_thread::sleep_for(4 * interval);
1✔
99
        t.request_stop();
1✔
100
        HPX_TEST(t.get_stop_source().stop_requested());
1✔
101
        t.join();
1✔
102
        HPX_TEST(interrupted);
1✔
103
    }
1✔
104
}
1✔
105

106
///////////////////////////////////////////////////////////////////////////////
107
void test_interrupt_started_thread_with_subthread()
1✔
108
{
109
    auto interval = std::chrono::milliseconds(200);
1✔
110

111
    {
112
        hpx::jthread t([interval](hpx::stop_token stoken) {
2✔
113
            hpx::jthread t2([interval, stoken] {
7✔
114
                while (!stoken.stop_requested())
5✔
115
                {
116
                    hpx::this_thread::sleep_for(interval);
4✔
117
                }
118
            });
1✔
119

120
            while (!stoken.stop_requested())
5✔
121
            {
122
                hpx::this_thread::sleep_for(interval);
4✔
123
            }
124
        });
1✔
125

126
        hpx::this_thread::sleep_for(4 * interval);
1✔
127
        t.request_stop();
1✔
128
        HPX_TEST(t.get_stop_source().stop_requested());
1✔
129
        t.join();
1✔
130
    }
1✔
131
}
1✔
132

133
////////////////////////////////////////////////////////////////////////////////
134
void test_basic_api_with_func()
1✔
135
{
136
    hpx::stop_source ssource;
1✔
137
    HPX_TEST(ssource.stop_possible());
1✔
138
    HPX_TEST(!ssource.stop_requested());
1✔
139

140
    {
141
        hpx::jthread t([]() {});
2✔
142
        ssource = t.get_stop_source();
1✔
143
        HPX_TEST(ssource.stop_possible());
1✔
144
        HPX_TEST(!ssource.stop_requested());
1✔
145
        hpx::this_thread::sleep_for(std::chrono::milliseconds(500));
1✔
146
    }
1✔
147

148
    HPX_TEST(ssource.stop_possible());
1✔
149
    HPX_TEST(ssource.stop_requested());
1✔
150
}
1✔
151

152
////////////////////////////////////////////////////////////////////////////////
153
void test_exchange_token()
1✔
154
{
155
    auto interval = std::chrono::milliseconds(500);
1✔
156

157
    {
158
        std::atomic<hpx::stop_token*> pstoken(nullptr);
1✔
159
        hpx::jthread t([&pstoken](hpx::stop_token sstoken) {
2✔
160
            auto act_token = sstoken;
1✔
161
            int num_interrupts = 0;
1✔
162
            try
163
            {
164
                for (int i = 0; num_interrupts < 2 && i < 500; ++i)
501✔
165
                {
166
                    // if we get a new interrupt token from the caller, take it
167
                    if (pstoken.load() != nullptr)
500✔
168
                    {
169
                        act_token = *pstoken;
×
170
                        if (act_token.stop_requested())
×
171
                        {
172
                            ++num_interrupts;
×
173
                        }
×
174
                        pstoken.store(nullptr);
×
175
                    }
×
176
                    hpx::this_thread::sleep_for(std::chrono::microseconds(100));
500✔
177
                }
500✔
178
            }
1✔
179
            catch (...)
180
            {
181
                HPX_TEST(false);
×
182
            }
×
183
        });
1✔
184

185
        hpx::this_thread::sleep_for(interval);
1✔
186
        t.request_stop();
1✔
187

188
        hpx::this_thread::sleep_for(interval);
1✔
189
        hpx::stop_token it;
1✔
190
        pstoken.store(&it);
1✔
191

192
        hpx::this_thread::sleep_for(interval);
1✔
193
        auto ssource2 = hpx::stop_source{};
1✔
194
        it = hpx::stop_token{ssource2.get_token()};
1✔
195
        pstoken.store(&it);
1✔
196

197
        hpx::this_thread::sleep_for(interval);
1✔
198
        ssource2.request_stop();
1✔
199

200
        hpx::this_thread::sleep_for(interval);
1✔
201
    }
1✔
202
}
1✔
203

204
///////////////////////////////////////////////////////////////////////////////
205
void test_concurrent_interrupt()
1✔
206
{
207
    int num_threads = 30;
1✔
208
    hpx::stop_source is;
1✔
209

210
    {
211
        hpx::jthread t1([it = is.get_token()](hpx::stop_token stoken) {
7✔
212
            try
213
            {
214
                bool stop_requested = false;
1✔
215
                for (int i = 0; !it.stop_requested(); ++i)
600✔
216
                {
217
                    // should never switch back once requested
218
                    if (stoken.stop_requested())
599✔
219
                    {
220
                        stop_requested = true;
42✔
221
                    }
42✔
222
                    else
223
                    {
224
                        HPX_TEST(!stop_requested);
557✔
225
                    }
226
                    hpx::this_thread::sleep_for(std::chrono::microseconds(100));
599✔
227
                }
599✔
228
                HPX_TEST(stop_requested);
1✔
229
            }
1✔
230
            catch (...)
231
            {
232
                HPX_TEST(false);
×
233
            }
×
234
        });
1✔
235

236
        hpx::this_thread::sleep_for(std::chrono::milliseconds(500));
1✔
237

238
        // starts thread concurrently calling request_stop() for the same token
239
        std::vector<hpx::jthread> tv;
1✔
240
        int num_requested_stops = 0;
1✔
241
        for (int i = 0; i < num_threads; ++i)
31✔
242
        {
243
            hpx::this_thread::sleep_for(std::chrono::microseconds(100));
30✔
244
            hpx::jthread t([&t1, &num_requested_stops] {
60✔
245
                for (int i = 0; i < 13; ++i)
420✔
246
                {
247
                    // only first call to request_stop should return true
248
                    num_requested_stops += (t1.request_stop() ? 1 : 0);
390✔
249
                    HPX_TEST(!t1.request_stop());
390✔
250
                    hpx::this_thread::sleep_for(std::chrono::microseconds(10));
390✔
251
                }
390✔
252
            });
30✔
253
            tv.push_back(std::move(t));
30✔
254
        }
30✔
255

256
        for (auto& t : tv)
31✔
257
        {
258
            t.join();
30✔
259
        }
260

261
        // only one request to request_stop() should have returned true
262
        HPX_TEST_EQ(num_requested_stops, 1);
1✔
263
        is.request_stop();
1✔
264
    }
1✔
265
}
1✔
266

267
///////////////////////////////////////////////////////////////////////////////
268
void test_jthread_move()
1✔
269
{
270
    {
271
        bool interrupt_signalled = false;
1✔
272
        hpx::jthread t{[&interrupt_signalled](hpx::stop_token st) {
2✔
273
            while (!st.stop_requested())
1✔
274
            {
275
                hpx::this_thread::sleep_for(std::chrono::milliseconds(100));
×
276
            }
277
            if (st.stop_requested())
1✔
278
            {
279
                interrupt_signalled = true;
1✔
280
            }
1✔
281
        }};
1✔
282

283
        hpx::jthread t2{std::move(t)};    // should compile
1✔
284

285
        // NOLINTNEXTLINE(bugprone-use-after-move)
286
        auto ssource = t.get_stop_source();
1✔
287
        HPX_TEST(!ssource.stop_possible());
1✔
288
        HPX_TEST(!ssource.stop_requested());
1✔
289

290
        ssource = t2.get_stop_source();
1✔
291
        HPX_TEST(ssource != hpx::stop_source{});
1✔
292
        HPX_TEST(ssource.stop_possible());
1✔
293
        HPX_TEST(!ssource.stop_requested());
1✔
294

295
        HPX_TEST(!interrupt_signalled);
1✔
296
        t.request_stop();
1✔
297
        HPX_TEST(!interrupt_signalled);
1✔
298
        t2.request_stop();
1✔
299
        t2.join();
1✔
300
        HPX_TEST(interrupt_signalled);
1✔
301
    }
1✔
302
}
1✔
303

304
///////////////////////////////////////////////////////////////////////////////
305
// void testEnabledIfForCopyConstructor_CompileTimeOnly()
306
// {
307
//     {
308
//         hpx::jthread t;
309
//         //hpx::jthread t2{t};  // should not compile
310
//     }
311
// }
312

313
///////////////////////////////////////////////////////////////////////////////
314
int hpx_main()
1✔
315
{
316
    std::set_terminate([]() { HPX_TEST(false); });
2✔
317

318
    test_interrupt_by_destructor();
1✔
319
    test_interrupt_started_thread();
1✔
320
    test_interrupt_started_thread_with_subthread();
1✔
321
    test_basic_api_with_func();
1✔
322
    test_exchange_token();
1✔
323
    test_concurrent_interrupt();
1✔
324
    test_jthread_move();
1✔
325
    //     testEnabledIfForCopyConstructor_CompileTimeOnly();
326

327
    return hpx::local::finalize();
1✔
328
}
329

330
int main(int argc, char* argv[])
1✔
331
{
332
    HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv), 0,
1✔
333
        "HPX main exited with non-zero status");
334

335
    return hpx::util::report_errors();
1✔
336
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc