• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #862

10 Jan 2023 05:30PM UTC coverage: 86.582% (-0.05%) from 86.634%
#862

push

StellarBot
Merge #6130

6130: Remove the mutex lock in the critical path of get_partitioner. r=hkaiser a=JiakunYan

Remove the mutex lock in the critical path of hpx::resource::detail::get_partitioner.

The protected variable `partitioner_ref` is only set once during initialization.

Co-authored-by: Jiakun Yan <jiakunyan1998@gmail.com>

6 of 6 new or added lines in 1 file covered. (100.0%)

174767 of 201851 relevant lines covered (86.58%)

2069816.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.62
/libs/core/threading_base/src/scheduler_base.cpp
1
//  Copyright (c) 2007-2022 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#include <hpx/config.hpp>
8
#include <hpx/assert.hpp>
9
#include <hpx/execution_base/this_thread.hpp>
10
#include <hpx/threading_base/scheduler_base.hpp>
11
#include <hpx/threading_base/scheduler_mode.hpp>
12
#include <hpx/threading_base/scheduler_state.hpp>
13
#include <hpx/threading_base/thread_init_data.hpp>
14
#include <hpx/threading_base/thread_pool_base.hpp>
15
#if defined(HPX_HAVE_SCHEDULER_LOCAL_STORAGE)
16
#include <hpx/coroutines/detail/tss.hpp>
17
#endif
18

19
#include <algorithm>
20
#include <atomic>
21
#include <chrono>
22
#include <cmath>
23
#include <condition_variable>
24
#include <cstddef>
25
#include <cstdint>
26
#include <exception>
27
#include <limits>
28
#include <memory>
29
#include <mutex>
30
#include <ostream>
31
#include <set>
32
#include <string>
33
#include <utility>
34
#include <vector>
35

36
///////////////////////////////////////////////////////////////////////////////
37
namespace hpx::threads::policies {
38

39
    scheduler_base::scheduler_base(std::size_t num_threads,
1,269✔
40
        char const* description, thread_queue_init_parameters thread_queue_init,
41
        scheduler_mode mode)
42
      : suspend_mtxs_(num_threads)
1,269✔
43
      , suspend_conds_(num_threads)
1,269✔
44
      , pu_mtxs_(num_threads)
1,269✔
45
      , states_(num_threads)
1,269✔
46
      , description_(description)
1,269✔
47
      , thread_queue_init_(thread_queue_init)
1,269✔
48
      , parent_pool_(nullptr)
1,269✔
49
      , background_thread_count_(0)
1,269✔
50
      , polling_function_mpi_(&null_polling_function)
1,269✔
51
      , polling_function_cuda_(&null_polling_function)
1,269✔
52
      , polling_work_count_function_mpi_(&null_polling_work_count_function)
1,269✔
53
      , polling_work_count_function_cuda_(&null_polling_work_count_function)
1,269✔
54
    {
1,269✔
55
        scheduler_base::set_scheduler_mode(mode);
1,269✔
56

57
#if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF)
58
        double max_time = thread_queue_init.max_idle_backoff_time_;
1,269✔
59

60
        wait_counts_.resize(num_threads);
1,269✔
61
        for (auto&& data : wait_counts_)
5,648✔
62
        {
63
            data.data_.wait_count_ = 0;
4,379✔
64
            data.data_.max_idle_backoff_time_ = max_time;
4,379✔
65
        }
66
#endif
67

68
        for (std::size_t i = 0; i != num_threads; ++i)
5,648✔
69
            states_[i].store(hpx::state::initialized);
4,379✔
70
    }
1,269✔
71

72
    void scheduler_base::idle_callback([[maybe_unused]] std::size_t num_thread)
5,172✔
73
    {
74
#if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF)
75
        if (mode_.data_.load(std::memory_order_relaxed) &
4,920✔
76
            policies::scheduler_mode::enable_idle_backoff)
77
        {
78
            // Put this thread to sleep for some time, additionally it gets
79
            // woken up on new work.
80

81
            idle_backoff_data& data = wait_counts_[num_thread].data_;
5,393✔
82

83
            // Exponential back-off with a maximum sleep time.
84
            double exponent = (std::min)(double(data.wait_count_),
9,840✔
85
                double(std::numeric_limits<double>::max_exponent - 1));
5,393✔
86

87
            std::chrono::milliseconds period(std::lround((std::min)(
5,393✔
88
                data.max_idle_backoff_time_, std::pow(2.0, exponent))));
5,393✔
89

90
            ++data.wait_count_;
5,393✔
91

92
            std::unique_lock<pu_mutex_type> l(mtx_);
5,393✔
93
            if (cond_.wait_for(l, period) ==    //-V1089
4,920✔
94
                std::cv_status::no_timeout)
95
            {
96
                // reset counter if thread was woken up
97
                data.wait_count_ = 0;
2,931✔
98
            }
2,931✔
99
        }
5,393✔
100
#endif
101
    }
5,379✔
102

103
    /// This function gets called by the thread-manager whenever new work
104
    /// has been added, allowing the scheduler to reactivate one or more of
105
    /// possibly idling OS threads
106
    void scheduler_base::do_some_work(std::size_t)
9,495,618✔
107
    {
108
#if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF)
109
        if (mode_.data_.load(std::memory_order_relaxed) &
9,495,456✔
110
            policies::scheduler_mode::enable_idle_backoff)
111
        {
112
            cond_.notify_all();
9,494,457✔
113
        }
9,494,457✔
114
#endif
115
    }
9,496,081✔
116

117
    void scheduler_base::suspend(std::size_t num_thread)
856✔
118
    {
119
        HPX_ASSERT(num_thread < suspend_conds_.size());
856✔
120

121
        states_[num_thread].store(hpx::state::sleeping);
860✔
122
        std::unique_lock<pu_mutex_type> l(suspend_mtxs_[num_thread]);
860✔
123
        suspend_conds_[num_thread].wait(l);    //-V1089
860✔
124

125
        // Only set running if still in hpx::state::sleeping. Can be set with
126
        // non-blocking/locking functions to stopping or terminating, in which
127
        // case the state is left untouched.
128
        hpx::state expected = hpx::state::sleeping;
860✔
129
        states_[num_thread].compare_exchange_strong(
860✔
130
            expected, hpx::state::running);
131

132
        HPX_ASSERT(expected == hpx::state::sleeping ||
857✔
133
            expected == hpx::state::stopping ||
134
            expected == hpx::state::terminating);
135
    }
860✔
136

137
    void scheduler_base::resume(std::size_t num_thread)
37,206✔
138
    {
139
        if (num_thread == std::size_t(-1))
37,206✔
140
        {
141
            for (std::condition_variable& c : suspend_conds_)
×
142
            {
143
                c.notify_one();
×
144
            }
145
        }
×
146
        else
147
        {
148
            HPX_ASSERT(num_thread < suspend_conds_.size());
37,206✔
149
            suspend_conds_[num_thread].notify_one();
37,206✔
150
        }
151
    }
37,206✔
152

153
    std::size_t scheduler_base::select_active_pu(
9,985,366✔
154
        std::unique_lock<pu_mutex_type>& l, std::size_t num_thread,
155
        bool allow_fallback)
156
    {
157
        if (mode_.data_.load(std::memory_order_relaxed) &
9,985,183✔
158
            threads::policies::scheduler_mode::enable_elasticity)
159
        {
160
            std::size_t states_size = states_.size();
80,372✔
161

162
            if (!allow_fallback)
80,372✔
163
            {
164
                // Try indefinitely as long as at least one thread is available
165
                // for scheduling. Increase allowed state if no threads are
166
                // available for scheduling.
167
                auto max_allowed_state = hpx::state::suspended;
66,631✔
168

169
                hpx::util::yield_while([this, states_size, &l, &num_thread,
133,262✔
170
                                           &max_allowed_state]() {
171
                    std::size_t num_allowed_threads = 0;
66,631✔
172

173
                    for (std::size_t offset = 0; offset < states_size; ++offset)
154,626✔
174
                    {
175
                        std::size_t num_thread_local =
154,626✔
176
                            (num_thread + offset) % states_size;
154,626✔
177

178
                        l = std::unique_lock<pu_mutex_type>(
154,626✔
179
                            pu_mtxs_[num_thread_local], std::try_to_lock);
154,626✔
180

181
                        if (l.owns_lock())
154,626✔
182
                        {
183
                            if (states_[num_thread_local] <= max_allowed_state)
154,626✔
184
                            {
185
                                num_thread = num_thread_local;
66,631✔
186
                                return false;
66,631✔
187
                            }
188

189
                            l.unlock();
87,995✔
190
                        }
87,995✔
191

192
                        if (states_[num_thread_local] <= max_allowed_state)
87,995✔
193
                        {
194
                            ++num_allowed_threads;
×
195
                        }
×
196
                    }
87,995✔
197

198
                    if (0 == num_allowed_threads)
×
199
                    {
200
                        if (max_allowed_state <= hpx::state::suspended)
×
201
                        {
202
                            max_allowed_state = hpx::state::sleeping;
×
203
                        }
×
204
                        else if (max_allowed_state <= hpx::state::sleeping)
×
205
                        {
206
                            max_allowed_state = hpx::state::stopping;
×
207
                        }
×
208
                        else
209
                        {
210
                            // All threads are terminating or stopped. Just
211
                            // return num_thread to avoid infinite loop.
212
                            return false;
×
213
                        }
214
                    }
×
215

216
                    // Yield after trying all pus, then try again
217
                    return true;
×
218
                });
66,631✔
219

220
                return num_thread;
66,631✔
221
            }
222

223
            // Try all pus only once if fallback is allowed
224
            HPX_ASSERT(num_thread != std::size_t(-1));
13,741✔
225
            for (std::size_t offset = 0; offset < states_size; ++offset)
16,650✔
226
            {
227
                std::size_t num_thread_local =
16,650✔
228
                    (num_thread + offset) % states_size;
16,650✔
229

230
                l = std::unique_lock<pu_mutex_type>(
16,650✔
231
                    pu_mtxs_[num_thread_local], std::try_to_lock);
16,650✔
232

233
                if (l.owns_lock() &&
16,650✔
234
                    states_[num_thread_local] <= hpx::state::suspended)
16,598✔
235
                {
236
                    return num_thread_local;
13,741✔
237
                }
238
            }
2,909✔
239
        }
×
240

241
        return num_thread;
9,904,883✔
242
    }
9,985,262✔
243

244
    // allow to access/manipulate states
245
    std::atomic<hpx::state>& scheduler_base::get_state(std::size_t num_thread)
1,295,111✔
246
    {
247
        HPX_ASSERT(num_thread < states_.size());
1,295,111✔
248
        return states_[num_thread];
1,295,094✔
249
    }
250

251
    std::atomic<hpx::state> const& scheduler_base::get_state(
×
252
        std::size_t num_thread) const
253
    {
254
        HPX_ASSERT(num_thread < states_.size());
×
255
        return states_[num_thread];
×
256
    }
257

258
    void scheduler_base::set_all_states(hpx::state s)
1,268✔
259
    {
260
        for (auto& state : states_)
5,643✔
261
        {
262
            state.store(s);
4,375✔
263
        }
264
    }
1,268✔
265

266
    void scheduler_base::set_all_states_at_least(hpx::state s)
2,533✔
267
    {
268
        for (auto& state : states_)
11,283✔
269
        {
270
            if (state < s)
8,750✔
271
            {
272
                state.store(s);
4,377✔
273
            }
4,377✔
274
        }
275
    }
2,533✔
276

277
    // return whether all states are at least at the given one
278
    bool scheduler_base::has_reached_state(hpx::state s) const
2,537✔
279
    {
280
        for (auto const& state : states_)
2,537✔
281
        {
282
            if (state.load(std::memory_order_relaxed) < s)
2,537✔
283
                return false;
2,537✔
284
        }
285
        return true;
×
286
    }
2,537✔
287

288
    bool scheduler_base::is_state(hpx::state s) const
×
289
    {
290
        for (auto const& state : states_)
×
291
        {
292
            if (state.load(std::memory_order_relaxed) != s)
×
293
                return false;
×
294
        }
295
        return true;
×
296
    }
×
297

298
    std::pair<hpx::state, hpx::state> scheduler_base::get_minmax_state() const
1,271,356✔
299
    {
300
        std::pair<hpx::state, hpx::state> result(
1,271,353✔
301
            hpx::state::last_valid_runtime_state,
1,271,353✔
302
            hpx::state::first_valid_runtime_state);
1,271,353✔
303

304
        for (auto const& state_iter : states_)
5,299,887✔
305
        {
306
            hpx::state s = state_iter.load();
4,028,553✔
307
            result.first = (std::min)(result.first, s);
4,028,553✔
308
            result.second = (std::max)(result.second, s);
4,028,553✔
309
        }
310

311
        return result;
1,271,324✔
312
    }
313

314
    // get/set scheduler mode
315
    void scheduler_base::set_scheduler_mode(scheduler_mode mode)
3,816✔
316
    {
317
        // distribute the same value across all cores
318
        mode_.data_.store(mode, std::memory_order_release);
3,816✔
319
        do_some_work(std::size_t(-1));
3,816✔
320
    }
3,816✔
321

322
    void scheduler_base::add_scheduler_mode(scheduler_mode mode)
1,261✔
323
    {
324
        // distribute the same value across all cores
325
        mode = scheduler_mode(get_scheduler_mode() | mode);
1,261✔
326
        set_scheduler_mode(mode);
1,261✔
327
    }
1,261✔
328

329
    void scheduler_base::remove_scheduler_mode(scheduler_mode mode)
13✔
330
    {
331
        mode = scheduler_mode(get_scheduler_mode() & ~mode);
13✔
332
        set_scheduler_mode(mode);
13✔
333
    }
13✔
334

335
    void scheduler_base::add_remove_scheduler_mode(
×
336
        scheduler_mode to_add_mode, scheduler_mode to_remove_mode)
337
    {
338
        scheduler_mode mode = scheduler_mode(
×
339
            (get_scheduler_mode() | to_add_mode) & ~to_remove_mode);
×
340
        set_scheduler_mode(mode);
×
341
    }
×
342

343
    void scheduler_base::update_scheduler_mode(scheduler_mode mode, bool set)
1,262✔
344
    {
345
        if (set)
1,262✔
346
        {
347
            add_scheduler_mode(mode);
1,260✔
348
        }
1,260✔
349
        else
350
        {
351
            remove_scheduler_mode(mode);
2✔
352
        }
353
    }
1,262✔
354

355
    ///////////////////////////////////////////////////////////////////////////
356
    std::int64_t scheduler_base::get_background_thread_count() const noexcept
1,964,386✔
357
    {
358
        return background_thread_count_;
1,964,386✔
359
    }
360

361
    void scheduler_base::increment_background_thread_count() noexcept
599✔
362
    {
363
        ++background_thread_count_;
599✔
364
    }
599✔
365

366
    void scheduler_base::decrement_background_thread_count() noexcept
598✔
367
    {
368
        --background_thread_count_;
598✔
369
    }
598✔
370

371
#if defined(HPX_HAVE_SCHEDULER_LOCAL_STORAGE)
372
    coroutines::detail::tss_data_node* scheduler_base::find_tss_data(
373
        void const* key)
374
    {
375
        if (!thread_data_)
376
            return nullptr;
377
        return thread_data_->find(key);
378
    }
379

380
    void scheduler_base::add_new_tss_node(void const* key,
381
        std::shared_ptr<coroutines::detail::tss_cleanup_function> const& func,
382
        void* tss_data)
383
    {
384
        if (!thread_data_)
385
        {
386
            thread_data_ = std::make_shared<coroutines::detail::tss_storage>();
387
        }
388
        thread_data_->insert(key, func, tss_data);
389
    }
390

391
    void scheduler_base::erase_tss_node(void const* key, bool cleanup_existing)
392
    {
393
        if (thread_data_)
394
            thread_data_->erase(key, cleanup_existing);
395
    }
396

397
    void* scheduler_base::get_tss_data(void const* key)
398
    {
399
        if (coroutines::detail::tss_data_node* const current_node =
400
                find_tss_data(key))
401
        {
402
            return current_node->get_value();
403
        }
404
        return nullptr;
405
    }
406

407
    void scheduler_base::set_tss_data(void const* key,
408
        std::shared_ptr<coroutines::detail::tss_cleanup_function> const& func,
409
        void* tss_data, bool cleanup_existing)
410
    {
411
        if (coroutines::detail::tss_data_node* const current_node =
412
                find_tss_data(key))
413
        {
414
            if (func || (tss_data != 0))
415
                current_node->reinit(func, tss_data, cleanup_existing);
416
            else
417
                erase_tss_node(key, cleanup_existing);
418
        }
419
        else if (func || (tss_data != 0))
420
        {
421
            add_new_tss_node(key, func, tss_data);
422
        }
423
    }
424
#endif
425

426
    void scheduler_base::set_mpi_polling_functions(
×
427
        polling_function_ptr mpi_func,
428
        polling_work_count_function_ptr mpi_work_count_func)
429
    {
430
        polling_function_mpi_.store(mpi_func, std::memory_order_relaxed);
×
431
        polling_work_count_function_mpi_.store(
×
432
            mpi_work_count_func, std::memory_order_relaxed);
×
433
    }
×
434

435
    void scheduler_base::clear_mpi_polling_function()
×
436
    {
437
        polling_function_mpi_.store(
×
438
            &null_polling_function, std::memory_order_relaxed);
439
        polling_work_count_function_mpi_.store(
×
440
            &null_polling_work_count_function, std::memory_order_relaxed);
441
    }
×
442

443
    void scheduler_base::set_cuda_polling_functions(
×
444
        polling_function_ptr cuda_func,
445
        polling_work_count_function_ptr cuda_work_count_func)
446
    {
447
        polling_function_cuda_.store(cuda_func, std::memory_order_relaxed);
×
448
        polling_work_count_function_cuda_.store(
×
449
            cuda_work_count_func, std::memory_order_relaxed);
×
450
    }
×
451

452
    void scheduler_base::clear_cuda_polling_function()
×
453
    {
454
        polling_function_cuda_.store(
×
455
            &null_polling_function, std::memory_order_relaxed);
456
        polling_work_count_function_cuda_.store(
×
457
            &null_polling_work_count_function, std::memory_order_relaxed);
458
    }
×
459

460
    detail::polling_status scheduler_base::custom_polling_function() const
478,840,205✔
461
    {
462
        detail::polling_status status = detail::polling_status::idle;
479,573,392✔
463
#if defined(HPX_HAVE_MODULE_ASYNC_MPI)
464
        if ((*polling_function_mpi_.load(std::memory_order_relaxed))() ==
465
            detail::polling_status::busy)
466
        {
467
            status = detail::polling_status::busy;
468
        }
469
#endif
470
#if defined(HPX_HAVE_MODULE_ASYNC_CUDA)
471
        if ((*polling_function_cuda_.load(std::memory_order_relaxed))() ==
472
            detail::polling_status::busy)
473
        {
474
            status = detail::polling_status::busy;
475
        }
476
#endif
477
        return status;
479,573,392✔
478
    }
479

480
    std::size_t scheduler_base::get_polling_work_count() const
1,286,918✔
481
    {
482
        std::size_t work_count = 0;
1,286,918✔
483
#if defined(HPX_HAVE_MODULE_ASYNC_MPI)
484
        work_count +=
485
            polling_work_count_function_mpi_.load(std::memory_order_relaxed)();
486
#endif
487
#if defined(HPX_HAVE_MODULE_ASYNC_CUDA)
488
        work_count +=
489
            polling_work_count_function_cuda_.load(std::memory_order_relaxed)();
490
#endif
491
        return work_count;
1,286,918✔
492
    }
493

494
    std::ostream& operator<<(std::ostream& os, scheduler_base const& scheduler)
32,088,580✔
495
    {
496
        os << scheduler.get_description() << "(" << &scheduler << ")";
32,088,965✔
497

498
        return os;
32,088,965✔
499
    }
500
}    // namespace hpx::threads::policies
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc