• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #845

05 Dec 2022 08:25PM UTC coverage: 85.664% (-0.1%) from 85.8%
#845

push

StellarBot
Merge #6091

6091: Replace artificial sequencing with fold expressions r=hkaiser a=hkaiser

- flyby: fix left-over problems from namespace change

working towards resolving #5497

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

99 of 99 new or added lines in 15 files covered. (100.0%)

171125 of 199764 relevant lines covered (85.66%)

1865254.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.86
/libs/core/threading/tests/unit/jthread2.cpp
1
//  Copyright (c) 2020 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
//  Parts of this code were inspired by https://github.com/josuttis/jthread. The
8
//  original code was published by Nicolai Josuttis and Lewis Baker under the
9
//  Creative Commons Attribution 4.0 International License
10
//  (http://creativecommons.org/licenses/by/4.0/).
11

12
#include <hpx/local/init.hpp>
13
#include <hpx/modules/testing.hpp>
14
#include <hpx/modules/threading.hpp>
15

16
#include <atomic>
17
#include <chrono>
18
#include <utility>
19
#include <vector>
20

21
////////////////////////////////////////////////////////////////////////////////
22
void test_interrupt_by_destructor()
1✔
23
{
24
    auto interval = std::chrono::milliseconds(200);
1✔
25
    bool was_interrupted = false;
1✔
26

27
    {
28
        hpx::jthread t([interval, &was_interrupted](hpx::stop_token stoken) {
2✔
29
            HPX_TEST(!stoken.stop_requested());
1✔
30
            try
31
            {
32
                // loop until interrupted (at most 40 times the interval)
33
                for (int i = 0; i < 40; ++i)
5✔
34
                {
35
                    if (stoken.stop_requested())
5✔
36
                    {
37
                        throw "interrupted";
1✔
38
                    }
39

40
                    hpx::this_thread::sleep_for(interval);
4✔
41
                }
4✔
42
                HPX_TEST(false);
×
43
            }
1✔
44
            catch (std::exception&)
45
            {
46
                // "interrupted" not derived from std::exception
47
                HPX_TEST(false);
×
48
            }
×
49
            catch (const char*)
50
            {
51
                HPX_TEST(stoken.stop_requested());
1✔
52
                was_interrupted = true;
1✔
53
            }
1✔
54
            catch (...)
55
            {
56
                HPX_TEST(false);
×
57
            }
1✔
58
        });
2✔
59

60
        HPX_TEST(!t.get_stop_source().stop_requested());
1✔
61

62
        // call destructor after 4 times the interval (should signal the interrupt)
63
        hpx::this_thread::sleep_for(4 * interval);
1✔
64
        HPX_TEST(!t.get_stop_source().stop_requested());
1✔
65
    }
1✔
66

67
    // key HPX_TESTion: signaled interrupt was processed
68
    HPX_TEST(was_interrupted);
1✔
69
}
1✔
70

71
///////////////////////////////////////////////////////////////////////////////
72
void test_interrupt_started_thread()
1✔
73
{
74
    auto interval = std::chrono::milliseconds(200);
1✔
75

76
    {
77
        bool interrupted = false;
1✔
78
        hpx::jthread t([interval, &interrupted](hpx::stop_token stoken) {
2✔
79
            try
80
            {
81
                // loop until interrupted (at most 40 times the interval)
82
                for (int i = 0; i < 40; ++i)
5✔
83
                {
84
                    if (stoken.stop_requested())
5✔
85
                    {
86
                        throw "interrupted";
1✔
87
                    }
88
                    hpx::this_thread::sleep_for(interval);
4✔
89
                }
4✔
90
                HPX_TEST(false);
×
91
            }
1✔
92
            catch (...)
93
            {
94
                interrupted = true;
1✔
95
            }
1✔
96
        });
2✔
97

98
        hpx::this_thread::sleep_for(4 * interval);
1✔
99
        t.request_stop();
1✔
100
        HPX_TEST(t.get_stop_source().stop_requested());
1✔
101
        t.join();
1✔
102
        HPX_TEST(interrupted);
1✔
103
    }
1✔
104
}
1✔
105

106
///////////////////////////////////////////////////////////////////////////////
107
void test_interrupt_started_thread_with_subthread()
1✔
108
{
109
    auto interval = std::chrono::milliseconds(200);
1✔
110

111
    {
112
        hpx::jthread t([interval](hpx::stop_token stoken) {
2✔
113
            hpx::jthread t2([interval, stoken] {
7✔
114
                while (!stoken.stop_requested())
5✔
115
                {
116
                    hpx::this_thread::sleep_for(interval);
4✔
117
                }
118
            });
1✔
119

120
            while (!stoken.stop_requested())
5✔
121
            {
122
                hpx::this_thread::sleep_for(interval);
4✔
123
            }
124
        });
1✔
125

126
        hpx::this_thread::sleep_for(4 * interval);
1✔
127
        t.request_stop();
1✔
128
        HPX_TEST(t.get_stop_source().stop_requested());
1✔
129
        t.join();
1✔
130
    }
1✔
131
}
1✔
132

133
////////////////////////////////////////////////////////////////////////////////
134
void test_basic_api_with_func()
1✔
135
{
136
    hpx::stop_source ssource;
1✔
137
    HPX_TEST(ssource.stop_possible());
1✔
138
    HPX_TEST(!ssource.stop_requested());
1✔
139

140
    {
141
        hpx::jthread t([]() {});
2✔
142
        ssource = t.get_stop_source();
1✔
143
        HPX_TEST(ssource.stop_possible());
1✔
144
        HPX_TEST(!ssource.stop_requested());
1✔
145
        hpx::this_thread::sleep_for(std::chrono::milliseconds(500));
1✔
146
    }
1✔
147

148
    HPX_TEST(ssource.stop_possible());
1✔
149
    HPX_TEST(ssource.stop_requested());
1✔
150
}
1✔
151

152
////////////////////////////////////////////////////////////////////////////////
153
void test_exchange_token()
1✔
154
{
155
    auto interval = std::chrono::milliseconds(500);
1✔
156

157
    {
158
        std::atomic<hpx::stop_token*> pstoken(nullptr);
1✔
159
        hpx::jthread t([&pstoken](hpx::stop_token sstoken) {
2✔
160
            auto act_token = sstoken;
1✔
161
            int num_interrupts = 0;
1✔
162
            try
163
            {
164
                for (int i = 0; num_interrupts < 2 && i < 500; ++i)
501✔
165
                {
166
                    // if we get a new interrupt token from the caller, take it
167
                    if (pstoken.load() != nullptr)
500✔
168
                    {
169
                        act_token = *pstoken;
×
170
                        if (act_token.stop_requested())
×
171
                        {
172
                            ++num_interrupts;
×
173
                        }
×
174
                        pstoken.store(nullptr);
×
175
                    }
×
176
                    hpx::this_thread::sleep_for(std::chrono::microseconds(100));
500✔
177
                }
500✔
178
            }
1✔
179
            catch (...)
180
            {
181
                HPX_TEST(false);
×
182
            }
×
183
        });
1✔
184

185
        hpx::this_thread::sleep_for(interval);
1✔
186
        t.request_stop();
1✔
187

188
        hpx::this_thread::sleep_for(interval);
1✔
189
        hpx::stop_token it;
1✔
190
        pstoken.store(&it);
1✔
191

192
        hpx::this_thread::sleep_for(interval);
1✔
193
        auto ssource2 = hpx::stop_source{};
1✔
194
        it = hpx::stop_token{ssource2.get_token()};
1✔
195
        pstoken.store(&it);
1✔
196

197
        hpx::this_thread::sleep_for(interval);
1✔
198
        ssource2.request_stop();
1✔
199

200
        hpx::this_thread::sleep_for(interval);
1✔
201
    }
1✔
202
}
1✔
203

204
///////////////////////////////////////////////////////////////////////////////
205
void test_concurrent_interrupt()
1✔
206
{
207
    int num_threads = 30;
1✔
208
    hpx::stop_source is;
1✔
209

210
    {
211
        hpx::jthread t1([it = is.get_token()](hpx::stop_token stoken) {
7✔
212
            try
213
            {
214
                bool stop_requested = false;
1✔
215
                for (int i = 0; !it.stop_requested(); ++i)
505✔
216
                {
217
                    // should never switch back once requested
218
                    if (stoken.stop_requested())
504✔
219
                    {
220
                        stop_requested = true;
43✔
221
                    }
43✔
222
                    else
223
                    {
224
                        HPX_TEST(!stop_requested);
461✔
225
                    }
226
                    hpx::this_thread::sleep_for(std::chrono::microseconds(100));
504✔
227
                }
504✔
228
                HPX_TEST(stop_requested);
1✔
229
            }
1✔
230
            catch (...)
231
            {
232
                HPX_TEST(false);
×
233
            }
×
234
        });
1✔
235

236
        hpx::this_thread::sleep_for(std::chrono::milliseconds(500));
1✔
237

238
        // starts thread concurrently calling request_stop() for the same token
239
        std::vector<hpx::jthread> tv;
1✔
240
        int num_requested_stops = 0;
1✔
241
        for (int i = 0; i < num_threads; ++i)
31✔
242
        {
243
            hpx::this_thread::sleep_for(std::chrono::microseconds(100));
30✔
244
            hpx::jthread t([&t1, &num_requested_stops] {
60✔
245
                for (int i = 0; i < 13; ++i)
420✔
246
                {
247
                    // only first call to request_stop should return true
248
                    num_requested_stops += (t1.request_stop() ? 1 : 0);
390✔
249
                    HPX_TEST(!t1.request_stop());
390✔
250
                    hpx::this_thread::sleep_for(std::chrono::microseconds(10));
390✔
251
                }
390✔
252
            });
30✔
253
            tv.push_back(std::move(t));
30✔
254
        }
30✔
255

256
        for (auto& t : tv)
31✔
257
        {
258
            t.join();
30✔
259
        }
260

261
        // only one request to request_stop() should have returned true
262
        HPX_TEST_EQ(num_requested_stops, 1);
1✔
263
        is.request_stop();
1✔
264
    }
1✔
265
}
1✔
266

267
///////////////////////////////////////////////////////////////////////////////
268
void test_jthread_move()
1✔
269
{
270
    {
271
        bool interrupt_signalled = false;
1✔
272
        hpx::jthread t{[&interrupt_signalled](hpx::stop_token st) {
2✔
273
            while (!st.stop_requested())
1✔
274
            {
275
                hpx::this_thread::sleep_for(std::chrono::milliseconds(100));
×
276
            }
277
            if (st.stop_requested())
1✔
278
            {
279
                interrupt_signalled = true;
1✔
280
            }
1✔
281
        }};
1✔
282

283
        hpx::jthread t2{std::move(t)};    // should compile
1✔
284

285
        // NOLINTNEXTLINE(bugprone-use-after-move)
286
        auto ssource = t.get_stop_source();
1✔
287
        HPX_TEST(!ssource.stop_possible());
1✔
288
        HPX_TEST(!ssource.stop_requested());
1✔
289

290
        ssource = t2.get_stop_source();
1✔
291
        HPX_TEST(ssource != hpx::stop_source{});
1✔
292
        HPX_TEST(ssource.stop_possible());
1✔
293
        HPX_TEST(!ssource.stop_requested());
1✔
294

295
        HPX_TEST(!interrupt_signalled);
1✔
296
        t.request_stop();
1✔
297
        HPX_TEST(!interrupt_signalled);
1✔
298
        t2.request_stop();
1✔
299
        t2.join();
1✔
300
        HPX_TEST(interrupt_signalled);
1✔
301
    }
1✔
302
}
1✔
303

304
///////////////////////////////////////////////////////////////////////////////
305
// void testEnabledIfForCopyConstructor_CompileTimeOnly()
306
// {
307
//     {
308
//         hpx::jthread t;
309
//         //hpx::jthread t2{t};  // should not compile
310
//     }
311
// }
312

313
///////////////////////////////////////////////////////////////////////////////
314
int hpx_main()
1✔
315
{
316
    std::set_terminate([]() { HPX_TEST(false); });
2✔
317

318
    test_interrupt_by_destructor();
1✔
319
    test_interrupt_started_thread();
1✔
320
    test_interrupt_started_thread_with_subthread();
1✔
321
    test_basic_api_with_func();
1✔
322
    test_exchange_token();
1✔
323
    test_concurrent_interrupt();
1✔
324
    test_jthread_move();
1✔
325
    //     testEnabledIfForCopyConstructor_CompileTimeOnly();
326

327
    return hpx::local::finalize();
1✔
328
}
329

330
int main(int argc, char* argv[])
1✔
331
{
332
    HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv), 0,
1✔
333
        "HPX main exited with non-zero status");
334

335
    return hpx::util::report_errors();
1✔
336
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc