• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #871

22 Jan 2023 11:22PM UTC coverage: 86.624% (+0.7%) from 85.97%
#871

push

StellarBot
Merge #6144

6144: General improvements to scheduling and related fixes r=hkaiser a=hkaiser

This is a collection of unrelated improvements applied to different parts of the code

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

152 of 152 new or added lines in 23 files covered. (100.0%)

174953 of 201969 relevant lines covered (86.62%)

1838882.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.1
/libs/core/threading_base/src/scheduler_base.cpp
1
//  Copyright (c) 2007-2022 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#include <hpx/config.hpp>
8
#include <hpx/assert.hpp>
9
#include <hpx/execution_base/this_thread.hpp>
10
#include <hpx/threading_base/scheduler_base.hpp>
11
#include <hpx/threading_base/scheduler_mode.hpp>
12
#include <hpx/threading_base/scheduler_state.hpp>
13
#include <hpx/threading_base/thread_init_data.hpp>
14
#include <hpx/threading_base/thread_pool_base.hpp>
15
#if defined(HPX_HAVE_SCHEDULER_LOCAL_STORAGE)
16
#include <hpx/coroutines/detail/tss.hpp>
17
#endif
18

19
#include <algorithm>
20
#include <atomic>
21
#include <chrono>
22
#include <cmath>
23
#include <condition_variable>
24
#include <cstddef>
25
#include <cstdint>
26
#include <exception>
27
#include <limits>
28
#include <memory>
29
#include <mutex>
30
#include <ostream>
31
#include <set>
32
#include <string>
33
#include <utility>
34
#include <vector>
35

36
///////////////////////////////////////////////////////////////////////////////
37
namespace hpx::threads::policies {
38

39
    scheduler_base::scheduler_base(std::size_t num_threads,
1,275✔
40
        char const* description,
41
        thread_queue_init_parameters const& thread_queue_init,
42
        scheduler_mode mode)
43
      : suspend_mtxs_(num_threads)
1,275✔
44
      , suspend_conds_(num_threads)
1,275✔
45
      , pu_mtxs_(num_threads)
1,275✔
46
      , states_(num_threads)
1,275✔
47
      , description_(description)
1,275✔
48
      , thread_queue_init_(thread_queue_init)
1,275✔
49
      , parent_pool_(nullptr)
1,275✔
50
      , background_thread_count_(0)
1,275✔
51
      , polling_function_mpi_(&null_polling_function)
1,275✔
52
      , polling_function_cuda_(&null_polling_function)
1,275✔
53
      , polling_work_count_function_mpi_(&null_polling_work_count_function)
1,275✔
54
      , polling_work_count_function_cuda_(&null_polling_work_count_function)
1,275✔
55
    {
1,275✔
56
        scheduler_base::set_scheduler_mode(mode);
1,275✔
57

58
#if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF)
59
        double max_time = thread_queue_init.max_idle_backoff_time_;
1,275✔
60

61
        wait_counts_.resize(num_threads);
1,275✔
62
        for (auto&& data : wait_counts_)
5,657✔
63
        {
64
            data.data_.wait_count_ = 0;
4,382✔
65
            data.data_.max_idle_backoff_time_ = max_time;
4,382✔
66
        }
67
#endif
68

69
        for (std::size_t i = 0; i != num_threads; ++i)
5,657✔
70
            states_[i].data_.store(hpx::state::initialized);
4,382✔
71
    }
1,275✔
72

73
    void scheduler_base::idle_callback([[maybe_unused]] std::size_t num_thread)
5,400✔
74
    {
75
#if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF)
76
        if (mode_.data_.load(std::memory_order_relaxed) &
5,129✔
77
            policies::scheduler_mode::enable_idle_backoff)
78
        {
79
            // Put this thread to sleep for some time, additionally it gets
80
            // woken up on new work.
81

82
            idle_backoff_data& data = wait_counts_[num_thread].data_;
5,646✔
83

84
            // Exponential back-off with a maximum sleep time.
85
            static constexpr std::int64_t const max_exponent =
86
                std::numeric_limits<double>::max_exponent;
87
            double exponent =
5,646✔
88
                (std::min)(double(data.wait_count_), double(max_exponent - 1));
5,646✔
89

90
            std::chrono::milliseconds period(std::lround((std::min)(
5,646✔
91
                data.max_idle_backoff_time_, std::pow(2.0, exponent))));
5,646✔
92

93
            ++data.wait_count_;
5,646✔
94

95
            std::unique_lock<pu_mutex_type> l(mtx_);
5,646✔
96
            if (cond_.wait_for(l, period) ==    //-V1089
5,129✔
97
                std::cv_status::no_timeout)
98
            {
99
                // reset counter if thread was woken up
100
                data.wait_count_ = 0;
3,226✔
101
            }
3,226✔
102
        }
5,646✔
103
#endif
104
    }
5,627✔
105

106
    /// This function gets called by the thread-manager whenever new work
107
    /// has been added, allowing the scheduler to reactivate one or more of
108
    /// possibly idling OS threads
109
    void scheduler_base::do_some_work(std::size_t)
9,075,410✔
110
    {
111
#if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF)
112
        if (mode_.data_.load(std::memory_order_relaxed) &
9,075,414✔
113
            policies::scheduler_mode::enable_idle_backoff)
114
        {
115
            cond_.notify_all();
9,073,880✔
116
        }
9,073,880✔
117
#endif
118
    }
9,075,443✔
119

120
    void scheduler_base::suspend(std::size_t num_thread)
875✔
121
    {
122
        HPX_ASSERT(num_thread < suspend_conds_.size());
875✔
123

124
        states_[num_thread].data_.store(hpx::state::sleeping);
878✔
125
        std::unique_lock<pu_mutex_type> l(suspend_mtxs_[num_thread]);
878✔
126
        suspend_conds_[num_thread].wait(l);    //-V1089
878✔
127

128
        // Only set running if still in hpx::state::sleeping. Can be set with
129
        // non-blocking/locking functions to stopping or terminating, in which
130
        // case the state is left untouched.
131
        hpx::state expected = hpx::state::sleeping;
878✔
132
        states_[num_thread].data_.compare_exchange_strong(
878✔
133
            expected, hpx::state::running);
134

135
        HPX_ASSERT(expected == hpx::state::sleeping ||
876✔
136
            expected == hpx::state::stopping ||
137
            expected == hpx::state::terminating);
138
    }
878✔
139

140
    void scheduler_base::resume(std::size_t num_thread)
36,461✔
141
    {
142
        if (num_thread == std::size_t(-1))
36,461✔
143
        {
144
            for (std::condition_variable& c : suspend_conds_)
×
145
            {
146
                c.notify_one();
×
147
            }
148
        }
×
149
        else
150
        {
151
            HPX_ASSERT(num_thread < suspend_conds_.size());
36,461✔
152
            suspend_conds_[num_thread].notify_one();
36,461✔
153
        }
154
    }
36,461✔
155

156
    std::size_t scheduler_base::select_active_pu(
9,564,981✔
157
        std::size_t num_thread, bool allow_fallback)
158
    {
159
        if (mode_.data_.load(std::memory_order_relaxed) &
9,564,462✔
160
            threads::policies::scheduler_mode::enable_elasticity)
161
        {
162
            std::size_t states_size = states_.size();
83,664✔
163

164
            if (!allow_fallback)
83,664✔
165
            {
166
                // Try indefinitely as long as at least one thread is available
167
                // for scheduling. Increase allowed state if no threads are
168
                // available for scheduling.
169
                auto max_allowed_state = hpx::state::suspended;
68,145✔
170

171
                hpx::util::yield_while([this, states_size, &num_thread,
136,290✔
172
                                           &max_allowed_state]() {
173
                    std::size_t num_allowed_threads = 0;
68,145✔
174

175
                    for (std::size_t offset = 0; offset < states_size; ++offset)
157,235✔
176
                    {
177
                        std::size_t num_thread_local =
157,235✔
178
                            (num_thread + offset) % states_size;
157,235✔
179

180
                        {
181
                            std::unique_lock<pu_mutex_type> l(
157,235✔
182
                                pu_mtxs_[num_thread_local], std::try_to_lock);
157,235✔
183

184
                            if (l.owns_lock())
157,235✔
185
                            {
186
                                if (states_[num_thread_local].data_.load(
314,470✔
187
                                        std::memory_order_relaxed) <=
157,235✔
188
                                    max_allowed_state)
157,235✔
189
                                {
190
                                    num_thread = num_thread_local;
68,145✔
191
                                    return false;
68,145✔
192
                                }
193
                            }
89,090✔
194
                        }
157,235✔
195

196
                        if (states_[num_thread_local].data_.load(
178,180✔
197
                                std::memory_order_relaxed) <= max_allowed_state)
89,090✔
198
                        {
199
                            ++num_allowed_threads;
×
200
                        }
×
201
                    }
89,090✔
202

203
                    if (0 == num_allowed_threads)
×
204
                    {
205
                        if (max_allowed_state <= hpx::state::suspended)
×
206
                        {
207
                            max_allowed_state = hpx::state::sleeping;
×
208
                        }
×
209
                        else if (max_allowed_state <= hpx::state::sleeping)
×
210
                        {
211
                            max_allowed_state = hpx::state::stopping;
×
212
                        }
×
213
                        else
214
                        {
215
                            // All threads are terminating or stopped. Just
216
                            // return num_thread to avoid infinite loop.
217
                            return false;
×
218
                        }
219
                    }
×
220

221
                    // Yield after trying all pus, then try again
222
                    return true;
×
223
                });
68,145✔
224

225
                return num_thread;
68,145✔
226
            }
227

228
            // Try all pus only once if fallback is allowed
229
            HPX_ASSERT(num_thread != std::size_t(-1));
15,519✔
230
            for (std::size_t offset = 0; offset < states_size; ++offset)
19,804✔
231
            {
232
                std::size_t num_thread_local =
19,804✔
233
                    (num_thread + offset) % states_size;
19,804✔
234

235
                std::unique_lock<pu_mutex_type> l(
19,804✔
236
                    pu_mtxs_[num_thread_local], std::try_to_lock);
19,804✔
237

238
                if (l.owns_lock() &&
19,804✔
239
                    states_[num_thread_local].data_.load(
19,804✔
240
                        std::memory_order_relaxed) <= hpx::state::suspended)
19,804✔
241
                {
242
                    return num_thread_local;
15,519✔
243
                }
244
            }
19,804✔
245
        }
×
246

247
        return num_thread;
9,480,695✔
248
    }
9,564,389✔
249

250
    // allow to access/manipulate states
251
    std::atomic<hpx::state>& scheduler_base::get_state(std::size_t num_thread)
1,295,558✔
252
    {
253
        HPX_ASSERT(num_thread < states_.size());
1,295,558✔
254
        return states_[num_thread].data_;
1,295,533✔
255
    }
256

257
    std::atomic<hpx::state> const& scheduler_base::get_state(
×
258
        std::size_t num_thread) const
259
    {
260
        HPX_ASSERT(num_thread < states_.size());
×
261
        return states_[num_thread].data_;
×
262
    }
263

264
    void scheduler_base::set_all_states(hpx::state s)
1,274✔
265
    {
266
        for (auto& state : states_)
5,652✔
267
        {
268
            state.data_.store(s);
4,378✔
269
        }
270
    }
1,274✔
271

272
    void scheduler_base::set_all_states_at_least(hpx::state s)
2,545✔
273
    {
274
        for (auto& state : states_)
11,301✔
275
        {
276
            if (state.data_.load(std::memory_order_relaxed) < s)
8,756✔
277
            {
278
                state.data_.store(s, std::memory_order_release);
4,380✔
279
            }
4,380✔
280
        }
281
    }
2,545✔
282

283
    // return whether all states are at least at the given one
284
    bool scheduler_base::has_reached_state(hpx::state s) const
2,549✔
285
    {
286
        for (auto const& state : states_)
2,549✔
287
        {
288
            if (state.data_.load(std::memory_order_relaxed) < s)
2,549✔
289
                return false;
2,549✔
290
        }
291
        return true;
×
292
    }
2,549✔
293

294
    bool scheduler_base::is_state(hpx::state s) const
×
295
    {
296
        for (auto const& state : states_)
×
297
        {
298
            if (state.data_.load(std::memory_order_relaxed) != s)
×
299
                return false;
×
300
        }
301
        return true;
×
302
    }
×
303

304
    std::pair<hpx::state, hpx::state> scheduler_base::get_minmax_state() const
1,271,467✔
305
    {
306
        std::pair<hpx::state, hpx::state> result(
1,271,491✔
307
            hpx::state::last_valid_runtime_state,
1,271,491✔
308
            hpx::state::first_valid_runtime_state);
1,271,491✔
309

310
        for (auto const& state_iter : states_)
5,300,955✔
311
        {
312
            hpx::state s = state_iter.data_.load(std::memory_order_relaxed);
4,029,537✔
313
            result.first = (std::min)(result.first, s);
4,029,537✔
314
            result.second = (std::max)(result.second, s);
4,029,537✔
315
        }
316

317
        return result;
1,271,456✔
318
    }
319

320
    // get/set scheduler mode
321
    void scheduler_base::set_scheduler_mode(scheduler_mode mode) noexcept
3,836✔
322
    {
323
        // distribute the same value across all cores
324
        mode_.data_.store(mode, std::memory_order_release);
3,836✔
325
        do_some_work(std::size_t(-1));
3,836✔
326
    }
3,836✔
327

328
    void scheduler_base::add_scheduler_mode(scheduler_mode mode) noexcept
1,266✔
329
    {
330
        // distribute the same value across all cores
331
        mode = scheduler_mode(get_scheduler_mode() | mode);
1,266✔
332
        set_scheduler_mode(mode);
1,266✔
333
    }
1,266✔
334

335
    void scheduler_base::remove_scheduler_mode(scheduler_mode mode) noexcept
16✔
336
    {
337
        mode = scheduler_mode(get_scheduler_mode() & ~mode);
16✔
338
        set_scheduler_mode(mode);
16✔
339
    }
16✔
340

341
    void scheduler_base::add_remove_scheduler_mode(
×
342
        scheduler_mode to_add_mode, scheduler_mode to_remove_mode) noexcept
343
    {
344
        scheduler_mode mode = scheduler_mode(
×
345
            (get_scheduler_mode() | to_add_mode) & ~to_remove_mode);
×
346
        set_scheduler_mode(mode);
×
347
    }
×
348

349
    void scheduler_base::update_scheduler_mode(
1,267✔
350
        scheduler_mode mode, bool set) noexcept
351
    {
352
        if (set)
1,267✔
353
        {
354
            add_scheduler_mode(mode);
1,265✔
355
        }
1,265✔
356
        else
357
        {
358
            remove_scheduler_mode(mode);
2✔
359
        }
360
    }
1,267✔
361

362
    ///////////////////////////////////////////////////////////////////////////
363
    std::int64_t scheduler_base::get_background_thread_count() const noexcept
1,531,053✔
364
    {
365
        return background_thread_count_;
1,531,053✔
366
    }
367

368
    void scheduler_base::increment_background_thread_count() noexcept
600✔
369
    {
370
        ++background_thread_count_;
600✔
371
    }
600✔
372

373
    void scheduler_base::decrement_background_thread_count() noexcept
577✔
374
    {
375
        --background_thread_count_;
577✔
376
    }
577✔
377

378
#if defined(HPX_HAVE_SCHEDULER_LOCAL_STORAGE)
379
    coroutines::detail::tss_data_node* scheduler_base::find_tss_data(
380
        void const* key)
381
    {
382
        if (!thread_data_)
383
            return nullptr;
384
        return thread_data_->find(key);
385
    }
386

387
    void scheduler_base::add_new_tss_node(void const* key,
388
        std::shared_ptr<coroutines::detail::tss_cleanup_function> const& func,
389
        void* tss_data)
390
    {
391
        if (!thread_data_)
392
        {
393
            thread_data_ = std::make_shared<coroutines::detail::tss_storage>();
394
        }
395
        thread_data_->insert(key, func, tss_data);
396
    }
397

398
    void scheduler_base::erase_tss_node(void const* key, bool cleanup_existing)
399
    {
400
        if (thread_data_)
401
            thread_data_->erase(key, cleanup_existing);
402
    }
403

404
    void* scheduler_base::get_tss_data(void const* key)
405
    {
406
        if (coroutines::detail::tss_data_node* const current_node =
407
                find_tss_data(key))
408
        {
409
            return current_node->get_value();
410
        }
411
        return nullptr;
412
    }
413

414
    void scheduler_base::set_tss_data(void const* key,
415
        std::shared_ptr<coroutines::detail::tss_cleanup_function> const& func,
416
        void* tss_data, bool cleanup_existing)
417
    {
418
        if (coroutines::detail::tss_data_node* const current_node =
419
                find_tss_data(key))
420
        {
421
            if (func || (tss_data != 0))
422
                current_node->reinit(func, tss_data, cleanup_existing);
423
            else
424
                erase_tss_node(key, cleanup_existing);
425
        }
426
        else if (func || (tss_data != 0))
427
        {
428
            add_new_tss_node(key, func, tss_data);
429
        }
430
    }
431
#endif
432

433
    void scheduler_base::set_mpi_polling_functions(
×
434
        polling_function_ptr mpi_func,
435
        polling_work_count_function_ptr mpi_work_count_func)
436
    {
437
        polling_function_mpi_.store(mpi_func, std::memory_order_relaxed);
×
438
        polling_work_count_function_mpi_.store(
×
439
            mpi_work_count_func, std::memory_order_relaxed);
×
440
    }
×
441

442
    void scheduler_base::clear_mpi_polling_function()
×
443
    {
444
        polling_function_mpi_.store(
×
445
            &null_polling_function, std::memory_order_relaxed);
446
        polling_work_count_function_mpi_.store(
×
447
            &null_polling_work_count_function, std::memory_order_relaxed);
448
    }
×
449

450
    void scheduler_base::set_cuda_polling_functions(
×
451
        polling_function_ptr cuda_func,
452
        polling_work_count_function_ptr cuda_work_count_func)
453
    {
454
        polling_function_cuda_.store(cuda_func, std::memory_order_relaxed);
×
455
        polling_work_count_function_cuda_.store(
×
456
            cuda_work_count_func, std::memory_order_relaxed);
×
457
    }
×
458

459
    void scheduler_base::clear_cuda_polling_function()
×
460
    {
461
        polling_function_cuda_.store(
×
462
            &null_polling_function, std::memory_order_relaxed);
463
        polling_work_count_function_cuda_.store(
×
464
            &null_polling_work_count_function, std::memory_order_relaxed);
465
    }
×
466

467
    detail::polling_status scheduler_base::custom_polling_function() const
389,532,714✔
468
    {
469
        detail::polling_status status = detail::polling_status::idle;
389,920,228✔
470
#if defined(HPX_HAVE_MODULE_ASYNC_MPI)
471
        if ((*polling_function_mpi_.load(std::memory_order_relaxed))() ==
472
            detail::polling_status::busy)
473
        {
474
            status = detail::polling_status::busy;
475
        }
476
#endif
477
#if defined(HPX_HAVE_MODULE_ASYNC_CUDA)
478
        if ((*polling_function_cuda_.load(std::memory_order_relaxed))() ==
479
            detail::polling_status::busy)
480
        {
481
            status = detail::polling_status::busy;
482
        }
483
#endif
484
        return status;
389,920,228✔
485
    }
486

487
    std::size_t scheduler_base::get_polling_work_count() const
1,074,919✔
488
    {
489
        std::size_t work_count = 0;
1,074,919✔
490
#if defined(HPX_HAVE_MODULE_ASYNC_MPI)
491
        work_count +=
492
            polling_work_count_function_mpi_.load(std::memory_order_relaxed)();
493
#endif
494
#if defined(HPX_HAVE_MODULE_ASYNC_CUDA)
495
        work_count +=
496
            polling_work_count_function_cuda_.load(std::memory_order_relaxed)();
497
#endif
498
        return work_count;
1,074,919✔
499
    }
500

501
    std::ostream& operator<<(std::ostream& os, scheduler_base const& scheduler)
31,223,709✔
502
    {
503
        os << scheduler.get_description() << "(" << &scheduler << ")";
31,224,515✔
504

505
        return os;
31,224,515✔
506
    }
507
}    // namespace hpx::threads::policies
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc