• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #877

28 Jan 2023 02:50AM UTC coverage: 86.595% (+0.2%) from 86.427%
#877

push

StellarBot
Merge #6057

6057: Create a dedicated thread pool to run LCI_progress. r=hkaiser a=JiakunYan

The `LCI_progress` function is not thread-safe and must be called frequently. Previously, we created a dedicated progress thread using pthread when initializing LCI. However, HPX doesn't know this progress thread and thus cannot bind threads to cores correctly.

This PR uses the new resource partitioner hook in the parcelport to create a thread pool "lci-progress-pool" (which only contains one core) to run LCI_progress.

Fixed:
- The thread pool will be allocated even if users are not using LCI parcelport.

Co-authored-by: Jiakun Yan <jiakunyan1998@gmail.com>

174895 of 201970 relevant lines covered (86.59%)

1858228.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.91
/libs/core/synchronization/tests/unit/shared_mutex/shared_mutex1.cpp
1
// (C) Copyright 2006-7 Anthony Williams
2
//  Copyright (c) 2015-2022 Hartmut Kaiser
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
// Distributed under the Boost Software License, Version 1.0. (See
6
// accompanying file LICENSE_1_0.txt or copy at
7
// http://www.boost.org/LICENSE_1_0.txt)
8

9
#include <hpx/async_local/post.hpp>
10
#include <hpx/local/future.hpp>
11
#include <hpx/local/init.hpp>
12
#include <hpx/local/shared_mutex.hpp>
13
#include <hpx/local/thread.hpp>
14

15
#include <hpx/modules/testing.hpp>
16

17
#include <chrono>
18
#include <mutex>
19
#include <shared_mutex>
20
#include <string>
21
#include <vector>
22

23
#include "shared_mutex_locking_thread.hpp"
24
#include "thread_group.hpp"
25

26
#define CHECK_LOCKED_VALUE_EQUAL(mutex_name, value, expected_value)            \
27
    {                                                                          \
28
        std::unique_lock<hpx::mutex> lock(mutex_name);                         \
29
        HPX_TEST_EQ(value, expected_value);                                    \
30
    }
31

32
void test_multiple_readers()
1✔
33
{
34
    typedef hpx::shared_mutex shared_mutex_type;
35
    typedef hpx::mutex mutex_type;
36

37
    unsigned const number_of_threads = 10;
1✔
38

39
    test::thread_group pool;
1✔
40

41
    hpx::shared_mutex rw_mutex;
1✔
42
    unsigned unblocked_count = 0;
1✔
43
    unsigned simultaneous_running_count = 0;
1✔
44
    unsigned max_simultaneous_running = 0;
1✔
45
    mutex_type unblocked_count_mutex;
1✔
46
    hpx::condition_variable unblocked_condition;
1✔
47
    mutex_type finish_mutex;
1✔
48
    std::unique_lock<mutex_type> finish_lock(finish_mutex);
1✔
49

50
    try
51
    {
52
        for (unsigned i = 0; i != number_of_threads; ++i)
11✔
53
        {
54
            pool.create_thread(
10✔
55
                test::locking_thread<std::shared_lock<shared_mutex_type>>(
10✔
56
                    rw_mutex, unblocked_count, unblocked_count_mutex,
57
                    unblocked_condition, finish_mutex,
58
                    simultaneous_running_count, max_simultaneous_running));
59
        }
10✔
60

61
        {
62
            std::unique_lock<mutex_type> lk(unblocked_count_mutex);
1✔
63
            // NOLINTNEXTLINE(bugprone-infinite-loop)
64
            while (unblocked_count < number_of_threads)
1✔
65
            {
66
                unblocked_condition.wait(lk);
×
67
            }
68
        }
1✔
69

70
        CHECK_LOCKED_VALUE_EQUAL(
1✔
71
            unblocked_count_mutex, unblocked_count, number_of_threads);
72

73
        finish_lock.unlock();
1✔
74
        pool.join_all();
1✔
75
    }
1✔
76
    catch (...)
77
    {
78
        pool.interrupt_all();
×
79
        pool.join_all();
×
80
        HPX_TEST(false);
×
81
    }
×
82

83
    CHECK_LOCKED_VALUE_EQUAL(
1✔
84
        unblocked_count_mutex, max_simultaneous_running, number_of_threads);
85
}
1✔
86

87
void test_only_one_writer_permitted()
1✔
88
{
89
    typedef hpx::shared_mutex shared_mutex_type;
90
    typedef hpx::mutex mutex_type;
91

92
    unsigned const number_of_threads = 10;
1✔
93

94
    test::thread_group pool;
1✔
95

96
    hpx::shared_mutex rw_mutex;
1✔
97
    unsigned unblocked_count = 0;
1✔
98
    unsigned simultaneous_running_count = 0;
1✔
99
    unsigned max_simultaneous_running = 0;
1✔
100
    mutex_type unblocked_count_mutex;
1✔
101
    hpx::condition_variable unblocked_condition;
1✔
102
    mutex_type finish_mutex;
1✔
103
    std::unique_lock<mutex_type> finish_lock(finish_mutex);
1✔
104

105
    try
106
    {
107
        for (unsigned i = 0; i != number_of_threads; ++i)
11✔
108
        {
109
            pool.create_thread(
10✔
110
                test::locking_thread<std::unique_lock<shared_mutex_type>>(
10✔
111
                    rw_mutex, unblocked_count, unblocked_count_mutex,
112
                    unblocked_condition, finish_mutex,
113
                    simultaneous_running_count, max_simultaneous_running));
114
        }
10✔
115

116
        hpx::this_thread::sleep_for(std::chrono::seconds(1));
1✔
117

118
        CHECK_LOCKED_VALUE_EQUAL(unblocked_count_mutex, unblocked_count, 1u);
1✔
119

120
        finish_lock.unlock();
1✔
121
        pool.join_all();
1✔
122
    }
1✔
123
    catch (...)
124
    {
125
        pool.interrupt_all();
×
126
        pool.join_all();
×
127
        HPX_TEST(false);
×
128
    }
×
129

130
    CHECK_LOCKED_VALUE_EQUAL(
1✔
131
        unblocked_count_mutex, unblocked_count, number_of_threads);
132
    CHECK_LOCKED_VALUE_EQUAL(
1✔
133
        unblocked_count_mutex, max_simultaneous_running, 1u);
134
}
1✔
135

136
void test_reader_blocks_writer()
1✔
137
{
138
    typedef hpx::shared_mutex shared_mutex_type;
139
    typedef hpx::mutex mutex_type;
140

141
    test::thread_group pool;
1✔
142

143
    hpx::shared_mutex rw_mutex;
1✔
144
    unsigned unblocked_count = 0;
1✔
145
    unsigned simultaneous_running_count = 0;
1✔
146
    unsigned max_simultaneous_running = 0;
1✔
147
    mutex_type unblocked_count_mutex;
1✔
148
    hpx::condition_variable unblocked_condition;
1✔
149
    mutex_type finish_mutex;
1✔
150
    std::unique_lock<mutex_type> finish_lock(finish_mutex);
1✔
151

152
    try
153
    {
154
        pool.create_thread(
1✔
155
            test::locking_thread<std::shared_lock<shared_mutex_type>>(rw_mutex,
1✔
156
                unblocked_count, unblocked_count_mutex, unblocked_condition,
157
                finish_mutex, simultaneous_running_count,
158
                max_simultaneous_running));
159

160
        {
161
            std::unique_lock<mutex_type> lk(unblocked_count_mutex);
1✔
162
            // NOLINTNEXTLINE(bugprone-infinite-loop)
163
            while (unblocked_count < 1)
2✔
164
            {
165
                unblocked_condition.wait(lk);
1✔
166
            }
167
        }
1✔
168

169
        CHECK_LOCKED_VALUE_EQUAL(unblocked_count_mutex, unblocked_count, 1u);
1✔
170

171
        pool.create_thread(
1✔
172
            test::locking_thread<std::unique_lock<shared_mutex_type>>(rw_mutex,
1✔
173
                unblocked_count, unblocked_count_mutex, unblocked_condition,
174
                finish_mutex, simultaneous_running_count,
175
                max_simultaneous_running));
176

177
        hpx::this_thread::sleep_for(std::chrono::seconds(1));
1✔
178

179
        CHECK_LOCKED_VALUE_EQUAL(unblocked_count_mutex, unblocked_count, 1u);
1✔
180

181
        finish_lock.unlock();
1✔
182
        pool.join_all();
1✔
183
    }
1✔
184
    catch (...)
185
    {
186
        pool.interrupt_all();
×
187
        pool.join_all();
×
188
        HPX_TEST(false);
×
189
    }
×
190

191
    CHECK_LOCKED_VALUE_EQUAL(unblocked_count_mutex, unblocked_count, 2u);
1✔
192
    CHECK_LOCKED_VALUE_EQUAL(
1✔
193
        unblocked_count_mutex, max_simultaneous_running, 1u);
194
}
1✔
195

196
void test_unlocking_writer_unblocks_all_readers()
1✔
197
{
198
    typedef hpx::shared_mutex shared_mutex_type;
199
    typedef hpx::mutex mutex_type;
200

201
    test::thread_group pool;
1✔
202

203
    hpx::shared_mutex rw_mutex;
1✔
204
    std::unique_lock<hpx::shared_mutex> write_lock(rw_mutex);
1✔
205
    unsigned unblocked_count = 0;
1✔
206
    unsigned simultaneous_running_count = 0;
1✔
207
    unsigned max_simultaneous_running = 0;
1✔
208
    mutex_type unblocked_count_mutex;
1✔
209
    hpx::condition_variable unblocked_condition;
1✔
210
    mutex_type finish_mutex;
1✔
211
    std::unique_lock<mutex_type> finish_lock(finish_mutex);
1✔
212

213
    unsigned const reader_count = 10;
1✔
214

215
    try
216
    {
217
        for (unsigned i = 0; i != reader_count; ++i)
11✔
218
        {
219
            pool.create_thread(
10✔
220
                test::locking_thread<std::shared_lock<shared_mutex_type>>(
10✔
221
                    rw_mutex, unblocked_count, unblocked_count_mutex,
222
                    unblocked_condition, finish_mutex,
223
                    simultaneous_running_count, max_simultaneous_running));
224
        }
10✔
225

226
        hpx::this_thread::sleep_for(std::chrono::seconds(1));
1✔
227

228
        CHECK_LOCKED_VALUE_EQUAL(unblocked_count_mutex, unblocked_count, 0u);
1✔
229

230
        write_lock.unlock();
1✔
231

232
        {
233
            std::unique_lock<mutex_type> lk(unblocked_count_mutex);
1✔
234
            // NOLINTNEXTLINE(bugprone-infinite-loop)
235
            while (unblocked_count < reader_count)
3✔
236
            {
237
                unblocked_condition.wait(lk);
2✔
238
            }
239
        }
1✔
240

241
        CHECK_LOCKED_VALUE_EQUAL(
1✔
242
            unblocked_count_mutex, unblocked_count, reader_count);
243

244
        finish_lock.unlock();
1✔
245
        pool.join_all();
1✔
246
    }
1✔
247
    catch (...)
248
    {
249
        pool.interrupt_all();
×
250
        pool.join_all();
×
251
        HPX_TEST(false);
×
252
    }
×
253

254
    CHECK_LOCKED_VALUE_EQUAL(
1✔
255
        unblocked_count_mutex, max_simultaneous_running, reader_count);
256
}
1✔
257

258
void test_unlocking_last_reader_only_unblocks_one_writer()
1✔
259
{
260
    typedef hpx::shared_mutex shared_mutex_type;
261
    typedef hpx::mutex mutex_type;
262

263
    test::thread_group pool;
1✔
264

265
    hpx::shared_mutex rw_mutex;
1✔
266
    unsigned unblocked_count = 0;
1✔
267
    unsigned simultaneous_running_readers = 0;
1✔
268
    unsigned max_simultaneous_readers = 0;
1✔
269
    unsigned simultaneous_running_writers = 0;
1✔
270
    unsigned max_simultaneous_writers = 0;
1✔
271
    mutex_type unblocked_count_mutex;
1✔
272
    hpx::condition_variable unblocked_condition;
1✔
273
    mutex_type finish_reading_mutex;
1✔
274
    std::unique_lock<mutex_type> finish_reading_lock(finish_reading_mutex);
1✔
275
    mutex_type finish_writing_mutex;
1✔
276
    std::unique_lock<mutex_type> finish_writing_lock(finish_writing_mutex);
1✔
277

278
    unsigned const reader_count = 10;
1✔
279
    unsigned const writer_count = 10;
1✔
280

281
    try
282
    {
283
        for (unsigned i = 0; i != reader_count; ++i)
11✔
284
        {
285
            pool.create_thread(
10✔
286
                test::locking_thread<std::shared_lock<shared_mutex_type>>(
10✔
287
                    rw_mutex, unblocked_count, unblocked_count_mutex,
288
                    unblocked_condition, finish_reading_mutex,
289
                    simultaneous_running_readers, max_simultaneous_readers));
290
        }
10✔
291

292
        hpx::this_thread::sleep_for(std::chrono::seconds(1));
1✔
293

294
        for (unsigned i = 0; i != writer_count; ++i)
11✔
295
        {
296
            pool.create_thread(
10✔
297
                test::locking_thread<std::unique_lock<shared_mutex_type>>(
10✔
298
                    rw_mutex, unblocked_count, unblocked_count_mutex,
299
                    unblocked_condition, finish_writing_mutex,
300
                    simultaneous_running_writers, max_simultaneous_writers));
301
        }
10✔
302

303
        {
304
            std::unique_lock<mutex_type> lk(unblocked_count_mutex);
1✔
305
            // NOLINTNEXTLINE(bugprone-infinite-loop)
306
            while (unblocked_count < reader_count)
1✔
307
            {
308
                unblocked_condition.wait(lk);
×
309
            }
310
        }
1✔
311

312
        hpx::this_thread::sleep_for(std::chrono::seconds(1));
1✔
313

314
        CHECK_LOCKED_VALUE_EQUAL(
1✔
315
            unblocked_count_mutex, unblocked_count, reader_count);
316

317
        finish_reading_lock.unlock();
1✔
318

319
        {
320
            std::unique_lock<mutex_type> lk(unblocked_count_mutex);
1✔
321
            // NOLINTNEXTLINE(bugprone-infinite-loop)
322
            while (unblocked_count < (reader_count + 1))
2✔
323
            {
324
                unblocked_condition.wait(lk);
1✔
325
            }
326
        }
1✔
327

328
        CHECK_LOCKED_VALUE_EQUAL(
1✔
329
            unblocked_count_mutex, unblocked_count, reader_count + 1);
330

331
        finish_writing_lock.unlock();
1✔
332
        pool.join_all();
1✔
333
    }
1✔
334
    catch (...)
335
    {
336
        pool.interrupt_all();
×
337
        pool.join_all();
×
338
        HPX_TEST(false);
×
339
    }
×
340

341
    CHECK_LOCKED_VALUE_EQUAL(
1✔
342
        unblocked_count_mutex, unblocked_count, reader_count + writer_count);
343
    CHECK_LOCKED_VALUE_EQUAL(
1✔
344
        unblocked_count_mutex, max_simultaneous_readers, reader_count);
345
    CHECK_LOCKED_VALUE_EQUAL(
1✔
346
        unblocked_count_mutex, max_simultaneous_writers, 1u);
347
}
1✔
348

349
///////////////////////////////////////////////////////////////////////////////
350
int hpx_main()
1✔
351
{
352
    test_multiple_readers();
1✔
353
    test_only_one_writer_permitted();
1✔
354
    test_reader_blocks_writer();
1✔
355
    test_unlocking_writer_unblocks_all_readers();
1✔
356
    test_unlocking_last_reader_only_unblocks_one_writer();
1✔
357

358
    return hpx::local::finalize();
1✔
359
}
360

361
int main(int argc, char* argv[])
1✔
362
{
363
    // By default this test should run on all available cores
364
    std::vector<std::string> const cfg = {"hpx.os_threads=all"};
1✔
365

366
    // Initialize and run HPX
367
    hpx::local::init_params init_args;
1✔
368
    init_args.cfg = cfg;
1✔
369
    HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv, init_args), 0,
1✔
370
        "HPX main exited with non-zero status");
371

372
    return hpx::util::report_errors();
1✔
373
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc