• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #857

28 Dec 2022 11:12PM UTC coverage: 86.543% (-0.06%) from 86.602%
#857

push

StellarBot
Merge #6118

6118: Modernize modules from level 17, 18, 19, and 20 r=hkaiser a=hkaiser

working towards https://github.com/STEllAR-GROUP/hpx/issues/5497

Modules:
- core/threading_base
- full/command_line_handling
- core/io_service
- core/schedulers
- core/synchronization
- core/futures
- core/thread_pools
- core/lcos_local
- core/pack_traversal
- core/resource_partitioner
- core/threading
- full/naming_base


Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

849 of 849 new or added lines in 98 files covered. (100.0%)

174389 of 201505 relevant lines covered (86.54%)

1916353.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.82
/libs/core/threading_base/include/hpx/threading_base/thread_pool_base.hpp
1
//  Copyright (c)      2018 Mikael Simberg
2
//  Copyright (c) 2007-2022 Hartmut Kaiser
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
#pragma once
9

10
#include <hpx/config.hpp>
11
#include <hpx/affinity/affinity_data.hpp>
12
#include <hpx/concurrency/barrier.hpp>
13
#include <hpx/functional/function.hpp>
14
#include <hpx/modules/errors.hpp>
15
#include <hpx/threading_base/callback_notifier.hpp>
16
#include <hpx/threading_base/network_background_callback.hpp>
17
#include <hpx/threading_base/scheduler_mode.hpp>
18
#include <hpx/threading_base/scheduler_state.hpp>
19
#include <hpx/threading_base/thread_init_data.hpp>
20
#include <hpx/timing/steady_clock.hpp>
21
#include <hpx/topology/cpu_mask.hpp>
22
#include <hpx/topology/topology.hpp>
23

24
#include <cstddef>
25
#include <cstdint>
26
#include <exception>
27
#include <functional>
28
#include <iosfwd>
29
#include <memory>
30
#include <mutex>
31
#include <string>
32
#include <thread>
33
#include <vector>
34

35
#include <hpx/config/warnings_prefix.hpp>
36

37
namespace hpx::threads {
38

39
    ///////////////////////////////////////////////////////////////////////////
40
    /// \cond NOINTERNAL
41
    struct pool_id_type
63,663,498✔
42
    {
43
        pool_id_type(std::size_t index, std::string name) noexcept
1,580✔
44
          : index_(index)
1,580✔
45
          , name_(HPX_MOVE(name))
1,580✔
46
        {
47
        }
1,580✔
48

49
        std::size_t index() const noexcept
31,819,663✔
50
        {
51
            return index_;
31,819,748✔
52
        }
53

54
        std::string const& name() const noexcept
32,109,549✔
55
        {
56
            return name_;
32,109,683✔
57
        }
58

59
    private:
60
        std::size_t const index_;
61
        std::string const name_;
62
    };
63
    /// \endcond
64

65
    struct thread_pool_init_parameters
66
    {
67
        std::string const& name_;
68
        std::size_t index_;
69
        policies::scheduler_mode mode_;
70
        std::size_t num_threads_;
71
        std::size_t thread_offset_;
72
        hpx::threads::policies::callback_notifier& notifier_;
73
        hpx::threads::policies::detail::affinity_data const& affinity_data_;
74
        hpx::threads::detail::network_background_callback_type const&
75
            network_background_callback_;
76
        std::size_t max_background_threads_;
77
        std::size_t max_idle_loop_count_;
78
        std::size_t max_busy_loop_count_;
79
        std::size_t shutdown_check_count_;
80

81
        thread_pool_init_parameters(std::string const& name, std::size_t index,
1,580✔
82
            policies::scheduler_mode mode, std::size_t num_threads,
83
            std::size_t thread_offset,
84
            hpx::threads::policies::callback_notifier& notifier,
85
            hpx::threads::policies::detail::affinity_data const& affinity_data,
86
            hpx::threads::detail::network_background_callback_type const&
87
                network_background_callback =
88
                    hpx::threads::detail::network_background_callback_type(),
89
            std::size_t max_background_threads = std::size_t(-1),
90
            std::size_t max_idle_loop_count = HPX_IDLE_LOOP_COUNT_MAX,
91
            std::size_t max_busy_loop_count = HPX_BUSY_LOOP_COUNT_MAX,
92
            std::size_t shutdown_check_count = 10)
93
          : name_(name)
1,580✔
94
          , index_(index)
1,580✔
95
          , mode_(mode)
1,580✔
96
          , num_threads_(num_threads)
1,580✔
97
          , thread_offset_(thread_offset)
1,580✔
98
          , notifier_(notifier)
1,580✔
99
          , affinity_data_(affinity_data)
1,580✔
100
          , network_background_callback_(network_background_callback)
1,580✔
101
          , max_background_threads_(max_background_threads)
1,580✔
102
          , max_idle_loop_count_(max_idle_loop_count)
1,580✔
103
          , max_busy_loop_count_(max_busy_loop_count)
1,580✔
104
          , shutdown_check_count_(shutdown_check_count)
1,580✔
105
        {
106
        }
1,580✔
107
    };
108

109
    ///////////////////////////////////////////////////////////////////////////
110
    // note: this data structure has to be protected from races from the outside
111

112
    /// \brief The base class used to manage a pool of OS threads.
113
    class HPX_CORE_EXPORT thread_pool_base
114
    {
115
    public:
116
        /// \cond NOINTERNAL
117
        explicit thread_pool_base(thread_pool_init_parameters const& init);
118

119
        virtual ~thread_pool_base() = default;
1,577✔
120

121
        virtual void init(std::size_t num_threads, std::size_t threads_offset);
122

123
        virtual bool run(
124
            std::unique_lock<std::mutex>& l, std::size_t num_threads) = 0;
125

126
        virtual void stop(
127
            std::unique_lock<std::mutex>& l, bool blocking = true) = 0;
128

129
        virtual void wait() = 0;
130
        virtual bool is_busy() = 0;
131
        virtual bool is_idle() = 0;
132

133
        virtual void print_pool(std::ostream&) const = 0;
134

135
        pool_id_type get_pool_id() const
31,813,288✔
136
        {
137
            return id_;
31,813,688✔
138
        }
139
        /// \endcond
140

141
        /// Suspends the given processing unit. Blocks until the processing unit
142
        /// has been suspended.
143
        ///
144
        /// \param virt_core [in] The processing unit on the the pool to be
145
        ///                  suspended. The processing units are indexed
146
        ///                  starting from 0.
147
        virtual void suspend_processing_unit_direct(
148
            std::size_t virt_core, error_code& ec = throws) = 0;
149

150
        /// Resumes the given processing unit. Blocks until the processing unit
151
        /// has been resumed.
152
        ///
153
        /// \param virt_core [in] The processing unit on the the pool to be resumed.
154
        ///                  The processing units are indexed starting from 0.
155
        virtual void resume_processing_unit_direct(
156
            std::size_t virt_core, error_code& ec = throws) = 0;
157

158
        /// Resumes the thread pool. Blocks until all OS threads on the thread pool
159
        /// have been resumed.
160
        ///
161
        /// \param ec [in,out] this represents the error status on exit, if this
162
        ///           is pre-initialized to \a hpx#throws the function will
163
        ///           throw on error instead.
164
        virtual void resume_direct(error_code& ec = throws) = 0;
165

166
        /// Suspends the thread pool. Blocks until all OS threads on the thread pool
167
        /// have been suspended.
168
        ///
169
        /// \note A thread pool cannot be suspended from an HPX thread running
170
        ///       on the pool itself.
171
        ///
172
        /// \param ec [in,out] this represents the error status on exit, if this
173
        ///           is pre-initialized to \a hpx#throws the function will
174
        ///           throw on error instead.
175
        ///
176
        /// \throws hpx::exception if called from an HPX thread which is running
177
        ///         on the pool itself.
178
        virtual void suspend_direct(error_code& ec = throws) = 0;
179

180
    public:
181
        /// \cond NOINTERNAL
182
        virtual std::size_t get_os_thread_count() const = 0;
183

184
        virtual std::thread& get_os_thread_handle(std::size_t num_thread) = 0;
185

186
        virtual std::size_t get_active_os_thread_count() const;
187

188
        virtual void create_thread(
189
            thread_init_data& data, thread_id_ref_type& id, error_code& ec) = 0;
190
        virtual thread_id_ref_type create_work(
191
            thread_init_data& data, error_code& ec) = 0;
192

193
        virtual thread_state set_state(thread_id_type const& id,
194
            thread_schedule_state new_state, thread_restart_state new_state_ex,
195
            thread_priority priority, error_code& ec) = 0;
196

197
        virtual thread_id_ref_type set_state(
198
            hpx::chrono::steady_time_point const& abs_time,
199
            thread_id_type const& id, thread_schedule_state newstate,
200
            thread_restart_state newstate_ex, thread_priority priority,
201
            error_code& ec) = 0;
202

203
        std::size_t get_pool_index() const noexcept
3,171✔
204
        {
205
            return id_.index();
3,171✔
206
        }
207

208
        std::string const& get_pool_name() const noexcept
284,756✔
209
        {
210
            return id_.name();
284,756✔
211
        }
212

213
        std::size_t get_thread_offset() const noexcept
117,560✔
214
        {
215
            return thread_offset_;
117,564✔
216
        }
217

218
        virtual policies::scheduler_base* get_scheduler() const
×
219
        {
220
            return nullptr;
×
221
        }
222

223
        mask_type get_used_processing_units(bool full_cores = false) const;
224
        mask_type get_used_processing_units(
225
            std::size_t num_cores, bool full_cores = false) const;
226
        mask_type get_used_processing_unit(
227
            std::size_t thread_num, bool full_cores = false) const;
228

229
        hwloc_bitmap_ptr get_numa_domain_bitmap() const;
230

231
        // performance counters
232
#if defined(HPX_HAVE_THREAD_CUMULATIVE_COUNTS)
233
        virtual std::int64_t get_executed_threads(
×
234
            std::size_t /*thread_num*/, bool /*reset*/)
235
        {
236
            return 0;
×
237
        }
238
        virtual std::int64_t get_executed_thread_phases(
×
239
            std::size_t /*thread_num*/, bool /*reset*/)
240
        {
241
            return 0;
×
242
        }
243
#if defined(HPX_HAVE_THREAD_IDLE_RATES)
244
        virtual std::int64_t get_thread_phase_duration(
245
            std::size_t /*thread_num*/, bool /*reset*/)
246
        {
247
            return 0;
248
        }
249
        virtual std::int64_t get_thread_duration(
250
            std::size_t /*thread_num*/, bool /*reset*/)
251
        {
252
            return 0;
253
        }
254
        virtual std::int64_t get_thread_phase_overhead(
255
            std::size_t /*thread_num*/, bool /*reset*/)
256
        {
257
            return 0;
258
        }
259
        virtual std::int64_t get_thread_overhead(
260
            std::size_t /*thread_num*/, bool /*reset*/)
261
        {
262
            return 0;
263
        }
264
        virtual std::int64_t get_cumulative_thread_duration(
265
            std::size_t /*thread_num*/, bool /*reset*/)
266
        {
267
            return 0;
268
        }
269
        virtual std::int64_t get_cumulative_thread_overhead(
270
            std::size_t /*thread_num*/, bool /*reset*/)
271
        {
272
            return 0;
273
        }
274
#endif
275
#endif
276
        virtual std::int64_t get_cumulative_duration(
×
277
            std::size_t /*thread_num*/, bool /*reset*/)
278
        {
279
            return 0;
×
280
        }
281

282
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
283
    defined(HPX_HAVE_THREAD_IDLE_RATES)
284
        virtual std::int64_t get_background_work_duration(
285
            std::size_t /*thread_num*/, bool /*reset*/)
286
        {
287
            return 0;
288
        }
289
        virtual std::int64_t get_background_overhead(
290
            std::size_t /*thread_num*/, bool /*reset*/)
291
        {
292
            return 0;
293
        }
294

295
        virtual std::int64_t get_background_send_duration(
296
            std::size_t /*thread_num*/, bool /*reset*/)
297
        {
298
            return 0;
299
        }
300
        virtual std::int64_t get_background_send_overhead(
301
            std::size_t /*thread_num*/, bool /*reset*/)
302
        {
303
            return 0;
304
        }
305

306
        virtual std::int64_t get_background_receive_duration(
307
            std::size_t /*thread_num*/, bool /*reset*/)
308
        {
309
            return 0;
310
        }
311
        virtual std::int64_t get_background_receive_overhead(
312
            std::size_t /*thread_num*/, bool /*reset*/)
313
        {
314
            return 0;
315
        }
316
#endif    // HPX_HAVE_BACKGROUND_THREAD_COUNTERS
317

318
#if defined(HPX_HAVE_THREAD_IDLE_RATES)
319
        virtual std::int64_t avg_idle_rate_all(bool /*reset*/)
320
        {
321
            return 0;
322
        }
323
        virtual std::int64_t avg_idle_rate(std::size_t, bool)
324
        {
325
            return 0;
326
        }
327

328
#if defined(HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES)
329
        virtual std::int64_t avg_creation_idle_rate(
330
            std::size_t /*thread_num*/, bool /*reset*/)
331
        {
332
            return 0;
333
        }
334
        virtual std::int64_t avg_cleanup_idle_rate(
335
            std::size_t /*thread_num*/, bool /*reset*/)
336
        {
337
            return 0;
338
        }
339
#endif
340
#endif
341
        virtual std::int64_t get_queue_length(std::size_t, bool)
×
342
        {
343
            return 0;
×
344
        }
345

346
#if defined(HPX_HAVE_THREAD_QUEUE_WAITTIME)
347
        virtual std::int64_t get_average_thread_wait_time(
348
            std::size_t /*thread_num*/, bool /*reset*/)
349
        {
350
            return 0;
351
        }
352
        virtual std::int64_t get_average_task_wait_time(
353
            std::size_t /*thread_num*/, bool /*reset*/)
354
        {
355
            return 0;
356
        }
357
#endif
358

359
#if defined(HPX_HAVE_THREAD_STEALING_COUNTS)
360
        virtual std::int64_t get_num_pending_misses(
361
            std::size_t /*thread_num*/, bool /*reset*/)
362
        {
363
            return 0;
364
        }
365
        virtual std::int64_t get_num_pending_accesses(
366
            std::size_t /*thread_num*/, bool /*reset*/)
367
        {
368
            return 0;
369
        }
370

371
        virtual std::int64_t get_num_stolen_from_pending(
372
            std::size_t /*thread_num*/, bool /*reset*/)
373
        {
374
            return 0;
375
        }
376
        virtual std::int64_t get_num_stolen_to_pending(
377
            std::size_t /*thread_num*/, bool /*reset*/)
378
        {
379
            return 0;
380
        }
381
        virtual std::int64_t get_num_stolen_from_staged(
382
            std::size_t /*thread_num*/, bool /*reset*/)
383
        {
384
            return 0;
385
        }
386
        virtual std::int64_t get_num_stolen_to_staged(
387
            std::size_t /*thread_num*/, bool /*reset*/)
388
        {
389
            return 0;
390
        }
391
#endif
392
        virtual std::int64_t get_thread_count(thread_schedule_state /*state*/,
×
393
            thread_priority /*priority*/, std::size_t /*num_thread*/,
394
            bool /*reset*/)
395
        {
396
            return 0;
×
397
        }
398

399
        virtual std::int64_t get_idle_core_count() const
×
400
        {
401
            return 0;
×
402
        }
403

404
        virtual void get_idle_core_mask(mask_type&) const {}
×
405

406
        virtual std::int64_t get_background_thread_count() const
×
407
        {
408
            return 0;
×
409
        }
410

411
        std::int64_t get_thread_count_unknown(
1,111,083✔
412
            std::size_t num_thread, bool reset)
413
        {
414
            return get_thread_count(thread_schedule_state::unknown,
1,111,083✔
415
                thread_priority::default_, num_thread, reset);
1,111,083✔
416
        }
417
        std::int64_t get_thread_count_active(std::size_t num_thread, bool reset)
3✔
418
        {
419
            return get_thread_count(thread_schedule_state::active,
3✔
420
                thread_priority::default_, num_thread, reset);
3✔
421
        }
422
        std::int64_t get_thread_count_pending(
3✔
423
            std::size_t num_thread, bool reset)
424
        {
425
            return get_thread_count(thread_schedule_state::pending,
3✔
426
                thread_priority::default_, num_thread, reset);
3✔
427
        }
428
        std::int64_t get_thread_count_suspended(
3✔
429
            std::size_t num_thread, bool reset)
430
        {
431
            return get_thread_count(thread_schedule_state::suspended,
3✔
432
                thread_priority::default_, num_thread, reset);
3✔
433
        }
434
        std::int64_t get_thread_count_terminated(
3✔
435
            std::size_t num_thread, bool reset)
436
        {
437
            return get_thread_count(thread_schedule_state::terminated,
3✔
438
                thread_priority::default_, num_thread, reset);
3✔
439
        }
440
        std::int64_t get_thread_count_staged(std::size_t num_thread, bool reset)
3✔
441
        {
442
            return get_thread_count(thread_schedule_state::staged,
3✔
443
                thread_priority::default_, num_thread, reset);
3✔
444
        }
445

446
        virtual std::int64_t get_scheduler_utilization() const = 0;
447

448
        virtual std::int64_t get_idle_loop_count(
449
            std::size_t num, bool reset) = 0;
450
        virtual std::int64_t get_busy_loop_count(
451
            std::size_t num, bool reset) = 0;
452

453
        ///////////////////////////////////////////////////////////////////////
454
        virtual bool enumerate_threads(
×
455
            hpx::function<bool(thread_id_type)> const& /*f*/,
456
            thread_schedule_state /*state*/ =
457
                thread_schedule_state::unknown) const
458
        {
459
            return false;
×
460
        }
461

462
        virtual void reset_thread_distribution() {}
×
463

464
        virtual void abort_all_suspended_threads() {}
×
465
        virtual bool cleanup_terminated(bool /*delete_all*/)
×
466
        {
467
            return false;
×
468
        }
469

470
        virtual hpx::state get_state() const = 0;
471
        virtual hpx::state get_state(std::size_t num_thread) const = 0;
472

473
        virtual bool has_reached_state(hpx::state s) const = 0;
474

475
        virtual void do_some_work(std::size_t /*num_thread*/) {}
×
476

477
        virtual void report_error(
×
478
            std::size_t global_thread_num, std::exception_ptr const& e)
479
        {
480
            notifier_.on_error(global_thread_num, e);
×
481
        }
×
482

483
        double timestamp_scale() const noexcept
56✔
484
        {
485
            return timestamp_scale_;
56✔
486
        }
487
        /// \endcond
488

489
    protected:
490
        /// \cond NOINTERNAL
491
        void init_pool_time_scale();
492
        /// \endcond
493

494
    protected:
495
        /// \cond NOINTERNAL
496
        pool_id_type id_;
497

498
        // The thread_offset is equal to the accumulated number of
499
        // threads in all pools preceding this pool
500
        // in the thread indexation. That means, that in order to know
501
        // the global index of a thread it owns, the pool has to compute:
502
        // global index = thread_offset_ + local index.
503
        std::size_t thread_offset_;
504

505
        policies::detail::affinity_data const& affinity_data_;
506

507
        // scale timestamps to nanoseconds
508
        double timestamp_scale_;
509

510
        // callback functions to invoke at start, stop, and error
511
        threads::policies::callback_notifier& notifier_;
512
        /// \endcond
513
    };
514

515
    HPX_CORE_EXPORT std::ostream& operator<<(
516
        std::ostream& os, thread_pool_base const& thread_pool);
517
}    // namespace hpx::threads
518

519
#include <hpx/config/warnings_suffix.hpp>
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc