• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #864

12 Jan 2023 03:29PM UTC coverage: 86.354% (-0.2%) from 86.533%
#864

push

web-flow
Merge pull request #6133 from STEllAR-GROUP/background_scheduler

Adding abridged static scheduler that supports running background threads only

263 of 263 new or added lines in 4 files covered. (100.0%)

174319 of 201865 relevant lines covered (86.35%)

1939452.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.51
/libs/core/threadmanager/src/threadmanager.cpp
1
//  Copyright (c) 2007-2023 Hartmut Kaiser
2
//  Copyright (c)      2011 Bryce Lelbach, Katelyn Kufahl
3
//  Copyright (c) 2008-2009 Chirag Dekate, Anshul Tandon
4
//  Copyright (c) 2015 Patricia Grubel
5
//  Copyright (c) 2017 Shoshana Jakobovits
6
//
7
//  SPDX-License-Identifier: BSL-1.0
8
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
9
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
10

11
#include <hpx/config.hpp>
12
#include <hpx/assert.hpp>
13
#include <hpx/async_combinators/wait_all.hpp>
14
#include <hpx/execution_base/this_thread.hpp>
15
#include <hpx/futures/future.hpp>
16
#include <hpx/hardware/timestamp.hpp>
17
#include <hpx/modules/errors.hpp>
18
#include <hpx/modules/logging.hpp>
19
#include <hpx/modules/schedulers.hpp>
20
#include <hpx/modules/threadmanager.hpp>
21
#include <hpx/resource_partitioner/detail/partitioner.hpp>
22
#include <hpx/runtime_configuration/runtime_configuration.hpp>
23
#include <hpx/thread_pool_util/thread_pool_suspension_helpers.hpp>
24
#include <hpx/thread_pools/scheduled_thread_pool.hpp>
25
#include <hpx/threading_base/set_thread_state.hpp>
26
#include <hpx/threading_base/thread_data.hpp>
27
#include <hpx/threading_base/thread_helpers.hpp>
28
#include <hpx/threading_base/thread_init_data.hpp>
29
#include <hpx/threading_base/thread_queue_init_parameters.hpp>
30
#include <hpx/topology/topology.hpp>
31
#include <hpx/type_support/unused.hpp>
32
#include <hpx/util/get_entry_as.hpp>
33

34
#include <cstddef>
35
#include <cstdint>
36
#include <functional>
37
#include <iosfwd>
38
#include <memory>
39
#include <mutex>
40
#include <numeric>
41
#include <string>
42
#include <utility>
43
#include <vector>
44

45
namespace hpx { namespace threads {
46
    namespace detail {
47
        void check_num_high_priority_queues(
1,228✔
48
            std::size_t num_threads, std::size_t num_high_priority_queues)
49
        {
50
            if (num_high_priority_queues > num_threads)
1,228✔
51
            {
52
                throw hpx::detail::command_line_error(
×
53
                    "Invalid command line option: "
54
                    "number of high priority threads ("
55
                    "--hpx:high-priority-threads), should not be larger "
56
                    "than number of threads (--hpx:threads)");
57
            }
58
        }
1,228✔
59
    }    // namespace detail
60

61
    ///////////////////////////////////////////////////////////////////////////
62
    threadmanager::threadmanager(hpx::util::runtime_configuration& rtcfg,
1,220✔
63
#ifdef HPX_HAVE_TIMER_POOL
64
        util::io_service_pool& timer_pool,
65
#endif
66
        notification_policy_type& notifier,
67
        detail::network_background_callback_type network_background_callback)
68
      : rtcfg_(rtcfg)
1,220✔
69
#ifdef HPX_HAVE_TIMER_POOL
70
      , timer_pool_(timer_pool)
1,220✔
71
#endif
72
      , notifier_(notifier)
1,220✔
73
      , network_background_callback_(network_background_callback)
1,220✔
74
    {
75
        using placeholders::_1;
76
        using placeholders::_3;
77

78
        // Add callbacks local to threadmanager.
79
        notifier.add_on_start_thread_callback(
2,440✔
80
            hpx::bind(&threadmanager::init_tss, this, _1));
1,220✔
81
        notifier.add_on_stop_thread_callback(
2,440✔
82
            hpx::bind(&threadmanager::deinit_tss, this));
1,220✔
83

84
        auto& rp = hpx::resource::get_partitioner();
1,220✔
85
        notifier.add_on_start_thread_callback(hpx::bind(
2,440✔
86
            &resource::detail::partitioner::assign_pu, std::ref(rp), _3, _1));
1,220✔
87
        notifier.add_on_stop_thread_callback(hpx::bind(
2,440✔
88
            &resource::detail::partitioner::unassign_pu, std::ref(rp), _3, _1));
1,220✔
89
    }
1,220✔
90

91
    policies::thread_queue_init_parameters threadmanager::get_init_parameters()
1,220✔
92
        const
93
    {
94
        std::int64_t const max_thread_count =
1,220✔
95
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
1,220✔
96
                "hpx.thread_queue.max_thread_count",
1,220✔
97
                HPX_THREAD_QUEUE_MAX_THREAD_COUNT);
1,220✔
98
        std::int64_t const min_tasks_to_steal_pending =
1,220✔
99
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
1,220✔
100
                "hpx.thread_queue.min_tasks_to_steal_pending",
1,220✔
101
                HPX_THREAD_QUEUE_MIN_TASKS_TO_STEAL_PENDING);
1,220✔
102
        std::int64_t const min_tasks_to_steal_staged =
1,220✔
103
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
1,220✔
104
                "hpx.thread_queue.min_tasks_to_steal_staged",
1,220✔
105
                HPX_THREAD_QUEUE_MIN_TASKS_TO_STEAL_STAGED);
1,220✔
106
        std::int64_t const min_add_new_count =
1,220✔
107
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
1,220✔
108
                "hpx.thread_queue.min_add_new_count",
1,220✔
109
                HPX_THREAD_QUEUE_MIN_ADD_NEW_COUNT);
1,220✔
110
        std::int64_t const max_add_new_count =
1,220✔
111
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
1,220✔
112
                "hpx.thread_queue.max_add_new_count",
1,220✔
113
                HPX_THREAD_QUEUE_MAX_ADD_NEW_COUNT);
1,220✔
114
        std::int64_t const min_delete_count =
1,220✔
115
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
1,220✔
116
                "hpx.thread_queue.min_delete_count",
1,220✔
117
                HPX_THREAD_QUEUE_MIN_DELETE_COUNT);
1,220✔
118
        std::int64_t const max_delete_count =
1,220✔
119
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
1,220✔
120
                "hpx.thread_queue.max_delete_count",
1,220✔
121
                HPX_THREAD_QUEUE_MAX_DELETE_COUNT);
1,220✔
122
        std::int64_t const max_terminated_threads =
1,220✔
123
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
1,220✔
124
                "hpx.thread_queue.max_terminated_threads",
1,220✔
125
                HPX_THREAD_QUEUE_MAX_TERMINATED_THREADS);
1,220✔
126
        std::int64_t const init_threads_count =
1,220✔
127
            hpx::util::get_entry_as<std::int64_t>(rtcfg_,
1,220✔
128
                "hpx.thread_queue.init_threads_count",
1,220✔
129
                HPX_THREAD_QUEUE_INIT_THREADS_COUNT);
1,220✔
130
        double const max_idle_backoff_time = hpx::util::get_entry_as<double>(
1,220✔
131
            rtcfg_, "hpx.max_idle_backoff_time", HPX_IDLE_BACKOFF_TIME_MAX);
1,220✔
132

133
        std::ptrdiff_t small_stacksize =
1,220✔
134
            rtcfg_.get_stack_size(thread_stacksize::small_);
1,220✔
135
        std::ptrdiff_t medium_stacksize =
1,220✔
136
            rtcfg_.get_stack_size(thread_stacksize::medium);
1,220✔
137
        std::ptrdiff_t large_stacksize =
1,220✔
138
            rtcfg_.get_stack_size(thread_stacksize::large);
1,220✔
139
        std::ptrdiff_t huge_stacksize =
1,220✔
140
            rtcfg_.get_stack_size(thread_stacksize::huge);
1,220✔
141

142
        return policies::thread_queue_init_parameters(max_thread_count,
2,440✔
143
            min_tasks_to_steal_pending, min_tasks_to_steal_staged,
1,220✔
144
            min_add_new_count, max_add_new_count, min_delete_count,
1,220✔
145
            max_delete_count, max_terminated_threads, init_threads_count,
1,220✔
146
            max_idle_backoff_time, small_stacksize, medium_stacksize,
1,220✔
147
            large_stacksize, huge_stacksize);
1,220✔
148
    }
×
149

150
    void threadmanager::create_scheduler_user_defined(
6✔
151
        hpx::resource::scheduler_function const& pool_func,
152
        thread_pool_init_parameters const& thread_pool_init,
153
        policies::thread_queue_init_parameters const& thread_queue_init)
154
    {
155
        std::unique_ptr<thread_pool_base> pool(
156
            pool_func(thread_pool_init, thread_queue_init));
6✔
157
        pools_.push_back(HPX_MOVE(pool));
6✔
158
    }
6✔
159

160
    void threadmanager::create_scheduler_local(
11✔
161
        thread_pool_init_parameters const& thread_pool_init,
162
        policies::thread_queue_init_parameters const& thread_queue_init,
163
        std::size_t numa_sensitive)
164
    {
165
        // instantiate the scheduler
166
        using local_sched_type =
167
            hpx::threads::policies::local_queue_scheduler<>;
168

169
        local_sched_type::init_parameter_type init(
11✔
170
            thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
11✔
171
            thread_queue_init, "core-local_queue_scheduler");
11✔
172

173
        std::unique_ptr<local_sched_type> sched =
174
            std::make_unique<local_sched_type>(init);
11✔
175

176
        // set the default scheduler flags
177
        sched->set_scheduler_mode(thread_pool_init.mode_);
11✔
178

179
        // conditionally set/unset this flag
180
        sched->update_scheduler_mode(
22✔
181
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive);
11✔
182

183
        // instantiate the pool
184
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
11✔
185
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
186
            HPX_MOVE(sched), thread_pool_init);
11✔
187
        pools_.push_back(HPX_MOVE(pool));
11✔
188
    }
11✔
189

190
    void threadmanager::create_scheduler_local_priority_fifo(
1,183✔
191
        thread_pool_init_parameters const& thread_pool_init,
192
        policies::thread_queue_init_parameters const& thread_queue_init,
193
        std::size_t numa_sensitive)
194
    {
195
        // set parameters for scheduler and pool instantiation and perform
196
        // compatibility checks
197
        std::size_t num_high_priority_queues =
1,183✔
198
            hpx::util::get_entry_as<std::size_t>(rtcfg_,
1,183✔
199
                "hpx.thread_queue.high_priority_queues",
1,183✔
200
                thread_pool_init.num_threads_);
1,183✔
201

202
        detail::check_num_high_priority_queues(
1,183✔
203
            thread_pool_init.num_threads_, num_high_priority_queues);
1,183✔
204

205
        // instantiate the scheduler
206
        using local_sched_type =
207
            hpx::threads::policies::local_priority_queue_scheduler<std::mutex,
208
                hpx::threads::policies::lockfree_fifo>;
209

210
        local_sched_type::init_parameter_type init(
1,183✔
211
            thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
1,183✔
212
            num_high_priority_queues, thread_queue_init,
1,183✔
213
            "core-local_priority_queue_scheduler");
214

215
        std::unique_ptr<local_sched_type> sched =
216
            std::make_unique<local_sched_type>(init);
1,183✔
217

218
        // set the default scheduler flags
219
        sched->set_scheduler_mode(thread_pool_init.mode_);
1,183✔
220

221
        // conditionally set/unset this flag
222
        sched->update_scheduler_mode(
2,366✔
223
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive);
1,183✔
224

225
        // instantiate the pool
226
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
1,183✔
227
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
228
            HPX_MOVE(sched), thread_pool_init);
1,183✔
229
        pools_.push_back(HPX_MOVE(pool));
1,183✔
230
    }
1,183✔
231

232
    void threadmanager::create_scheduler_local_priority_lifo(
12✔
233
        thread_pool_init_parameters const& thread_pool_init,
234
        policies::thread_queue_init_parameters const& thread_queue_init,
235
        std::size_t numa_sensitive)
236
    {
237
#if defined(HPX_HAVE_CXX11_STD_ATOMIC_128BIT)
238
        // set parameters for scheduler and pool instantiation and perform
239
        // compatibility checks
240
        std::size_t num_high_priority_queues =
12✔
241
            hpx::util::get_entry_as<std::size_t>(rtcfg_,
12✔
242
                "hpx.thread_queue.high_priority_queues",
12✔
243
                thread_pool_init.num_threads_);
12✔
244
        detail::check_num_high_priority_queues(
12✔
245
            thread_pool_init.num_threads_, num_high_priority_queues);
12✔
246

247
        // instantiate the scheduler
248
        using local_sched_type =
249
            hpx::threads::policies::local_priority_queue_scheduler<std::mutex,
250
                hpx::threads::policies::lockfree_lifo>;
251

252
        local_sched_type::init_parameter_type init(
12✔
253
            thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
12✔
254
            num_high_priority_queues, thread_queue_init,
12✔
255
            "core-local_priority_queue_scheduler");
256

257
        std::unique_ptr<local_sched_type> sched =
258
            std::make_unique<local_sched_type>(init);
12✔
259

260
        // set the default scheduler flags
261
        sched->set_scheduler_mode(thread_pool_init.mode_);
12✔
262

263
        // conditionally set/unset this flag
264
        sched->update_scheduler_mode(
24✔
265
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive);
12✔
266

267
        // instantiate the pool
268
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
12✔
269
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
270
            HPX_MOVE(sched), thread_pool_init);
12✔
271
        pools_.push_back(HPX_MOVE(pool));
12✔
272
#else
273
        throw hpx::detail::command_line_error(
274
            "Command line option --hpx:queuing=local-priority-lifo "
275
            "is not configured in this build. Please make sure 128bit "
276
            "atomics are available.");
277
#endif
278
    }
12✔
279

280
    void threadmanager::create_scheduler_static(
15✔
281
        thread_pool_init_parameters const& thread_pool_init,
282
        policies::thread_queue_init_parameters const& thread_queue_init,
283
        std::size_t numa_sensitive)
284
    {
285
        // instantiate the scheduler
286
        std::unique_ptr<thread_pool_base> pool;
15✔
287
        hpx::threads::policies::local_queue_scheduler<>::init_parameter_type
288
            init(thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
30✔
289
                thread_queue_init, "core-static_queue_scheduler");
15✔
290

291
        if (thread_pool_init.mode_ &
15✔
292
            policies::scheduler_mode::do_background_work_only)
293
        {
294
            using local_sched_type =
295
                hpx::threads::policies::background_scheduler<>;
296

297
            std::unique_ptr<local_sched_type> sched =
298
                std::make_unique<local_sched_type>(init);
×
299

300
            // set the default scheduler flags
301
            sched->set_scheduler_mode(thread_pool_init.mode_);
×
302

303
            // instantiate the pool
304
            pool = std::make_unique<
×
305
                hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
306
                HPX_MOVE(sched), thread_pool_init);
×
307
        }
×
308
        else
309
        {
310
            using local_sched_type =
311
                hpx::threads::policies::static_queue_scheduler<>;
312

313
            std::unique_ptr<local_sched_type> sched =
314
                std::make_unique<local_sched_type>(init);
15✔
315

316
            // set the default scheduler flags
317
            sched->set_scheduler_mode(thread_pool_init.mode_);
15✔
318

319
            // conditionally set/unset this flag
320
            sched->update_scheduler_mode(
30✔
321
                policies::scheduler_mode::enable_stealing_numa,
322
                !numa_sensitive);
15✔
323

324
            // instantiate the pool
325
            pool = std::make_unique<
15✔
326
                hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
327
                HPX_MOVE(sched), thread_pool_init);
15✔
328
        }
15✔
329

330
        pools_.push_back(HPX_MOVE(pool));
15✔
331
    }
15✔
332

333
    void threadmanager::create_scheduler_static_priority(
11✔
334
        thread_pool_init_parameters const& thread_pool_init,
335
        policies::thread_queue_init_parameters const& thread_queue_init,
336
        std::size_t numa_sensitive)
337
    {
338
        // set parameters for scheduler and pool instantiation and perform
339
        // compatibility checks
340
        std::size_t num_high_priority_queues =
11✔
341
            hpx::util::get_entry_as<std::size_t>(rtcfg_,
11✔
342
                "hpx.thread_queue.high_priority_queues",
11✔
343
                thread_pool_init.num_threads_);
11✔
344
        detail::check_num_high_priority_queues(
11✔
345
            thread_pool_init.num_threads_, num_high_priority_queues);
11✔
346

347
        // instantiate the scheduler
348
        using local_sched_type =
349
            hpx::threads::policies::static_priority_queue_scheduler<>;
350

351
        local_sched_type::init_parameter_type init(
11✔
352
            thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
11✔
353
            num_high_priority_queues, thread_queue_init,
11✔
354
            "core-static_priority_queue_scheduler");
355

356
        std::unique_ptr<local_sched_type> sched =
357
            std::make_unique<local_sched_type>(init);
11✔
358

359
        // set the default scheduler flags
360
        sched->set_scheduler_mode(thread_pool_init.mode_);
11✔
361

362
        // conditionally set/unset this flag
363
        sched->update_scheduler_mode(
22✔
364
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive);
11✔
365

366
        // instantiate the pool
367
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
11✔
368
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
369
            HPX_MOVE(sched), thread_pool_init);
11✔
370
        pools_.push_back(HPX_MOVE(pool));
11✔
371
    }
11✔
372

373
    void threadmanager::create_scheduler_abp_priority_fifo(
12✔
374
        thread_pool_init_parameters const& thread_pool_init,
375
        policies::thread_queue_init_parameters const& thread_queue_init,
376
        std::size_t numa_sensitive)
377
    {
378
#if defined(HPX_HAVE_CXX11_STD_ATOMIC_128BIT)
379
        // set parameters for scheduler and pool instantiation and perform
380
        // compatibility checks
381
        std::size_t num_high_priority_queues =
12✔
382
            hpx::util::get_entry_as<std::size_t>(rtcfg_,
12✔
383
                "hpx.thread_queue.high_priority_queues",
12✔
384
                thread_pool_init.num_threads_);
12✔
385
        detail::check_num_high_priority_queues(
12✔
386
            thread_pool_init.num_threads_, num_high_priority_queues);
12✔
387

388
        // instantiate the scheduler
389
        using local_sched_type =
390
            hpx::threads::policies::local_priority_queue_scheduler<std::mutex,
391
                hpx::threads::policies::lockfree_fifo>;
392

393
        local_sched_type::init_parameter_type init(
12✔
394
            thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
12✔
395
            num_high_priority_queues, thread_queue_init,
12✔
396
            "core-abp_fifo_priority_queue_scheduler");
397

398
        std::unique_ptr<local_sched_type> sched =
399
            std::make_unique<local_sched_type>(init);
12✔
400

401
        // set the default scheduler flags
402
        sched->set_scheduler_mode(thread_pool_init.mode_);
12✔
403

404
        // conditionally set/unset this flag
405
        sched->update_scheduler_mode(
24✔
406
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive);
12✔
407

408
        // instantiate the pool
409
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
12✔
410
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
411
            HPX_MOVE(sched), thread_pool_init);
12✔
412
        pools_.push_back(HPX_MOVE(pool));
12✔
413
#else
414
        throw hpx::detail::command_line_error(
415
            "Command line option --hpx:queuing=abp-priority-fifo "
416
            "is not configured in this build. Please make sure 128bit "
417
            "atomics are available.");
418
#endif
419
    }
12✔
420

421
    void threadmanager::create_scheduler_abp_priority_lifo(
10✔
422
        thread_pool_init_parameters const& thread_pool_init,
423
        policies::thread_queue_init_parameters const& thread_queue_init,
424
        std::size_t numa_sensitive)
425
    {
426
#if defined(HPX_HAVE_CXX11_STD_ATOMIC_128BIT)
427
        // set parameters for scheduler and pool instantiation and perform
428
        // compatibility checks
429
        std::size_t num_high_priority_queues =
10✔
430
            hpx::util::get_entry_as<std::size_t>(rtcfg_,
10✔
431
                "hpx.thread_queue.high_priority_queues",
10✔
432
                thread_pool_init.num_threads_);
10✔
433
        detail::check_num_high_priority_queues(
10✔
434
            thread_pool_init.num_threads_, num_high_priority_queues);
10✔
435

436
        // instantiate the scheduler
437
        using local_sched_type =
438
            hpx::threads::policies::local_priority_queue_scheduler<std::mutex,
439
                hpx::threads::policies::lockfree_lifo>;
440

441
        local_sched_type::init_parameter_type init(
10✔
442
            thread_pool_init.num_threads_, thread_pool_init.affinity_data_,
10✔
443
            num_high_priority_queues, thread_queue_init,
10✔
444
            "core-abp_fifo_priority_queue_scheduler");
445

446
        std::unique_ptr<local_sched_type> sched =
447
            std::make_unique<local_sched_type>(init);
10✔
448

449
        // set the default scheduler flags
450
        sched->set_scheduler_mode(thread_pool_init.mode_);
10✔
451

452
        // conditionally set/unset this flag
453
        sched->update_scheduler_mode(
20✔
454
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive);
10✔
455

456
        // instantiate the pool
457
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
10✔
458
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
459
            HPX_MOVE(sched), thread_pool_init);
10✔
460
        pools_.push_back(HPX_MOVE(pool));
10✔
461
#else
462
        throw hpx::detail::command_line_error(
463
            "Command line option --hpx:queuing=abp-priority-lifo "
464
            "is not configured in this build. Please make sure 128bit "
465
            "atomics are available.");
466
#endif
467
    }
10✔
468

469
    void threadmanager::create_scheduler_shared_priority(
9✔
470
        thread_pool_init_parameters const& thread_pool_init,
471
        policies::thread_queue_init_parameters const& thread_queue_init,
472
        std::size_t numa_sensitive)
473
    {
474
        // instantiate the scheduler
475
        typedef hpx::threads::policies::shared_priority_queue_scheduler<>
476
            local_sched_type;
477
        local_sched_type::init_parameter_type init(
9✔
478
            thread_pool_init.num_threads_, {1, 1, 1},
9✔
479
            thread_pool_init.affinity_data_, thread_queue_init,
9✔
480
            "core-shared_priority_queue_scheduler");
481

482
        std::unique_ptr<local_sched_type> sched =
483
            std::make_unique<local_sched_type>(init);
9✔
484

485
        // set the default scheduler flags
486
        sched->set_scheduler_mode(thread_pool_init.mode_);
9✔
487

488
        // conditionally set/unset this flag
489
        sched->update_scheduler_mode(
18✔
490
            policies::scheduler_mode::enable_stealing_numa, !numa_sensitive);
9✔
491

492
        // instantiate the pool
493
        std::unique_ptr<thread_pool_base> pool = std::make_unique<
9✔
494
            hpx::threads::detail::scheduled_thread_pool<local_sched_type>>(
495
            HPX_MOVE(sched), thread_pool_init);
9✔
496
        pools_.push_back(HPX_MOVE(pool));
9✔
497
    }
9✔
498

499
    void threadmanager::create_pools()
1,220✔
500
    {
501
        auto& rp = hpx::resource::get_partitioner();
1,220✔
502
        size_t num_pools = rp.get_num_pools();
1,220✔
503
        std::size_t thread_offset = 0;
1,220✔
504
        std::size_t const max_idle_loop_count =
1,220✔
505
            hpx::util::get_entry_as<std::int64_t>(
1,220✔
506
                rtcfg_, "hpx.max_idle_loop_count", HPX_IDLE_LOOP_COUNT_MAX);
1,220✔
507
        std::size_t const max_busy_loop_count =
1,220✔
508
            hpx::util::get_entry_as<std::int64_t>(
1,220✔
509
                rtcfg_, "hpx.max_busy_loop_count", HPX_BUSY_LOOP_COUNT_MAX);
1,220✔
510

511
        std::size_t numa_sensitive = hpx::util::get_entry_as<std::size_t>(
1,220✔
512
            rtcfg_, "hpx.numa_sensitive", 0);
1,220✔
513

514
        policies::thread_queue_init_parameters thread_queue_init =
515
            get_init_parameters();
1,220✔
516

517
        std::size_t max_background_threads =
1,220✔
518
            hpx::util::get_entry_as<std::size_t>(rtcfg_,
1,220✔
519
                "hpx.max_background_threads",
1,220✔
520
                (std::numeric_limits<std::size_t>::max)());
1,220✔
521

522
        if (!rtcfg_.enable_networking())
1,220✔
523
        {
524
            max_background_threads = 0;
936✔
525
        }
936✔
526

527
        // instantiate the pools
528
        for (size_t i = 0; i != num_pools; i++)
2,489✔
529
        {
530
            std::string name = rp.get_pool_name(i);
1,269✔
531
            resource::scheduling_policy sched_type = rp.which_scheduler(name);
1,269✔
532
            std::size_t num_threads_in_pool = rp.get_num_threads(i);
1,269✔
533
            policies::scheduler_mode scheduler_mode = rp.get_scheduler_mode(i);
1,269✔
534

535
            // make sure the first thread-pool that gets instantiated is the
536
            // default one
537
            if (i == 0)
1,269✔
538
            {
539
                if (name != rp.get_default_pool_name())
1,220✔
540
                {
541
                    throw std::invalid_argument("Trying to instantiate pool " +
×
542
                        name +
×
543
                        " as first thread pool, but first thread pool must "
544
                        "be named " +
×
545
                        rp.get_default_pool_name());
×
546
                }
547
            }
1,220✔
548

549
            thread_pool_init_parameters thread_pool_init(name, i,
2,538✔
550
                scheduler_mode, num_threads_in_pool, thread_offset, notifier_,
1,269✔
551
                rp.get_affinity_data(), network_background_callback_,
1,269✔
552
                max_background_threads, max_idle_loop_count,
1,269✔
553
                max_busy_loop_count);
1,269✔
554

555
            switch (sched_type)
1,269✔
556
            {
557
            case resource::scheduling_policy::user_defined:
558
                create_scheduler_user_defined(rp.get_pool_creator(i),
6✔
559
                    thread_pool_init, thread_queue_init);
560
                break;
6✔
561

562
            case resource::scheduling_policy::local:
563
                create_scheduler_local(
11✔
564
                    thread_pool_init, thread_queue_init, numa_sensitive);
11✔
565
                break;
11✔
566

567
            case resource::scheduling_policy::local_priority_fifo:
568
                create_scheduler_local_priority_fifo(
1,183✔
569
                    thread_pool_init, thread_queue_init, numa_sensitive);
1,183✔
570
                break;
1,183✔
571

572
            case resource::scheduling_policy::local_priority_lifo:
573
                create_scheduler_local_priority_lifo(
12✔
574
                    thread_pool_init, thread_queue_init, numa_sensitive);
12✔
575
                break;
12✔
576

577
            case resource::scheduling_policy::static_:
578
                create_scheduler_static(
15✔
579
                    thread_pool_init, thread_queue_init, numa_sensitive);
15✔
580
                break;
15✔
581

582
            case resource::scheduling_policy::static_priority:
583
                create_scheduler_static_priority(
11✔
584
                    thread_pool_init, thread_queue_init, numa_sensitive);
11✔
585
                break;
11✔
586

587
            case resource::scheduling_policy::abp_priority_fifo:
588
                create_scheduler_abp_priority_fifo(
12✔
589
                    thread_pool_init, thread_queue_init, numa_sensitive);
12✔
590
                break;
12✔
591

592
            case resource::scheduling_policy::abp_priority_lifo:
593
                create_scheduler_abp_priority_lifo(
10✔
594
                    thread_pool_init, thread_queue_init, numa_sensitive);
10✔
595
                break;
10✔
596

597
            case resource::scheduling_policy::shared_priority:
598
                create_scheduler_shared_priority(
9✔
599
                    thread_pool_init, thread_queue_init, numa_sensitive);
9✔
600
                break;
9✔
601

602
            default:
603
                [[fallthrough]];
604
            case resource::scheduling_policy::unspecified:
605
                throw std::invalid_argument(
×
606
                    "cannot instantiate a thread-manager if the thread-pool" +
×
607
                    name + " has an unspecified scheduler type");
×
608
                break;
609
            }
610

611
            // update the thread_offset for the next pool
612
            thread_offset += num_threads_in_pool;
1,269✔
613
        }
1,269✔
614

615
        // fill the thread-lookup table
616
        for (auto& pool_iter : pools_)
2,489✔
617
        {
618
            std::size_t nt = rp.get_num_threads(pool_iter->get_pool_index());
1,269✔
619
            for (std::size_t i = 0; i < nt; i++)
5,644✔
620
            {
621
                threads_lookup_.push_back(pool_iter->get_pool_id());
4,375✔
622
            }
4,375✔
623
        }
624
    }
1,220✔
625

626
    threadmanager::~threadmanager() = default;
1,217✔
627

628
    void threadmanager::init()
1,220✔
629
    {
630
        auto& rp = hpx::resource::get_partitioner();
1,220✔
631
        std::size_t threads_offset = 0;
1,220✔
632

633
        // initialize all pools
634
        for (auto&& pool_iter : pools_)
2,489✔
635
        {
636
            std::size_t num_threads_in_pool =
1,269✔
637
                rp.get_num_threads(pool_iter->get_pool_index());
1,269✔
638
            pool_iter->init(num_threads_in_pool, threads_offset);
1,269✔
639
            threads_offset += num_threads_in_pool;
1,269✔
640
        }
641
    }
1,220✔
642

643
    void threadmanager::print_pools(std::ostream& os)
9✔
644
    {
645
        os << "The thread-manager owns " << pools_.size()    //  -V128
9✔
646
           << " pool(s) : \n";
9✔
647

648
        for (auto&& pool_iter : pools_)
46✔
649
        {
650
            pool_iter->print_pool(os);
37✔
651
        }
652
    }
9✔
653

654
    thread_pool_base& threadmanager::default_pool() const
1,242,979✔
655
    {
656
        HPX_ASSERT(!pools_.empty());
1,242,979✔
657
        return *pools_[0];
1,242,997✔
658
    }
659

660
    thread_pool_base& threadmanager::get_pool(
168✔
661
        std::string const& pool_name) const
662
    {
663
        // if the given pool_name is default, we don't need to look for it
664
        // we must always return pool 0
665
        if (pool_name == "default" ||
168✔
666
            pool_name == resource::get_partitioner().get_default_pool_name())
103✔
667
        {
668
            return default_pool();
81✔
669
        }
670

671
        // now check the other pools - no need to check pool 0 again, so ++begin
672
        auto pool = std::find_if(++pools_.begin(), pools_.end(),
174✔
673
            [&pool_name](pool_type const& itp) -> bool {
266✔
674
                return (itp->get_pool_name() == pool_name);
179✔
675
            });
676

677
        if (pool != pools_.end())
87✔
678
        {
679
            return **pool;
87✔
680
        }
681

682
        //! FIXME Add names of available pools?
683
        HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
×
684
            "threadmanager::get_pool",
685
            "the resource partitioner does not own a thread pool named '{}'.\n",
686
            pool_name);
687
    }
168✔
688

689
    thread_pool_base& threadmanager::get_pool(pool_id_type const& pool_id) const
×
690
    {
691
        return get_pool(pool_id.name());
×
692
    }
693

694
    thread_pool_base& threadmanager::get_pool(std::size_t thread_index) const
×
695
    {
696
        return get_pool(threads_lookup_[thread_index]);
×
697
    }
698

699
    bool threadmanager::pool_exists(std::string const& pool_name) const
3✔
700
    {
701
        // if the given pool_name is default, we don't need to look for it
702
        // we must always return pool 0
703
        if (pool_name == "default" ||
3✔
704
            pool_name == resource::get_partitioner().get_default_pool_name())
2✔
705
        {
706
            return true;
2✔
707
        }
708

709
        // now check the other pools - no need to check pool 0 again, so ++begin
710
        auto pool = std::find_if(++pools_.begin(), pools_.end(),
2✔
711
            [&pool_name](pool_type const& itp) -> bool {
4✔
712
                return (itp->get_pool_name() == pool_name);
3✔
713
            });
714

715
        if (pool != pools_.end())
1✔
716
        {
717
            return true;
×
718
        }
719

720
        return false;
1✔
721
    }
3✔
722

723
    bool threadmanager::pool_exists(std::size_t pool_index) const
5✔
724
    {
725
        return pool_index < pools_.size();
5✔
726
    }
727

728
    ///////////////////////////////////////////////////////////////////////////
729
    std::int64_t threadmanager::get_thread_count(thread_schedule_state state,
50,006✔
730
        thread_priority priority, std::size_t num_thread, bool reset)
731
    {
732
        std::int64_t total_count = 0;
50,006✔
733
        std::lock_guard<mutex_type> lk(mtx_);
50,006✔
734

735
        for (auto& pool_iter : pools_)
100,012✔
736
        {
737
            total_count +=
50,006✔
738
                pool_iter->get_thread_count(state, priority, num_thread, reset);
50,006✔
739
        }
740

741
        return total_count;
50,006✔
742
    }
50,006✔
743

744
    std::int64_t threadmanager::get_idle_core_count()
×
745
    {
746
        std::int64_t total_count = 0;
×
747
        std::lock_guard<mutex_type> lk(mtx_);
×
748

749
        for (auto& pool_iter : pools_)
×
750
        {
751
            total_count += pool_iter->get_idle_core_count();
×
752
        }
753

754
        return total_count;
×
755
    }
×
756

757
    mask_type threadmanager::get_idle_core_mask()
×
758
    {
759
        mask_type mask = mask_type();
×
760
        resize(mask, hardware_concurrency());
×
761

762
        std::lock_guard<mutex_type> lk(mtx_);
×
763

764
        for (auto& pool_iter : pools_)
×
765
        {
766
            pool_iter->get_idle_core_mask(mask);
×
767
        }
768

769
        return mask;
×
770
    }
×
771

772
    std::int64_t threadmanager::get_background_thread_count() const
×
773
    {
774
        std::int64_t total_count = 0;
×
775
        std::lock_guard<mutex_type> lk(mtx_);
×
776

777
        for (auto& pool_iter : pools_)
×
778
        {
779
            total_count += pool_iter->get_background_thread_count();
×
780
        }
781

782
        return total_count;
×
783
    }
×
784

785
    ///////////////////////////////////////////////////////////////////////////
786
    // Enumerate all matching threads
787
    bool threadmanager::enumerate_threads(
1✔
788
        hpx::function<bool(thread_id_type)> const& f,
789
        thread_schedule_state state) const
790
    {
791
        std::lock_guard<mutex_type> lk(mtx_);
1✔
792
        bool result = true;
1✔
793

794
        for (auto& pool_iter : pools_)
2✔
795
        {
796
            result = result && pool_iter->enumerate_threads(f, state);
2✔
797
        }
798

799
        return result;
1✔
800
    }
1✔
801

802
    ///////////////////////////////////////////////////////////////////////////
803
    // Abort all threads which are in suspended state. This will set
804
    // the state of all suspended threads to \a pending while
805
    // supplying the wait_abort extended state flag
806
    void threadmanager::abort_all_suspended_threads()
×
807
    {
808
        std::lock_guard<mutex_type> lk(mtx_);
×
809
        for (auto& pool_iter : pools_)
×
810
        {
811
            pool_iter->abort_all_suspended_threads();
×
812
        }
813
    }
×
814

815
    ///////////////////////////////////////////////////////////////////////////
816
    // Clean up terminated threads. This deletes all threads which
817
    // have been terminated but which are still held in the queue
818
    // of terminated threads. Some schedulers might not do anything
819
    // here.
820
    bool threadmanager::cleanup_terminated(bool delete_all)
8,315✔
821
    {
822
        std::lock_guard<mutex_type> lk(mtx_);
8,315✔
823
        bool result = true;
8,315✔
824

825
        for (auto& pool_iter : pools_)
16,630✔
826
        {
827
            result = pool_iter->cleanup_terminated(delete_all) && result;
8,315✔
828
        }
829

830
        return result;
8,315✔
831
    }
8,315✔
832

833
    ///////////////////////////////////////////////////////////////////////////
834
    void threadmanager::register_thread(
1,220✔
835
        thread_init_data& data, thread_id_ref_type& id, error_code& ec)
836
    {
837
        thread_pool_base* pool = nullptr;
1,220✔
838
        auto thrd_data = get_self_id_data();
1,220✔
839
        if (thrd_data)
1,220✔
840
        {
841
            pool = thrd_data->get_scheduler_base()->get_parent_pool();
×
842
        }
×
843
        else
844
        {
845
            pool = &default_pool();
1,220✔
846
        }
847
        pool->create_thread(data, id, ec);
1,220✔
848
    }
1,220✔
849

850
    ///////////////////////////////////////////////////////////////////////////
851
    thread_id_ref_type threadmanager::register_work(
×
852
        thread_init_data& data, error_code& ec)
853
    {
854
        thread_pool_base* pool = nullptr;
×
855
        auto thrd_data = get_self_id_data();
×
856
        if (thrd_data)
×
857
        {
858
            pool = thrd_data->get_scheduler_base()->get_parent_pool();
×
859
        }
×
860
        else
861
        {
862
            pool = &default_pool();
×
863
        }
864
        return pool->create_work(data, ec);
×
865
    }
866

867
    ///////////////////////////////////////////////////////////////////////////
868
    constexpr std::size_t all_threads = std::size_t(-1);
869

870
    std::int64_t threadmanager::get_queue_length(bool reset)
1✔
871
    {
872
        std::int64_t result = 0;
1✔
873
        for (auto const& pool_iter : pools_)
2✔
874
            result += pool_iter->get_queue_length(all_threads, reset);
1✔
875
        return result;
1✔
876
    }
877

878
#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
879
    std::int64_t threadmanager::get_average_thread_wait_time(bool reset)
880
    {
881
        std::int64_t result = 0;
882
        for (auto const& pool_iter : pools_)
883
            result +=
884
                pool_iter->get_average_thread_wait_time(all_threads, reset);
885
        return result;
886
    }
887

888
    std::int64_t threadmanager::get_average_task_wait_time(bool reset)
889
    {
890
        std::int64_t result = 0;
891
        for (auto const& pool_iter : pools_)
892
            result += pool_iter->get_average_task_wait_time(all_threads, reset);
893
        return result;
894
    }
895
#endif
896

897
    std::int64_t threadmanager::get_cumulative_duration(bool reset)
1✔
898
    {
899
        std::int64_t result = 0;
1✔
900
        for (auto const& pool_iter : pools_)
2✔
901
            result += pool_iter->get_cumulative_duration(all_threads, reset);
1✔
902
        return result;
1✔
903
    }
904

905
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
906
    defined(HPX_HAVE_THREAD_IDLE_RATES)
907
    std::int64_t threadmanager::get_background_work_duration(bool reset)
908
    {
909
        std::int64_t result = 0;
910
        for (auto const& pool_iter : pools_)
911
            result +=
912
                pool_iter->get_background_work_duration(all_threads, reset);
913
        return result;
914
    }
915

916
    std::int64_t threadmanager::get_background_overhead(bool reset)
917
    {
918
        std::int64_t result = 0;
919
        for (auto const& pool_iter : pools_)
920
            result += pool_iter->get_background_overhead(all_threads, reset);
921
        return result;
922
    }
923

924
    std::int64_t threadmanager::get_background_send_duration(bool reset)
925
    {
926
        std::int64_t result = 0;
927
        for (auto const& pool_iter : pools_)
928
            result +=
929
                pool_iter->get_background_send_duration(all_threads, reset);
930
        return result;
931
    }
932

933
    std::int64_t threadmanager::get_background_send_overhead(bool reset)
934
    {
935
        std::int64_t result = 0;
936
        for (auto const& pool_iter : pools_)
937
            result +=
938
                pool_iter->get_background_send_overhead(all_threads, reset);
939
        return result;
940
    }
941

942
    std::int64_t threadmanager::get_background_receive_duration(bool reset)
943
    {
944
        std::int64_t result = 0;
945
        for (auto const& pool_iter : pools_)
946
            result +=
947
                pool_iter->get_background_receive_duration(all_threads, reset);
948
        return result;
949
    }
950

951
    std::int64_t threadmanager::get_background_receive_overhead(bool reset)
952
    {
953
        std::int64_t result = 0;
954
        for (auto const& pool_iter : pools_)
955
            result +=
956
                pool_iter->get_background_receive_overhead(all_threads, reset);
957
        return result;
958
    }
959
#endif    // HPX_HAVE_BACKGROUND_THREAD_COUNTERS
960

961
#ifdef HPX_HAVE_THREAD_IDLE_RATES
962
    std::int64_t threadmanager::avg_idle_rate(bool reset)
963
    {
964
        std::int64_t result = 0;
965
        for (auto const& pool_iter : pools_)
966
            result += pool_iter->avg_idle_rate(all_threads, reset);
967
        return result;
968
    }
969

970
#ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES
971
    std::int64_t threadmanager::avg_creation_idle_rate(bool reset)
972
    {
973
        std::int64_t result = 0;
974
        for (auto const& pool_iter : pools_)
975
            result += pool_iter->avg_creation_idle_rate(all_threads, reset);
976
        return result;
977
    }
978

979
    std::int64_t threadmanager::avg_cleanup_idle_rate(bool reset)
980
    {
981
        std::int64_t result = 0;
982
        for (auto const& pool_iter : pools_)
983
            result += pool_iter->avg_cleanup_idle_rate(all_threads, reset);
984
        return result;
985
    }
986
#endif
987
#endif
988

989
#ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS
990
    std::int64_t threadmanager::get_executed_threads(bool reset)
1✔
991
    {
992
        std::int64_t result = 0;
1✔
993
        for (auto const& pool_iter : pools_)
2✔
994
            result += pool_iter->get_executed_threads(all_threads, reset);
1✔
995
        return result;
1✔
996
    }
997

998
    std::int64_t threadmanager::get_executed_thread_phases(bool reset)
1✔
999
    {
1000
        std::int64_t result = 0;
1✔
1001
        for (auto const& pool_iter : pools_)
2✔
1002
            result += pool_iter->get_executed_thread_phases(all_threads, reset);
1✔
1003
        return result;
1✔
1004
    }
1005

1006
#ifdef HPX_HAVE_THREAD_IDLE_RATES
1007
    std::int64_t threadmanager::get_thread_duration(bool reset)
1008
    {
1009
        std::int64_t result = 0;
1010
        for (auto const& pool_iter : pools_)
1011
            result += pool_iter->get_thread_duration(all_threads, reset);
1012
        return result;
1013
    }
1014

1015
    std::int64_t threadmanager::get_thread_phase_duration(bool reset)
1016
    {
1017
        std::int64_t result = 0;
1018
        for (auto const& pool_iter : pools_)
1019
            result += pool_iter->get_thread_phase_duration(all_threads, reset);
1020
        return result;
1021
    }
1022

1023
    std::int64_t threadmanager::get_thread_overhead(bool reset)
1024
    {
1025
        std::int64_t result = 0;
1026
        for (auto const& pool_iter : pools_)
1027
            result += pool_iter->get_thread_overhead(all_threads, reset);
1028
        return result;
1029
    }
1030

1031
    std::int64_t threadmanager::get_thread_phase_overhead(bool reset)
1032
    {
1033
        std::int64_t result = 0;
1034
        for (auto const& pool_iter : pools_)
1035
            result += pool_iter->get_thread_phase_overhead(all_threads, reset);
1036
        return result;
1037
    }
1038

1039
    std::int64_t threadmanager::get_cumulative_thread_duration(bool reset)
1040
    {
1041
        std::int64_t result = 0;
1042
        for (auto const& pool_iter : pools_)
1043
            result +=
1044
                pool_iter->get_cumulative_thread_duration(all_threads, reset);
1045
        return result;
1046
    }
1047

1048
    std::int64_t threadmanager::get_cumulative_thread_overhead(bool reset)
1049
    {
1050
        std::int64_t result = 0;
1051
        for (auto const& pool_iter : pools_)
1052
            result +=
1053
                pool_iter->get_cumulative_thread_overhead(all_threads, reset);
1054
        return result;
1055
    }
1056
#endif
1057
#endif
1058

1059
#ifdef HPX_HAVE_THREAD_STEALING_COUNTS
1060
    std::int64_t threadmanager::get_num_pending_misses(bool reset)
1061
    {
1062
        std::int64_t result = 0;
1063
        for (auto const& pool_iter : pools_)
1064
            result += pool_iter->get_num_pending_misses(all_threads, reset);
1065
        return result;
1066
    }
1067

1068
    std::int64_t threadmanager::get_num_pending_accesses(bool reset)
1069
    {
1070
        std::int64_t result = 0;
1071
        for (auto const& pool_iter : pools_)
1072
            result += pool_iter->get_num_pending_accesses(all_threads, reset);
1073
        return result;
1074
    }
1075

1076
    std::int64_t threadmanager::get_num_stolen_from_pending(bool reset)
1077
    {
1078
        std::int64_t result = 0;
1079
        for (auto const& pool_iter : pools_)
1080
            result +=
1081
                pool_iter->get_num_stolen_from_pending(all_threads, reset);
1082
        return result;
1083
    }
1084

1085
    std::int64_t threadmanager::get_num_stolen_from_staged(bool reset)
1086
    {
1087
        std::int64_t result = 0;
1088
        for (auto const& pool_iter : pools_)
1089
            result += pool_iter->get_num_stolen_from_staged(all_threads, reset);
1090
        return result;
1091
    }
1092

1093
    std::int64_t threadmanager::get_num_stolen_to_pending(bool reset)
1094
    {
1095
        std::int64_t result = 0;
1096
        for (auto const& pool_iter : pools_)
1097
            result += pool_iter->get_num_stolen_to_pending(all_threads, reset);
1098
        return result;
1099
    }
1100

1101
    std::int64_t threadmanager::get_num_stolen_to_staged(bool reset)
1102
    {
1103
        std::int64_t result = 0;
1104
        for (auto const& pool_iter : pools_)
1105
            result += pool_iter->get_num_stolen_to_staged(all_threads, reset);
1106
        return result;
1107
    }
1108
#endif
1109

1110
    ///////////////////////////////////////////////////////////////////////////
1111
    bool threadmanager::run()
1,220✔
1112
    {
1113
        std::unique_lock<mutex_type> lk(mtx_);
1,220✔
1114

1115
        // the main thread needs to have a unique thread_num
1116
        // worker threads are numbered 0..N-1, so we can use N for this thread
1117
        auto& rp = hpx::resource::get_partitioner();
1,220✔
1118
        init_tss(rp.get_num_threads());
1,220✔
1119

1120
#ifdef HPX_HAVE_TIMER_POOL
1121
        LTM_(info).format("run: running timer pool");
1,220✔
1122
        timer_pool_.run(false);
1,220✔
1123
#endif
1124

1125
        for (auto& pool_iter : pools_)
2,489✔
1126
        {
1127
            std::size_t num_threads_in_pool =
1,269✔
1128
                rp.get_num_threads(pool_iter->get_pool_name());
1,269✔
1129

1130
            if (pool_iter->get_os_thread_count() != 0 ||
2,538✔
1131
                pool_iter->has_reached_state(hpx::state::running))
1,269✔
1132
            {
1133
                return true;    // do nothing if already running
×
1134
            }
1135

1136
            if (!pool_iter->run(lk, num_threads_in_pool))
1,269✔
1137
            {
1138
#ifdef HPX_HAVE_TIMER_POOL
1139
                timer_pool_.stop();
×
1140
#endif
1141
                return false;
×
1142
            }
1143

1144
            // set all states of all schedulers to "running"
1145
            policies::scheduler_base* sched = pool_iter->get_scheduler();
1,269✔
1146
            if (sched)
1,269✔
1147
                sched->set_all_states(hpx::state::running);
1,269✔
1148
        }
1149

1150
        LTM_(info).format("run: running");
1,220✔
1151
        return true;
1,220✔
1152
    }
1,220✔
1153

1154
    void threadmanager::stop(bool blocking)
3,653✔
1155
    {
1156
        LTM_(info).format("stop: blocking({})", blocking ? "true" : "false");
3,653✔
1157

1158
        std::unique_lock<mutex_type> lk(mtx_);
3,653✔
1159
        for (auto& pool_iter : pools_)
7,453✔
1160
        {
1161
            pool_iter->stop(lk, blocking);
3,800✔
1162
        }
1163
        deinit_tss();
3,653✔
1164
    }
3,653✔
1165

1166
    bool threadmanager::is_busy()
1,336,291✔
1167
    {
1168
        bool busy = false;
1,336,291✔
1169
        for (auto& pool_iter : pools_)
2,673,762✔
1170
        {
1171
            busy = busy || pool_iter->is_busy();
1,337,471✔
1172
        }
1173
        return busy;
1,336,291✔
1174
    }
1175

1176
    bool threadmanager::is_idle()
×
1177
    {
1178
        bool idle = true;
×
1179
        for (auto& pool_iter : pools_)
×
1180
        {
1181
            idle = idle && pool_iter->is_idle();
×
1182
        }
1183
        return idle;
×
1184
    }
1185

1186
    void threadmanager::wait()
2,490✔
1187
    {
1188
        std::size_t shutdown_check_count = util::get_entry_as<std::size_t>(
2,490✔
1189
            rtcfg_, "hpx.shutdown_check_count", 10);
2,490✔
1190
        hpx::util::detail::yield_while_count(
2,490✔
1191
            [this]() { return is_busy(); }, shutdown_check_count);
1,332,214✔
1192
    }
2,490✔
1193

1194
    void threadmanager::suspend()
118✔
1195
    {
1196
        wait();
118✔
1197

1198
        if (threads::get_self_ptr())
118✔
1199
        {
1200
            std::vector<hpx::future<void>> fs;
×
1201

1202
            for (auto& pool_iter : pools_)
×
1203
            {
1204
                fs.push_back(suspend_pool(*pool_iter));
×
1205
            }
1206

1207
            hpx::wait_all(fs);
×
1208
        }
×
1209
        else
1210
        {
1211
            for (auto& pool_iter : pools_)
236✔
1212
            {
1213
                pool_iter->suspend_direct();
118✔
1214
            }
1215
        }
1216
    }
118✔
1217

1218
    void threadmanager::resume()
570✔
1219
    {
1220
        if (threads::get_self_ptr())
570✔
1221
        {
1222
            std::vector<hpx::future<void>> fs;
452✔
1223

1224
            for (auto& pool_iter : pools_)
904✔
1225
            {
1226
                fs.push_back(resume_pool(*pool_iter));
452✔
1227
            }
1228
            hpx::wait_all(fs);
452✔
1229
        }
452✔
1230
        else
1231
        {
1232
            for (auto& pool_iter : pools_)
236✔
1233
            {
1234
                pool_iter->resume_direct();
118✔
1235
            }
1236
        }
1237
    }
570✔
1238
}}    // namespace hpx::threads
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc