• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #871

22 Jan 2023 11:22PM UTC coverage: 86.624% (+0.7%) from 85.97%
#871

push

StellarBot
Merge #6144

6144: General improvements to scheduling and related fixes r=hkaiser a=hkaiser

This is a collection of unrelated improvements applied to different parts of the code

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

152 of 152 new or added lines in 23 files covered. (100.0%)

174953 of 201969 relevant lines covered (86.62%)

1838882.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.89
/libs/core/threadmanager/include/hpx/modules/threadmanager.hpp
1
//  Copyright (c) 2007-2023 Hartmut Kaiser
2
//  Copyright (c) 2007-2009 Chirag Dekate, Anshul Tandon
3
//  Copyright (c)      2011 Bryce Lelbach, Katelyn Kufahl
4
//  Copyright (c)      2017 Shoshana Jakobovits
5
//  SPDX-License-Identifier: BSL-1.0
6
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
7
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8

9
#pragma once
10

11
#include <hpx/config.hpp>
12
#include <hpx/concurrency/barrier.hpp>
13
#include <hpx/concurrency/spinlock.hpp>
14
#include <hpx/io_service/io_service_pool.hpp>
15
#include <hpx/modules/errors.hpp>
16
#include <hpx/resource_partitioner/detail/partitioner.hpp>
17
#include <hpx/runtime_configuration/runtime_configuration.hpp>
18
#include <hpx/thread_pools/scheduled_thread_pool.hpp>
19
#include <hpx/threading_base/scheduler_mode.hpp>
20
#include <hpx/threading_base/scheduler_state.hpp>
21
#include <hpx/threading_base/thread_init_data.hpp>
22
#include <hpx/threading_base/thread_num_tss.hpp>
23
#include <hpx/threading_base/thread_pool_base.hpp>
24
#include <hpx/threadmanager/threadmanager_fwd.hpp>
25
#include <hpx/topology/cpu_mask.hpp>
26

27
#include <atomic>
28
#include <cstddef>
29
#include <cstdint>
30
#include <exception>
31
#include <iosfwd>
32
#include <memory>
33
#include <mutex>
34
#include <numeric>
35
#include <string>
36
#include <thread>
37
#include <type_traits>
38
#include <utility>
39
#include <vector>
40

41
#include <hpx/config/warnings_prefix.hpp>
42

43
namespace hpx { namespace threads {
44
    ///////////////////////////////////////////////////////////////////////////
45
    /// The \a thread-manager class is the central instance of management for
46
    /// all (non-depleted) threads
47
    class threadmanager
48
    {
49
    private:
50
        // we use a simple mutex to protect the data members of the
51
        // thread manager for now
52
        typedef std::mutex mutex_type;
53

54
    public:
55
        typedef threads::policies::callback_notifier notification_policy_type;
56
        typedef std::unique_ptr<thread_pool_base> pool_type;
57
        typedef threads::policies::scheduler_base scheduler_type;
58
        typedef std::vector<pool_type> pool_vector;
59

60
        threadmanager(hpx::util::runtime_configuration& rtcfg_,
61
#ifdef HPX_HAVE_TIMER_POOL
62
            util::io_service_pool& timer_pool,
63
#endif
64
            notification_policy_type& notifier,
65
            detail::network_background_callback_type
66
                network_background_callback =
67
                    detail::network_background_callback_type());
68
        ~threadmanager();
69

70
        void init();
71
        void create_pools();
72

73
        //! FIXME move to private and add --hpx:printpools cmd line option
74
        void print_pools(std::ostream&);
75

76
        // Get functions
77
        thread_pool_base& default_pool() const;
78

79
        scheduler_type& default_scheduler() const;
80

81
        thread_pool_base& get_pool(std::string const& pool_name) const;
82
        thread_pool_base& get_pool(pool_id_type const& pool_id) const;
83
        thread_pool_base& get_pool(std::size_t thread_index) const;
84

85
        bool pool_exists(std::string const& pool_name) const;
86
        bool pool_exists(std::size_t pool_index) const;
87

88
        /// The function \a register_work adds a new work item to the thread
89
        /// manager. It doesn't immediately create a new \a thread, it just adds
90
        /// the task parameters (function, initial state and description) to
91
        /// the internal management data structures. The thread itself will be
92
        /// created when the number of existing threads drops below the number
93
        /// of threads specified by the constructors max_count parameter.
94
        ///
95
        /// \param func   [in] The function or function object to execute as
96
        ///               the thread's function. This must have a signature as
97
        ///               defined by \a thread_function_type.
98
        /// \param description [in] The value of this parameter allows to
99
        ///               specify a description of the thread to create. This
100
        ///               information is used for logging purposes mainly, but
101
        ///               might be useful for debugging as well. This parameter
102
        ///               is optional and defaults to an empty string.
103
        thread_id_ref_type register_work(
104
            thread_init_data& data, error_code& ec = throws);
105

106
        /// The function \a register_thread adds a new work item to the thread
107
        /// manager. It creates a new \a thread, adds it to the internal
108
        /// management data structures, and schedules the new thread, if
109
        /// appropriate.
110
        ///
111
        /// \param func   [in] The function or function object to execute as
112
        ///               the thread's function. This must have a signature as
113
        ///               defined by \a thread_function_type.
114
        /// \param id     [out] This parameter will hold the id of the created
115
        ///               thread. This id is guaranteed to be validly
116
        ///               initialized before the thread function is executed.
117
        /// \param description [in] The value of this parameter allows to
118
        ///               specify a description of the thread to create. This
119
        ///               information is used for logging purposes mainly, but
120
        ///               might be useful for debugging as well. This parameter
121
        ///               is optional and defaults to an empty string.
122
        void register_thread(thread_init_data& data, thread_id_ref_type& id,
123
            error_code& ec = throws);
124

125
        /// \brief  Run the thread manager's work queue. This function
126
        ///         instantiates the specified number of OS threads in each
127
        ///         pool. All OS threads are started to execute the function
128
        ///         \a tfunc.
129
        ///
130
        /// \returns      The function returns \a true if the thread manager
131
        ///               has been started successfully, otherwise it returns
132
        ///               \a false.
133
        bool run();
134

135
        /// \brief Forcefully stop the thread-manager
136
        ///
137
        /// \param blocking
138
        ///
139
        void stop(bool blocking = true);
140

141
        bool is_busy();
142
        bool is_idle();
143
        void wait();
144

145
        // \brief Suspend all thread pools.
146
        void suspend();
147

148
        // \brief Resume all thread pools.
149
        void resume();
150

151
        /// \brief Return whether the thread manager is still running
152
        //! This returns the "minimal state", i.e. the state of the
153
        //! least advanced thread pool
154
        state status() const
2,540,260✔
155
        {
156
            hpx::state result(hpx::state::last_valid_runtime_state);
2,540,203✔
157

158
            for (auto& pool_iter : pools_)
5,080,352✔
159
            {
160
                hpx::state s = pool_iter->get_state();
2,540,158✔
161
                result = (std::min)(result, s);
2,540,158✔
162
            }
163

164
            return result;
2,540,061✔
165
        }
166

167
        /// \brief return the number of HPX-threads with the given state
168
        ///
169
        /// \note This function lock the internal OS lock in the thread manager
170
        std::int64_t get_thread_count(
171
            thread_schedule_state state = thread_schedule_state::unknown,
172
            thread_priority priority = thread_priority::default_,
173
            std::size_t num_thread = std::size_t(-1), bool reset = false);
174

175
        std::int64_t get_idle_core_count();
176

177
        mask_type get_idle_core_mask();
178

179
        std::int64_t get_background_thread_count() const;
180

181
        // Enumerate all matching threads
182
        bool enumerate_threads(hpx::function<bool(thread_id_type)> const& f,
183
            thread_schedule_state state = thread_schedule_state::unknown) const;
184

185
        // \brief Abort all threads which are in suspended state. This will set
186
        //        the state of all suspended threads to \a pending while
187
        //        supplying the wait_abort extended state flag
188
        void abort_all_suspended_threads();
189

190
        // \brief Clean up terminated threads. This deletes all threads which
191
        //        have been terminated but which are still held in the queue
192
        //        of terminated threads. Some schedulers might not do anything
193
        //        here.
194
        bool cleanup_terminated(bool delete_all);
195

196
        /// \brief Return the number of OS threads running in this thread-manager
197
        ///
198
        /// This function will return correct results only if the thread-manager
199
        /// is running.
200
        std::size_t get_os_thread_count() const
1,129✔
201
        {
202
            std::lock_guard<mutex_type> lk(mtx_);
1,129✔
203
            std::size_t total = 0;
1,129✔
204
            for (auto& pool_iter : pools_)
2,261✔
205
            {
206
                total += pool_iter->get_os_thread_count();
1,132✔
207
            }
208
            return total;
1,129✔
209
        }
1,129✔
210

211
        std::thread& get_os_thread_handle(std::size_t num_thread) const
×
212
        {
213
            std::lock_guard<mutex_type> lk(mtx_);
×
214
            pool_id_type id = threads_lookup_[num_thread];
×
215
            thread_pool_base& pool = get_pool(id);
×
216
            return pool.get_os_thread_handle(num_thread);
×
217
        }
×
218

219
    public:
220
        /// API functions forwarding to notification policy
221

222
        /// This notifies the thread manager that the passed exception has been
223
        /// raised. The exception will be routed through the notifier and the
224
        /// scheduler (which will result in it being passed to the runtime
225
        /// object, which in turn will report it to the console, etc.).
226
        void report_error(std::size_t num_thread, std::exception_ptr const& e)
×
227
        {
228
            // propagate the error reporting to all pools, which in turn
229
            // will propagate to schedulers
230
            for (auto& pool_iter : pools_)
×
231
            {
232
                pool_iter->report_error(num_thread, e);
×
233
            }
234
        }
×
235

236
    public:
237
        /// Returns the mask identifying all processing units used by this
238
        /// thread manager.
239
        mask_type get_used_processing_units() const
4✔
240
        {
241
            mask_type total_used_processing_punits = mask_type();
4✔
242
            threads::resize(total_used_processing_punits,
4✔
243
                static_cast<std::size_t>(hardware_concurrency()));
4✔
244

245
            for (auto& pool_iter : pools_)
8✔
246
            {
247
                total_used_processing_punits |=
4✔
248
                    pool_iter->get_used_processing_units();
4✔
249
            }
250

251
            return total_used_processing_punits;
4✔
252
        }
4✔
253

254
        hwloc_bitmap_ptr get_pool_numa_bitmap(
4✔
255
            std::string const& pool_name) const
256
        {
257
            return get_pool(pool_name).get_numa_domain_bitmap();
4✔
258
        }
259

260
        void set_scheduler_mode(threads::policies::scheduler_mode mode) noexcept
×
261
        {
262
            for (auto& pool_iter : pools_)
×
263
            {
264
                pool_iter->get_scheduler()->set_scheduler_mode(mode);
×
265
            }
266
        }
×
267

268
        void add_scheduler_mode(threads::policies::scheduler_mode mode) noexcept
1✔
269
        {
270
            for (auto& pool_iter : pools_)
2✔
271
            {
272
                pool_iter->get_scheduler()->add_scheduler_mode(mode);
1✔
273
            }
274
        }
1✔
275

276
        void add_remove_scheduler_mode(
×
277
            threads::policies::scheduler_mode to_add_mode,
278
            threads::policies::scheduler_mode to_remove_mode) noexcept
279
        {
280
            for (auto& pool_iter : pools_)
×
281
            {
282
                pool_iter->get_scheduler()->add_remove_scheduler_mode(
×
283
                    to_add_mode, to_remove_mode);
×
284
            }
285
        }
×
286

287
        void remove_scheduler_mode(
1✔
288
            threads::policies::scheduler_mode mode) noexcept
289
        {
290
            for (auto& pool_iter : pools_)
2✔
291
            {
292
                pool_iter->get_scheduler()->remove_scheduler_mode(mode);
1✔
293
            }
294
        }
1✔
295

296
        void reset_thread_distribution()
×
297
        {
298
            for (auto& pool_iter : pools_)
×
299
            {
300
                pool_iter->reset_thread_distribution();
×
301
            }
302
        }
×
303

304
        void init_tss(std::size_t global_thread_num)
5,600✔
305
        {
306
            detail::set_global_thread_num_tss(global_thread_num);
5,600✔
307
        }
5,600✔
308

309
        void deinit_tss()
8,018✔
310
        {
311
            detail::set_global_thread_num_tss(std::size_t(-1));
7,999✔
312
        }
7,999✔
313

314
    public:
315
        // performance counters
316
        std::int64_t get_queue_length(bool reset);
317
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
318
        std::int64_t get_average_thread_wait_time(bool reset);
319
        std::int64_t get_average_task_wait_time(bool reset);
320
#endif
321
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
322
    defined(HPX_HAVE_THREAD_IDLE_RATES)
323
        std::int64_t get_background_work_duration(bool reset);
324
        std::int64_t get_background_overhead(bool reset);
325

326
        std::int64_t get_background_send_duration(bool reset);
327
        std::int64_t get_background_send_overhead(bool reset);
328

329
        std::int64_t get_background_receive_duration(bool reset);
330
        std::int64_t get_background_receive_overhead(bool reset);
331
#endif    //HPX_HAVE_BACKGROUND_THREAD_COUNTERS
332

333
        std::int64_t get_cumulative_duration(bool reset);
334

335
        std::int64_t get_thread_count_unknown(bool reset)
1✔
336
        {
337
            return get_thread_count(thread_schedule_state::unknown,
1✔
338
                thread_priority::default_, std::size_t(-1), reset);
1✔
339
        }
340
        std::int64_t get_thread_count_active(bool reset)
1✔
341
        {
342
            return get_thread_count(thread_schedule_state::active,
1✔
343
                thread_priority::default_, std::size_t(-1), reset);
1✔
344
        }
345
        std::int64_t get_thread_count_pending(bool reset)
1✔
346
        {
347
            return get_thread_count(thread_schedule_state::pending,
1✔
348
                thread_priority::default_, std::size_t(-1), reset);
1✔
349
        }
350
        std::int64_t get_thread_count_suspended(bool reset)
1✔
351
        {
352
            return get_thread_count(thread_schedule_state::suspended,
1✔
353
                thread_priority::default_, std::size_t(-1), reset);
1✔
354
        }
355
        std::int64_t get_thread_count_terminated(bool reset)
1✔
356
        {
357
            return get_thread_count(thread_schedule_state::terminated,
1✔
358
                thread_priority::default_, std::size_t(-1), reset);
1✔
359
        }
360
        std::int64_t get_thread_count_staged(bool reset)
1✔
361
        {
362
            return get_thread_count(thread_schedule_state::staged,
1✔
363
                thread_priority::default_, std::size_t(-1), reset);
1✔
364
        }
365

366
#ifdef HPX_HAVE_THREAD_IDLE_RATES
367
        std::int64_t avg_idle_rate(bool reset) noexcept;
368
#ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES
369
        std::int64_t avg_creation_idle_rate(bool reset) noexcept;
370
        std::int64_t avg_cleanup_idle_rate(bool reset) noexcept;
371
#endif
372
#endif
373

374
#ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS
375
        std::int64_t get_executed_threads(bool reset) noexcept;
376
        std::int64_t get_executed_thread_phases(bool reset) noexcept;
377
#ifdef HPX_HAVE_THREAD_IDLE_RATES
378
        std::int64_t get_thread_duration(bool reset);
379
        std::int64_t get_thread_phase_duration(bool reset);
380
        std::int64_t get_thread_overhead(bool reset);
381
        std::int64_t get_thread_phase_overhead(bool reset);
382
        std::int64_t get_cumulative_thread_duration(bool reset);
383
        std::int64_t get_cumulative_thread_overhead(bool reset);
384
#endif
385
#endif
386

387
#ifdef HPX_HAVE_THREAD_STEALING_COUNTS
388
        std::int64_t get_num_pending_misses(bool reset);
389
        std::int64_t get_num_pending_accesses(bool reset);
390
        std::int64_t get_num_stolen_from_pending(bool reset);
391
        std::int64_t get_num_stolen_from_staged(bool reset);
392
        std::int64_t get_num_stolen_to_pending(bool reset);
393
        std::int64_t get_num_stolen_to_staged(bool reset);
394
#endif
395

396
    private:
397
        policies::thread_queue_init_parameters get_init_parameters() const;
398
        void create_scheduler_user_defined(
399
            hpx::resource::scheduler_function const&,
400
            thread_pool_init_parameters const&,
401
            policies::thread_queue_init_parameters const&);
402
        void create_scheduler_local(thread_pool_init_parameters const&,
403
            policies::thread_queue_init_parameters const&, std::size_t);
404
        void create_scheduler_local_priority_fifo(
405
            thread_pool_init_parameters const&,
406
            policies::thread_queue_init_parameters const&, std::size_t);
407
        void create_scheduler_local_priority_lifo(
408
            thread_pool_init_parameters const&,
409
            policies::thread_queue_init_parameters const&, std::size_t);
410
        void create_scheduler_static(thread_pool_init_parameters const&,
411
            policies::thread_queue_init_parameters const&, std::size_t);
412
        void create_scheduler_static_priority(
413
            thread_pool_init_parameters const&,
414
            policies::thread_queue_init_parameters const&, std::size_t);
415
        void create_scheduler_abp_priority_fifo(
416
            thread_pool_init_parameters const&,
417
            policies::thread_queue_init_parameters const&, std::size_t);
418
        void create_scheduler_abp_priority_lifo(
419
            thread_pool_init_parameters const&,
420
            policies::thread_queue_init_parameters const&, std::size_t);
421
        void create_scheduler_shared_priority(
422
            thread_pool_init_parameters const&,
423
            policies::thread_queue_init_parameters const&, std::size_t);
424

425
        mutable mutex_type mtx_;    // mutex protecting the members
426

427
        hpx::util::runtime_configuration& rtcfg_;
428
        std::vector<pool_id_type> threads_lookup_;
429

430
#ifdef HPX_HAVE_TIMER_POOL
431
        util::io_service_pool& timer_pool_;    // used for timed set_state
432
#endif
433
        pool_vector pools_;
434

435
        notification_policy_type& notifier_;
436
        detail::network_background_callback_type network_background_callback_;
437
    };
438
}}    // namespace hpx::threads
439

440
#include <hpx/config/warnings_suffix.hpp>
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc