• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #847

07 Dec 2022 07:15PM UTC coverage: 85.835% (-0.6%) from 86.482%
#847

push

StellarBot
Merge #6095

6095: Replacing facilities from Boost.Range r=hkaiser a=hkaiser

Working towards #3440 


Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

43 of 43 new or added lines in 21 files covered. (100.0%)

171460 of 199755 relevant lines covered (85.84%)

1820288.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.49
/libs/core/threading_base/src/scheduler_base.cpp
1
//  Copyright (c) 2007-2019 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#include <hpx/config.hpp>
8
#include <hpx/assert.hpp>
9
#include <hpx/execution_base/this_thread.hpp>
10
#include <hpx/threading_base/scheduler_base.hpp>
11
#include <hpx/threading_base/scheduler_mode.hpp>
12
#include <hpx/threading_base/scheduler_state.hpp>
13
#include <hpx/threading_base/thread_init_data.hpp>
14
#include <hpx/threading_base/thread_pool_base.hpp>
15
#if defined(HPX_HAVE_SCHEDULER_LOCAL_STORAGE)
16
#include <hpx/coroutines/detail/tss.hpp>
17
#endif
18

19
#include <algorithm>
20
#include <atomic>
21
#include <chrono>
22
#include <cmath>
23
#include <condition_variable>
24
#include <cstddef>
25
#include <cstdint>
26
#include <exception>
27
#include <limits>
28
#include <memory>
29
#include <mutex>
30
#include <ostream>
31
#include <set>
32
#include <string>
33
#include <utility>
34
#include <vector>
35

36
///////////////////////////////////////////////////////////////////////////////
37
namespace hpx { namespace threads { namespace policies {
38
    scheduler_base::scheduler_base(std::size_t num_threads,
1,571✔
39
        char const* description, thread_queue_init_parameters thread_queue_init,
40
        scheduler_mode mode)
41
      : suspend_mtxs_(num_threads)
1,571✔
42
      , suspend_conds_(num_threads)
1,571✔
43
      , pu_mtxs_(num_threads)
1,571✔
44
      , states_(num_threads)
1,571✔
45
      , description_(description)
1,571✔
46
      , thread_queue_init_(thread_queue_init)
1,571✔
47
      , parent_pool_(nullptr)
1,571✔
48
      , background_thread_count_(0)
1,571✔
49
      , polling_function_mpi_(&null_polling_function)
1,571✔
50
      , polling_function_cuda_(&null_polling_function)
1,571✔
51
      , polling_work_count_function_mpi_(&null_polling_work_count_function)
1,571✔
52
      , polling_work_count_function_cuda_(&null_polling_work_count_function)
1,571✔
53
    {
1,571✔
54
        set_scheduler_mode(mode);
1,571✔
55

56
#if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF)
57
        double max_time = thread_queue_init.max_idle_backoff_time_;
1,571✔
58

59
        wait_counts_.resize(num_threads);
1,571✔
60
        for (auto&& data : wait_counts_)
5,926✔
61
        {
62
            data.data_.wait_count_ = 0;
4,355✔
63
            data.data_.max_idle_backoff_time_ = max_time;
4,355✔
64
        }
65
#endif
66

67
        for (std::size_t i = 0; i != num_threads; ++i)
5,926✔
68
            states_[i].store(hpx::state::initialized);
4,355✔
69
    }
1,571✔
70

71
    void scheduler_base::idle_callback(std::size_t num_thread)
5,243✔
72
    {
73
#if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF)
74
        if (mode_.data_.load(std::memory_order_relaxed) &
5,014✔
75
            policies::scheduler_mode::enable_idle_backoff)
76
        {
77
            // Put this thread to sleep for some time, additionally it gets
78
            // woken up on new work.
79

80
            idle_backoff_data& data = wait_counts_[num_thread].data_;
5,392✔
81

82
            // Exponential back-off with a maximum sleep time.
83
            double exponent = (std::min)(double(data.wait_count_),
10,028✔
84
                double(std::numeric_limits<double>::max_exponent - 1));
5,392✔
85

86
            std::chrono::milliseconds period(std::lround((std::min)(
5,392✔
87
                data.max_idle_backoff_time_, std::pow(2.0, exponent))));
5,392✔
88

89
            ++data.wait_count_;
5,392✔
90

91
            std::unique_lock<pu_mutex_type> l(mtx_);
5,392✔
92
            if (cond_.wait_for(l, period) == std::cv_status::no_timeout)
5,014✔
93
            {
94
                // reset counter if thread was woken up
95
                data.wait_count_ = 0;
2,787✔
96
            }
2,787✔
97
        }
5,392✔
98
#else
99
        (void) num_thread;
100
#endif
101
    }
5,377✔
102

103
    /// This function gets called by the thread-manager whenever new work
104
    /// has been added, allowing the scheduler to reactivate one or more of
105
    /// possibly idling OS threads
106
    void scheduler_base::do_some_work(std::size_t)
7,745,441✔
107
    {
108
#if defined(HPX_HAVE_THREAD_MANAGER_IDLE_BACKOFF)
109
        if (mode_.data_.load(std::memory_order_relaxed) &
7,745,418✔
110
            policies::scheduler_mode::enable_idle_backoff)
111
        {
112
            cond_.notify_all();
7,743,109✔
113
        }
7,743,109✔
114
#endif
115
    }
7,745,429✔
116

117
    void scheduler_base::suspend(std::size_t num_thread)
840✔
118
    {
119
        HPX_ASSERT(num_thread < suspend_conds_.size());
840✔
120

121
        states_[num_thread].store(hpx::state::sleeping);
850✔
122
        std::unique_lock<pu_mutex_type> l(suspend_mtxs_[num_thread]);
850✔
123
        suspend_conds_[num_thread].wait(l);
850✔
124

125
        // Only set running if still in hpx::state::sleeping. Can be set with
126
        // non-blocking/locking functions to stopping or terminating, in which
127
        // case the state is left untouched.
128
        hpx::state expected = hpx::state::sleeping;
850✔
129
        states_[num_thread].compare_exchange_strong(
850✔
130
            expected, hpx::state::running);
131

132
        HPX_ASSERT(expected == hpx::state::sleeping ||
842✔
133
            expected == hpx::state::stopping ||
134
            expected == hpx::state::terminating);
135
    }
850✔
136

137
    void scheduler_base::resume(std::size_t num_thread)
32,989✔
138
    {
139
        if (num_thread == std::size_t(-1))
32,989✔
140
        {
141
            for (std::condition_variable& c : suspend_conds_)
×
142
            {
143
                c.notify_one();
×
144
            }
145
        }
×
146
        else
147
        {
148
            HPX_ASSERT(num_thread < suspend_conds_.size());
32,989✔
149
            suspend_conds_[num_thread].notify_one();
32,989✔
150
        }
151
    }
32,989✔
152

153
    std::size_t scheduler_base::select_active_pu(
8,233,437✔
154
        std::unique_lock<pu_mutex_type>& l, std::size_t num_thread,
155
        bool allow_fallback)
156
    {
157
        if (mode_.data_.load(std::memory_order_relaxed) &
8,232,948✔
158
            threads::policies::scheduler_mode::enable_elasticity)
159
        {
160
            std::size_t states_size = states_.size();
81,116✔
161

162
            if (!allow_fallback)
81,116✔
163
            {
164
                // Try indefinitely as long as at least one thread is
165
                // available for scheduling. Increase allowed state if no
166
                // threads are available for scheduling.
167
                auto max_allowed_state = hpx::state::suspended;
66,172✔
168

169
                hpx::util::yield_while([this, states_size, &l, &num_thread,
132,344✔
170
                                           &max_allowed_state]() {
171
                    std::size_t num_allowed_threads = 0;
66,172✔
172

173
                    for (std::size_t offset = 0; offset < states_size; ++offset)
153,831✔
174
                    {
175
                        std::size_t num_thread_local =
153,831✔
176
                            (num_thread + offset) % states_size;
153,831✔
177

178
                        l = std::unique_lock<pu_mutex_type>(
153,831✔
179
                            pu_mtxs_[num_thread_local], std::try_to_lock);
153,831✔
180

181
                        if (l.owns_lock())
153,831✔
182
                        {
183
                            if (states_[num_thread_local] <= max_allowed_state)
153,831✔
184
                            {
185
                                num_thread = num_thread_local;
66,172✔
186
                                return false;
66,172✔
187
                            }
188

189
                            l.unlock();
87,659✔
190
                        }
87,659✔
191

192
                        if (states_[num_thread_local] <= max_allowed_state)
87,659✔
193
                        {
194
                            ++num_allowed_threads;
×
195
                        }
×
196
                    }
87,659✔
197

198
                    if (0 == num_allowed_threads)
×
199
                    {
200
                        if (max_allowed_state <= hpx::state::suspended)
×
201
                        {
202
                            max_allowed_state = hpx::state::sleeping;
×
203
                        }
×
204
                        else if (max_allowed_state <= hpx::state::sleeping)
×
205
                        {
206
                            max_allowed_state = hpx::state::stopping;
×
207
                        }
×
208
                        else
209
                        {
210
                            // All threads are terminating or stopped.
211
                            // Just return num_thread to avoid infinite
212
                            // loop.
213
                            return false;
×
214
                        }
215
                    }
×
216

217
                    // Yield after trying all pus, then try again
218
                    return true;
×
219
                });
66,172✔
220

221
                return num_thread;
66,172✔
222
            }
223

224
            // Try all pus only once if fallback is allowed
225
            HPX_ASSERT(num_thread != std::size_t(-1));
14,944✔
226
            for (std::size_t offset = 0; offset < states_size; ++offset)
18,655✔
227
            {
228
                std::size_t num_thread_local =
18,655✔
229
                    (num_thread + offset) % states_size;
18,655✔
230

231
                l = std::unique_lock<pu_mutex_type>(
18,655✔
232
                    pu_mtxs_[num_thread_local], std::try_to_lock);
18,655✔
233

234
                if (l.owns_lock() &&
18,655✔
235
                    states_[num_thread_local] <= hpx::state::suspended)
18,299✔
236
                {
237
                    return num_thread_local;
14,944✔
238
                }
239
            }
3,711✔
240
        }
×
241

242
        return num_thread;
8,151,790✔
243
    }
8,232,907✔
244

245
    // allow to access/manipulate states
246
    std::atomic<hpx::state>& scheduler_base::get_state(std::size_t num_thread)
1,295,618✔
247
    {
248
        HPX_ASSERT(num_thread < states_.size());
1,295,618✔
249
        return states_[num_thread];
1,295,595✔
250
    }
251
    std::atomic<hpx::state> const& scheduler_base::get_state(
×
252
        std::size_t num_thread) const
253
    {
254
        HPX_ASSERT(num_thread < states_.size());
×
255
        return states_[num_thread];
×
256
    }
257

258
    void scheduler_base::set_all_states(hpx::state s)
1,570✔
259
    {
260
        typedef std::atomic<hpx::state> state_type;
261
        for (state_type& state : states_)
5,921✔
262
        {
263
            state.store(s);
4,351✔
264
        }
265
    }
1,570✔
266

267
    void scheduler_base::set_all_states_at_least(hpx::state s)
3,137✔
268
    {
269
        typedef std::atomic<hpx::state> state_type;
270
        for (state_type& state : states_)
11,839✔
271
        {
272
            if (state < s)
8,702✔
273
            {
274
                state.store(s);
4,353✔
275
            }
4,353✔
276
        }
277
    }
3,137✔
278

279
    // return whether all states are at least at the given one
280
    bool scheduler_base::has_reached_state(hpx::state s) const
3,141✔
281
    {
282
        typedef std::atomic<hpx::state> state_type;
283
        for (state_type const& state : states_)
3,141✔
284
        {
285
            if (state.load(std::memory_order_relaxed) < s)
3,141✔
286
                return false;
3,141✔
287
        }
288
        return true;
×
289
    }
3,141✔
290

291
    bool scheduler_base::is_state(hpx::state s) const
×
292
    {
293
        typedef std::atomic<hpx::state> state_type;
294
        for (state_type const& state : states_)
×
295
        {
296
            if (state.load(std::memory_order_relaxed) != s)
×
297
                return false;
×
298
        }
299
        return true;
×
300
    }
×
301

302
    std::pair<hpx::state, hpx::state> scheduler_base::get_minmax_state() const
1,272,814✔
303
    {
304
        std::pair<hpx::state, hpx::state> result(
1,272,800✔
305
            hpx::state::last_valid_runtime_state,
1,272,800✔
306
            hpx::state::first_valid_runtime_state);
1,272,800✔
307

308
        typedef std::atomic<hpx::state> state_type;
309
        for (state_type const& state_iter : states_)
5,304,720✔
310
        {
311
            hpx::state s = state_iter.load();
4,031,910✔
312
            result.first = (std::min)(result.first, s);
4,031,910✔
313
            result.second = (std::max)(result.second, s);
4,031,910✔
314
        }
315

316
        return result;
1,272,754✔
317
    }
318

319
    // get/set scheduler mode
320
    void scheduler_base::set_scheduler_mode(scheduler_mode mode)
4,771✔
321
    {
322
        // distribute the same value across all cores
323
        mode_.data_.store(mode, std::memory_order_release);
4,771✔
324
        do_some_work(std::size_t(-1));
4,771✔
325
    }
4,771✔
326

327
    void scheduler_base::add_scheduler_mode(scheduler_mode mode)
1,563✔
328
    {
329
        // distribute the same value across all cores
330
        mode = scheduler_mode(get_scheduler_mode() | mode);
1,563✔
331
        set_scheduler_mode(mode);
1,563✔
332
    }
1,563✔
333

334
    void scheduler_base::remove_scheduler_mode(scheduler_mode mode)
62✔
335
    {
336
        mode = scheduler_mode(get_scheduler_mode() & ~mode);
62✔
337
        set_scheduler_mode(mode);
62✔
338
    }
62✔
339

340
    void scheduler_base::add_remove_scheduler_mode(
×
341
        scheduler_mode to_add_mode, scheduler_mode to_remove_mode)
342
    {
343
        scheduler_mode mode = scheduler_mode(
×
344
            (get_scheduler_mode() | to_add_mode) & ~to_remove_mode);
×
345
        set_scheduler_mode(mode);
×
346
    }
×
347

348
    void scheduler_base::update_scheduler_mode(scheduler_mode mode, bool set)
1,564✔
349
    {
350
        if (set)
1,564✔
351
        {
352
            add_scheduler_mode(mode);
1,562✔
353
        }
1,562✔
354
        else
355
        {
356
            remove_scheduler_mode(mode);
2✔
357
        }
358
    }
1,564✔
359

360
    ///////////////////////////////////////////////////////////////////////////
361
    std::int64_t scheduler_base::get_background_thread_count()
1,576,842✔
362
    {
363
        return background_thread_count_;
1,576,842✔
364
    }
365

366
    void scheduler_base::increment_background_thread_count()
599✔
367
    {
368
        ++background_thread_count_;
599✔
369
    }
599✔
370

371
    void scheduler_base::decrement_background_thread_count()
596✔
372
    {
373
        --background_thread_count_;
596✔
374
    }
596✔
375

376
#if defined(HPX_HAVE_SCHEDULER_LOCAL_STORAGE)
377
    coroutines::detail::tss_data_node* scheduler_base::find_tss_data(
378
        void const* key)
379
    {
380
        if (!thread_data_)
381
            return nullptr;
382
        return thread_data_->find(key);
383
    }
384

385
    void scheduler_base::add_new_tss_node(void const* key,
386
        std::shared_ptr<coroutines::detail::tss_cleanup_function> const& func,
387
        void* tss_data)
388
    {
389
        if (!thread_data_)
390
        {
391
            thread_data_ = std::make_shared<coroutines::detail::tss_storage>();
392
        }
393
        thread_data_->insert(key, func, tss_data);
394
    }
395

396
    void scheduler_base::erase_tss_node(void const* key, bool cleanup_existing)
397
    {
398
        if (thread_data_)
399
            thread_data_->erase(key, cleanup_existing);
400
    }
401

402
    void* scheduler_base::get_tss_data(void const* key)
403
    {
404
        if (coroutines::detail::tss_data_node* const current_node =
405
                find_tss_data(key))
406
        {
407
            return current_node->get_value();
408
        }
409
        return nullptr;
410
    }
411

412
    void scheduler_base::set_tss_data(void const* key,
413
        std::shared_ptr<coroutines::detail::tss_cleanup_function> const& func,
414
        void* tss_data, bool cleanup_existing)
415
    {
416
        if (coroutines::detail::tss_data_node* const current_node =
417
                find_tss_data(key))
418
        {
419
            if (func || (tss_data != 0))
420
                current_node->reinit(func, tss_data, cleanup_existing);
421
            else
422
                erase_tss_node(key, cleanup_existing);
423
        }
424
        else if (func || (tss_data != 0))
425
        {
426
            add_new_tss_node(key, func, tss_data);
427
        }
428
    }
429
#endif
430

431
    std::ostream& operator<<(std::ostream& os, scheduler_base const& scheduler)
28,872,550✔
432
    {
433
        os << scheduler.get_description() << "(" << &scheduler << ")";
28,872,834✔
434

435
        return os;
28,872,834✔
436
    }
437
}}}    // namespace hpx::threads::policies
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc