• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #852

17 Dec 2022 04:43PM UTC coverage: 85.912% (-0.7%) from 86.568%
#852

push

StellarBot
Merge #6106

6106: Modernizing modules of levels 0 to 5 r=hkaiser a=hkaiser

- flyby: HPX_FORWARD/HPX_MOVE now expand to std::forward and std::move if those are implemented as builtin functions

working towards #5497

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

87 of 87 new or added lines in 24 files covered. (100.0%)

173152 of 201546 relevant lines covered (85.91%)

1910264.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.49
/libs/core/threading_base/src/scheduler_base.cpp
1
//  Copyright (c) 2007-2019 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#include <hpx/config.hpp>
8
#include <hpx/assert.hpp>
9
#include <hpx/execution_base/this_thread.hpp>
10
#include <hpx/threading_base/scheduler_base.hpp>
11
#include <hpx/threading_base/scheduler_mode.hpp>
12
#include <hpx/threading_base/scheduler_state.hpp>
13
#include <hpx/threading_base/thread_init_data.hpp>
14
#include <hpx/threading_base/thread_pool_base.hpp>
15
#if defined(HPX_HAVE_SCHEDULER_LOCAL_STORAGE)
16
#include <hpx/coroutines/detail/tss.hpp>
17
#endif
18

19
#include <algorithm>
20
#include <atomic>
21
#include <chrono>
22
#include <cmath>
23
#include <condition_variable>
24
#include <cstddef>
25
#include <cstdint>
26
#include <exception>
27
#include <limits>
28
#include <memory>
29
#include <mutex>
30
#include <ostream>
31
#include <set>
32
#include <string>
33
#include <utility>
34
#include <vector>
35

36
///////////////////////////////////////////////////////////////////////////////
37
namespace hpx { namespace threads { namespace policies {
38
    scheduler_base::scheduler_base(std::size_t num_threads,
1,581✔
39
        char const* description, thread_queue_init_parameters thread_queue_init,
40
        scheduler_mode mode)
41
      : suspend_mtxs_(num_threads)
1,581✔
42
      , suspend_conds_(num_threads)
1,581✔
43
      , pu_mtxs_(num_threads)
1,581✔
44
      , states_(num_threads)
1,581✔
45
      , description_(description)
1,581✔
46
      , thread_queue_init_(thread_queue_init)
1,581✔
47
      , parent_pool_(nullptr)
1,581✔
48
      , background_thread_count_(0)
1,581✔
49
      , polling_function_mpi_(&null_polling_function)
1,581✔
50
      , polling_function_cuda_(&null_polling_function)
1,581✔
51
      , polling_work_count_function_mpi_(&null_polling_work_count_function)
1,581✔
52
      , polling_work_count_function_cuda_(&null_polling_work_count_function)
1,581✔
53
    {
1,581✔
54
        set_scheduler_mode(mode);
1,581✔
55

56
#if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF)
57
        double max_time = thread_queue_init.max_idle_backoff_time_;
1,581✔
58

59
        wait_counts_.resize(num_threads);
1,581✔
60
        for (auto&& data : wait_counts_)
5,948✔
61
        {
62
            data.data_.wait_count_ = 0;
4,367✔
63
            data.data_.max_idle_backoff_time_ = max_time;
4,367✔
64
        }
65
#endif
66

67
        for (std::size_t i = 0; i != num_threads; ++i)
5,948✔
68
            states_[i].store(hpx::state::initialized);
4,367✔
69
    }
1,581✔
70

71
    void scheduler_base::idle_callback(std::size_t num_thread)
5,234✔
72
    {
73
#if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF)
74
        if (mode_.data_.load(std::memory_order_relaxed) &
5,110✔
75
            policies::scheduler_mode::enable_idle_backoff)
76
        {
77
            // Put this thread to sleep for some time, additionally it gets
78
            // woken up on new work.
79

80
            idle_backoff_data& data = wait_counts_[num_thread].data_;
5,440✔
81

82
            // Exponential back-off with a maximum sleep time.
83
            double exponent = (std::min)(double(data.wait_count_),
10,220✔
84
                double(std::numeric_limits<double>::max_exponent - 1));
5,440✔
85

86
            std::chrono::milliseconds period(std::lround((std::min)(
5,440✔
87
                data.max_idle_backoff_time_, std::pow(2.0, exponent))));
5,440✔
88

89
            ++data.wait_count_;
5,440✔
90

91
            std::unique_lock<pu_mutex_type> l(mtx_);
5,440✔
92
            if (cond_.wait_for(l, period) == std::cv_status::no_timeout)
5,110✔
93
            {
94
                // reset counter if thread was woken up
95
                data.wait_count_ = 0;
2,812✔
96
            }
2,812✔
97
        }
5,439✔
98
#else
99
        (void) num_thread;
100
#endif
101
    }
5,429✔
102

103
    /// This function gets called by the thread-manager whenever new work
104
    /// has been added, allowing the scheduler to reactivate one or more of
105
    /// possibly idling OS threads
106
    void scheduler_base::do_some_work(std::size_t)
9,212,962✔
107
    {
108
#if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF)
109
        if (mode_.data_.load(std::memory_order_relaxed) &
9,212,931✔
110
            policies::scheduler_mode::enable_idle_backoff)
111
        {
112
            cond_.notify_all();
9,210,617✔
113
        }
9,210,617✔
114
#endif
115
    }
9,212,917✔
116

117
    void scheduler_base::suspend(std::size_t num_thread)
861✔
118
    {
119
        HPX_ASSERT(num_thread < suspend_conds_.size());
861✔
120

121
        states_[num_thread].store(hpx::state::sleeping);
866✔
122
        std::unique_lock<pu_mutex_type> l(suspend_mtxs_[num_thread]);
866✔
123
        suspend_conds_[num_thread].wait(l);
866✔
124

125
        // Only set running if still in hpx::state::sleeping. Can be set with
126
        // non-blocking/locking functions to stopping or terminating, in which
127
        // case the state is left untouched.
128
        hpx::state expected = hpx::state::sleeping;
866✔
129
        states_[num_thread].compare_exchange_strong(
866✔
130
            expected, hpx::state::running);
131

132
        HPX_ASSERT(expected == hpx::state::sleeping ||
861✔
133
            expected == hpx::state::stopping ||
134
            expected == hpx::state::terminating);
135
    }
866✔
136

137
    void scheduler_base::resume(std::size_t num_thread)
34,812✔
138
    {
139
        if (num_thread == std::size_t(-1))
34,812✔
140
        {
141
            for (std::condition_variable& c : suspend_conds_)
×
142
            {
143
                c.notify_one();
×
144
            }
145
        }
×
146
        else
147
        {
148
            HPX_ASSERT(num_thread < suspend_conds_.size());
34,812✔
149
            suspend_conds_[num_thread].notify_one();
34,812✔
150
        }
151
    }
34,812✔
152

153
    std::size_t scheduler_base::select_active_pu(
9,700,974✔
154
        std::unique_lock<pu_mutex_type>& l, std::size_t num_thread,
155
        bool allow_fallback)
156
    {
157
        if (mode_.data_.load(std::memory_order_relaxed) &
9,700,578✔
158
            threads::policies::scheduler_mode::enable_elasticity)
159
        {
160
            std::size_t states_size = states_.size();
79,692✔
161

162
            if (!allow_fallback)
79,692✔
163
            {
164
                // Try indefinitely as long as at least one thread is
165
                // available for scheduling. Increase allowed state if no
166
                // threads are available for scheduling.
167
                auto max_allowed_state = hpx::state::suspended;
66,722✔
168

169
                hpx::util::yield_while([this, states_size, &l, &num_thread,
133,444✔
170
                                           &max_allowed_state]() {
171
                    std::size_t num_allowed_threads = 0;
66,722✔
172

173
                    for (std::size_t offset = 0; offset < states_size; ++offset)
154,707✔
174
                    {
175
                        std::size_t num_thread_local =
154,707✔
176
                            (num_thread + offset) % states_size;
154,707✔
177

178
                        l = std::unique_lock<pu_mutex_type>(
154,707✔
179
                            pu_mtxs_[num_thread_local], std::try_to_lock);
154,707✔
180

181
                        if (l.owns_lock())
154,707✔
182
                        {
183
                            if (states_[num_thread_local] <= max_allowed_state)
154,707✔
184
                            {
185
                                num_thread = num_thread_local;
66,722✔
186
                                return false;
66,722✔
187
                            }
188

189
                            l.unlock();
87,985✔
190
                        }
87,985✔
191

192
                        if (states_[num_thread_local] <= max_allowed_state)
87,985✔
193
                        {
194
                            ++num_allowed_threads;
×
195
                        }
×
196
                    }
87,985✔
197

198
                    if (0 == num_allowed_threads)
×
199
                    {
200
                        if (max_allowed_state <= hpx::state::suspended)
×
201
                        {
202
                            max_allowed_state = hpx::state::sleeping;
×
203
                        }
×
204
                        else if (max_allowed_state <= hpx::state::sleeping)
×
205
                        {
206
                            max_allowed_state = hpx::state::stopping;
×
207
                        }
×
208
                        else
209
                        {
210
                            // All threads are terminating or stopped.
211
                            // Just return num_thread to avoid infinite
212
                            // loop.
213
                            return false;
×
214
                        }
215
                    }
×
216

217
                    // Yield after trying all pus, then try again
218
                    return true;
×
219
                });
66,722✔
220

221
                return num_thread;
66,722✔
222
            }
223

224
            // Try all pus only once if fallback is allowed
225
            HPX_ASSERT(num_thread != std::size_t(-1));
12,970✔
226
            for (std::size_t offset = 0; offset < states_size; ++offset)
16,538✔
227
            {
228
                std::size_t num_thread_local =
16,538✔
229
                    (num_thread + offset) % states_size;
16,538✔
230

231
                l = std::unique_lock<pu_mutex_type>(
16,538✔
232
                    pu_mtxs_[num_thread_local], std::try_to_lock);
16,538✔
233

234
                if (l.owns_lock() &&
16,538✔
235
                    states_[num_thread_local] <= hpx::state::suspended)
16,316✔
236
                {
237
                    return num_thread_local;
12,970✔
238
                }
239
            }
3,568✔
240
        }
×
241

242
        return num_thread;
9,620,838✔
243
    }
9,700,536✔
244

245
    // allow to access/manipulate states
246
    std::atomic<hpx::state>& scheduler_base::get_state(std::size_t num_thread)
1,295,748✔
247
    {
248
        HPX_ASSERT(num_thread < states_.size());
1,295,748✔
249
        return states_[num_thread];
1,295,753✔
250
    }
251
    std::atomic<hpx::state> const& scheduler_base::get_state(
×
252
        std::size_t num_thread) const
253
    {
254
        HPX_ASSERT(num_thread < states_.size());
×
255
        return states_[num_thread];
×
256
    }
257

258
    void scheduler_base::set_all_states(hpx::state s)
1,580✔
259
    {
260
        typedef std::atomic<hpx::state> state_type;
261
        for (state_type& state : states_)
5,943✔
262
        {
263
            state.store(s);
4,363✔
264
        }
265
    }
1,580✔
266

267
    void scheduler_base::set_all_states_at_least(hpx::state s)
3,157✔
268
    {
269
        typedef std::atomic<hpx::state> state_type;
270
        for (state_type& state : states_)
11,883✔
271
        {
272
            if (state < s)
8,726✔
273
            {
274
                state.store(s);
4,365✔
275
            }
4,365✔
276
        }
277
    }
3,157✔
278

279
    // return whether all states are at least at the given one
280
    bool scheduler_base::has_reached_state(hpx::state s) const
3,161✔
281
    {
282
        typedef std::atomic<hpx::state> state_type;
283
        for (state_type const& state : states_)
3,161✔
284
        {
285
            if (state.load(std::memory_order_relaxed) < s)
3,161✔
286
                return false;
3,161✔
287
        }
288
        return true;
×
289
    }
3,161✔
290

291
    bool scheduler_base::is_state(hpx::state s) const
×
292
    {
293
        typedef std::atomic<hpx::state> state_type;
294
        for (state_type const& state : states_)
×
295
        {
296
            if (state.load(std::memory_order_relaxed) != s)
×
297
                return false;
×
298
        }
299
        return true;
×
300
    }
×
301

302
    std::pair<hpx::state, hpx::state> scheduler_base::get_minmax_state() const
1,269,991✔
303
    {
304
        std::pair<hpx::state, hpx::state> result(
1,269,999✔
305
            hpx::state::last_valid_runtime_state,
1,269,999✔
306
            hpx::state::first_valid_runtime_state);
1,269,999✔
307

308
        typedef std::atomic<hpx::state> state_type;
309
        for (state_type const& state_iter : states_)
5,295,666✔
310
        {
311
            hpx::state s = state_iter.load();
4,025,728✔
312
            result.first = (std::min)(result.first, s);
4,025,728✔
313
            result.second = (std::max)(result.second, s);
4,025,728✔
314
        }
315

316
        return result;
1,269,923✔
317
    }
318

319
    // get/set scheduler mode
320
    void scheduler_base::set_scheduler_mode(scheduler_mode mode)
4,798✔
321
    {
322
        // distribute the same value across all cores
323
        mode_.data_.store(mode, std::memory_order_release);
4,798✔
324
        do_some_work(std::size_t(-1));
4,798✔
325
    }
4,798✔
326

327
    void scheduler_base::add_scheduler_mode(scheduler_mode mode)
1,573✔
328
    {
329
        // distribute the same value across all cores
330
        mode = scheduler_mode(get_scheduler_mode() | mode);
1,573✔
331
        set_scheduler_mode(mode);
1,573✔
332
    }
1,573✔
333

334
    void scheduler_base::remove_scheduler_mode(scheduler_mode mode)
59✔
335
    {
336
        mode = scheduler_mode(get_scheduler_mode() & ~mode);
59✔
337
        set_scheduler_mode(mode);
59✔
338
    }
59✔
339

340
    void scheduler_base::add_remove_scheduler_mode(
×
341
        scheduler_mode to_add_mode, scheduler_mode to_remove_mode)
342
    {
343
        scheduler_mode mode = scheduler_mode(
×
344
            (get_scheduler_mode() | to_add_mode) & ~to_remove_mode);
×
345
        set_scheduler_mode(mode);
×
346
    }
×
347

348
    void scheduler_base::update_scheduler_mode(scheduler_mode mode, bool set)
1,574✔
349
    {
350
        if (set)
1,574✔
351
        {
352
            add_scheduler_mode(mode);
1,572✔
353
        }
1,572✔
354
        else
355
        {
356
            remove_scheduler_mode(mode);
2✔
357
        }
358
    }
1,574✔
359

360
    ///////////////////////////////////////////////////////////////////////////
361
    std::int64_t scheduler_base::get_background_thread_count()
1,676,768✔
362
    {
363
        return background_thread_count_;
1,676,768✔
364
    }
365

366
    void scheduler_base::increment_background_thread_count()
599✔
367
    {
368
        ++background_thread_count_;
599✔
369
    }
599✔
370

371
    void scheduler_base::decrement_background_thread_count()
594✔
372
    {
373
        --background_thread_count_;
594✔
374
    }
594✔
375

376
#if defined(HPX_HAVE_SCHEDULER_LOCAL_STORAGE)
377
    coroutines::detail::tss_data_node* scheduler_base::find_tss_data(
378
        void const* key)
379
    {
380
        if (!thread_data_)
381
            return nullptr;
382
        return thread_data_->find(key);
383
    }
384

385
    void scheduler_base::add_new_tss_node(void const* key,
386
        std::shared_ptr<coroutines::detail::tss_cleanup_function> const& func,
387
        void* tss_data)
388
    {
389
        if (!thread_data_)
390
        {
391
            thread_data_ = std::make_shared<coroutines::detail::tss_storage>();
392
        }
393
        thread_data_->insert(key, func, tss_data);
394
    }
395

396
    void scheduler_base::erase_tss_node(void const* key, bool cleanup_existing)
397
    {
398
        if (thread_data_)
399
            thread_data_->erase(key, cleanup_existing);
400
    }
401

402
    void* scheduler_base::get_tss_data(void const* key)
403
    {
404
        if (coroutines::detail::tss_data_node* const current_node =
405
                find_tss_data(key))
406
        {
407
            return current_node->get_value();
408
        }
409
        return nullptr;
410
    }
411

412
    void scheduler_base::set_tss_data(void const* key,
413
        std::shared_ptr<coroutines::detail::tss_cleanup_function> const& func,
414
        void* tss_data, bool cleanup_existing)
415
    {
416
        if (coroutines::detail::tss_data_node* const current_node =
417
                find_tss_data(key))
418
        {
419
            if (func || (tss_data != 0))
420
                current_node->reinit(func, tss_data, cleanup_existing);
421
            else
422
                erase_tss_node(key, cleanup_existing);
423
        }
424
        else if (func || (tss_data != 0))
425
        {
426
            add_new_tss_node(key, func, tss_data);
427
        }
428
    }
429
#endif
430

431
    std::ostream& operator<<(std::ostream& os, scheduler_base const& scheduler)
31,793,062✔
432
    {
433
        os << scheduler.get_description() << "(" << &scheduler << ")";
31,793,625✔
434

435
        return os;
31,793,625✔
436
    }
437
}}}    // namespace hpx::threads::policies
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc