• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.21
/libs/core/threadmanager/src/threadmanager.cpp
1
//  Copyright (c) 2007-2024 Hartmut Kaiser
2
//  Copyright (c)      2011 Bryce Lelbach, Katelyn Kufahl
3
//  Copyright (c) 2008-2009 Chirag Dekate, Anshul Tandon
4
//  Copyright (c) 2015 Patricia Grubel
5
//  Copyright (c) 2017 Shoshana Jakobovits
6
//
7
//  SPDX-License-Identifier: BSL-1.0
8
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
9
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
10

11
#include <hpx/config.hpp>
12
#include <hpx/assert.hpp>
13
#include <hpx/modules/async_combinators.hpp>
14
#include <hpx/modules/errors.hpp>
15
#include <hpx/modules/execution.hpp>
16
#include <hpx/modules/execution_base.hpp>
17
#include <hpx/modules/futures.hpp>
18
#include <hpx/modules/io_service.hpp>
19
#include <hpx/modules/logging.hpp>
20
#include <hpx/modules/resource_partitioner.hpp>
21
#include <hpx/modules/runtime_configuration.hpp>
22
#include <hpx/modules/schedulers.hpp>
23
#include <hpx/modules/thread_pool_util.hpp>
24
#include <hpx/modules/thread_pools.hpp>
25
#include <hpx/modules/threading_base.hpp>
26
#include <hpx/modules/threadmanager.hpp>
27
#include <hpx/modules/timing.hpp>
28
#include <hpx/modules/topology.hpp>
29
#include <hpx/modules/type_support.hpp>
30
#include <hpx/modules/util.hpp>
31

32
#include <cstddef>
33
#include <cstdint>
34
#include <exception>
35
#include <functional>
36
#include <iosfwd>
37
#include <memory>
38
#include <mutex>
39
#include <numeric>
40
#include <string>
41
#include <utility>
42
#include <vector>
43

44
namespace hpx::threads {
45

46
    namespace detail {
47
        static void check_num_high_priority_queues(
48
            std::size_t const num_threads,
49
            std::size_t const num_high_priority_queues)
50
        {
64✔
51
            if (num_high_priority_queues > num_threads)
52
            {
53
                throw hpx::detail::command_line_error(
64✔
54
                    "Invalid command line option: number of high priority "
55
                    "threads (--hpx:high-priority-threads), should not be "
56
                    "larger than number of threads (--hpx:threads)");
57
            }
58
        }
59
    }    // namespace detail
×
60

61
    ///////////////////////////////////////////////////////////////////////////
64✔
62
    threadmanager::threadmanager(hpx::util::runtime_configuration& rtcfg,
63
#ifdef HPX_HAVE_TIMER_POOL
64
        util::io_service_pool& timer_pool,
65
#endif
64✔
66
        notification_policy_type& notifier,
67
        detail::network_background_callback_type network_background_callback)
68
      : rtcfg_(rtcfg)
69
#ifdef HPX_HAVE_TIMER_POOL
70
      , timer_pool_(timer_pool)
64✔
71
#endif
64✔
72
      , notifier_(notifier)
73
      , network_background_callback_(HPX_MOVE(network_background_callback))
64✔
74
    {
75
        using placeholders::_1;
64✔
76
        using placeholders::_3;
64✔
77

78
        // Add callbacks local to threadmanager.
79
        notifier.add_on_start_thread_callback(
80
            hpx::bind(&threadmanager::init_tss, _1));
81
        notifier.add_on_stop_thread_callback(
82
            hpx::bind(&threadmanager::deinit_tss));
128✔
83

84
        auto& rp = hpx::resource::get_partitioner();
64✔
85
        notifier.add_on_start_thread_callback(hpx::bind(
86
            &resource::detail::partitioner::assign_pu, std::ref(rp), _3, _1));
87
        notifier.add_on_stop_thread_callback(hpx::bind(
64✔
88
            &resource::detail::partitioner::unassign_pu, std::ref(rp), _3, _1));
128✔
89
    }
90

64✔
91
    policies::thread_queue_init_parameters threadmanager::get_init_parameters()
92
        const
64✔
93
    {
94
        std::int64_t const max_thread_count =
64✔
95
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
96
                "hpx.thread_queue.max_thread_count",
97
                HPX_THREAD_QUEUE_MAX_THREAD_COUNT);
98
        std::int64_t const min_tasks_to_steal_pending =
64✔
99
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
100
                "hpx.thread_queue.min_tasks_to_steal_pending",
64✔
101
                HPX_THREAD_QUEUE_MIN_TASKS_TO_STEAL_PENDING);
102
        std::int64_t const min_tasks_to_steal_staged =
64✔
103
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
104
                "hpx.thread_queue.min_tasks_to_steal_staged",
64✔
105
                HPX_THREAD_QUEUE_MIN_TASKS_TO_STEAL_STAGED);
106
        std::int64_t const min_add_new_count =
64✔
107
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
108
                "hpx.thread_queue.min_add_new_count",
64✔
109
                HPX_THREAD_QUEUE_MIN_ADD_NEW_COUNT);
110
        std::int64_t const max_add_new_count =
64✔
111
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
112
                "hpx.thread_queue.max_add_new_count",
64✔
113
                HPX_THREAD_QUEUE_MAX_ADD_NEW_COUNT);
114
        std::int64_t const min_delete_count =
64✔
115
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
116
                "hpx.thread_queue.min_delete_count",
64✔
117
                HPX_THREAD_QUEUE_MIN_DELETE_COUNT);
118
        std::int64_t const max_delete_count =
64✔
119
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
120
                "hpx.thread_queue.max_delete_count",
64✔
121
                HPX_THREAD_QUEUE_MAX_DELETE_COUNT);
122
        std::int64_t const max_terminated_threads =
64✔
123
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
124
                "hpx.thread_queue.max_terminated_threads",
64✔
125
                HPX_THREAD_QUEUE_MAX_TERMINATED_THREADS);
126
        std::int64_t const init_threads_count =
64✔
127
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
128
                "hpx.thread_queue.init_threads_count",
64✔
129
                HPX_THREAD_QUEUE_INIT_THREADS_COUNT);
130
        double const max_idle_backoff_time = hpx::util::get_entry_as<double>(
64✔
131
            rtcfg_, "hpx.max_idle_backoff_time", HPX_IDLE_BACKOFF_TIME_MAX);
132

64✔
133
        std::ptrdiff_t const small_stacksize =
64✔
134
            rtcfg_.get_stack_size(thread_stacksize::small_);
64✔
135
        std::ptrdiff_t const medium_stacksize =
136
            rtcfg_.get_stack_size(thread_stacksize::medium);
137
        std::ptrdiff_t const large_stacksize =
64✔
138
            rtcfg_.get_stack_size(thread_stacksize::large);
139
        std::ptrdiff_t const huge_stacksize =
64✔
140
            rtcfg_.get_stack_size(thread_stacksize::huge);
141

64✔
142
        return policies::thread_queue_init_parameters(max_thread_count,
143
            min_tasks_to_steal_pending, min_tasks_to_steal_staged,
64✔
144
            min_add_new_count, max_add_new_count, min_delete_count,
145
            max_delete_count, max_terminated_threads, init_threads_count,
146
            max_idle_backoff_time, small_stacksize, medium_stacksize,
147
            large_stacksize, huge_stacksize);
148
    }
149

150
    void threadmanager::create_scheduler_user_defined(
64✔
151
        hpx::resource::scheduler_function const& pool_func,
152
        thread_pool_init_parameters const& thread_pool_init,
153
        policies::thread_queue_init_parameters const& thread_queue_init)
×
154
    {
155
        std::unique_ptr<thread_pool_base> pool(
156
            pool_func(thread_pool_init, thread_queue_init));
157
        pools_.push_back(HPX_MOVE(pool));
158
    }
159

×
160
    void threadmanager::create_scheduler_local(
×
161
        thread_pool_init_parameters const& thread_pool_init,
×
162
        policies::thread_queue_init_parameters const& thread_queue_init,
163
        std::size_t const numa_sensitive)
×
164
    {
165
        // instantiate the scheduler
166
        using local_sched_type =
167
            hpx::threads::policies::local_queue_scheduler<>;
168

169
        local_sched_type::init_parameter_type init(
170
            thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
171
            thread_queue_init, "core-local_queue_scheduler");
172

173
        auto sched = std::make_unique<local_sched_type>(init);
×
174
        auto const full_mask =
×
175
            hpx::resource::get_partitioner().get_pool_pus_mask(
176
                thread_pool_init.name_);
177

×
178
        // set the default scheduler flags
179
        sched->set_scheduler_mode(thread_pool_init.mode_, full_mask);
180

×
181
        // conditionally set/unset this flag
182
        sched->update_scheduler_mode(
183
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive,
×
184
            full_mask);
185

186
        // instantiate the pool
187
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
188
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
×
189
            HPX_MOVE(sched), thread_pool_init);
190
        pools_.push_back(HPX_MOVE(pool));
×
191
    }
×
192

193
    void threadmanager::create_scheduler_local_priority_fifo(
64✔
194
        thread_pool_init_parameters const& thread_pool_init,
195
        policies::thread_queue_init_parameters const& thread_queue_init,
196
        std::size_t const numa_sensitive)
197
    {
198
        // set parameters for scheduler and pool instantiation and perform
199
        // compatibility checks
200
        std::size_t const num_high_priority_queues =
201
            hpx::util::get_entry_as<std::size_t>(rtcfg_,
64✔
202
                "hpx.thread_queue.high_priority_queues",
203
                thread_pool_init.num_threads_);
64✔
204

205
        detail::check_num_high_priority_queues(
64✔
206
            thread_pool_init.num_threads_, num_high_priority_queues);
64✔
207

208
        // instantiate the scheduler
209
        using local_sched_type =
210
            hpx::threads::policies::local_priority_queue_scheduler<std::mutex,
211
                hpx::threads::policies::lockfree_fifo>;
212

213
        local_sched_type::init_parameter_type init(
214
            thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
64✔
215
            num_high_priority_queues, thread_queue_init,
216
            "core-local_priority_queue_scheduler-fifo");
64✔
217

218
        auto sched = std::make_unique<local_sched_type>(init);
219
        auto const full_mask =
64✔
220
            hpx::resource::get_partitioner().get_pool_pus_mask(
221
                thread_pool_init.name_);
222

64✔
223
        // set the default scheduler flags
224
        sched->set_scheduler_mode(thread_pool_init.mode_, full_mask);
225

64✔
226
        // conditionally set/unset this flag
227
        sched->update_scheduler_mode(
228
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive,
229
            full_mask);
230

64✔
231
        // instantiate the pool
232
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
64✔
233
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
64✔
234
            HPX_MOVE(sched), thread_pool_init);
235
        pools_.push_back(HPX_MOVE(pool));
×
236
    }
237

238
    void threadmanager::create_scheduler_local_priority_lifo(
239
        [[maybe_unused]] thread_pool_init_parameters const& thread_pool_init,
240
        [[maybe_unused]] policies::thread_queue_init_parameters const&
241
            thread_queue_init,
242
        [[maybe_unused]] std::size_t numa_sensitive)
243
    {
244
#if defined(HPX_HAVE_CXX11_STD_ATOMIC_128BIT)
245
        // set parameters for scheduler and pool instantiation and perform
×
246
        // compatibility checks
247
        std::size_t const num_high_priority_queues =
×
248
            hpx::util::get_entry_as<std::size_t>(rtcfg_,
×
249
                "hpx.thread_queue.high_priority_queues",
×
250
                thread_pool_init.num_threads_);
251
        detail::check_num_high_priority_queues(
252
            thread_pool_init.num_threads_, num_high_priority_queues);
253

254
        // instantiate the scheduler
255
        using local_sched_type =
256
            hpx::threads::policies::local_priority_queue_scheduler<std::mutex,
257
                hpx::threads::policies::lockfree_lifo>;
×
258

259
        local_sched_type::init_parameter_type init(
×
260
            thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
261
            num_high_priority_queues, thread_queue_init,
262
            "core-local_priority_queue_scheduler-lifo");
×
263

264
        auto sched = std::make_unique<local_sched_type>(init);
265
        auto const full_mask =
×
266
            hpx::resource::get_partitioner().get_pool_pus_mask(
267
                thread_pool_init.name_);
268

×
269
        // set the default scheduler flags
270
        sched->set_scheduler_mode(thread_pool_init.mode_, full_mask);
271

272
        // conditionally set/unset this flag
273
        sched->update_scheduler_mode(
×
274
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive,
275
            full_mask);
×
276

277
        // instantiate the pool
278
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
279
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
280
            HPX_MOVE(sched), thread_pool_init);
281
        pools_.push_back(HPX_MOVE(pool));
282
#else
×
283
        throw hpx::detail::command_line_error(
284
            "Command line option --hpx:queuing=local-priority-lifo "
×
285
            "is not configured in this build. Please make sure 128bit "
286
            "atomics are available.");
287
#endif
288
    }
289

290
    void threadmanager::create_scheduler_static(
×
291
        thread_pool_init_parameters const& thread_pool_init,
292
        policies::thread_queue_init_parameters const& thread_queue_init,
×
293
        std::size_t const numa_sensitive)
×
294
    {
295
        // instantiate the scheduler
×
296
        std::unique_ptr<thread_pool_base> pool;
297
        hpx::threads::policies::local_queue_scheduler<>::init_parameter_type
298
            init(thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
299
                thread_queue_init, "core-static_queue_scheduler");
300

301
        auto const full_mask =
302
            hpx::resource::get_partitioner().get_pool_pus_mask(
×
303
                thread_pool_init.name_);
304

305
        if (thread_pool_init.mode_ &
306
            policies::scheduler_mode::do_background_work_only)
307
        {
308
            using local_sched_type =
309
                hpx::threads::policies::background_scheduler<>;
×
310

311
            auto sched = std::make_unique<local_sched_type>(init);
312

313
            // set the default scheduler flags
314
            sched->set_scheduler_mode(thread_pool_init.mode_, full_mask);
315

316
            // instantiate the pool
317
            pool = std::make_unique<
318
                hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
×
319
                HPX_MOVE(sched), thread_pool_init);
320
        }
321
        else
×
322
        {
323
            using local_sched_type =
324
                hpx::threads::policies::static_queue_scheduler<>;
×
325

326
            auto sched = std::make_unique<local_sched_type>(init);
327

328
            // set the default scheduler flags
329
            sched->set_scheduler_mode(thread_pool_init.mode_, full_mask);
330

×
331
            // conditionally set/unset this flag
332
            sched->update_scheduler_mode(
333
                policies::scheduler_mode::enable_stealing_numa, !numa_sensitive,
334
                full_mask);
×
335

×
336
            // instantiate the pool
337
            pool = std::make_unique<
×
338
                hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
339
                HPX_MOVE(sched), thread_pool_init);
340
        }
341

342
        pools_.push_back(HPX_MOVE(pool));
343
    }
344

345
    void threadmanager::create_scheduler_static_priority(
×
346
        thread_pool_init_parameters const& thread_pool_init,
347
        policies::thread_queue_init_parameters const& thread_queue_init,
×
348
        std::size_t const)
×
349
    {
×
350
        // set parameters for scheduler and pool instantiation and perform
351
        // compatibility checks
352
        std::size_t const num_high_priority_queues =
353
            hpx::util::get_entry_as<std::size_t>(rtcfg_,
354
                "hpx.thread_queue.high_priority_queues",
355
                thread_pool_init.num_threads_);
356
        detail::check_num_high_priority_queues(
×
357
            thread_pool_init.num_threads_, num_high_priority_queues);
358

×
359
        // instantiate the scheduler
360
        using local_sched_type =
361
            hpx::threads::policies::static_priority_queue_scheduler<>;
×
362

363
        local_sched_type::init_parameter_type init(
364
            thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
×
365
            num_high_priority_queues, thread_queue_init,
366
            "core-static_priority_queue_scheduler");
367

×
368
        auto sched = std::make_unique<local_sched_type>(init);
369
        auto const full_mask =
370
            hpx::resource::get_partitioner().get_pool_pus_mask(
371
                thread_pool_init.name_);
372

×
373
        // set the default scheduler flags
374
        sched->set_scheduler_mode(thread_pool_init.mode_, full_mask);
×
375

×
376
        // conditionally set/unset this flag
377
        sched->remove_scheduler_mode(policies::scheduler_mode::enable_stealing |
×
378
                policies::scheduler_mode::enable_stealing_numa,
379
            full_mask);
380

381
        // instantiate the pool
382
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
383
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
384
            HPX_MOVE(sched), thread_pool_init);
385
        pools_.push_back(HPX_MOVE(pool));
386
    }
387

×
388
    void threadmanager::create_scheduler_abp_priority_fifo(
389
        [[maybe_unused]] thread_pool_init_parameters const& thread_pool_init,
×
390
        [[maybe_unused]] policies::thread_queue_init_parameters const&
×
391
            thread_queue_init,
×
392
        [[maybe_unused]] std::size_t numa_sensitive)
393
    {
394
#if defined(HPX_HAVE_CXX11_STD_ATOMIC_128BIT)
395
        // set parameters for scheduler and pool instantiation and perform
396
        // compatibility checks
397
        std::size_t const num_high_priority_queues =
398
            hpx::util::get_entry_as<std::size_t>(rtcfg_,
399
                "hpx.thread_queue.high_priority_queues",
×
400
                thread_pool_init.num_threads_);
401
        detail::check_num_high_priority_queues(
×
402
            thread_pool_init.num_threads_, num_high_priority_queues);
403

404
        // instantiate the scheduler
×
405
        using local_sched_type =
406
            hpx::threads::policies::local_priority_queue_scheduler<std::mutex,
407
                hpx::threads::policies::lockfree_fifo>;
×
408

409
        local_sched_type::init_parameter_type init(
410
            thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
×
411
            num_high_priority_queues, thread_queue_init,
412
            "core-abp_priority_queue_scheduler-fifo");
413

414
        auto sched = std::make_unique<local_sched_type>(init);
415
        auto const full_mask =
×
416
            hpx::resource::get_partitioner().get_pool_pus_mask(
417
                thread_pool_init.name_);
×
418

419
        // set the default scheduler flags
420
        sched->set_scheduler_mode(thread_pool_init.mode_, full_mask);
421

422
        // conditionally set/unset this flag
423
        sched->update_scheduler_mode(
424
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive,
×
425
            full_mask);
426

×
427
        // instantiate the pool
428
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
429
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
430
            HPX_MOVE(sched), thread_pool_init);
431
        pools_.push_back(HPX_MOVE(pool));
432
#else
433
        throw hpx::detail::command_line_error(
434
            "Command line option --hpx:queuing=abp-priority-fifo "
435
            "is not configured in this build. Please make sure 128bit "
436
            "atomics are available.");
×
437
#endif
438
    }
×
439

×
440
    void threadmanager::create_scheduler_abp_priority_lifo(
×
441
        [[maybe_unused]] thread_pool_init_parameters const& thread_pool_init,
442
        [[maybe_unused]] policies::thread_queue_init_parameters const&
443
            thread_queue_init,
444
        [[maybe_unused]] std::size_t numa_sensitive)
445
    {
446
#if defined(HPX_HAVE_CXX11_STD_ATOMIC_128BIT)
447
        // set parameters for scheduler and pool instantiation and perform
448
        // compatibility checks
×
449
        std::size_t const num_high_priority_queues =
450
            hpx::util::get_entry_as<std::size_t>(rtcfg_,
×
451
                "hpx.thread_queue.high_priority_queues",
452
                thread_pool_init.num_threads_);
453
        detail::check_num_high_priority_queues(
×
454
            thread_pool_init.num_threads_, num_high_priority_queues);
455

456
        // instantiate the scheduler
×
457
        using local_sched_type =
458
            hpx::threads::policies::local_priority_queue_scheduler<std::mutex,
459
                hpx::threads::policies::lockfree_lifo>;
×
460

461
        local_sched_type::init_parameter_type init(
462
            thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
463
            num_high_priority_queues, thread_queue_init,
464
            "core-abp_priority_queue_scheduler-lifo");
×
465

466
        auto sched = std::make_unique<local_sched_type>(init);
×
467
        auto const full_mask =
468
            hpx::resource::get_partitioner().get_pool_pus_mask(
469
                thread_pool_init.name_);
470

471
        // set the default scheduler flags
472
        sched->set_scheduler_mode(thread_pool_init.mode_, full_mask);
473

×
474
        // conditionally set/unset this flag
475
        sched->update_scheduler_mode(
×
476
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive,
477
            full_mask);
478

479
        // instantiate the pool
480
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
481
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
482
            HPX_MOVE(sched), thread_pool_init);
483
        pools_.push_back(HPX_MOVE(pool));
484
#else
×
485
        throw hpx::detail::command_line_error(
486
            "Command line option --hpx:queuing=abp-priority-lifo "
×
487
            "is not configured in this build. Please make sure 128bit "
488
            "atomics are available.");
489
#endif
×
490
    }
491

492
    void threadmanager::create_scheduler_shared_priority(
×
493
        thread_pool_init_parameters const& thread_pool_init,
494
        policies::thread_queue_init_parameters const& thread_queue_init,
495
        std::size_t const numa_sensitive)
×
496
    {
497
        // instantiate the scheduler
498
        using local_sched_type =
499
            hpx::threads::policies::shared_priority_queue_scheduler<>;
500
        local_sched_type::init_parameter_type init(
×
501
            thread_pool_init.num_threads_, {1, 1, 1},
502
            thread_pool_init.affinity_data_, thread_queue_init,
×
503
            "core-shared_priority_queue_scheduler");
×
504

505
        auto const full_mask =
×
506
            hpx::resource::get_partitioner().get_pool_pus_mask(
507
                thread_pool_init.name_);
508
        auto sched = std::make_unique<local_sched_type>(init);
509

510
        // set the default scheduler flags
511
        sched->set_scheduler_mode(thread_pool_init.mode_, full_mask);
512

513
        // conditionally set/unset this flag
×
514
        sched->update_scheduler_mode(
515
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive,
×
516
            full_mask);
×
517

×
518
        // instantiate the pool
519
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
520
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
521
            HPX_MOVE(sched), thread_pool_init);
522
        pools_.push_back(HPX_MOVE(pool));
523
    }
524

×
525
    void threadmanager::create_scheduler_local_workrequesting_fifo(
526
        [[maybe_unused]] thread_pool_init_parameters const& thread_pool_init,
×
527
        [[maybe_unused]] policies::thread_queue_init_parameters const&
528
            thread_queue_init,
×
529
        [[maybe_unused]] std::size_t const numa_sensitive)
530
    {
531
#if defined(HPX_HAVE_WORK_REQUESTING_SCHEDULERS)
×
532
        // set parameters for scheduler and pool instantiation and perform
533
        // compatibility checks
534
        std::size_t const num_high_priority_queues =
×
535
            hpx::util::get_entry_as<std::size_t>(rtcfg_,
536
                "hpx.thread_queue.high_priority_queues",
537
                thread_pool_init.num_threads_);
538
        detail::check_num_high_priority_queues(
539
            thread_pool_init.num_threads_, num_high_priority_queues);
×
540

541
        // instantiate the scheduler
×
542
        using local_sched_type =
×
543
            hpx::threads::policies::local_workrequesting_scheduler<>;
544

×
545
        local_sched_type::init_parameter_type const init(
546
            thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
547
            num_high_priority_queues, thread_queue_init,
548
            "core-local_workrequesting_scheduler-fifo");
549

550
        auto sched = std::make_unique<local_sched_type>(init);
551
        auto const full_mask =
552
            hpx::resource::get_partitioner().get_pool_pus_mask(
×
553
                thread_pool_init.name_);
554

×
555
        // set the default scheduler flags
×
556
        sched->set_scheduler_mode(thread_pool_init.mode_, full_mask);
×
557

558
        // conditionally set/unset this flag
559
        sched->update_scheduler_mode(
560
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive,
561
            full_mask);
562

563
        // instantiate the pool
564
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
×
565
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
566
            HPX_MOVE(sched), thread_pool_init);
×
567
        pools_.push_back(HPX_MOVE(pool));
568
#else
×
569
        throw hpx::detail::command_line_error(
570
            "Command line option --hpx:queuing=local-workrequesting-fifo "
571
            "is not configured in this build. Please make sure "
×
572
            "HPX_WITH_WORK_REQUESTING_SCHEDULERS is set to ON");
573
#endif
574
    }
×
575

576
    void threadmanager::create_scheduler_local_workrequesting_mc(
577
        [[maybe_unused]] thread_pool_init_parameters const& thread_pool_init,
578
        [[maybe_unused]] policies::thread_queue_init_parameters const&
579
            thread_queue_init,
×
580
        [[maybe_unused]] std::size_t const numa_sensitive)
581
    {
×
582
#if defined(HPX_HAVE_WORK_REQUESTING_SCHEDULERS)
×
583
        // set parameters for scheduler and pool instantiation and perform
584
        // compatibility checks
64✔
585
        std::size_t const num_high_priority_queues =
586
            hpx::util::get_entry_as<std::size_t>(rtcfg_,
64✔
587
                "hpx.thread_queue.high_priority_queues",
64✔
588
                thread_pool_init.num_threads_);
589
        detail::check_num_high_priority_queues(
590
            thread_pool_init.num_threads_, num_high_priority_queues);
192✔
591

64✔
592
        // instantiate the scheduler
593
        using local_sched_type =
192✔
594
            hpx::threads::policies::local_workrequesting_scheduler<std::mutex,
64✔
595
                hpx::threads::policies::concurrentqueue_fifo>;
596

192✔
597
        local_sched_type::init_parameter_type const init(
64✔
598
            thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
599
            num_high_priority_queues, thread_queue_init,
600
            "core-local_workrequesting_scheduler-mc");
64✔
601

602
        auto sched = std::make_unique<local_sched_type>(init);
603
        auto const full_mask =
64✔
604
            hpx::resource::get_partitioner().get_pool_pus_mask(
605
                thread_pool_init.name_);
128✔
606

607
        // set the default scheduler flags
64✔
608
        sched->set_scheduler_mode(thread_pool_init.mode_, full_mask);
609

58✔
610
        // conditionally set/unset this flag
611
        sched->update_scheduler_mode(
612
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive,
613
            full_mask);
128✔
614

615
        // instantiate the pool
64✔
616
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
64✔
617
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
64✔
618
            HPX_MOVE(sched), thread_pool_init);
64✔
619
        pools_.push_back(HPX_MOVE(pool));
620
#else
64✔
621
        throw hpx::detail::command_line_error(
622
            "Command line option --hpx:queuing=local-workrequesting-mc "
623
            "is not configured in this build. Please make sure "
624
            "HPX_WITH_WORK_REQUESTING_SCHEDULERS is set to ON");
64✔
625
#endif
626
    }
64✔
627

628
    void threadmanager::create_scheduler_local_workrequesting_lifo(
×
629
        [[maybe_unused]] thread_pool_init_parameters const& thread_pool_init,
×
630
        [[maybe_unused]] policies::thread_queue_init_parameters const&
631
            thread_queue_init,
×
632
        [[maybe_unused]] std::size_t numa_sensitive)
×
633
    {
634
#if defined(HPX_HAVE_WORK_REQUESTING_SCHEDULERS)
635
#if defined(HPX_HAVE_CXX11_STD_ATOMIC_128BIT)
636
        // set parameters for scheduler and pool instantiation and
637
        // perform compatibility checks
64✔
638
        std::size_t const num_high_priority_queues =
64✔
639
            hpx::util::get_entry_as<std::size_t>(rtcfg_,
640
                "hpx.thread_queue.high_priority_queues",
×
641
                thread_pool_init.num_threads_);
642
        detail::check_num_high_priority_queues(
643
            thread_pool_init.num_threads_, num_high_priority_queues);
644

645
        // instantiate the scheduler
646
        using local_sched_type =
647
            hpx::threads::policies::local_workrequesting_scheduler<std::mutex,
648
                hpx::threads::policies::lockfree_lifo>;
649

650
        local_sched_type::init_parameter_type const init(
651
            thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
652
            num_high_priority_queues, thread_queue_init,
653
            "core-local_workrequesting_scheduler-lifo");
654

×
655
        auto sched = std::make_unique<local_sched_type>(init);
×
656
        auto const full_mask =
×
657
            hpx::resource::get_partitioner().get_pool_pus_mask(
×
658
                thread_pool_init.name_);
×
659

660
        // set the default scheduler flags
661
        sched->set_scheduler_mode(thread_pool_init.mode_, full_mask);
662

663
        // conditionally set/unset this flag
664
        sched->update_scheduler_mode(
665
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive,
666
            full_mask);
667

668
        // instantiate the pool
669
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
670
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
671
            HPX_MOVE(sched), thread_pool_init);
672
        pools_.push_back(HPX_MOVE(pool));
673
#else
674
        throw hpx::detail::command_line_error(
675
            "Command line option --hpx:queuing=local-workrequesting-lifo "
676
            "is not configured in this build. Please make sure 128bit "
×
677
            "atomics are available.");
×
678
#endif
679
#else
680
        throw hpx::detail::command_line_error(
681
            "Command line option --hpx:queuing=local-workrequesting-lifo "
682
            "is not configured in this build. Please make sure "
683
            "HPX_WITH_WORK_REQUESTING_SCHEDULERS is set to ON");
684
#endif
685
    }
64✔
686

687
    void threadmanager::create_pools()
688
    {
64✔
689
        auto& rp = hpx::resource::get_partitioner();
690
        size_t const num_pools = rp.get_num_pools();
64✔
691
        std::size_t thread_offset = 0;
692
        std::size_t const max_idle_loop_count =
×
693
            hpx::util::get_entry_as<std::int64_t>(
×
694
                rtcfg_, "hpx.max_idle_loop_count", HPX_IDLE_LOOP_COUNT_MAX);
695
        std::size_t const max_busy_loop_count =
×
696
            hpx::util::get_entry_as<std::int64_t>(
697
                rtcfg_, "hpx.max_busy_loop_count", HPX_BUSY_LOOP_COUNT_MAX);
×
698

×
699
        std::size_t const numa_sensitive = hpx::util::get_entry_as<std::size_t>(
700
            rtcfg_, "hpx.numa_sensitive", 0);
701

702
        policies::thread_queue_init_parameters const thread_queue_init =
64✔
703
            get_init_parameters();
64✔
704

705
        std::size_t max_background_threads =
706
            hpx::util::get_entry_as<std::size_t>(rtcfg_,
707
                "hpx.max_background_threads",
×
708
                (std::numeric_limits<std::size_t>::max)());
×
709

710
        if (!rtcfg_.enable_networking())
711
        {
712
            max_background_threads = 0;
×
713
        }
×
714

715
        // instantiate the pools
716
        for (size_t i = 0; i != num_pools; i++)
717
        {
×
718
            std::string name = rp.get_pool_name(i);
×
719
            resource::scheduling_policy const sched_type =
720
                rp.which_scheduler(name);
721
            std::size_t num_threads_in_pool = rp.get_num_threads(i);
722
            policies::scheduler_mode const scheduler_mode =
×
723
                rp.get_scheduler_mode(i);
×
724
            resource::background_work_function background_work =
725
                rp.get_background_work(i);
726

727
            // make sure the first thread-pool that gets instantiated is the
×
728
            // default one
×
729
            if (i == 0)
730
            {
731
                if (name != rp.get_default_pool_name())
732
                {
×
733
                    throw std::invalid_argument("Trying to instantiate pool " +
×
734
                        name +
735
                        " as first thread pool, but first thread pool must "
736
                        "be named " +
737
                        rp.get_default_pool_name());
×
738
                }
×
739
            }
740

741
            threads::detail::network_background_callback_type
742
                overall_background_work;
×
743
            if (!background_work.empty())
×
744
            {
745
                if (!network_background_callback_.empty())
746
                {
747
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
×
748
    defined(HPX_HAVE_THREAD_IDLE_RATES)
749
                    overall_background_work =
750
                        [this, background_work](std::size_t num_thread,
×
751
                            std::int64_t& t1, std::int64_t& t2) -> bool {
×
752
                        bool result = background_work(num_thread);
×
753
                        return network_background_callback_(
754
                                   num_thread, t1, t2) ||
755
                            result;
756
                    };
757
#else
64✔
758
                    overall_background_work =
759
                        [this, background_work](
760
                            std::size_t const num_thread) -> bool {
761
                        bool const result = background_work(num_thread);
128✔
762
                        return network_background_callback_(num_thread) ||
763
                            result;
64✔
764
                    };
194✔
765
#endif
766
                }
260✔
767
                else
768
                {
769
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
64✔
770
    defined(HPX_HAVE_THREAD_IDLE_RATES)
771
                    overall_background_work =
64✔
772
                        [background_work](std::size_t num_thread, std::int64_t&,
773
                            std::int64_t&) -> bool {
64✔
774
                        return background_work(num_thread);
775
                    };
64✔
776
#else
777
                    overall_background_work = background_work;
778
#endif
779
                }
128✔
780

781
                max_background_threads =
782
                    (std::max) (num_threads_in_pool, max_background_threads);
64✔
783
            }
64✔
784
            else
64✔
785
            {
786
                overall_background_work = network_background_callback_;
64✔
787
            }
788

×
789
            thread_pool_init_parameters thread_pool_init(name, i,
790
                scheduler_mode, num_threads_in_pool, thread_offset, notifier_,
791
                rp.get_affinity_data(), overall_background_work,
×
792
                max_background_threads, max_idle_loop_count,
793
                max_busy_loop_count);
×
794

795
            switch (sched_type)
×
796
            {
797
            case resource::scheduling_policy::user_defined:
×
798
                create_scheduler_user_defined(rp.get_pool_creator(i),
799
                    thread_pool_init, thread_queue_init);
372✔
800
                break;
801

802
            case resource::scheduling_policy::local:
372✔
803
                create_scheduler_local(
804
                    thread_pool_init, thread_queue_init, numa_sensitive);
805
                break;
×
806

807
            case resource::scheduling_policy::local_priority_fifo:
808
                create_scheduler_local_priority_fifo(
809
                    thread_pool_init, thread_queue_init, numa_sensitive);
810
                break;
×
811

×
812
            case resource::scheduling_policy::local_priority_lifo:
813
                create_scheduler_local_priority_lifo(
×
814
                    thread_pool_init, thread_queue_init, numa_sensitive);
815
                break;
816

817
            case resource::scheduling_policy::static_:
×
818
                create_scheduler_static(
819
                    thread_pool_init, thread_queue_init, numa_sensitive);
×
820
                break;
821

822
            case resource::scheduling_policy::static_priority:
×
823
                create_scheduler_static_priority(
824
                    thread_pool_init, thread_queue_init, numa_sensitive);
×
825
                break;
826

827
            case resource::scheduling_policy::local_workrequesting_fifo:
828
                create_scheduler_local_workrequesting_fifo(
×
829
                    thread_pool_init, thread_queue_init, numa_sensitive);
830
                break;
831

832
            case resource::scheduling_policy::local_workrequesting_lifo:
833
                create_scheduler_local_workrequesting_lifo(
834
                    thread_pool_init, thread_queue_init, numa_sensitive);
×
835
                break;
836

×
837
            case resource::scheduling_policy::local_workrequesting_mc:
838
                create_scheduler_local_workrequesting_mc(
839
                    thread_pool_init, thread_queue_init, numa_sensitive);
×
840
                break;
841

×
842
            case resource::scheduling_policy::abp_priority_fifo:
843
                create_scheduler_abp_priority_fifo(
844
                    thread_pool_init, thread_queue_init, numa_sensitive);
×
845
                break;
846

847
            case resource::scheduling_policy::abp_priority_lifo:
848
                create_scheduler_abp_priority_lifo(
×
849
                    thread_pool_init, thread_queue_init, numa_sensitive);
×
850
                break;
851

×
852
            case resource::scheduling_policy::shared_priority:
853
                create_scheduler_shared_priority(
854
                    thread_pool_init, thread_queue_init, numa_sensitive);
855
                break;
×
856

857
            case resource::scheduling_policy::unspecified:
×
858
                throw std::invalid_argument(
859
                    "cannot instantiate a thread-manager if the thread-pool" +
860
                    name + " has an unspecified scheduler type");
×
861
            }
862

863
            // update the thread_offset for the next pool
864
            thread_offset += num_threads_in_pool;
865
        }
866

867
        // fill the thread-lookup table
868
        for (auto const& pool_iter : pools_)
×
869
        {
870
            std::size_t const nt =
×
871
                rp.get_num_threads(pool_iter->get_pool_index());
872
            for (std::size_t i = 0; i < nt; i++)
873
            {
874
                threads_lookup_.emplace_back(pool_iter->get_pool_id());
×
875
            }
876
        }
877
    }
878

×
879
    threadmanager::~threadmanager() = default;
880

×
881
    void threadmanager::init() const
882
    {
×
883
        auto const& rp = hpx::resource::get_partitioner();
×
884
        std::size_t threads_offset = 0;
885

886
        // initialize all pools
×
887
        for (auto&& pool_iter : pools_)
888
        {
889
            std::size_t const num_threads_in_pool =
×
890
                rp.get_num_threads(pool_iter->get_pool_index());
891
            pool_iter->init(num_threads_in_pool, threads_offset);
892
            threads_offset += num_threads_in_pool;
×
893
        }
894
    }
×
895

896
    void threadmanager::print_pools(std::ostream& os) const
×
897
    {
898
        os << "The thread-manager owns " << pools_.size()    //-V128
899
           << " pool(s) : \n";
×
900

901
        for (auto&& pool_iter : pools_)
902
        {
×
903
            pool_iter->print_pool(os);
904
        }
×
905
    }
×
906

907
    thread_pool_base& threadmanager::default_pool() const
×
908
    {
909
        HPX_ASSERT(!pools_.empty());
×
910
        return *pools_[0];
911
    }
×
912

913
    thread_pool_base& threadmanager::get_pool(
914
        std::string const& pool_name) const
×
915
    {
916
        // if the given pool_name is default, we don't need to look for it,
917
        // we must always return pool 0
×
918
        if (pool_name == "default" ||
919
            pool_name == resource::get_partitioner().get_default_pool_name())
920
        {
×
921
            return default_pool();
922
        }
×
923

924
        // now check the other pools - no need to check pool 0 again, so ++begin
×
925
        auto const pool = std::find_if(++pools_.begin(), pools_.end(),
926
            [&pool_name](pool_type const& itp) -> bool {
927
                return (itp->get_pool_name() == pool_name);
×
928
            });
929

930
        if (pool != pools_.end())
931
        {
932
            return **pool;
1✔
933
        }
934

935
        //! FIXME Add names of available pools?
936
        HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
1✔
937
            "threadmanager::get_pool",
938
            "the resource partitioner does not own a thread pool named '{}'.\n",
939
            pool_name);
2✔
940
    }
941

1✔
942
    thread_pool_base& threadmanager::get_pool(pool_id_type const& pool_id) const
943
    {
944
        return get_pool(pool_id.name());
1✔
945
    }
946

947
    thread_pool_base& threadmanager::get_pool(
948
        std::size_t const thread_index) const
949
    {
950
        return get_pool(threads_lookup_[thread_index]);
951
    }
×
952

953
    bool threadmanager::pool_exists(std::string const& pool_name) const
×
954
    {
×
955
        // if the given pool_name is default, we don't need to look for it,
956
        // we must always return pool 0
×
957
        if (pool_name == "default" ||
958
            pool_name == resource::get_partitioner().get_default_pool_name())
×
959
        {
960
            return true;
961
        }
962

963
        // now check the other pools - no need to check pool 0 again, so ++begin
964
        auto const pool = std::find_if(++pools_.begin(), pools_.end(),
965
            [&pool_name](pool_type const& itp) -> bool {
76✔
966
                return (itp->get_pool_name() == pool_name);
967
            });
76✔
968

969
        if (pool != pools_.end())
970
        {
152✔
971
            return true;
972
        }
76✔
973

974
        return false;
975
    }
76✔
976

977
    bool threadmanager::pool_exists(std::size_t const pool_index) const
978
    {
979
        return pool_index < pools_.size();
64✔
980
    }
981

982
    ///////////////////////////////////////////////////////////////////////////
983
    std::int64_t threadmanager::get_thread_count(
64✔
984
        thread_schedule_state const state, thread_priority const priority,
64✔
985
        std::size_t const num_thread, bool const reset) const
986
    {
987
        std::int64_t total_count = 0;
988
        std::lock_guard<mutex_type> lk(mtx_);
989

990
        for (auto const& pool_iter : pools_)
64✔
991
        {
992
            total_count +=
64✔
993
                pool_iter->get_thread_count(state, priority, num_thread, reset);
64✔
994
        }
995

996
        return total_count;
×
997
    }
998

999
    std::int64_t threadmanager::get_idle_core_count() const
1000
    {
×
1001
        std::int64_t total_count = 0;
×
1002
        std::lock_guard<mutex_type> lk(mtx_);
1003

1004
        for (auto const& pool_iter : pools_)
1005
        {
1006
            total_count += pool_iter->get_idle_core_count();
1007
        }
×
1008

1009
        return total_count;
×
1010
    }
1011

1012
    mask_type threadmanager::get_idle_core_mask() const
1013
    {
1014
        mask_type mask = mask_type();
1015
        resize(mask, hardware_concurrency());
×
1016

1017
        std::lock_guard<mutex_type> lk(mtx_);
1018

×
1019
        for (auto const& pool_iter : pools_)
×
1020
        {
×
1021
            pool_iter->get_idle_core_mask(mask);
1022
        }
1023

1024
        return mask;
1025
    }
1026

1027
    std::int64_t threadmanager::get_background_thread_count() const
1028
    {
1029
        std::int64_t total_count = 0;
1030
        std::lock_guard<mutex_type> lk(mtx_);
1031

1032
        for (auto& pool_iter : pools_)
1033
        {
1034
            total_count += pool_iter->get_background_thread_count();
1035
        }
1036

1037
        return total_count;
1038
    }
1039

1040
    ///////////////////////////////////////////////////////////////////////////
1041
    // Enumerate all matching threads
1042
    bool threadmanager::enumerate_threads(
×
1043
        hpx::function<bool(thread_id_type)> const& f,
1044
        thread_schedule_state const state) const
1045
    {
×
1046
        std::lock_guard<mutex_type> lk(mtx_);
×
1047
        bool result = true;
×
1048

1049
        for (auto& pool_iter : pools_)
1050
        {
1051
            result = result && pool_iter->enumerate_threads(f, state);
1052
        }
1053

1054
        return result;
1055
    }
1056

1057
    ///////////////////////////////////////////////////////////////////////////
1058
    // Abort all threads which are in suspended state. This will set
1059
    // the state of all suspended threads to \a pending while
1060
    // supplying the wait_abort extended state flag
1061
    void threadmanager::abort_all_suspended_threads() const
1062
    {
1063
        std::lock_guard<mutex_type> lk(mtx_);
1064
        for (auto const& pool_iter : pools_)
1065
        {
1066
            pool_iter->abort_all_suspended_threads();
1067
        }
1068
    }
1069

1070
    ///////////////////////////////////////////////////////////////////////////
1071
    // Clean up terminated threads. This deletes all threads which
1072
    // have been terminated but which are still held in the queue
1073
    // of terminated threads. Some schedulers might not do anything
1074
    // here.
1075
    bool threadmanager::cleanup_terminated(bool const delete_all) const
1076
    {
1077
        std::lock_guard<mutex_type> lk(mtx_);
1078
        bool result = true;
1079

1080
        for (auto const& pool_iter : pools_)
1081
        {
1082
            result = pool_iter->cleanup_terminated(delete_all) && result;
1083
        }
1084

1085
        return result;
1086
    }
1087

1088
    std::size_t threadmanager::get_os_thread_count() const
1089
    {
1090
        std::lock_guard<mutex_type> lk(mtx_);
1091
        std::size_t total = 0;
1092
        for (auto& pool_iter : pools_)
1093
        {
1094
            total += pool_iter->get_os_thread_count();
1095
        }
1096
        return total;
1097
    }
1098

1099
    std::thread& threadmanager::get_os_thread_handle(
1100
        std::size_t const num_thread) const
1101
    {
1102
        std::lock_guard<mutex_type> lk(mtx_);
1103
        pool_id_type const id = threads_lookup_[num_thread];
1104
        thread_pool_base& pool = get_pool(id);
1105
        return pool.get_os_thread_handle(num_thread);
1106
    }
1107

1108
    void threadmanager::report_error(
1109
        std::size_t const num_thread, std::exception_ptr const& e) const
1110
    {
1111
        // propagate the error reporting to all pools, which in turn
1112
        // will propagate to schedulers
1113
        for (auto& pool_iter : pools_)
1114
        {
1115
            pool_iter->report_error(num_thread, e);
1116
        }
1117
    }
1118

1119
    mask_type threadmanager::get_used_processing_units() const
1120
    {
1121
        auto total_used_processing_punits = mask_type();
1122
        threads::resize(total_used_processing_punits,
1123
            static_cast<std::size_t>(hardware_concurrency()));
1124

1125
        for (auto& pool_iter : pools_)
1126
        {
1127
            total_used_processing_punits |=
1128
                pool_iter->get_used_processing_units();
1129
        }
1130

1131
        return total_used_processing_punits;
1132
    }
1133

1134
    hwloc_bitmap_ptr threadmanager::get_pool_numa_bitmap(
1135
        std::string const& pool_name) const
×
1136
    {
1137
        return get_pool(pool_name).get_numa_domain_bitmap();
1138
    }
×
1139

×
1140
    void threadmanager::set_scheduler_mode(
×
1141
        threads::policies::scheduler_mode const mode,
1142
        hpx::threads::mask_cref_type pu_mask) const noexcept
1143
    {
×
1144
        for (auto const& pool_iter : pools_)
1145
        {
1146
            pool_iter->get_scheduler()->set_scheduler_mode(mode, pu_mask);
×
1147
        }
×
1148
    }
×
1149

1150
    void threadmanager::add_scheduler_mode(
1151
        threads::policies::scheduler_mode const mode,
1152
        hpx::threads::mask_cref_type pu_mask) const noexcept
1153
    {
1154
        for (auto const& pool_iter : pools_)
1155
        {
1156
            pool_iter->get_scheduler()->add_scheduler_mode(mode, pu_mask);
1157
        }
1158
    }
1159

1160
    void threadmanager::add_remove_scheduler_mode(
1161
        threads::policies::scheduler_mode const to_add_mode,
1162
        threads::policies::scheduler_mode const to_remove_mode,
1163
        hpx::threads::mask_cref_type pu_mask) const noexcept
1164
    {
1165
        for (auto const& pool_iter : pools_)
1166
        {
1167
            pool_iter->get_scheduler()->add_remove_scheduler_mode(
1168
                to_add_mode, to_remove_mode, pu_mask);
1169
        }
1170
    }
1171

1172
    void threadmanager::remove_scheduler_mode(
1173
        threads::policies::scheduler_mode const mode,
1174
        hpx::threads::mask_cref_type pu_mask) const noexcept
1175
    {
1176
        for (auto const& pool_iter : pools_)
1177
        {
1178
            pool_iter->get_scheduler()->remove_scheduler_mode(mode, pu_mask);
1179
        }
1180
    }
1181

1182
    void threadmanager::reset_thread_distribution() const noexcept
1183
    {
1184
        for (auto const& pool_iter : pools_)
1185
        {
1186
            pool_iter->reset_thread_distribution();
1187
        }
1188
    }
1189

1190
    ///////////////////////////////////////////////////////////////////////////
1191
    void threadmanager::register_thread(
1192
        thread_init_data& data, thread_id_ref_type& id, error_code& ec) const
1193
    {
1194
        thread_pool_base* pool;
1195
        if (auto const* thrd_data = get_self_id_data())
1196
        {
1197
            pool = thrd_data->get_scheduler_base()->get_parent_pool();
1198
        }
1199
        else
1200
        {
1201
            pool = &default_pool();
1202
        }
1203
        pool->create_thread(data, id, ec);
1204
    }
1205

1206
    ///////////////////////////////////////////////////////////////////////////
1207
    thread_id_ref_type threadmanager::register_work(
1208
        thread_init_data& data, error_code& ec) const
1209
    {
1210
        thread_pool_base* pool;
1211
        if (auto const* thrd_data = get_self_id_data())
1212
        {
1213
            pool = thrd_data->get_scheduler_base()->get_parent_pool();
1214
        }
1215
        else
1216
        {
1217
            pool = &default_pool();
1218
        }
1219
        return pool->create_work(data, ec);
1220
    }
1221

1222
    void threadmanager::init_tss(std::size_t const global_thread_num)
1223
    {
1224
        detail::set_global_thread_num_tss(global_thread_num);
1225
    }
1226

1227
    void threadmanager::deinit_tss()
1228
    {
1229
        detail::set_global_thread_num_tss(static_cast<std::size_t>(-1));
1230
    }
1231

1232
    ///////////////////////////////////////////////////////////////////////////
1233
    inline constexpr std::size_t all_threads = static_cast<std::size_t>(-1);
1234

1235
    std::int64_t threadmanager::get_queue_length(bool const reset) const
1236
    {
1237
        std::int64_t result = 0;
1238
        for (auto const& pool_iter : pools_)
1239
            result += pool_iter->get_queue_length(all_threads, reset);
1240
        return result;
1241
    }
1242

1243
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
1244
    std::int64_t threadmanager::get_average_thread_wait_time(bool reset) const
1245
    {
1246
        std::int64_t result = 0;
1247
        for (auto const& pool_iter : pools_)
1248
            result +=
1249
                pool_iter->get_average_thread_wait_time(all_threads, reset);
1250
        return result;
1251
    }
1252

1253
    std::int64_t threadmanager::get_average_task_wait_time(bool reset) const
1254
    {
1255
        std::int64_t result = 0;
1256
        for (auto const& pool_iter : pools_)
64✔
1257
            result += pool_iter->get_average_task_wait_time(all_threads, reset);
1258
        return result;
64✔
1259
    }
1260
#endif
1261

1262
    std::int64_t threadmanager::get_cumulative_duration(bool const reset) const
64✔
1263
    {
64✔
1264
        std::int64_t result = 0;
1265
        for (auto const& pool_iter : pools_)
1266
            result += pool_iter->get_cumulative_duration(all_threads, reset);
64✔
1267
        return result;
64✔
1268
    }
1269

1270
    std::int64_t threadmanager::get_thread_count_unknown(bool const reset) const
128✔
1271
    {
1272
        return get_thread_count(thread_schedule_state::unknown,
1273
            thread_priority::default_, static_cast<std::size_t>(-1), reset);
64✔
1274
    }
1275

128✔
1276
    std::int64_t threadmanager::get_thread_count_active(bool const reset) const
64✔
1277
    {
1278
        return get_thread_count(thread_schedule_state::active,
×
1279
            thread_priority::default_, static_cast<std::size_t>(-1), reset);
1280
    }
1281

64✔
1282
    std::int64_t threadmanager::get_thread_count_pending(bool const reset) const
1283
    {
1284
        return get_thread_count(thread_schedule_state::pending,
×
1285
            thread_priority::default_, static_cast<std::size_t>(-1), reset);
1286
    }
1287

1288
    std::int64_t threadmanager::get_thread_count_suspended(
1289
        bool const reset) const
1290
    {
64✔
1291
        return get_thread_count(thread_schedule_state::suspended,
64✔
1292
            thread_priority::default_, static_cast<std::size_t>(-1), reset);
64✔
1293
    }
1294

1295
    std::int64_t threadmanager::get_thread_count_terminated(
64✔
1296
        bool const reset) const
1297
    {
1298
        return get_thread_count(thread_schedule_state::terminated,
1299
            thread_priority::default_, static_cast<std::size_t>(-1), reset);
192✔
1300
    }
1301

192✔
1302
    std::int64_t threadmanager::get_thread_count_staged(bool const reset) const
1303
    {
192✔
1304
        return get_thread_count(thread_schedule_state::staged,
384✔
1305
            thread_priority::default_, static_cast<std::size_t>(-1), reset);
1306
    }
192✔
1307

1308
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
1309
    defined(HPX_HAVE_THREAD_IDLE_RATES)
192✔
1310
    std::int64_t threadmanager::get_background_work_duration(bool reset) const
1311
    {
1,512✔
1312
        std::int64_t result = 0;
1313
        for (auto const& pool_iter : pools_)
1314
            result +=
3,024✔
1315
                pool_iter->get_background_work_duration(all_threads, reset);
1316
        return result;
1,512✔
1317
    }
1318

1,512✔
1319
    std::int64_t threadmanager::get_background_overhead(bool reset) const
1320
    {
1321
        std::int64_t result = 0;
×
1322
        for (auto const& pool_iter : pools_)
1323
            result += pool_iter->get_background_overhead(all_threads, reset);
1324
        return result;
×
1325
    }
1326

×
1327
    std::int64_t threadmanager::get_background_send_duration(bool reset) const
1328
    {
×
1329
        std::int64_t result = 0;
1330
        for (auto const& pool_iter : pools_)
1331
            result +=
84✔
1332
                pool_iter->get_background_send_duration(all_threads, reset);
1333
        return result;
252✔
1334
    }
84✔
1335

84✔
1336
    std::int64_t threadmanager::get_background_send_overhead(bool reset) const
1,332✔
1337
    {
84✔
1338
        std::int64_t result = 0;
1339
        for (auto const& pool_iter : pools_)
24✔
1340
            result +=
1341
                pool_iter->get_background_send_overhead(all_threads, reset);
72✔
1342
        return result;
24✔
1343
    }
24✔
1344

288✔
1345
    std::int64_t threadmanager::get_background_receive_duration(
1346
        bool reset) const
1347
    {
×
1348
        std::int64_t result = 0;
1349
        for (auto const& pool_iter : pools_)
×
1350
            result +=
1351
                pool_iter->get_background_receive_duration(all_threads, reset);
×
1352
        return result;
1353
    }
×
1354

1355
    std::int64_t threadmanager::get_background_receive_overhead(
×
1356
        bool reset) const
1357
    {
×
1358
        std::int64_t result = 0;
1359
        for (auto const& pool_iter : pools_)
1360
            result +=
1361
                pool_iter->get_background_receive_overhead(all_threads, reset);
×
1362
        return result;
1363
    }
1364
#endif    // HPX_HAVE_BACKGROUND_THREAD_COUNTERS
×
1365

1366
#ifdef HPX_HAVE_THREAD_IDLE_RATES
×
1367
    std::int64_t threadmanager::avg_idle_rate(bool reset) const noexcept
1368
    {
1369
        std::int64_t result = 0;
×
1370
        for (auto const& pool_iter : pools_)
1371
            result += pool_iter->avg_idle_rate(all_threads, reset);
29✔
1372
        return result;
1373
    }
29✔
1374

1375
#ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES
29✔
1376
    std::int64_t threadmanager::avg_creation_idle_rate(
1377
        bool reset) const noexcept
58✔
1378
    {
1379
        std::int64_t result = 0;
58✔
1380
        for (auto const& pool_iter : pools_)
1381
            result += pool_iter->avg_creation_idle_rate(all_threads, reset);
1382
        return result;
29✔
1383
    }
1384

1385
    std::int64_t threadmanager::avg_cleanup_idle_rate(bool reset) const noexcept
×
1386
    {
1387
        std::int64_t result = 0;
×
1388
        for (auto const& pool_iter : pools_)
1389
            result += pool_iter->avg_cleanup_idle_rate(all_threads, reset);
1390
        return result;
29✔
1391
    }
1392
#endif
1393
#endif
1394

1395
#ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS
1396
    std::int64_t threadmanager::get_executed_threads(bool reset) const noexcept
1397
    {
1398
        std::int64_t result = 0;
1399
        for (auto const& pool_iter : pools_)
1400
            result += pool_iter->get_executed_threads(all_threads, reset);
1401
        return result;
1402
    }
1403

1404
    std::int64_t threadmanager::get_executed_thread_phases(
1405
        bool reset) const noexcept
1406
    {
1407
        std::int64_t result = 0;
1408
        for (auto const& pool_iter : pools_)
1409
            result += pool_iter->get_executed_thread_phases(all_threads, reset);
1410
        return result;
1411
    }
1412

1413
#ifdef HPX_HAVE_THREAD_IDLE_RATES
1414
    std::int64_t threadmanager::get_thread_duration(bool reset) const
1415
    {
1416
        std::int64_t result = 0;
1417
        for (auto const& pool_iter : pools_)
1418
            result += pool_iter->get_thread_duration(all_threads, reset);
1419
        return result;
1420
    }
1421

1422
    std::int64_t threadmanager::get_thread_phase_duration(bool reset) const
1423
    {
1424
        std::int64_t result = 0;
1425
        for (auto const& pool_iter : pools_)
1426
            result += pool_iter->get_thread_phase_duration(all_threads, reset);
1427
        return result;
1428
    }
1429

1430
    std::int64_t threadmanager::get_thread_overhead(bool reset) const
1431
    {
1432
        std::int64_t result = 0;
1433
        for (auto const& pool_iter : pools_)
1434
            result += pool_iter->get_thread_overhead(all_threads, reset);
1435
        return result;
1436
    }
1437

1438
    std::int64_t threadmanager::get_thread_phase_overhead(bool reset) const
1439
    {
1440
        std::int64_t result = 0;
1441
        for (auto const& pool_iter : pools_)
1442
            result += pool_iter->get_thread_phase_overhead(all_threads, reset);
1443
        return result;
1444
    }
1445

1446
    std::int64_t threadmanager::get_cumulative_thread_duration(bool reset) const
1447
    {
1448
        std::int64_t result = 0;
1449
        for (auto const& pool_iter : pools_)
1450
            result +=
1451
                pool_iter->get_cumulative_thread_duration(all_threads, reset);
1452
        return result;
1453
    }
1454

1455
    std::int64_t threadmanager::get_cumulative_thread_overhead(bool reset) const
1456
    {
1457
        std::int64_t result = 0;
1458
        for (auto const& pool_iter : pools_)
1459
            result +=
1460
                pool_iter->get_cumulative_thread_overhead(all_threads, reset);
1461
        return result;
1462
    }
1463
#endif
1464
#endif
1465

1466
#ifdef HPX_HAVE_THREAD_STEALING_COUNTS
1467
    std::int64_t threadmanager::get_num_pending_misses(bool reset) const
1468
    {
1469
        std::int64_t result = 0;
1470
        for (auto const& pool_iter : pools_)
1471
            result += pool_iter->get_num_pending_misses(all_threads, reset);
1472
        return result;
1473
    }
1474

1475
    std::int64_t threadmanager::get_num_pending_accesses(bool reset) const
1476
    {
1477
        std::int64_t result = 0;
1478
        for (auto const& pool_iter : pools_)
1479
            result += pool_iter->get_num_pending_accesses(all_threads, reset);
1480
        return result;
1481
    }
1482

1483
    std::int64_t threadmanager::get_num_stolen_from_pending(bool reset) const
1484
    {
1485
        std::int64_t result = 0;
1486
        for (auto const& pool_iter : pools_)
1487
            result +=
1488
                pool_iter->get_num_stolen_from_pending(all_threads, reset);
1489
        return result;
1490
    }
1491

1492
    std::int64_t threadmanager::get_num_stolen_from_staged(bool reset) const
1493
    {
1494
        std::int64_t result = 0;
1495
        for (auto const& pool_iter : pools_)
1496
            result += pool_iter->get_num_stolen_from_staged(all_threads, reset);
1497
        return result;
1498
    }
1499

1500
    std::int64_t threadmanager::get_num_stolen_to_pending(bool reset) const
1501
    {
1502
        std::int64_t result = 0;
1503
        for (auto const& pool_iter : pools_)
1504
            result += pool_iter->get_num_stolen_to_pending(all_threads, reset);
1505
        return result;
1506
    }
1507

1508
    std::int64_t threadmanager::get_num_stolen_to_staged(bool reset) const
1509
    {
1510
        std::int64_t result = 0;
1511
        for (auto const& pool_iter : pools_)
1512
            result += pool_iter->get_num_stolen_to_staged(all_threads, reset);
1513
        return result;
1514
    }
1515
#endif
1516

1517
    ///////////////////////////////////////////////////////////////////////////
1518
    bool threadmanager::run() const
1519
    {
1520
        std::unique_lock<mutex_type> lk(mtx_);
1521

1522
        // the main thread needs to have a unique thread_num worker threads are
1523
        // numbered 0 to N-1, so we can use N for this thread
1524
        auto const& rp = hpx::resource::get_partitioner();
1525
        init_tss(rp.get_num_threads());
1526

1527
#ifdef HPX_HAVE_TIMER_POOL
1528
        LTM_(info).format("run: running timer pool");
1529
        timer_pool_.run(false);
1530
#endif
1531

1532
        for (auto const& pool_iter : pools_)
1533
        {
1534
            std::size_t const num_threads_in_pool =
1535
                rp.get_num_threads(pool_iter->get_pool_name());
1536

1537
            if (pool_iter->get_os_thread_count() != 0 ||
1538
                pool_iter->has_reached_state(hpx::state::running))
1539
            {
1540
                return true;    // do nothing if already running
1541
            }
1542

1543
            if (!pool_iter->run(lk, num_threads_in_pool))
1544
            {
1545
#ifdef HPX_HAVE_TIMER_POOL
1546
                timer_pool_.stop();
1547
#endif
1548
                return false;
1549
            }
1550

1551
            // set all states of all schedulers to "running"
1552
            if (policies::scheduler_base* sched = pool_iter->get_scheduler())
1553
                sched->set_all_states(hpx::state::running);
1554
        }
1555

1556
        LTM_(info).format("run: running");
1557
        return true;
1558
    }
1559

1560
    void threadmanager::stop(bool const blocking) const
1561
    {
1562
        LTM_(info).format("stop: blocking({})", blocking ? "true" : "false");
1563

1564
        std::unique_lock<mutex_type> lk(mtx_);
1565
        for (auto const& pool_iter : pools_)
1566
        {
1567
            pool_iter->stop(lk, blocking);
1568
        }
1569
        deinit_tss();
1570
    }
1571

1572
    bool threadmanager::is_busy() const
1573
    {
1574
        bool busy = false;
1575
        for (auto const& pool_iter : pools_)
1576
        {
1577
            busy = busy || pool_iter->is_busy();
1578
        }
1579
        return busy;
1580
    }
1581

1582
    bool threadmanager::is_idle() const
1583
    {
1584
        bool idle = true;
1585
        for (auto const& pool_iter : pools_)
1586
        {
1587
            idle = idle && pool_iter->is_idle();
1588
        }
1589
        return idle;
1590
    }
1591

1592
    void threadmanager::wait() const
1593
    {
1594
        auto const shutdown_check_count = util::get_entry_as<std::size_t>(
1595
            rtcfg_, "hpx.shutdown_check_count", 10);
1596
        hpx::util::detail::yield_while_count(
1597
            [this]() { return is_busy(); }, shutdown_check_count);
1598
    }
1599

1600
    bool threadmanager::wait_for(
1601
        hpx::chrono::steady_duration const& rel_time) const
1602
    {
1603
        auto const shutdown_check_count = util::get_entry_as<std::size_t>(
1604
            rtcfg_, "hpx.shutdown_check_count", 10);
1605
        return hpx::util::detail::yield_while_count_timeout(
1606
            [this]() { return is_busy(); }, shutdown_check_count, rel_time);
1607
    }
1608

1609
    void threadmanager::suspend() const
1610
    {
1611
        wait();
1612

1613
        if (threads::get_self_ptr())
1614
        {
1615
            std::vector<hpx::future<void>> fs;
1616

1617
            for (auto& pool_iter : pools_)
1618
            {
1619
                fs.emplace_back(suspend_pool(*pool_iter));
1620
            }
1621

1622
            hpx::wait_all(fs);
1623
        }
1624
        else
1625
        {
1626
            for (auto const& pool_iter : pools_)
1627
            {
1628
                pool_iter->suspend_direct();
1629
            }
1630
        }
1631
    }
1632

1633
    void threadmanager::resume() const
1634
    {
1635
        if (threads::get_self_ptr())
1636
        {
1637
            std::vector<hpx::future<void>> fs;
1638

1639
            for (auto& pool_iter : pools_)
1640
            {
1641
                fs.emplace_back(resume_pool(*pool_iter));
1642
            }
1643
            hpx::wait_all(fs);
1644
        }
1645
        else
1646
        {
1647
            for (auto const& pool_iter : pools_)
1648
            {
1649
                pool_iter->resume_direct();
1650
            }
1651
        }
1652
    }
1653

1654
    hpx::state threadmanager::status() const
1655
    {
1656
        hpx::state result(hpx::state::last_valid_runtime_state);
1657

1658
        for (auto& pool_iter : pools_)
1659
        {
1660
            hpx::state s = pool_iter->get_state();
1661
            result = (std::min) (result, s);
1662
        }
1663

1664
        return result;
1665
    }
1666
}    // namespace hpx::threads
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc