• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.25
/libs/core/threading_base/include/hpx/threading_base/thread_pool_base.hpp
1
//  Copyright (c)      2018 Mikael Simberg
2
//  Copyright (c) 2007-2024 Hartmut Kaiser
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
#pragma once
9

10
#include <hpx/config.hpp>
11
#include <hpx/modules/affinity.hpp>
12
#include <hpx/modules/errors.hpp>
13
#include <hpx/modules/functional.hpp>
14
#include <hpx/modules/timing.hpp>
15
#include <hpx/modules/topology.hpp>
16
#include <hpx/threading_base/callback_notifier.hpp>
17
#include <hpx/threading_base/detail/get_default_pool.hpp>
18
#include <hpx/threading_base/network_background_callback.hpp>
19
#include <hpx/threading_base/scheduler_mode.hpp>
20
#include <hpx/threading_base/scheduler_state.hpp>
21
#include <hpx/threading_base/thread_init_data.hpp>
22

23
#include <cstddef>
24
#include <cstdint>
25
#include <exception>
26
#include <iosfwd>
27
#include <memory>
28
#include <mutex>
29
#include <string>
30
#include <thread>
31

32
#include <hpx/config/warnings_prefix.hpp>
33

34
namespace hpx::threads {
35

36
    ///////////////////////////////////////////////////////////////////////////
37
    /// \cond NOINTERNAL
38
    HPX_CXX_EXPORT struct pool_id_type
39
    {
1,302✔
40
        pool_id_type(std::size_t index, std::string name) noexcept
41
          : index_(index)
42
          , name_(HPX_MOVE(name))
64✔
43
        {
64✔
44
        }
45

46
        std::size_t index() const noexcept
47
        {
48
            return index_;
49
        }
258✔
50

51
        std::string const& name() const noexcept
52
        {
53
            return name_;
54
        }
64✔
55

56
    private:
57
        std::size_t const index_;
58
        std::string const name_;
59
    };
60
    /// \endcond
61

62
    HPX_CXX_EXPORT struct thread_pool_init_parameters
63
    {
64
        std::string const& name_;
65
        std::size_t index_;
66
        policies::scheduler_mode mode_;
67
        std::size_t num_threads_;
68
        std::size_t thread_offset_;
69
        hpx::threads::policies::callback_notifier& notifier_;
70
        hpx::threads::policies::detail::affinity_data const& affinity_data_;
71
        hpx::threads::detail::network_background_callback_type const&
72
            network_background_callback_;
73
        std::size_t max_background_threads_;
74
        std::size_t max_idle_loop_count_;
75
        std::size_t max_busy_loop_count_;
76
        std::size_t shutdown_check_count_;
77

78
        thread_pool_init_parameters(std::string const& name, std::size_t index,
79
            policies::scheduler_mode mode, std::size_t num_threads,
80
            std::size_t thread_offset,
81
            hpx::threads::policies::callback_notifier& notifier,
82
            hpx::threads::policies::detail::affinity_data const& affinity_data,
83
            hpx::threads::detail::network_background_callback_type const&
84
                network_background_callback =
85
                    hpx::threads::detail::network_background_callback_type(),
86
            std::size_t max_background_threads = static_cast<std::size_t>(-1),
87
            std::size_t max_idle_loop_count = HPX_IDLE_LOOP_COUNT_MAX,
88
            std::size_t max_busy_loop_count = HPX_BUSY_LOOP_COUNT_MAX,
89
            std::size_t shutdown_check_count = 10)
90
          : name_(name)
91
          , index_(index)
64✔
92
          , mode_(mode)
64✔
93
          , num_threads_(num_threads)
64✔
94
          , thread_offset_(thread_offset)
64✔
95
          , notifier_(notifier)
64✔
96
          , affinity_data_(affinity_data)
64✔
97
          , network_background_callback_(network_background_callback)
64✔
98
          , max_background_threads_(max_background_threads)
64✔
99
          , max_idle_loop_count_(max_idle_loop_count)
64✔
100
          , max_busy_loop_count_(max_busy_loop_count)
64✔
101
          , shutdown_check_count_(shutdown_check_count)
64✔
102
        {
64✔
103
        }
104
    };
105

106
    ///////////////////////////////////////////////////////////////////////////
107
    // note: this data structure has to be protected from races from the outside
108

109
    /// \brief The base class used to manage a pool of OS threads.
110
    HPX_CXX_EXPORT class thread_pool_base
111
    {
112
    public:
113
        /// \cond NOINTERNAL
114
        explicit thread_pool_base(thread_pool_init_parameters const& init);
115

116
        thread_pool_base(thread_pool_base const&) = delete;
117
        thread_pool_base(thread_pool_base&&) = delete;
118
        thread_pool_base& operator=(thread_pool_base const&) = delete;
119
        thread_pool_base& operator=(thread_pool_base&&) = delete;
120

121
        virtual ~thread_pool_base() = default;
122

64✔
123
        virtual void init(std::size_t num_threads, std::size_t threads_offset);
124

125
        virtual bool run(
126
            std::unique_lock<std::mutex>& l, std::size_t num_threads) = 0;
127

128
        virtual void stop(
129
            std::unique_lock<std::mutex>& l, bool blocking = true) = 0;
130

131
        virtual void wait() = 0;
132
        virtual bool is_busy() = 0;
133
        virtual bool is_idle() = 0;
134

135
        virtual void print_pool(std::ostream&) const = 0;
136

137
        pool_id_type get_pool_id() const
138
        {
139
            return id_;
140
        }
×
141
        /// \endcond
142

143
        /// Suspends the given processing unit. Blocks until the processing unit
144
        /// has been suspended.
145
        ///
146
        /// \param virt_core   [in] The processing unit on the pool to be
147
        ///                    suspended. The processing units are indexed
148
        ///                    starting from 0.
149
        /// \param ec [in,out] this represents the error status on exit, if this
150
        ///           is pre-initialized to \a hpx#throws the function will
151
        ///           throw on error instead.
152
        virtual void suspend_processing_unit_direct(
153
            std::size_t virt_core, error_code& ec = throws) = 0;
154

155
        /// Resumes the given processing unit. Blocks until the processing unit
156
        /// has been resumed.
157
        ///
158
        /// \param virt_core   [in] The processing unit on the pool to be resumed.
159
        ///                    The processing units are indexed starting from 0.
160
        /// \param ec [in,out] this represents the error status on exit, if this
161
        ///           is pre-initialized to \a hpx#throws the function will
162
        ///           throw on error instead.
163
        virtual void resume_processing_unit_direct(
164
            std::size_t virt_core, error_code& ec = throws) = 0;
165

166
        /// Resumes the thread pool. Blocks until all OS threads on the thread pool
167
        /// have been resumed.
168
        ///
169
        /// \param ec [in,out] this represents the error status on exit, if this
170
        ///           is pre-initialized to \a hpx#throws the function will
171
        ///           throw on error instead.
172
        virtual void resume_direct(error_code& ec = throws) = 0;
173

174
        /// Suspends the thread pool. Blocks until all OS threads on the thread pool
175
        /// have been suspended.
176
        ///
177
        /// \note A thread pool cannot be suspended from an HPX thread running
178
        ///       on the pool itself.
179
        ///
180
        /// \param ec [in,out] this represents the error status on exit, if this
181
        ///           is pre-initialized to \a hpx#throws the function will
182
        ///           throw on error instead.
183
        ///
184
        /// \throws hpx::exception if called from an HPX thread which is running
185
        ///         on the pool itself.
186
        virtual void suspend_direct(error_code& ec = throws) = 0;
187

188
    public:
189
        /// \cond NOINTERNAL
190
        virtual std::size_t get_os_thread_count() const = 0;
191

192
        virtual std::thread& get_os_thread_handle(std::size_t num_thread) = 0;
193

194
        virtual std::size_t get_active_os_thread_count() const;
195

196
        virtual void create_thread(
197
            thread_init_data& data, thread_id_ref_type& id, error_code& ec) = 0;
198
        virtual thread_id_ref_type create_work(
199
            thread_init_data& data, error_code& ec) = 0;
200

201
        virtual thread_state set_state(thread_id_type const& id,
202
            thread_schedule_state new_state, thread_restart_state new_state_ex,
203
            thread_priority priority, error_code& ec) = 0;
204

205
        virtual thread_id_ref_type set_state(
206
            hpx::chrono::steady_time_point const& abs_time,
207
            thread_id_type const& id, thread_schedule_state newstate,
208
            thread_restart_state newstate_ex, thread_priority priority,
209
            error_code& ec) = 0;
210

211
        [[nodiscard]] std::size_t get_pool_index() const noexcept
212
        {
213
            return id_.index();
214
        }
215

216
        [[nodiscard]] std::string const& get_pool_name() const noexcept
217
        {
218
            return id_.name();
219
        }
220

221
        [[nodiscard]] std::size_t get_thread_offset() const noexcept
222
        {
223
            return thread_offset_;
224
        }
1✔
225

226
        virtual policies::scheduler_base* get_scheduler() const
227
        {
×
228
            return nullptr;
229
        }
×
230

231
        mask_type get_used_processing_units(bool full_cores = false) const;
232
        mask_type get_used_processing_units(
233
            std::size_t num_cores, bool full_cores = false) const;
234
        mask_type get_used_processing_unit(
235
            std::size_t thread_num, bool full_cores = false) const;
236

237
        hwloc_bitmap_ptr get_numa_domain_bitmap() const;
238

239
        // performance counters
240
#if defined(HPX_HAVE_THREAD_CUMULATIVE_COUNTS)
241
        virtual std::int64_t get_executed_threads(
242
            std::size_t /*thread_num*/, bool /*reset*/)
×
243
        {
244
            return 0;
245
        }
×
246
        virtual std::int64_t get_executed_thread_phases(
247
            std::size_t /*thread_num*/, bool /*reset*/)
×
248
        {
249
            return 0;
250
        }
×
251
#if defined(HPX_HAVE_THREAD_IDLE_RATES)
252
        virtual std::int64_t get_thread_phase_duration(
253
            std::size_t /*thread_num*/, bool /*reset*/)
254
        {
255
            return 0;
256
        }
257
        virtual std::int64_t get_thread_duration(
258
            std::size_t /*thread_num*/, bool /*reset*/)
259
        {
260
            return 0;
261
        }
262
        virtual std::int64_t get_thread_phase_overhead(
263
            std::size_t /*thread_num*/, bool /*reset*/)
264
        {
265
            return 0;
266
        }
267
        virtual std::int64_t get_thread_overhead(
268
            std::size_t /*thread_num*/, bool /*reset*/)
269
        {
270
            return 0;
271
        }
272
        virtual std::int64_t get_cumulative_thread_duration(
273
            std::size_t /*thread_num*/, bool /*reset*/)
274
        {
275
            return 0;
276
        }
277
        virtual std::int64_t get_cumulative_thread_overhead(
278
            std::size_t /*thread_num*/, bool /*reset*/)
279
        {
280
            return 0;
281
        }
282
#endif
283
#endif
284
        virtual std::int64_t get_cumulative_duration(
285
            std::size_t /*thread_num*/, bool /*reset*/)
×
286
        {
287
            return 0;
288
        }
×
289

290
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
291
    defined(HPX_HAVE_THREAD_IDLE_RATES)
292
        virtual std::int64_t get_background_work_duration(
293
            std::size_t /*thread_num*/, bool /*reset*/)
294
        {
295
            return 0;
296
        }
297
        virtual std::int64_t get_background_overhead(
298
            std::size_t /*thread_num*/, bool /*reset*/)
299
        {
300
            return 0;
301
        }
302

303
        virtual std::int64_t get_background_send_duration(
304
            std::size_t /*thread_num*/, bool /*reset*/)
305
        {
306
            return 0;
307
        }
308
        virtual std::int64_t get_background_send_overhead(
309
            std::size_t /*thread_num*/, bool /*reset*/)
310
        {
311
            return 0;
312
        }
313

314
        virtual std::int64_t get_background_receive_duration(
315
            std::size_t /*thread_num*/, bool /*reset*/)
316
        {
317
            return 0;
318
        }
319
        virtual std::int64_t get_background_receive_overhead(
320
            std::size_t /*thread_num*/, bool /*reset*/)
321
        {
322
            return 0;
323
        }
324
#endif    // HPX_HAVE_BACKGROUND_THREAD_COUNTERS
325

326
#if defined(HPX_HAVE_THREAD_IDLE_RATES)
327
        virtual std::int64_t avg_idle_rate_all(bool /*reset*/) noexcept
328
        {
329
            return 0;
330
        }
331
        virtual std::int64_t avg_idle_rate(std::size_t, bool) noexcept
332
        {
333
            return 0;
334
        }
335

336
#if defined(HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES)
337
        virtual std::int64_t avg_creation_idle_rate(
338
            std::size_t /*thread_num*/, bool /*reset*/) noexcept
339
        {
340
            return 0;
341
        }
342
        virtual std::int64_t avg_cleanup_idle_rate(
343
            std::size_t /*thread_num*/, bool /*reset*/) noexcept
344
        {
345
            return 0;
346
        }
347
#endif
348
#endif
349
        virtual std::int64_t get_queue_length(std::size_t, bool)
350
        {
×
351
            return 0;
352
        }
×
353

354
#if defined(HPX_HAVE_THREAD_QUEUE_WAITTIME)
355
        virtual std::int64_t get_average_thread_wait_time(
356
            std::size_t /*thread_num*/, bool /*reset*/)
357
        {
358
            return 0;
359
        }
360
        virtual std::int64_t get_average_task_wait_time(
361
            std::size_t /*thread_num*/, bool /*reset*/)
362
        {
363
            return 0;
364
        }
365
#endif
366

367
#if defined(HPX_HAVE_THREAD_STEALING_COUNTS)
368
        virtual std::int64_t get_num_pending_misses(
369
            std::size_t /*thread_num*/, bool /*reset*/)
370
        {
371
            return 0;
372
        }
373
        virtual std::int64_t get_num_pending_accesses(
374
            std::size_t /*thread_num*/, bool /*reset*/)
375
        {
376
            return 0;
377
        }
378

379
        virtual std::int64_t get_num_stolen_from_pending(
380
            std::size_t /*thread_num*/, bool /*reset*/)
381
        {
382
            return 0;
383
        }
384
        virtual std::int64_t get_num_stolen_to_pending(
385
            std::size_t /*thread_num*/, bool /*reset*/)
386
        {
387
            return 0;
388
        }
389
        virtual std::int64_t get_num_stolen_from_staged(
390
            std::size_t /*thread_num*/, bool /*reset*/)
391
        {
392
            return 0;
393
        }
394
        virtual std::int64_t get_num_stolen_to_staged(
395
            std::size_t /*thread_num*/, bool /*reset*/)
396
        {
397
            return 0;
398
        }
399
#endif
400
        virtual std::int64_t get_thread_count(thread_schedule_state /*state*/,
401
            thread_priority /*priority*/, std::size_t /*num_thread*/,
×
402
            bool /*reset*/)
403
        {
404
            return 0;
405
        }
×
406

407
        virtual std::int64_t get_idle_core_count() const
408
        {
×
409
            return 0;
410
        }
×
411

412
        virtual void get_idle_core_mask(mask_type&) const {}
413

×
414
        virtual std::int64_t get_background_thread_count() const
415
        {
×
416
            return 0;
417
        }
×
418

419
        std::int64_t get_thread_count_unknown(
420
            std::size_t num_thread, bool reset);
×
421
        std::int64_t get_thread_count_active(
422
            std::size_t num_thread, bool reset);
423
        std::int64_t get_thread_count_pending(
3,580✔
424
            std::size_t num_thread, bool reset);
×
425
        std::int64_t get_thread_count_suspended(
426
            std::size_t num_thread, bool reset);
×
427
        std::int64_t get_thread_count_terminated(
428
            std::size_t num_thread, bool reset);
×
429
        std::int64_t get_thread_count_staged(
×
430
            std::size_t num_thread, bool reset);
431

×
432
        virtual std::int64_t get_scheduler_utilization() const = 0;
433

434
        virtual std::int64_t get_idle_loop_count(
×
435
            std::size_t num, bool reset) = 0;
×
436
        virtual std::int64_t get_busy_loop_count(
437
            std::size_t num, bool reset) = 0;
×
438

439
        ///////////////////////////////////////////////////////////////////////
440
        virtual bool enumerate_threads(
×
441
            hpx::function<bool(thread_id_type)> const& /*f*/,
×
442
            thread_schedule_state /*state*/ =
443
                thread_schedule_state::unknown) const
×
444
        {
445
            return false;
446
        }
×
447

×
448
        virtual void reset_thread_distribution() noexcept {}
449

×
450
        virtual void abort_all_suspended_threads() {}
451
        virtual bool cleanup_terminated(bool /*delete_all*/)
×
452
        {
×
453
            return false;
454
        }
455

456
        virtual hpx::state get_state() const = 0;
457
        virtual hpx::state get_state(std::size_t num_thread) const = 0;
458

459
        virtual bool has_reached_state(hpx::state s) const = 0;
460

461
        virtual void do_some_work(std::size_t /*num_thread*/) {}
462

463
        virtual bool report_error(
×
464
            std::size_t global_thread_num, std::exception_ptr const& e)
465
        {
466
            return notifier_.on_error(global_thread_num, e);
467
        }
468

×
469
        [[nodiscard]] double timestamp_scale() const noexcept
470
        {
471
            return timestamp_scale_;
×
472
        }
473
        /// \endcond
×
474

×
475
    protected:
476
        /// \cond NOINTERNAL
×
477
        void init_pool_time_scale();
478
        /// \endcond
479

480
    protected:
481
        /// \cond NOINTERNAL
482
        pool_id_type id_;
483

484
        // The thread_offset is equal to the accumulated number of
×
485
        // threads in all pools preceding this pool
486
        // in the thread indexation. That means, that in order to know
×
487
        // the global index of a thread it owns, the pool has to compute:
488
        // global index = thread_offset_ + local index.
489
        std::size_t thread_offset_;
×
490

491
        policies::detail::affinity_data const& affinity_data_;
492

493
        // scale timestamps to nanoseconds
494
        double timestamp_scale_;
495

496
        // callback functions to invoke at start, stop, and error
497
        threads::policies::callback_notifier& notifier_;
498
        /// \endcond
499
    };
500

501
    HPX_CXX_EXPORT HPX_CORE_EXPORT std::ostream& operator<<(
502
        std::ostream& os, thread_pool_base const& thread_pool);
503
}    // namespace hpx::threads
504

505
#include <hpx/config/warnings_suffix.hpp>
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc