• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #871

22 Jan 2023 11:22PM UTC coverage: 86.624% (+0.7%) from 85.97%
#871

push

StellarBot
Merge #6144

6144: General improvements to scheduling and related fixes r=hkaiser a=hkaiser

This is a collection of unrelated improvements applied to different parts of the code

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

152 of 152 new or added lines in 23 files covered. (100.0%)

174953 of 201969 relevant lines covered (86.62%)

1838882.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.74
/libs/core/threadmanager/src/threadmanager.cpp
1
//  Copyright (c) 2007-2023 Hartmut Kaiser
2
//  Copyright (c)      2011 Bryce Lelbach, Katelyn Kufahl
3
//  Copyright (c) 2008-2009 Chirag Dekate, Anshul Tandon
4
//  Copyright (c) 2015 Patricia Grubel
5
//  Copyright (c) 2017 Shoshana Jakobovits
6
//
7
//  SPDX-License-Identifier: BSL-1.0
8
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
9
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
10

11
#include <hpx/config.hpp>
12
#include <hpx/assert.hpp>
13
#include <hpx/async_combinators/wait_all.hpp>
14
#include <hpx/execution_base/this_thread.hpp>
15
#include <hpx/futures/future.hpp>
16
#include <hpx/hardware/timestamp.hpp>
17
#include <hpx/modules/errors.hpp>
18
#include <hpx/modules/logging.hpp>
19
#include <hpx/modules/schedulers.hpp>
20
#include <hpx/modules/threadmanager.hpp>
21
#include <hpx/resource_partitioner/detail/partitioner.hpp>
22
#include <hpx/runtime_configuration/runtime_configuration.hpp>
23
#include <hpx/thread_pool_util/thread_pool_suspension_helpers.hpp>
24
#include <hpx/thread_pools/scheduled_thread_pool.hpp>
25
#include <hpx/threading_base/set_thread_state.hpp>
26
#include <hpx/threading_base/thread_data.hpp>
27
#include <hpx/threading_base/thread_helpers.hpp>
28
#include <hpx/threading_base/thread_init_data.hpp>
29
#include <hpx/threading_base/thread_queue_init_parameters.hpp>
30
#include <hpx/topology/topology.hpp>
31
#include <hpx/type_support/unused.hpp>
32
#include <hpx/util/get_entry_as.hpp>
33

34
#include <cstddef>
35
#include <cstdint>
36
#include <functional>
37
#include <iosfwd>
38
#include <memory>
39
#include <mutex>
40
#include <numeric>
41
#include <string>
42
#include <utility>
43
#include <vector>
44

45
namespace hpx { namespace threads {
46

47
    namespace detail {
48
        void check_num_high_priority_queues(
1,235✔
49
            std::size_t num_threads, std::size_t num_high_priority_queues)
50
        {
51
            if (num_high_priority_queues > num_threads)
1,235✔
52
            {
53
                throw hpx::detail::command_line_error(
×
54
                    "Invalid command line option: "
55
                    "number of high priority threads ("
56
                    "--hpx:high-priority-threads), should not be larger "
57
                    "than number of threads (--hpx:threads)");
58
            }
59
        }
1,235✔
60
    }    // namespace detail
61

62
    ///////////////////////////////////////////////////////////////////////////
63
    threadmanager::threadmanager(hpx::util::runtime_configuration& rtcfg,
1,222✔
64
#ifdef HPX_HAVE_TIMER_POOL
65
        util::io_service_pool& timer_pool,
66
#endif
67
        notification_policy_type& notifier,
68
        detail::network_background_callback_type network_background_callback)
69
      : rtcfg_(rtcfg)
1,222✔
70
#ifdef HPX_HAVE_TIMER_POOL
71
      , timer_pool_(timer_pool)
1,222✔
72
#endif
73
      , notifier_(notifier)
1,222✔
74
      , network_background_callback_(network_background_callback)
1,222✔
75
    {
76
        using placeholders::_1;
77
        using placeholders::_3;
78

79
        // Add callbacks local to threadmanager.
80
        notifier.add_on_start_thread_callback(
2,444✔
81
            hpx::bind(&threadmanager::init_tss, this, _1));
1,222✔
82
        notifier.add_on_stop_thread_callback(
2,444✔
83
            hpx::bind(&threadmanager::deinit_tss, this));
1,222✔
84

85
        auto& rp = hpx::resource::get_partitioner();
1,222✔
86
        notifier.add_on_start_thread_callback(hpx::bind(
2,444✔
87
            &resource::detail::partitioner::assign_pu, std::ref(rp), _3, _1));
1,222✔
88
        notifier.add_on_stop_thread_callback(hpx::bind(
2,444✔
89
            &resource::detail::partitioner::unassign_pu, std::ref(rp), _3, _1));
1,222✔
90
    }
1,222✔
91

92
    policies::thread_queue_init_parameters threadmanager::get_init_parameters()
1,222✔
93
        const
94
    {
95
        std::int64_t const max_thread_count =
1,222✔
96
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
1,222✔
97
                "hpx.thread_queue.max_thread_count",
1,222✔
98
                HPX_THREAD_QUEUE_MAX_THREAD_COUNT);
1,222✔
99
        std::int64_t const min_tasks_to_steal_pending =
1,222✔
100
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
1,222✔
101
                "hpx.thread_queue.min_tasks_to_steal_pending",
1,222✔
102
                HPX_THREAD_QUEUE_MIN_TASKS_TO_STEAL_PENDING);
1,222✔
103
        std::int64_t const min_tasks_to_steal_staged =
1,222✔
104
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
1,222✔
105
                "hpx.thread_queue.min_tasks_to_steal_staged",
1,222✔
106
                HPX_THREAD_QUEUE_MIN_TASKS_TO_STEAL_STAGED);
1,222✔
107
        std::int64_t const min_add_new_count =
1,222✔
108
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
1,222✔
109
                "hpx.thread_queue.min_add_new_count",
1,222✔
110
                HPX_THREAD_QUEUE_MIN_ADD_NEW_COUNT);
1,222✔
111
        std::int64_t const max_add_new_count =
1,222✔
112
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
1,222✔
113
                "hpx.thread_queue.max_add_new_count",
1,222✔
114
                HPX_THREAD_QUEUE_MAX_ADD_NEW_COUNT);
1,222✔
115
        std::int64_t const min_delete_count =
1,222✔
116
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
1,222✔
117
                "hpx.thread_queue.min_delete_count",
1,222✔
118
                HPX_THREAD_QUEUE_MIN_DELETE_COUNT);
1,222✔
119
        std::int64_t const max_delete_count =
1,222✔
120
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
1,222✔
121
                "hpx.thread_queue.max_delete_count",
1,222✔
122
                HPX_THREAD_QUEUE_MAX_DELETE_COUNT);
1,222✔
123
        std::int64_t const max_terminated_threads =
1,222✔
124
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
1,222✔
125
                "hpx.thread_queue.max_terminated_threads",
1,222✔
126
                HPX_THREAD_QUEUE_MAX_TERMINATED_THREADS);
1,222✔
127
        std::int64_t const init_threads_count =
1,222✔
128
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
1,222✔
129
                "hpx.thread_queue.init_threads_count",
1,222✔
130
                HPX_THREAD_QUEUE_INIT_THREADS_COUNT);
1,222✔
131
        double const max_idle_backoff_time = hpx::util::get_entry_as<double>(
1,222✔
132
            rtcfg_, "hpx.max_idle_backoff_time", HPX_IDLE_BACKOFF_TIME_MAX);
1,222✔
133

134
        std::ptrdiff_t small_stacksize =
1,222✔
135
            rtcfg_.get_stack_size(thread_stacksize::small_);
1,222✔
136
        std::ptrdiff_t medium_stacksize =
1,222✔
137
            rtcfg_.get_stack_size(thread_stacksize::medium);
1,222✔
138
        std::ptrdiff_t large_stacksize =
1,222✔
139
            rtcfg_.get_stack_size(thread_stacksize::large);
1,222✔
140
        std::ptrdiff_t huge_stacksize =
1,222✔
141
            rtcfg_.get_stack_size(thread_stacksize::huge);
1,222✔
142

143
        return policies::thread_queue_init_parameters(max_thread_count,
2,444✔
144
            min_tasks_to_steal_pending, min_tasks_to_steal_staged,
1,222✔
145
            min_add_new_count, max_add_new_count, min_delete_count,
1,222✔
146
            max_delete_count, max_terminated_threads, init_threads_count,
1,222✔
147
            max_idle_backoff_time, small_stacksize, medium_stacksize,
1,222✔
148
            large_stacksize, huge_stacksize);
1,222✔
149
    }
×
150

151
    void threadmanager::create_scheduler_user_defined(
6✔
152
        hpx::resource::scheduler_function const& pool_func,
153
        thread_pool_init_parameters const& thread_pool_init,
154
        policies::thread_queue_init_parameters const& thread_queue_init)
155
    {
156
        std::unique_ptr<thread_pool_base> pool(
157
            pool_func(thread_pool_init, thread_queue_init));
6✔
158
        pools_.push_back(HPX_MOVE(pool));
6✔
159
    }
6✔
160

161
    void threadmanager::create_scheduler_local(
11✔
162
        thread_pool_init_parameters const& thread_pool_init,
163
        policies::thread_queue_init_parameters const& thread_queue_init,
164
        std::size_t numa_sensitive)
165
    {
166
        // instantiate the scheduler
167
        using local_sched_type =
168
            hpx::threads::policies::local_queue_scheduler<>;
169

170
        local_sched_type::init_parameter_type init(
11✔
171
            thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
11✔
172
            thread_queue_init, "core-local_queue_scheduler");
11✔
173

174
        std::unique_ptr<local_sched_type> sched =
175
            std::make_unique<local_sched_type>(init);
11✔
176

177
        // set the default scheduler flags
178
        sched->set_scheduler_mode(thread_pool_init.mode_);
11✔
179

180
        // conditionally set/unset this flag
181
        sched->update_scheduler_mode(
22✔
182
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive);
11✔
183

184
        // instantiate the pool
185
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
11✔
186
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
187
            HPX_MOVE(sched), thread_pool_init);
11✔
188
        pools_.push_back(HPX_MOVE(pool));
11✔
189
    }
11✔
190

191
    void threadmanager::create_scheduler_local_priority_fifo(
1,186✔
192
        thread_pool_init_parameters const& thread_pool_init,
193
        policies::thread_queue_init_parameters const& thread_queue_init,
194
        std::size_t numa_sensitive)
195
    {
196
        // set parameters for scheduler and pool instantiation and perform
197
        // compatibility checks
198
        std::size_t num_high_priority_queues =
1,186✔
199
            hpx::util::get_entry_as<std::size_t>(rtcfg_,
1,186✔
200
                "hpx.thread_queue.high_priority_queues",
1,186✔
201
                thread_pool_init.num_threads_);
1,186✔
202

203
        detail::check_num_high_priority_queues(
1,186✔
204
            thread_pool_init.num_threads_, num_high_priority_queues);
1,186✔
205

206
        // instantiate the scheduler
207
        using local_sched_type =
208
            hpx::threads::policies::local_priority_queue_scheduler<std::mutex,
209
                hpx::threads::policies::lockfree_fifo>;
210

211
        local_sched_type::init_parameter_type init(
1,186✔
212
            thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
1,186✔
213
            num_high_priority_queues, thread_queue_init,
1,186✔
214
            "core-local_priority_queue_scheduler");
215

216
        std::unique_ptr<local_sched_type> sched =
217
            std::make_unique<local_sched_type>(init);
1,186✔
218

219
        // set the default scheduler flags
220
        sched->set_scheduler_mode(thread_pool_init.mode_);
1,186✔
221

222
        // conditionally set/unset this flag
223
        sched->update_scheduler_mode(
2,372✔
224
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive);
1,186✔
225

226
        // instantiate the pool
227
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
1,186✔
228
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
229
            HPX_MOVE(sched), thread_pool_init);
1,186✔
230
        pools_.push_back(HPX_MOVE(pool));
1,186✔
231
    }
1,186✔
232

233
    void threadmanager::create_scheduler_local_priority_lifo(
11✔
234
        thread_pool_init_parameters const& thread_pool_init,
235
        policies::thread_queue_init_parameters const& thread_queue_init,
236
        std::size_t numa_sensitive)
237
    {
238
#if defined(HPX_HAVE_CXX11_STD_ATOMIC_128BIT)
239
        // set parameters for scheduler and pool instantiation and perform
240
        // compatibility checks
241
        std::size_t num_high_priority_queues =
11✔
242
            hpx::util::get_entry_as<std::size_t>(rtcfg_,
11✔
243
                "hpx.thread_queue.high_priority_queues",
11✔
244
                thread_pool_init.num_threads_);
11✔
245
        detail::check_num_high_priority_queues(
11✔
246
            thread_pool_init.num_threads_, num_high_priority_queues);
11✔
247

248
        // instantiate the scheduler
249
        using local_sched_type =
250
            hpx::threads::policies::local_priority_queue_scheduler<std::mutex,
251
                hpx::threads::policies::lockfree_lifo>;
252

253
        local_sched_type::init_parameter_type init(
11✔
254
            thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
11✔
255
            num_high_priority_queues, thread_queue_init,
11✔
256
            "core-local_priority_queue_scheduler");
257

258
        std::unique_ptr<local_sched_type> sched =
259
            std::make_unique<local_sched_type>(init);
11✔
260

261
        // set the default scheduler flags
262
        sched->set_scheduler_mode(thread_pool_init.mode_);
11✔
263

264
        // conditionally set/unset this flag
265
        sched->update_scheduler_mode(
22✔
266
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive);
11✔
267

268
        // instantiate the pool
269
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
11✔
270
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
271
            HPX_MOVE(sched), thread_pool_init);
11✔
272
        pools_.push_back(HPX_MOVE(pool));
11✔
273
#else
274
        throw hpx::detail::command_line_error(
275
            "Command line option --hpx:queuing=local-priority-lifo "
276
            "is not configured in this build. Please make sure 128bit "
277
            "atomics are available.");
278
#endif
279
    }
11✔
280

281
    void threadmanager::create_scheduler_static(
13✔
282
        thread_pool_init_parameters const& thread_pool_init,
283
        policies::thread_queue_init_parameters const& thread_queue_init,
284
        std::size_t numa_sensitive)
285
    {
286
        // instantiate the scheduler
287
        std::unique_ptr<thread_pool_base> pool;
13✔
288
        hpx::threads::policies::local_queue_scheduler<>::init_parameter_type
289
            init(thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
26✔
290
                thread_queue_init, "core-static_queue_scheduler");
13✔
291

292
        if (thread_pool_init.mode_ &
13✔
293
            policies::scheduler_mode::do_background_work_only)
294
        {
295
            using local_sched_type =
296
                hpx::threads::policies::background_scheduler<>;
297

298
            std::unique_ptr<local_sched_type> sched =
299
                std::make_unique<local_sched_type>(init);
1✔
300

301
            // set the default scheduler flags
302
            sched->set_scheduler_mode(thread_pool_init.mode_);
1✔
303

304
            // instantiate the pool
305
            pool = std::make_unique<
1✔
306
                hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
307
                HPX_MOVE(sched), thread_pool_init);
1✔
308
        }
1✔
309
        else
310
        {
311
            using local_sched_type =
312
                hpx::threads::policies::static_queue_scheduler<>;
313

314
            std::unique_ptr<local_sched_type> sched =
315
                std::make_unique<local_sched_type>(init);
12✔
316

317
            // set the default scheduler flags
318
            sched->set_scheduler_mode(thread_pool_init.mode_);
12✔
319

320
            // conditionally set/unset this flag
321
            sched->update_scheduler_mode(
24✔
322
                policies::scheduler_mode::enable_stealing_numa,
323
                !numa_sensitive);
12✔
324

325
            // instantiate the pool
326
            pool = std::make_unique<
12✔
327
                hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
328
                HPX_MOVE(sched), thread_pool_init);
12✔
329
        }
12✔
330

331
        pools_.push_back(HPX_MOVE(pool));
13✔
332
    }
13✔
333

334
    void threadmanager::create_scheduler_static_priority(
13✔
335
        thread_pool_init_parameters const& thread_pool_init,
336
        policies::thread_queue_init_parameters const& thread_queue_init,
337
        std::size_t numa_sensitive)
338
    {
339
        // set parameters for scheduler and pool instantiation and perform
340
        // compatibility checks
341
        std::size_t num_high_priority_queues =
13✔
342
            hpx::util::get_entry_as<std::size_t>(rtcfg_,
13✔
343
                "hpx.thread_queue.high_priority_queues",
13✔
344
                thread_pool_init.num_threads_);
13✔
345
        detail::check_num_high_priority_queues(
13✔
346
            thread_pool_init.num_threads_, num_high_priority_queues);
13✔
347

348
        // instantiate the scheduler
349
        using local_sched_type =
350
            hpx::threads::policies::static_priority_queue_scheduler<>;
351

352
        local_sched_type::init_parameter_type init(
13✔
353
            thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
13✔
354
            num_high_priority_queues, thread_queue_init,
13✔
355
            "core-static_priority_queue_scheduler");
356

357
        std::unique_ptr<local_sched_type> sched =
358
            std::make_unique<local_sched_type>(init);
13✔
359

360
        // set the default scheduler flags
361
        sched->set_scheduler_mode(thread_pool_init.mode_);
13✔
362

363
        // conditionally set/unset this flag
364
        sched->update_scheduler_mode(
26✔
365
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive);
13✔
366

367
        // instantiate the pool
368
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
13✔
369
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
370
            HPX_MOVE(sched), thread_pool_init);
13✔
371
        pools_.push_back(HPX_MOVE(pool));
13✔
372
    }
13✔
373

374
    void threadmanager::create_scheduler_abp_priority_fifo(
12✔
375
        thread_pool_init_parameters const& thread_pool_init,
376
        policies::thread_queue_init_parameters const& thread_queue_init,
377
        std::size_t numa_sensitive)
378
    {
379
#if defined(HPX_HAVE_CXX11_STD_ATOMIC_128BIT)
380
        // set parameters for scheduler and pool instantiation and perform
381
        // compatibility checks
382
        std::size_t num_high_priority_queues =
12✔
383
            hpx::util::get_entry_as<std::size_t>(rtcfg_,
12✔
384
                "hpx.thread_queue.high_priority_queues",
12✔
385
                thread_pool_init.num_threads_);
12✔
386
        detail::check_num_high_priority_queues(
12✔
387
            thread_pool_init.num_threads_, num_high_priority_queues);
12✔
388

389
        // instantiate the scheduler
390
        using local_sched_type =
391
            hpx::threads::policies::local_priority_queue_scheduler<std::mutex,
392
                hpx::threads::policies::lockfree_fifo>;
393

394
        local_sched_type::init_parameter_type init(
12✔
395
            thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
12✔
396
            num_high_priority_queues, thread_queue_init,
12✔
397
            "core-abp_fifo_priority_queue_scheduler");
398

399
        std::unique_ptr<local_sched_type> sched =
400
            std::make_unique<local_sched_type>(init);
12✔
401

402
        // set the default scheduler flags
403
        sched->set_scheduler_mode(thread_pool_init.mode_);
12✔
404

405
        // conditionally set/unset this flag
406
        sched->update_scheduler_mode(
24✔
407
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive);
12✔
408

409
        // instantiate the pool
410
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
12✔
411
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
412
            HPX_MOVE(sched), thread_pool_init);
12✔
413
        pools_.push_back(HPX_MOVE(pool));
12✔
414
#else
415
        throw hpx::detail::command_line_error(
416
            "Command line option --hpx:queuing=abp-priority-fifo "
417
            "is not configured in this build. Please make sure 128bit "
418
            "atomics are available.");
419
#endif
420
    }
12✔
421

422
    void threadmanager::create_scheduler_abp_priority_lifo(
13✔
423
        thread_pool_init_parameters const& thread_pool_init,
424
        policies::thread_queue_init_parameters const& thread_queue_init,
425
        std::size_t numa_sensitive)
426
    {
427
#if defined(HPX_HAVE_CXX11_STD_ATOMIC_128BIT)
428
        // set parameters for scheduler and pool instantiation and perform
429
        // compatibility checks
430
        std::size_t num_high_priority_queues =
13✔
431
            hpx::util::get_entry_as<std::size_t>(rtcfg_,
13✔
432
                "hpx.thread_queue.high_priority_queues",
13✔
433
                thread_pool_init.num_threads_);
13✔
434
        detail::check_num_high_priority_queues(
13✔
435
            thread_pool_init.num_threads_, num_high_priority_queues);
13✔
436

437
        // instantiate the scheduler
438
        using local_sched_type =
439
            hpx::threads::policies::local_priority_queue_scheduler<std::mutex,
440
                hpx::threads::policies::lockfree_lifo>;
441

442
        local_sched_type::init_parameter_type init(
13✔
443
            thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
13✔
444
            num_high_priority_queues, thread_queue_init,
13✔
445
            "core-abp_fifo_priority_queue_scheduler");
446

447
        std::unique_ptr<local_sched_type> sched =
448
            std::make_unique<local_sched_type>(init);
13✔
449

450
        // set the default scheduler flags
451
        sched->set_scheduler_mode(thread_pool_init.mode_);
13✔
452

453
        // conditionally set/unset this flag
454
        sched->update_scheduler_mode(
26✔
455
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive);
13✔
456

457
        // instantiate the pool
458
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
13✔
459
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
460
            HPX_MOVE(sched), thread_pool_init);
13✔
461
        pools_.push_back(HPX_MOVE(pool));
13✔
462
#else
463
        throw hpx::detail::command_line_error(
464
            "Command line option --hpx:queuing=abp-priority-lifo "
465
            "is not configured in this build. Please make sure 128bit "
466
            "atomics are available.");
467
#endif
468
    }
13✔
469

470
    void threadmanager::create_scheduler_shared_priority(
9✔
471
        thread_pool_init_parameters const& thread_pool_init,
472
        policies::thread_queue_init_parameters const& thread_queue_init,
473
        std::size_t numa_sensitive)
474
    {
475
        // instantiate the scheduler
476
        typedef hpx::threads::policies::shared_priority_queue_scheduler<>
477
            local_sched_type;
478
        local_sched_type::init_parameter_type init(
9✔
479
            thread_pool_init.num_threads_, {1, 1, 1},
9✔
480
            thread_pool_init.affinity_data_, thread_queue_init,
9✔
481
            "core-shared_priority_queue_scheduler");
482

483
        std::unique_ptr<local_sched_type> sched =
484
            std::make_unique<local_sched_type>(init);
9✔
485

486
        // set the default scheduler flags
487
        sched->set_scheduler_mode(thread_pool_init.mode_);
9✔
488

489
        // conditionally set/unset this flag
490
        sched->update_scheduler_mode(
18✔
491
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive);
9✔
492

493
        // instantiate the pool
494
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
9✔
495
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
496
            HPX_MOVE(sched), thread_pool_init);
9✔
497
        pools_.push_back(HPX_MOVE(pool));
9✔
498
    }
9✔
499

500
    void threadmanager::create_pools()
1,222✔
501
    {
502
        auto& rp = hpx::resource::get_partitioner();
1,222✔
503
        size_t num_pools = rp.get_num_pools();
1,222✔
504
        std::size_t thread_offset = 0;
1,222✔
505
        std::size_t const max_idle_loop_count =
1,222✔
506
            hpx::util::get_entry_as<std::int64_t>(
1,222✔
507
                rtcfg_, "hpx.max_idle_loop_count", HPX_IDLE_LOOP_COUNT_MAX);
1,222✔
508
        std::size_t const max_busy_loop_count =
1,222✔
509
            hpx::util::get_entry_as<std::int64_t>(
1,222✔
510
                rtcfg_, "hpx.max_busy_loop_count", HPX_BUSY_LOOP_COUNT_MAX);
1,222✔
511

512
        std::size_t numa_sensitive = hpx::util::get_entry_as<std::size_t>(
1,222✔
513
            rtcfg_, "hpx.numa_sensitive", 0);
1,222✔
514

515
        policies::thread_queue_init_parameters thread_queue_init =
516
            get_init_parameters();
1,222✔
517

518
        std::size_t max_background_threads =
1,222✔
519
            hpx::util::get_entry_as<std::size_t>(rtcfg_,
1,222✔
520
                "hpx.max_background_threads",
1,222✔
521
                (std::numeric_limits<std::size_t>::max)());
1,222✔
522

523
        if (!rtcfg_.enable_networking())
1,222✔
524
        {
525
            max_background_threads = 0;
938✔
526
        }
938✔
527

528
        // instantiate the pools
529
        for (size_t i = 0; i != num_pools; i++)
2,496✔
530
        {
531
            std::string name = rp.get_pool_name(i);
1,274✔
532
            resource::scheduling_policy sched_type = rp.which_scheduler(name);
1,274✔
533
            std::size_t num_threads_in_pool = rp.get_num_threads(i);
1,274✔
534
            policies::scheduler_mode scheduler_mode = rp.get_scheduler_mode(i);
1,274✔
535
            resource::background_work_function background_work =
536
                rp.get_background_work(i);
1,274✔
537

538
            // make sure the first thread-pool that gets instantiated is the
539
            // default one
540
            if (i == 0)
1,274✔
541
            {
542
                if (name != rp.get_default_pool_name())
1,222✔
543
                {
544
                    throw std::invalid_argument("Trying to instantiate pool " +
×
545
                        name +
×
546
                        " as first thread pool, but first thread pool must "
547
                        "be named " +
×
548
                        rp.get_default_pool_name());
×
549
                }
550
            }
1,222✔
551

552
            threads::detail::network_background_callback_type
553
                overall_background_work;
1,274✔
554
            if (!background_work.empty())
1,274✔
555
            {
556
                if (!network_background_callback_.empty())
1✔
557
                {
558
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
559
    defined(HPX_HAVE_THREAD_IDLE_RATES)
560
                    overall_background_work =
561
                        [this, background_work](std::size_t num_thread,
562
                            std::int64_t& t1, std::int64_t& t2) -> bool {
563
                        bool result = background_work(num_thread);
564
                        return network_background_callback_(
565
                                   num_thread, t1, t2) ||
566
                            result;
567
                    };
568
#else
569
                    overall_background_work =
×
570
                        [this, background_work](
×
571
                            std::size_t num_thread) -> bool {
572
                        bool result = background_work(num_thread);
×
573
                        return network_background_callback_(num_thread) ||
×
574
                            result;
×
575
                    };
576
#endif
577
                }
×
578
                else
579
                {
580
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
581
    defined(HPX_HAVE_THREAD_IDLE_RATES)
582
                    overall_background_work =
583
                        [background_work](std::size_t num_thread, std::int64_t&,
584
                            std::int64_t&) -> bool {
585
                        return background_work(num_thread);
586
                    };
587
#else
588
                    overall_background_work = background_work;
1✔
589
#endif
590
                }
591

592
                max_background_threads =
1✔
593
                    (std::max)(num_threads_in_pool, max_background_threads);
1✔
594
            }
1✔
595
            else
596
            {
597
                overall_background_work = network_background_callback_;
1,273✔
598
            }
599

600
            thread_pool_init_parameters thread_pool_init(name, i,
2,548✔
601
                scheduler_mode, num_threads_in_pool, thread_offset, notifier_,
1,274✔
602
                rp.get_affinity_data(), overall_background_work,
1,274✔
603
                max_background_threads, max_idle_loop_count,
1,274✔
604
                max_busy_loop_count);
1,274✔
605

606
            switch (sched_type)
1,274✔
607
            {
608
            case resource::scheduling_policy::user_defined:
609
                create_scheduler_user_defined(rp.get_pool_creator(i),
6✔
610
                    thread_pool_init, thread_queue_init);
611
                break;
6✔
612

613
            case resource::scheduling_policy::local:
614
                create_scheduler_local(
11✔
615
                    thread_pool_init, thread_queue_init, numa_sensitive);
11✔
616
                break;
11✔
617

618
            case resource::scheduling_policy::local_priority_fifo:
619
                create_scheduler_local_priority_fifo(
1,186✔
620
                    thread_pool_init, thread_queue_init, numa_sensitive);
1,186✔
621
                break;
1,186✔
622

623
            case resource::scheduling_policy::local_priority_lifo:
624
                create_scheduler_local_priority_lifo(
11✔
625
                    thread_pool_init, thread_queue_init, numa_sensitive);
11✔
626
                break;
11✔
627

628
            case resource::scheduling_policy::static_:
629
                create_scheduler_static(
13✔
630
                    thread_pool_init, thread_queue_init, numa_sensitive);
13✔
631
                break;
13✔
632

633
            case resource::scheduling_policy::static_priority:
634
                create_scheduler_static_priority(
13✔
635
                    thread_pool_init, thread_queue_init, numa_sensitive);
13✔
636
                break;
13✔
637

638
            case resource::scheduling_policy::abp_priority_fifo:
639
                create_scheduler_abp_priority_fifo(
12✔
640
                    thread_pool_init, thread_queue_init, numa_sensitive);
12✔
641
                break;
12✔
642

643
            case resource::scheduling_policy::abp_priority_lifo:
644
                create_scheduler_abp_priority_lifo(
13✔
645
                    thread_pool_init, thread_queue_init, numa_sensitive);
13✔
646
                break;
13✔
647

648
            case resource::scheduling_policy::shared_priority:
649
                create_scheduler_shared_priority(
9✔
650
                    thread_pool_init, thread_queue_init, numa_sensitive);
9✔
651
                break;
9✔
652

653
            default:
654
                [[fallthrough]];
655
            case resource::scheduling_policy::unspecified:
656
                throw std::invalid_argument(
×
657
                    "cannot instantiate a thread-manager if the thread-pool" +
×
658
                    name + " has an unspecified scheduler type");
×
659
                break;
660
            }
661

662
            // update the thread_offset for the next pool
663
            thread_offset += num_threads_in_pool;
1,274✔
664
        }
1,274✔
665

666
        // fill the thread-lookup table
667
        for (auto& pool_iter : pools_)
2,496✔
668
        {
669
            std::size_t nt = rp.get_num_threads(pool_iter->get_pool_index());
1,274✔
670
            for (std::size_t i = 0; i < nt; i++)
5,652✔
671
            {
672
                threads_lookup_.push_back(pool_iter->get_pool_id());
4,378✔
673
            }
4,378✔
674
        }
675
    }
1,222✔
676

677
    threadmanager::~threadmanager() = default;
1,219✔
678

679
    void threadmanager::init()
1,222✔
680
    {
681
        auto& rp = hpx::resource::get_partitioner();
1,222✔
682
        std::size_t threads_offset = 0;
1,222✔
683

684
        // initialize all pools
685
        for (auto&& pool_iter : pools_)
2,496✔
686
        {
687
            std::size_t num_threads_in_pool =
1,274✔
688
                rp.get_num_threads(pool_iter->get_pool_index());
1,274✔
689
            pool_iter->init(num_threads_in_pool, threads_offset);
1,274✔
690
            threads_offset += num_threads_in_pool;
1,274✔
691
        }
692
    }
1,222✔
693

694
    void threadmanager::print_pools(std::ostream& os)
9✔
695
    {
696
        os << "The thread-manager owns " << pools_.size()    //  -V128
9✔
697
           << " pool(s) : \n";
9✔
698

699
        for (auto&& pool_iter : pools_)
48✔
700
        {
701
            pool_iter->print_pool(os);
39✔
702
        }
703
    }
9✔
704

705
    thread_pool_base& threadmanager::default_pool() const
1,243,852✔
706
    {
707
        HPX_ASSERT(!pools_.empty());
1,243,852✔
708
        return *pools_[0];
1,243,897✔
709
    }
710

711
    thread_pool_base& threadmanager::get_pool(
172✔
712
        std::string const& pool_name) const
713
    {
714
        // if the given pool_name is default, we don't need to look for it
715
        // we must always return pool 0
716
        if (pool_name == "default" ||
172✔
717
            pool_name == resource::get_partitioner().get_default_pool_name())
107✔
718
        {
719
            return default_pool();
81✔
720
        }
721

722
        // now check the other pools - no need to check pool 0 again, so ++begin
723
        auto pool = std::find_if(++pools_.begin(), pools_.end(),
182✔
724
            [&pool_name](pool_type const& itp) -> bool {
276✔
725
                return (itp->get_pool_name() == pool_name);
185✔
726
            });
727

728
        if (pool != pools_.end())
91✔
729
        {
730
            return **pool;
91✔
731
        }
732

733
        //! FIXME Add names of available pools?
734
        HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
×
735
            "threadmanager::get_pool",
736
            "the resource partitioner does not own a thread pool named '{}'.\n",
737
            pool_name);
738
    }
172✔
739

740
    thread_pool_base& threadmanager::get_pool(pool_id_type const& pool_id) const
×
741
    {
742
        return get_pool(pool_id.name());
×
743
    }
744

745
    thread_pool_base& threadmanager::get_pool(std::size_t thread_index) const
×
746
    {
747
        return get_pool(threads_lookup_[thread_index]);
×
748
    }
749

750
    bool threadmanager::pool_exists(std::string const& pool_name) const
3✔
751
    {
752
        // if the given pool_name is default, we don't need to look for it
753
        // we must always return pool 0
754
        if (pool_name == "default" ||
3✔
755
            pool_name == resource::get_partitioner().get_default_pool_name())
2✔
756
        {
757
            return true;
2✔
758
        }
759

760
        // now check the other pools - no need to check pool 0 again, so ++begin
761
        auto pool = std::find_if(++pools_.begin(), pools_.end(),
2✔
762
            [&pool_name](pool_type const& itp) -> bool {
4✔
763
                return (itp->get_pool_name() == pool_name);
3✔
764
            });
765

766
        if (pool != pools_.end())
1✔
767
        {
768
            return true;
×
769
        }
770

771
        return false;
1✔
772
    }
3✔
773

774
    bool threadmanager::pool_exists(std::size_t pool_index) const
5✔
775
    {
776
        return pool_index < pools_.size();
5✔
777
    }
778

779
    ///////////////////////////////////////////////////////////////////////////
780
    std::int64_t threadmanager::get_thread_count(thread_schedule_state state,
50,006✔
781
        thread_priority priority, std::size_t num_thread, bool reset)
782
    {
783
        std::int64_t total_count = 0;
50,006✔
784
        std::lock_guard<mutex_type> lk(mtx_);
50,006✔
785

786
        for (auto& pool_iter : pools_)
100,012✔
787
        {
788
            total_count +=
50,006✔
789
                pool_iter->get_thread_count(state, priority, num_thread, reset);
50,006✔
790
        }
791

792
        return total_count;
50,006✔
793
    }
50,006✔
794

795
    std::int64_t threadmanager::get_idle_core_count()
×
796
    {
797
        std::int64_t total_count = 0;
×
798
        std::lock_guard<mutex_type> lk(mtx_);
×
799

800
        for (auto& pool_iter : pools_)
×
801
        {
802
            total_count += pool_iter->get_idle_core_count();
×
803
        }
804

805
        return total_count;
×
806
    }
×
807

808
    mask_type threadmanager::get_idle_core_mask()
×
809
    {
810
        mask_type mask = mask_type();
×
811
        resize(mask, hardware_concurrency());
×
812

813
        std::lock_guard<mutex_type> lk(mtx_);
×
814

815
        for (auto& pool_iter : pools_)
×
816
        {
817
            pool_iter->get_idle_core_mask(mask);
×
818
        }
819

820
        return mask;
×
821
    }
×
822

823
    std::int64_t threadmanager::get_background_thread_count() const
×
824
    {
825
        std::int64_t total_count = 0;
×
826
        std::lock_guard<mutex_type> lk(mtx_);
×
827

828
        for (auto& pool_iter : pools_)
×
829
        {
830
            total_count += pool_iter->get_background_thread_count();
×
831
        }
832

833
        return total_count;
×
834
    }
×
835

836
    ///////////////////////////////////////////////////////////////////////////
837
    // Enumerate all matching threads
838
    bool threadmanager::enumerate_threads(
1✔
839
        hpx::function<bool(thread_id_type)> const& f,
840
        thread_schedule_state state) const
841
    {
842
        std::lock_guard<mutex_type> lk(mtx_);
1✔
843
        bool result = true;
1✔
844

845
        for (auto& pool_iter : pools_)
2✔
846
        {
847
            result = result && pool_iter->enumerate_threads(f, state);
2✔
848
        }
849

850
        return result;
1✔
851
    }
1✔
852

853
    ///////////////////////////////////////////////////////////////////////////
854
    // Abort all threads which are in suspended state. This will set
855
    // the state of all suspended threads to \a pending while
856
    // supplying the wait_abort extended state flag
857
    void threadmanager::abort_all_suspended_threads()
×
858
    {
859
        std::lock_guard<mutex_type> lk(mtx_);
×
860
        for (auto& pool_iter : pools_)
×
861
        {
862
            pool_iter->abort_all_suspended_threads();
×
863
        }
864
    }
×
865

866
    ///////////////////////////////////////////////////////////////////////////
867
    // Clean up terminated threads. This deletes all threads which
868
    // have been terminated but which are still held in the queue
869
    // of terminated threads. Some schedulers might not do anything
870
    // here.
871
    bool threadmanager::cleanup_terminated(bool delete_all)
8,372✔
872
    {
873
        std::lock_guard<mutex_type> lk(mtx_);
8,372✔
874
        bool result = true;
8,372✔
875

876
        for (auto& pool_iter : pools_)
16,744✔
877
        {
878
            result = pool_iter->cleanup_terminated(delete_all) && result;
8,372✔
879
        }
880

881
        return result;
8,372✔
882
    }
8,372✔
883

884
    ///////////////////////////////////////////////////////////////////////////
885
    void threadmanager::register_thread(
1,222✔
886
        thread_init_data& data, thread_id_ref_type& id, error_code& ec)
887
    {
888
        thread_pool_base* pool = nullptr;
1,222✔
889
        auto thrd_data = get_self_id_data();
1,222✔
890
        if (thrd_data)
1,222✔
891
        {
892
            pool = thrd_data->get_scheduler_base()->get_parent_pool();
×
893
        }
×
894
        else
895
        {
896
            pool = &default_pool();
1,222✔
897
        }
898
        pool->create_thread(data, id, ec);
1,222✔
899
    }
1,222✔
900

901
    ///////////////////////////////////////////////////////////////////////////
902
    thread_id_ref_type threadmanager::register_work(
×
903
        thread_init_data& data, error_code& ec)
904
    {
905
        thread_pool_base* pool = nullptr;
×
906
        auto thrd_data = get_self_id_data();
×
907
        if (thrd_data)
×
908
        {
909
            pool = thrd_data->get_scheduler_base()->get_parent_pool();
×
910
        }
×
911
        else
912
        {
913
            pool = &default_pool();
×
914
        }
915
        return pool->create_work(data, ec);
×
916
    }
917

918
    ///////////////////////////////////////////////////////////////////////////
919
    constexpr std::size_t all_threads = std::size_t(-1);
920

921
    std::int64_t threadmanager::get_queue_length(bool reset)
1✔
922
    {
923
        std::int64_t result = 0;
1✔
924
        for (auto const& pool_iter : pools_)
2✔
925
            result += pool_iter->get_queue_length(all_threads, reset);
1✔
926
        return result;
1✔
927
    }
928

929
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
930
    std::int64_t threadmanager::get_average_thread_wait_time(bool reset)
931
    {
932
        std::int64_t result = 0;
933
        for (auto const& pool_iter : pools_)
934
            result +=
935
                pool_iter->get_average_thread_wait_time(all_threads, reset);
936
        return result;
937
    }
938

939
    std::int64_t threadmanager::get_average_task_wait_time(bool reset)
940
    {
941
        std::int64_t result = 0;
942
        for (auto const& pool_iter : pools_)
943
            result += pool_iter->get_average_task_wait_time(all_threads, reset);
944
        return result;
945
    }
946
#endif
947

948
    std::int64_t threadmanager::get_cumulative_duration(bool reset)
1✔
949
    {
950
        std::int64_t result = 0;
1✔
951
        for (auto const& pool_iter : pools_)
2✔
952
            result += pool_iter->get_cumulative_duration(all_threads, reset);
1✔
953
        return result;
1✔
954
    }
955

956
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
957
    defined(HPX_HAVE_THREAD_IDLE_RATES)
958
    std::int64_t threadmanager::get_background_work_duration(bool reset)
959
    {
960
        std::int64_t result = 0;
961
        for (auto const& pool_iter : pools_)
962
            result +=
963
                pool_iter->get_background_work_duration(all_threads, reset);
964
        return result;
965
    }
966

967
    std::int64_t threadmanager::get_background_overhead(bool reset)
968
    {
969
        std::int64_t result = 0;
970
        for (auto const& pool_iter : pools_)
971
            result += pool_iter->get_background_overhead(all_threads, reset);
972
        return result;
973
    }
974

975
    std::int64_t threadmanager::get_background_send_duration(bool reset)
976
    {
977
        std::int64_t result = 0;
978
        for (auto const& pool_iter : pools_)
979
            result +=
980
                pool_iter->get_background_send_duration(all_threads, reset);
981
        return result;
982
    }
983

984
    std::int64_t threadmanager::get_background_send_overhead(bool reset)
985
    {
986
        std::int64_t result = 0;
987
        for (auto const& pool_iter : pools_)
988
            result +=
989
                pool_iter->get_background_send_overhead(all_threads, reset);
990
        return result;
991
    }
992

993
    std::int64_t threadmanager::get_background_receive_duration(bool reset)
994
    {
995
        std::int64_t result = 0;
996
        for (auto const& pool_iter : pools_)
997
            result +=
998
                pool_iter->get_background_receive_duration(all_threads, reset);
999
        return result;
1000
    }
1001

1002
    std::int64_t threadmanager::get_background_receive_overhead(bool reset)
1003
    {
1004
        std::int64_t result = 0;
1005
        for (auto const& pool_iter : pools_)
1006
            result +=
1007
                pool_iter->get_background_receive_overhead(all_threads, reset);
1008
        return result;
1009
    }
1010
#endif    // HPX_HAVE_BACKGROUND_THREAD_COUNTERS
1011

1012
#ifdef HPX_HAVE_THREAD_IDLE_RATES
1013
    std::int64_t threadmanager::avg_idle_rate(bool reset) noexcept
1014
    {
1015
        std::int64_t result = 0;
1016
        for (auto const& pool_iter : pools_)
1017
            result += pool_iter->avg_idle_rate(all_threads, reset);
1018
        return result;
1019
    }
1020

1021
#ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES
1022
    std::int64_t threadmanager::avg_creation_idle_rate(bool reset) noexcept
1023
    {
1024
        std::int64_t result = 0;
1025
        for (auto const& pool_iter : pools_)
1026
            result += pool_iter->avg_creation_idle_rate(all_threads, reset);
1027
        return result;
1028
    }
1029

1030
    std::int64_t threadmanager::avg_cleanup_idle_rate(bool reset) noexcept
1031
    {
1032
        std::int64_t result = 0;
1033
        for (auto const& pool_iter : pools_)
1034
            result += pool_iter->avg_cleanup_idle_rate(all_threads, reset);
1035
        return result;
1036
    }
1037
#endif
1038
#endif
1039

1040
#ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS
1041
    std::int64_t threadmanager::get_executed_threads(bool reset) noexcept
1✔
1042
    {
1043
        std::int64_t result = 0;
1✔
1044
        for (auto const& pool_iter : pools_)
2✔
1045
            result += pool_iter->get_executed_threads(all_threads, reset);
1✔
1046
        return result;
1✔
1047
    }
1048

1049
    std::int64_t threadmanager::get_executed_thread_phases(bool reset) noexcept
1✔
1050
    {
1051
        std::int64_t result = 0;
1✔
1052
        for (auto const& pool_iter : pools_)
2✔
1053
            result += pool_iter->get_executed_thread_phases(all_threads, reset);
1✔
1054
        return result;
1✔
1055
    }
1056

1057
#ifdef HPX_HAVE_THREAD_IDLE_RATES
1058
    std::int64_t threadmanager::get_thread_duration(bool reset)
1059
    {
1060
        std::int64_t result = 0;
1061
        for (auto const& pool_iter : pools_)
1062
            result += pool_iter->get_thread_duration(all_threads, reset);
1063
        return result;
1064
    }
1065

1066
    std::int64_t threadmanager::get_thread_phase_duration(bool reset)
1067
    {
1068
        std::int64_t result = 0;
1069
        for (auto const& pool_iter : pools_)
1070
            result += pool_iter->get_thread_phase_duration(all_threads, reset);
1071
        return result;
1072
    }
1073

1074
    std::int64_t threadmanager::get_thread_overhead(bool reset)
1075
    {
1076
        std::int64_t result = 0;
1077
        for (auto const& pool_iter : pools_)
1078
            result += pool_iter->get_thread_overhead(all_threads, reset);
1079
        return result;
1080
    }
1081

1082
    std::int64_t threadmanager::get_thread_phase_overhead(bool reset)
1083
    {
1084
        std::int64_t result = 0;
1085
        for (auto const& pool_iter : pools_)
1086
            result += pool_iter->get_thread_phase_overhead(all_threads, reset);
1087
        return result;
1088
    }
1089

1090
    std::int64_t threadmanager::get_cumulative_thread_duration(bool reset)
1091
    {
1092
        std::int64_t result = 0;
1093
        for (auto const& pool_iter : pools_)
1094
            result +=
1095
                pool_iter->get_cumulative_thread_duration(all_threads, reset);
1096
        return result;
1097
    }
1098

1099
    std::int64_t threadmanager::get_cumulative_thread_overhead(bool reset)
1100
    {
1101
        std::int64_t result = 0;
1102
        for (auto const& pool_iter : pools_)
1103
            result +=
1104
                pool_iter->get_cumulative_thread_overhead(all_threads, reset);
1105
        return result;
1106
    }
1107
#endif
1108
#endif
1109

1110
#ifdef HPX_HAVE_THREAD_STEALING_COUNTS
1111
    std::int64_t threadmanager::get_num_pending_misses(bool reset)
1112
    {
1113
        std::int64_t result = 0;
1114
        for (auto const& pool_iter : pools_)
1115
            result += pool_iter->get_num_pending_misses(all_threads, reset);
1116
        return result;
1117
    }
1118

1119
    std::int64_t threadmanager::get_num_pending_accesses(bool reset)
1120
    {
1121
        std::int64_t result = 0;
1122
        for (auto const& pool_iter : pools_)
1123
            result += pool_iter->get_num_pending_accesses(all_threads, reset);
1124
        return result;
1125
    }
1126

1127
    std::int64_t threadmanager::get_num_stolen_from_pending(bool reset)
1128
    {
1129
        std::int64_t result = 0;
1130
        for (auto const& pool_iter : pools_)
1131
            result +=
1132
                pool_iter->get_num_stolen_from_pending(all_threads, reset);
1133
        return result;
1134
    }
1135

1136
    std::int64_t threadmanager::get_num_stolen_from_staged(bool reset)
1137
    {
1138
        std::int64_t result = 0;
1139
        for (auto const& pool_iter : pools_)
1140
            result += pool_iter->get_num_stolen_from_staged(all_threads, reset);
1141
        return result;
1142
    }
1143

1144
    std::int64_t threadmanager::get_num_stolen_to_pending(bool reset)
1145
    {
1146
        std::int64_t result = 0;
1147
        for (auto const& pool_iter : pools_)
1148
            result += pool_iter->get_num_stolen_to_pending(all_threads, reset);
1149
        return result;
1150
    }
1151

1152
    std::int64_t threadmanager::get_num_stolen_to_staged(bool reset)
1153
    {
1154
        std::int64_t result = 0;
1155
        for (auto const& pool_iter : pools_)
1156
            result += pool_iter->get_num_stolen_to_staged(all_threads, reset);
1157
        return result;
1158
    }
1159
#endif
1160

1161
    ///////////////////////////////////////////////////////////////////////////
1162
    bool threadmanager::run()
1,222✔
1163
    {
1164
        std::unique_lock<mutex_type> lk(mtx_);
1,222✔
1165

1166
        // the main thread needs to have a unique thread_num
1167
        // worker threads are numbered 0..N-1, so we can use N for this thread
1168
        auto& rp = hpx::resource::get_partitioner();
1,222✔
1169
        init_tss(rp.get_num_threads());
1,222✔
1170

1171
#ifdef HPX_HAVE_TIMER_POOL
1172
        LTM_(info).format("run: running timer pool");
1,222✔
1173
        timer_pool_.run(false);
1,222✔
1174
#endif
1175

1176
        for (auto& pool_iter : pools_)
2,496✔
1177
        {
1178
            std::size_t num_threads_in_pool =
1,274✔
1179
                rp.get_num_threads(pool_iter->get_pool_name());
1,274✔
1180

1181
            if (pool_iter->get_os_thread_count() != 0 ||
2,548✔
1182
                pool_iter->has_reached_state(hpx::state::running))
1,274✔
1183
            {
1184
                return true;    // do nothing if already running
×
1185
            }
1186

1187
            if (!pool_iter->run(lk, num_threads_in_pool))
1,274✔
1188
            {
1189
#ifdef HPX_HAVE_TIMER_POOL
1190
                timer_pool_.stop();
×
1191
#endif
1192
                return false;
×
1193
            }
1194

1195
            // set all states of all schedulers to "running"
1196
            policies::scheduler_base* sched = pool_iter->get_scheduler();
1,274✔
1197
            if (sched)
1,274✔
1198
                sched->set_all_states(hpx::state::running);
1,274✔
1199
        }
1200

1201
        LTM_(info).format("run: running");
1,222✔
1202
        return true;
1,222✔
1203
    }
1,222✔
1204

1205
    void threadmanager::stop(bool blocking)
3,659✔
1206
    {
1207
        LTM_(info).format("stop: blocking({})", blocking ? "true" : "false");
3,659✔
1208

1209
        std::unique_lock<mutex_type> lk(mtx_);
3,659✔
1210
        for (auto& pool_iter : pools_)
7,474✔
1211
        {
1212
            pool_iter->stop(lk, blocking);
3,815✔
1213
        }
1214
        deinit_tss();
3,659✔
1215
    }
3,659✔
1216

1217
    bool threadmanager::is_busy()
1,034,095✔
1218
    {
1219
        bool busy = false;
1,034,095✔
1220
        for (auto& pool_iter : pools_)
2,069,282✔
1221
        {
1222
            busy = busy || pool_iter->is_busy();
1,035,187✔
1223
        }
1224
        return busy;
1,034,095✔
1225
    }
1226

1227
    bool threadmanager::is_idle()
×
1228
    {
1229
        bool idle = true;
×
1230
        for (auto& pool_iter : pools_)
×
1231
        {
1232
            idle = idle && pool_iter->is_idle();
×
1233
        }
1234
        return idle;
×
1235
    }
1236

1237
    void threadmanager::wait()
2,493✔
1238
    {
1239
        std::size_t shutdown_check_count = util::get_entry_as<std::size_t>(
2,493✔
1240
            rtcfg_, "hpx.shutdown_check_count", 10);
2,493✔
1241
        hpx::util::detail::yield_while_count(
2,493✔
1242
            [this]() { return is_busy(); }, shutdown_check_count);
1,029,966✔
1243
    }
2,493✔
1244

1245
    void threadmanager::suspend()
118✔
1246
    {
1247
        wait();
118✔
1248

1249
        if (threads::get_self_ptr())
118✔
1250
        {
1251
            std::vector<hpx::future<void>> fs;
×
1252

1253
            for (auto& pool_iter : pools_)
×
1254
            {
1255
                fs.push_back(suspend_pool(*pool_iter));
×
1256
            }
1257

1258
            hpx::wait_all(fs);
×
1259
        }
×
1260
        else
1261
        {
1262
            for (auto& pool_iter : pools_)
236✔
1263
            {
1264
                pool_iter->suspend_direct();
118✔
1265
            }
1266
        }
1267
    }
118✔
1268

1269
    void threadmanager::resume()
571✔
1270
    {
1271
        if (threads::get_self_ptr())
571✔
1272
        {
1273
            std::vector<hpx::future<void>> fs;
453✔
1274

1275
            for (auto& pool_iter : pools_)
906✔
1276
            {
1277
                fs.push_back(resume_pool(*pool_iter));
453✔
1278
            }
1279
            hpx::wait_all(fs);
453✔
1280
        }
453✔
1281
        else
1282
        {
1283
            for (auto& pool_iter : pools_)
236✔
1284
            {
1285
                pool_iter->resume_direct();
118✔
1286
            }
1287
        }
1288
    }
571✔
1289
}}    // namespace hpx::threads
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc