• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #877

28 Jan 2023 02:50AM UTC coverage: 86.595% (+0.2%) from 86.427%
#877

push

StellarBot
Merge #6057

6057: Create a dedicated thread pool to run LCI_progress. r=hkaiser a=JiakunYan

The `LCI_progress` function is not thread-safe and must be called frequently. Previously, we created a dedicated progress thread using pthread when initializing LCI. However, HPX doesn't know this progress thread and thus cannot bind threads to cores correctly.

This PR uses the new resource partitioner hook in the parcelport to create a thread pool "lci-progress-pool" (which only contains one core) to run LCI_progress.

Fixed:
- The thread pool will be allocated even if users are not using LCI parcelport.

Co-authored-by: Jiakun Yan <jiakunyan1998@gmail.com>

174895 of 201970 relevant lines covered (86.59%)

1858228.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.26
/libs/core/synchronization/tests/unit/shared_mutex/shared_mutex2.cpp
1
// (C) Copyright 2006-7 Anthony Williams
2
//  Copyright (c) 2015-2022 Hartmut Kaiser
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
// Distributed under the Boost Software License, Version 1.0. (See
6
// accompanying file LICENSE_1_0.txt or copy at
7
// http://www.boost.org/LICENSE_1_0.txt)
8

9
#include <hpx/local/future.hpp>
10
#include <hpx/local/init.hpp>
11
#include <hpx/local/shared_mutex.hpp>
12
#include <hpx/local/thread.hpp>
13

14
#include <hpx/modules/testing.hpp>
15

16
#include <chrono>
17
#include <mutex>
18
#include <string>
19
#include <vector>
20

21
#include "shared_mutex_locking_thread.hpp"
22
#include "thread_group.hpp"
23

24
#define CHECK_LOCKED_VALUE_EQUAL(mutex_name, value, expected_value)            \
25
    {                                                                          \
26
        std::unique_lock<hpx::mutex> lock(mutex_name);                         \
27
        HPX_TEST_EQ(value, expected_value);                                    \
28
    }
29

30
void test_only_one_upgrade_lock_permitted()
1✔
31
{
32
    typedef hpx::shared_mutex shared_mutex_type;
33
    typedef hpx::mutex mutex_type;
34

35
    unsigned const number_of_threads = 2;
1✔
36

37
    test::thread_group pool;
1✔
38

39
    shared_mutex_type rw_mutex;
1✔
40
    unsigned unblocked_count = 0;
1✔
41
    unsigned simultaneous_running_count = 0;
1✔
42
    unsigned max_simultaneous_running = 0;
1✔
43
    mutex_type unblocked_count_mutex;
1✔
44
    hpx::condition_variable unblocked_condition;
1✔
45
    mutex_type finish_mutex;
1✔
46
    std::unique_lock<mutex_type> finish_lock(finish_mutex);
1✔
47

48
    try
49
    {
50
        for (unsigned i = 0; i != number_of_threads; ++i)
3✔
51
        {
52
            pool.create_thread(
2✔
53
                test::locking_thread<hpx::upgrade_lock<shared_mutex_type>>(
2✔
54
                    rw_mutex, unblocked_count, unblocked_count_mutex,
55
                    unblocked_condition, finish_mutex,
56
                    simultaneous_running_count, max_simultaneous_running));
57
        }
2✔
58

59
        hpx::this_thread::sleep_for(std::chrono::seconds(1));
1✔
60

61
        CHECK_LOCKED_VALUE_EQUAL(unblocked_count_mutex, unblocked_count, 1u);
1✔
62

63
        finish_lock.unlock();
1✔
64
        pool.join_all();
1✔
65
    }
1✔
66
    catch (...)
67
    {
68
        pool.interrupt_all();
×
69
        pool.join_all();
×
70
        HPX_TEST(false);
×
71
    }
×
72

73
    CHECK_LOCKED_VALUE_EQUAL(
1✔
74
        unblocked_count_mutex, unblocked_count, number_of_threads);
75
    CHECK_LOCKED_VALUE_EQUAL(
1✔
76
        unblocked_count_mutex, max_simultaneous_running, 1u);
77
}
1✔
78

79
void test_can_lock_upgrade_if_currently_locked_shared()
1✔
80
{
81
    typedef hpx::shared_mutex shared_mutex_type;
82
    typedef hpx::mutex mutex_type;
83

84
    test::thread_group pool;
1✔
85

86
    shared_mutex_type rw_mutex;
1✔
87
    unsigned unblocked_count = 0;
1✔
88
    unsigned simultaneous_running_count = 0;
1✔
89
    unsigned max_simultaneous_running = 0;
1✔
90
    mutex_type unblocked_count_mutex;
1✔
91
    hpx::condition_variable unblocked_condition;
1✔
92
    mutex_type finish_mutex;
1✔
93
    std::unique_lock<mutex_type> finish_lock(finish_mutex);
1✔
94

95
    unsigned const reader_count = 10;
1✔
96

97
    try
98
    {
99
        for (unsigned i = 0; i != reader_count; ++i)
11✔
100
        {
101
            pool.create_thread(
10✔
102
                test::locking_thread<std::shared_lock<shared_mutex_type>>(
10✔
103
                    rw_mutex, unblocked_count, unblocked_count_mutex,
104
                    unblocked_condition, finish_mutex,
105
                    simultaneous_running_count, max_simultaneous_running));
106
        }
10✔
107

108
        hpx::this_thread::sleep_for(std::chrono::seconds(1));
1✔
109

110
        pool.create_thread(
1✔
111
            test::locking_thread<hpx::upgrade_lock<shared_mutex_type>>(rw_mutex,
1✔
112
                unblocked_count, unblocked_count_mutex, unblocked_condition,
113
                finish_mutex, simultaneous_running_count,
114
                max_simultaneous_running));
115

116
        {
117
            std::unique_lock<mutex_type> lk(unblocked_count_mutex);
1✔
118
            // NOLINTNEXTLINE(bugprone-infinite-loop)
119
            while (unblocked_count < (reader_count + 1))
1✔
120
            {
121
                unblocked_condition.wait(lk);
×
122
            }
123
        }
1✔
124

125
        CHECK_LOCKED_VALUE_EQUAL(
1✔
126
            unblocked_count_mutex, unblocked_count, reader_count + 1);
127

128
        finish_lock.unlock();
1✔
129
        pool.join_all();
1✔
130
    }
1✔
131
    catch (...)
132
    {
133
        pool.interrupt_all();
×
134
        pool.join_all();
×
135
        HPX_TEST(false);
×
136
    }
×
137

138
    CHECK_LOCKED_VALUE_EQUAL(
1✔
139
        unblocked_count_mutex, unblocked_count, reader_count + 1);
140
    CHECK_LOCKED_VALUE_EQUAL(
1✔
141
        unblocked_count_mutex, max_simultaneous_running, reader_count + 1);
142
}
1✔
143

144
void test_can_lock_upgrade_to_unique_if_currently_locked_upgrade()
1✔
145
{
146
    typedef hpx::shared_mutex shared_mutex_type;
147

148
    shared_mutex_type mtx;
1✔
149
    hpx::upgrade_lock<shared_mutex_type> l(mtx);
1✔
150
    hpx::upgrade_to_unique_lock<shared_mutex_type> ul(l);
1✔
151
    HPX_TEST(ul.owns_lock());
1✔
152
}
1✔
153

154
void test_if_other_thread_has_write_lock_try_lock_shared_returns_false()
1✔
155
{
156
    typedef hpx::shared_mutex shared_mutex_type;
157
    typedef hpx::mutex mutex_type;
158

159
    shared_mutex_type rw_mutex;
1✔
160
    mutex_type finish_mutex;
1✔
161
    mutex_type unblocked_mutex;
1✔
162
    unsigned unblocked_count = 0;
1✔
163
    std::unique_lock<mutex_type> finish_lock(finish_mutex);
1✔
164
    hpx::thread writer(test::simple_writing_thread(
1✔
165
        rw_mutex, finish_mutex, unblocked_mutex, unblocked_count));
166

167
    hpx::this_thread::sleep_for(std::chrono::seconds(1));
1✔
168

169
    CHECK_LOCKED_VALUE_EQUAL(unblocked_mutex, unblocked_count, 1u);
1✔
170

171
    bool const try_succeeded = rw_mutex.try_lock_shared();
1✔
172
    HPX_TEST(!try_succeeded);
1✔
173
    if (try_succeeded)
1✔
174
    {
175
        rw_mutex.unlock_shared();
×
176
    }
×
177

178
    finish_lock.unlock();
1✔
179
    writer.join();
1✔
180
}
1✔
181

182
void test_if_other_thread_has_write_lock_try_lock_upgrade_returns_false()
1✔
183
{
184
    typedef hpx::shared_mutex shared_mutex_type;
185
    typedef hpx::mutex mutex_type;
186

187
    shared_mutex_type rw_mutex;
1✔
188
    mutex_type finish_mutex;
1✔
189
    mutex_type unblocked_mutex;
1✔
190
    unsigned unblocked_count = 0;
1✔
191
    std::unique_lock<mutex_type> finish_lock(finish_mutex);
1✔
192
    hpx::thread writer(test::simple_writing_thread(
1✔
193
        rw_mutex, finish_mutex, unblocked_mutex, unblocked_count));
194

195
    hpx::this_thread::sleep_for(std::chrono::seconds(1));
1✔
196

197
    CHECK_LOCKED_VALUE_EQUAL(unblocked_mutex, unblocked_count, 1u);
1✔
198

199
    bool const try_succeeded = rw_mutex.try_lock_upgrade();
1✔
200
    HPX_TEST(!try_succeeded);
1✔
201
    if (try_succeeded)
1✔
202
    {
203
        rw_mutex.unlock_upgrade();
×
204
    }
×
205

206
    finish_lock.unlock();
1✔
207
    writer.join();
1✔
208
}
1✔
209

210
void test_if_no_thread_has_lock_try_lock_shared_returns_true()
1✔
211
{
212
    typedef hpx::shared_mutex shared_mutex_type;
213

214
    shared_mutex_type rw_mutex;
1✔
215
    bool const try_succeeded = rw_mutex.try_lock_shared();
1✔
216
    HPX_TEST(try_succeeded);
1✔
217
    if (try_succeeded)
1✔
218
    {
219
        rw_mutex.unlock_shared();
1✔
220
    }
1✔
221
}
1✔
222

223
void test_if_no_thread_has_lock_try_lock_upgrade_returns_true()
1✔
224
{
225
    typedef hpx::shared_mutex shared_mutex_type;
226

227
    shared_mutex_type rw_mutex;
1✔
228
    bool const try_succeeded = rw_mutex.try_lock_upgrade();
1✔
229
    HPX_TEST(try_succeeded);
1✔
230
    if (try_succeeded)
1✔
231
    {
232
        rw_mutex.unlock_upgrade();
1✔
233
    }
1✔
234
}
1✔
235

236
void test_if_other_thread_has_shared_lock_try_lock_shared_returns_true()
1✔
237
{
238
    typedef hpx::shared_mutex shared_mutex_type;
239
    typedef hpx::mutex mutex_type;
240

241
    shared_mutex_type rw_mutex;
1✔
242
    mutex_type finish_mutex;
1✔
243
    mutex_type unblocked_mutex;
1✔
244
    unsigned unblocked_count = 0;
1✔
245
    std::unique_lock<mutex_type> finish_lock(finish_mutex);
1✔
246
    hpx::thread writer(test::simple_reading_thread(
1✔
247
        rw_mutex, finish_mutex, unblocked_mutex, unblocked_count));
248

249
    hpx::this_thread::sleep_for(std::chrono::seconds(1));
1✔
250

251
    CHECK_LOCKED_VALUE_EQUAL(unblocked_mutex, unblocked_count, 1u);
1✔
252

253
    bool const try_succeeded = rw_mutex.try_lock_shared();
1✔
254
    HPX_TEST(try_succeeded);
1✔
255
    if (try_succeeded)
1✔
256
    {
257
        rw_mutex.unlock_shared();
1✔
258
    }
1✔
259

260
    finish_lock.unlock();
1✔
261
    writer.join();
1✔
262
}
1✔
263

264
void test_if_other_thread_has_shared_lock_try_lock_upgrade_returns_true()
1✔
265
{
266
    typedef hpx::shared_mutex shared_mutex_type;
267
    typedef hpx::mutex mutex_type;
268

269
    shared_mutex_type rw_mutex;
1✔
270
    mutex_type finish_mutex;
1✔
271
    mutex_type unblocked_mutex;
1✔
272
    unsigned unblocked_count = 0;
1✔
273
    std::unique_lock<mutex_type> finish_lock(finish_mutex);
1✔
274
    hpx::thread writer(test::simple_reading_thread(
1✔
275
        rw_mutex, finish_mutex, unblocked_mutex, unblocked_count));
276

277
    hpx::this_thread::sleep_for(std::chrono::seconds(1));
1✔
278

279
    CHECK_LOCKED_VALUE_EQUAL(unblocked_mutex, unblocked_count, 1u);
1✔
280

281
    bool const try_succeeded = rw_mutex.try_lock_upgrade();
1✔
282
    HPX_TEST(try_succeeded);
1✔
283
    if (try_succeeded)
1✔
284
    {
285
        rw_mutex.unlock_upgrade();
1✔
286
    }
1✔
287

288
    finish_lock.unlock();
1✔
289
    writer.join();
1✔
290
}
1✔
291

292
///////////////////////////////////////////////////////////////////////////////
293
int hpx_main()
1✔
294
{
295
    test_only_one_upgrade_lock_permitted();
1✔
296
    test_can_lock_upgrade_if_currently_locked_shared();
1✔
297
    test_can_lock_upgrade_to_unique_if_currently_locked_upgrade();
1✔
298
    test_if_other_thread_has_write_lock_try_lock_shared_returns_false();
1✔
299
    test_if_other_thread_has_write_lock_try_lock_upgrade_returns_false();
1✔
300
    test_if_no_thread_has_lock_try_lock_shared_returns_true();
1✔
301
    test_if_no_thread_has_lock_try_lock_upgrade_returns_true();
1✔
302
    test_if_other_thread_has_shared_lock_try_lock_shared_returns_true();
1✔
303
    test_if_other_thread_has_shared_lock_try_lock_upgrade_returns_true();
1✔
304

305
    return hpx::local::finalize();
1✔
306
}
307

308
int main(int argc, char* argv[])
1✔
309
{
310
    // By default this test should run on all available cores
311
    std::vector<std::string> const cfg = {"hpx.os_threads=all"};
1✔
312

313
    // Initialize and run HPX
314
    hpx::local::init_params init_args;
1✔
315
    init_args.cfg = cfg;
1✔
316
    HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv, init_args), 0,
1✔
317
        "HPX main exited with non-zero status");
318

319
    return hpx::util::report_errors();
1✔
320
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc