• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #861

09 Jan 2023 03:50PM UTC coverage: 86.634% (+0.7%) from 85.965%
#861

push

StellarBot
Merge #6127

6127: Working around gccV9 problem that prevent us from storing enum classes in bit fields r=hkaiser a=hkaiser



Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

40 of 40 new or added lines in 8 files covered. (100.0%)

174879 of 201859 relevant lines covered (86.63%)

1846822.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.12
/libs/core/threading_base/src/scheduler_base.cpp
1
//  Copyright (c) 2007-2022 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#include <hpx/config.hpp>
8
#include <hpx/assert.hpp>
9
#include <hpx/execution_base/this_thread.hpp>
10
#include <hpx/threading_base/scheduler_base.hpp>
11
#include <hpx/threading_base/scheduler_mode.hpp>
12
#include <hpx/threading_base/scheduler_state.hpp>
13
#include <hpx/threading_base/thread_init_data.hpp>
14
#include <hpx/threading_base/thread_pool_base.hpp>
15
#if defined(HPX_HAVE_SCHEDULER_LOCAL_STORAGE)
16
#include <hpx/coroutines/detail/tss.hpp>
17
#endif
18

19
#include <algorithm>
20
#include <atomic>
21
#include <chrono>
22
#include <cmath>
23
#include <condition_variable>
24
#include <cstddef>
25
#include <cstdint>
26
#include <exception>
27
#include <limits>
28
#include <memory>
29
#include <mutex>
30
#include <ostream>
31
#include <set>
32
#include <string>
33
#include <utility>
34
#include <vector>
35

36
///////////////////////////////////////////////////////////////////////////////
37
namespace hpx::threads::policies {
38

39
    scheduler_base::scheduler_base(std::size_t num_threads,
1,567✔
40
        char const* description, thread_queue_init_parameters thread_queue_init,
41
        scheduler_mode mode)
42
      : suspend_mtxs_(num_threads)
1,567✔
43
      , suspend_conds_(num_threads)
1,567✔
44
      , pu_mtxs_(num_threads)
1,567✔
45
      , states_(num_threads)
1,567✔
46
      , description_(description)
1,567✔
47
      , thread_queue_init_(thread_queue_init)
1,567✔
48
      , parent_pool_(nullptr)
1,567✔
49
      , background_thread_count_(0)
1,567✔
50
      , polling_function_mpi_(&null_polling_function)
1,567✔
51
      , polling_function_cuda_(&null_polling_function)
1,567✔
52
      , polling_work_count_function_mpi_(&null_polling_work_count_function)
1,567✔
53
      , polling_work_count_function_cuda_(&null_polling_work_count_function)
1,567✔
54
    {
1,567✔
55
        set_scheduler_mode(mode);
1,567✔
56

57
#if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF)
58
        double max_time = thread_queue_init.max_idle_backoff_time_;
1,567✔
59

60
        wait_counts_.resize(num_threads);
1,567✔
61
        for (auto&& data : wait_counts_)
5,946✔
62
        {
63
            data.data_.wait_count_ = 0;
4,379✔
64
            data.data_.max_idle_backoff_time_ = max_time;
4,379✔
65
        }
66
#endif
67

68
        for (std::size_t i = 0; i != num_threads; ++i)
5,946✔
69
            states_[i].store(hpx::state::initialized);
4,379✔
70
    }
1,567✔
71

72
    void scheduler_base::idle_callback([[maybe_unused]] std::size_t num_thread)
5,216✔
73
    {
74
#if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF)
75
        if (mode_.data_.load(std::memory_order_relaxed) &
4,989✔
76
            policies::scheduler_mode::enable_idle_backoff)
77
        {
78
            // Put this thread to sleep for some time, additionally it gets
79
            // woken up on new work.
80

81
            idle_backoff_data& data = wait_counts_[num_thread].data_;
5,431✔
82

83
            // Exponential back-off with a maximum sleep time.
84
            double exponent = (std::min)(double(data.wait_count_),
9,978✔
85
                double(std::numeric_limits<double>::max_exponent - 1));
5,431✔
86

87
            std::chrono::milliseconds period(std::lround((std::min)(
5,431✔
88
                data.max_idle_backoff_time_, std::pow(2.0, exponent))));
5,431✔
89

90
            ++data.wait_count_;
5,431✔
91

92
            std::unique_lock<pu_mutex_type> l(mtx_);
5,431✔
93
            if (cond_.wait_for(l, period) == std::cv_status::no_timeout)
4,989✔
94
            {
95
                // reset counter if thread was woken up
96
                data.wait_count_ = 0;
2,893✔
97
            }
2,893✔
98
        }
5,433✔
99
#endif
100
    }
5,421✔
101

102
    /// This function gets called by the thread-manager whenever new work
103
    /// has been added, allowing the scheduler to reactivate one or more of
104
    /// possibly idling OS threads
105
    void scheduler_base::do_some_work(std::size_t)
9,081,438✔
106
    {
107
#if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF)
108
        if (mode_.data_.load(std::memory_order_relaxed) &
9,081,469✔
109
            policies::scheduler_mode::enable_idle_backoff)
110
        {
111
            cond_.notify_all();
9,079,589✔
112
        }
9,079,589✔
113
#endif
114
    }
9,081,484✔
115

116
    void scheduler_base::suspend(std::size_t num_thread)
850✔
117
    {
118
        HPX_ASSERT(num_thread < suspend_conds_.size());
850✔
119

120
        states_[num_thread].store(hpx::state::sleeping);
850✔
121
        std::unique_lock<pu_mutex_type> l(suspend_mtxs_[num_thread]);
850✔
122
        suspend_conds_[num_thread].wait(l);
850✔
123

124
        // Only set running if still in hpx::state::sleeping. Can be set with
125
        // non-blocking/locking functions to stopping or terminating, in which
126
        // case the state is left untouched.
127
        hpx::state expected = hpx::state::sleeping;
850✔
128
        states_[num_thread].compare_exchange_strong(
850✔
129
            expected, hpx::state::running);
130

131
        HPX_ASSERT(expected == hpx::state::sleeping ||
850✔
132
            expected == hpx::state::stopping ||
133
            expected == hpx::state::terminating);
134
    }
850✔
135

136
    void scheduler_base::resume(std::size_t num_thread)
35,397✔
137
    {
138
        if (num_thread == std::size_t(-1))
35,397✔
139
        {
140
            for (std::condition_variable& c : suspend_conds_)
×
141
            {
142
                c.notify_one();
×
143
            }
144
        }
×
145
        else
146
        {
147
            HPX_ASSERT(num_thread < suspend_conds_.size());
35,397✔
148
            suspend_conds_[num_thread].notify_one();
35,397✔
149
        }
150
    }
35,397✔
151

152
    std::size_t scheduler_base::select_active_pu(
9,569,501✔
153
        std::unique_lock<pu_mutex_type>& l, std::size_t num_thread,
154
        bool allow_fallback)
155
    {
156
        if (mode_.data_.load(std::memory_order_relaxed) &
9,569,029✔
157
            threads::policies::scheduler_mode::enable_elasticity)
158
        {
159
            std::size_t states_size = states_.size();
80,118✔
160

161
            if (!allow_fallback)
80,118✔
162
            {
163
                // Try indefinitely as long as at least one thread is available
164
                // for scheduling. Increase allowed state if no threads are
165
                // available for scheduling.
166
                auto max_allowed_state = hpx::state::suspended;
65,837✔
167

168
                hpx::util::yield_while([this, states_size, &l, &num_thread,
131,674✔
169
                                           &max_allowed_state]() {
170
                    std::size_t num_allowed_threads = 0;
65,837✔
171

172
                    for (std::size_t offset = 0; offset < states_size; ++offset)
153,279✔
173
                    {
174
                        std::size_t num_thread_local =
153,279✔
175
                            (num_thread + offset) % states_size;
153,279✔
176

177
                        l = std::unique_lock<pu_mutex_type>(
153,279✔
178
                            pu_mtxs_[num_thread_local], std::try_to_lock);
153,279✔
179

180
                        if (l.owns_lock())
153,279✔
181
                        {
182
                            if (states_[num_thread_local] <= max_allowed_state)
153,279✔
183
                            {
184
                                num_thread = num_thread_local;
65,837✔
185
                                return false;
65,837✔
186
                            }
187

188
                            l.unlock();
87,442✔
189
                        }
87,442✔
190

191
                        if (states_[num_thread_local] <= max_allowed_state)
87,442✔
192
                        {
193
                            ++num_allowed_threads;
×
194
                        }
×
195
                    }
87,442✔
196

197
                    if (0 == num_allowed_threads)
×
198
                    {
199
                        if (max_allowed_state <= hpx::state::suspended)
×
200
                        {
201
                            max_allowed_state = hpx::state::sleeping;
×
202
                        }
×
203
                        else if (max_allowed_state <= hpx::state::sleeping)
×
204
                        {
205
                            max_allowed_state = hpx::state::stopping;
×
206
                        }
×
207
                        else
208
                        {
209
                            // All threads are terminating or stopped. Just
210
                            // return num_thread to avoid infinite loop.
211
                            return false;
×
212
                        }
213
                    }
×
214

215
                    // Yield after trying all pus, then try again
216
                    return true;
×
217
                });
65,837✔
218

219
                return num_thread;
65,837✔
220
            }
221

222
            // Try all pus only once if fallback is allowed
223
            HPX_ASSERT(num_thread != std::size_t(-1));
14,281✔
224
            for (std::size_t offset = 0; offset < states_size; ++offset)
19,283✔
225
            {
226
                std::size_t num_thread_local =
19,149✔
227
                    (num_thread + offset) % states_size;
19,149✔
228

229
                l = std::unique_lock<pu_mutex_type>(
19,149✔
230
                    pu_mtxs_[num_thread_local], std::try_to_lock);
19,149✔
231

232
                if (l.owns_lock() &&
19,149✔
233
                    states_[num_thread_local] <= hpx::state::suspended)
18,949✔
234
                {
235
                    return num_thread_local;
14,147✔
236
                }
237
            }
5,002✔
238
        }
134✔
239

240
        return num_thread;
9,488,985✔
241
    }
9,568,977✔
242

243
    // allow to access/manipulate states
244
    std::atomic<hpx::state>& scheduler_base::get_state(std::size_t num_thread)
1,295,560✔
245
    {
246
        HPX_ASSERT(num_thread < states_.size());
1,295,560✔
247
        return states_[num_thread];
1,295,539✔
248
    }
249

250
    std::atomic<hpx::state> const& scheduler_base::get_state(
×
251
        std::size_t num_thread) const
252
    {
253
        HPX_ASSERT(num_thread < states_.size());
×
254
        return states_[num_thread];
×
255
    }
256

257
    void scheduler_base::set_all_states(hpx::state s)
1,566✔
258
    {
259
        for (auto& state : states_)
5,941✔
260
        {
261
            state.store(s);
4,375✔
262
        }
263
    }
1,566✔
264

265
    void scheduler_base::set_all_states_at_least(hpx::state s)
3,129✔
266
    {
267
        for (auto& state : states_)
11,879✔
268
        {
269
            if (state < s)
8,750✔
270
            {
271
                state.store(s);
4,377✔
272
            }
4,377✔
273
        }
274
    }
3,129✔
275

276
    // return whether all states are at least at the given one
277
    bool scheduler_base::has_reached_state(hpx::state s) const
3,133✔
278
    {
279
        for (auto const& state : states_)
3,133✔
280
        {
281
            if (state.load(std::memory_order_relaxed) < s)
3,133✔
282
                return false;
3,133✔
283
        }
284
        return true;
×
285
    }
3,133✔
286

287
    bool scheduler_base::is_state(hpx::state s) const
×
288
    {
289
        for (auto const& state : states_)
×
290
        {
291
            if (state.load(std::memory_order_relaxed) != s)
×
292
                return false;
×
293
        }
294
        return true;
×
295
    }
×
296

297
    std::pair<hpx::state, hpx::state> scheduler_base::get_minmax_state() const
1,270,567✔
298
    {
299
        std::pair<hpx::state, hpx::state> result(
1,270,537✔
300
            hpx::state::last_valid_runtime_state,
1,270,537✔
301
            hpx::state::first_valid_runtime_state);
1,270,537✔
302

303
        for (auto const& state_iter : states_)
5,296,418✔
304
        {
305
            hpx::state s = state_iter.load();
4,025,887✔
306
            result.first = (std::min)(result.first, s);
4,025,887✔
307
            result.second = (std::max)(result.second, s);
4,025,887✔
308
        }
309

310
        return result;
1,270,515✔
311
    }
312

313
    // get/set scheduler mode
314
    void scheduler_base::set_scheduler_mode(scheduler_mode mode)
4,753✔
315
    {
316
        // distribute the same value across all cores
317
        mode_.data_.store(mode, std::memory_order_release);
4,753✔
318
        do_some_work(std::size_t(-1));
4,753✔
319
    }
4,753✔
320

321
    void scheduler_base::add_scheduler_mode(scheduler_mode mode)
1,559✔
322
    {
323
        // distribute the same value across all cores
324
        mode = scheduler_mode(get_scheduler_mode() | mode);
1,559✔
325
        set_scheduler_mode(mode);
1,559✔
326
    }
1,559✔
327

328
    void scheduler_base::remove_scheduler_mode(scheduler_mode mode)
56✔
329
    {
330
        mode = scheduler_mode(get_scheduler_mode() & ~mode);
56✔
331
        set_scheduler_mode(mode);
56✔
332
    }
56✔
333

334
    void scheduler_base::add_remove_scheduler_mode(
×
335
        scheduler_mode to_add_mode, scheduler_mode to_remove_mode)
336
    {
337
        scheduler_mode mode = scheduler_mode(
×
338
            (get_scheduler_mode() | to_add_mode) & ~to_remove_mode);
×
339
        set_scheduler_mode(mode);
×
340
    }
×
341

342
    void scheduler_base::update_scheduler_mode(scheduler_mode mode, bool set)
1,560✔
343
    {
344
        if (set)
1,560✔
345
        {
346
            add_scheduler_mode(mode);
1,558✔
347
        }
1,558✔
348
        else
349
        {
350
            remove_scheduler_mode(mode);
2✔
351
        }
352
    }
1,560✔
353

354
    ///////////////////////////////////////////////////////////////////////////
355
    std::int64_t scheduler_base::get_background_thread_count() const noexcept
1,643,115✔
356
    {
357
        return background_thread_count_;
1,643,115✔
358
    }
359

360
    void scheduler_base::increment_background_thread_count() noexcept
599✔
361
    {
362
        ++background_thread_count_;
599✔
363
    }
599✔
364

365
    void scheduler_base::decrement_background_thread_count() noexcept
593✔
366
    {
367
        --background_thread_count_;
593✔
368
    }
593✔
369

370
#if defined(HPX_HAVE_SCHEDULER_LOCAL_STORAGE)
371
    coroutines::detail::tss_data_node* scheduler_base::find_tss_data(
372
        void const* key)
373
    {
374
        if (!thread_data_)
375
            return nullptr;
376
        return thread_data_->find(key);
377
    }
378

379
    void scheduler_base::add_new_tss_node(void const* key,
380
        std::shared_ptr<coroutines::detail::tss_cleanup_function> const& func,
381
        void* tss_data)
382
    {
383
        if (!thread_data_)
384
        {
385
            thread_data_ = std::make_shared<coroutines::detail::tss_storage>();
386
        }
387
        thread_data_->insert(key, func, tss_data);
388
    }
389

390
    void scheduler_base::erase_tss_node(void const* key, bool cleanup_existing)
391
    {
392
        if (thread_data_)
393
            thread_data_->erase(key, cleanup_existing);
394
    }
395

396
    void* scheduler_base::get_tss_data(void const* key)
397
    {
398
        if (coroutines::detail::tss_data_node* const current_node =
399
                find_tss_data(key))
400
        {
401
            return current_node->get_value();
402
        }
403
        return nullptr;
404
    }
405

406
    void scheduler_base::set_tss_data(void const* key,
407
        std::shared_ptr<coroutines::detail::tss_cleanup_function> const& func,
408
        void* tss_data, bool cleanup_existing)
409
    {
410
        if (coroutines::detail::tss_data_node* const current_node =
411
                find_tss_data(key))
412
        {
413
            if (func || (tss_data != 0))
414
                current_node->reinit(func, tss_data, cleanup_existing);
415
            else
416
                erase_tss_node(key, cleanup_existing);
417
        }
418
        else if (func || (tss_data != 0))
419
        {
420
            add_new_tss_node(key, func, tss_data);
421
        }
422
    }
423
#endif
424

425
    void scheduler_base::set_mpi_polling_functions(
×
426
        polling_function_ptr mpi_func,
427
        polling_work_count_function_ptr mpi_work_count_func)
428
    {
429
        polling_function_mpi_.store(mpi_func, std::memory_order_relaxed);
×
430
        polling_work_count_function_mpi_.store(
×
431
            mpi_work_count_func, std::memory_order_relaxed);
×
432
    }
×
433

434
    void scheduler_base::clear_mpi_polling_function()
×
435
    {
436
        polling_function_mpi_.store(
×
437
            &null_polling_function, std::memory_order_relaxed);
438
        polling_work_count_function_mpi_.store(
×
439
            &null_polling_work_count_function, std::memory_order_relaxed);
440
    }
×
441

442
    void scheduler_base::set_cuda_polling_functions(
×
443
        polling_function_ptr cuda_func,
444
        polling_work_count_function_ptr cuda_work_count_func)
445
    {
446
        polling_function_cuda_.store(cuda_func, std::memory_order_relaxed);
×
447
        polling_work_count_function_cuda_.store(
×
448
            cuda_work_count_func, std::memory_order_relaxed);
×
449
    }
×
450

451
    void scheduler_base::clear_cuda_polling_function()
×
452
    {
453
        polling_function_cuda_.store(
×
454
            &null_polling_function, std::memory_order_relaxed);
455
        polling_work_count_function_cuda_.store(
×
456
            &null_polling_work_count_function, std::memory_order_relaxed);
457
    }
×
458

459
    detail::polling_status scheduler_base::custom_polling_function() const
448,730,075✔
460
    {
461
        detail::polling_status status = detail::polling_status::idle;
449,468,707✔
462
#if defined(HPX_HAVE_MODULE_ASYNC_MPI)
463
        if ((*polling_function_mpi_.load(std::memory_order_relaxed))() ==
464
            detail::polling_status::busy)
465
        {
466
            status = detail::polling_status::busy;
467
        }
468
#endif
469
#if defined(HPX_HAVE_MODULE_ASYNC_CUDA)
470
        if ((*polling_function_cuda_.load(std::memory_order_relaxed))() ==
471
            detail::polling_status::busy)
472
        {
473
            status = detail::polling_status::busy;
474
        }
475
#endif
476
        return status;
449,468,707✔
477
    }
478

479
    std::size_t scheduler_base::get_polling_work_count() const
1,137,858✔
480
    {
481
        std::size_t work_count = 0;
1,137,858✔
482
#if defined(HPX_HAVE_MODULE_ASYNC_MPI)
483
        work_count +=
484
            polling_work_count_function_mpi_.load(std::memory_order_relaxed)();
485
#endif
486
#if defined(HPX_HAVE_MODULE_ASYNC_CUDA)
487
        work_count +=
488
            polling_work_count_function_cuda_.load(std::memory_order_relaxed)();
489
#endif
490
        return work_count;
1,137,858✔
491
    }
492

493
    std::ostream& operator<<(std::ostream& os, scheduler_base const& scheduler)
31,246,963✔
494
    {
495
        os << scheduler.get_description() << "(" << &scheduler << ")";
31,247,430✔
496

497
        return os;
31,247,430✔
498
    }
499
}    // namespace hpx::threads::policies
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc