• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #858

31 Dec 2022 07:46PM UTC coverage: 86.496% (-0.05%) from 86.543%
#858

push

StellarBot
Merge #6120

6120: Use index_queue for parallel executors bulk_async_execute r=hkaiser a=hkaiser

This should significantly improve the performance of many parallel algorithms

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

347 of 347 new or added lines in 15 files covered. (100.0%)

174559 of 201811 relevant lines covered (86.5%)

1846742.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.13
/libs/core/resource_partitioner/src/detail_partitioner.cpp
1
//  Copyright (c) 2017 Shoshana Jakobovits
2
//  Copyright (c) 2017-2022 Hartmut Kaiser
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
#include <hpx/config.hpp>
9
#include <hpx/assert.hpp>
10
#include <hpx/functional/function.hpp>
11
#include <hpx/ini/ini.hpp>
12
#include <hpx/modules/errors.hpp>
13
#include <hpx/modules/format.hpp>
14
#include <hpx/resource_partitioner/detail/partitioner.hpp>
15
#include <hpx/resource_partitioner/partitioner.hpp>
16
#include <hpx/thread_pools/scheduled_thread_pool.hpp>
17
#include <hpx/threading_base/scheduler_mode.hpp>
18
#include <hpx/threading_base/thread_pool_base.hpp>
19
#include <hpx/topology/topology.hpp>
20
#include <hpx/type_support/static.hpp>
21
#include <hpx/util/from_string.hpp>
22
#include <hpx/util/get_entry_as.hpp>
23

24
#include <atomic>
25
#include <cstddef>
26
#include <iosfwd>
27
#include <iostream>
28
#include <memory>
29
#include <stdexcept>
30
#include <string>
31
#include <utility>
32
#include <vector>
33

34
namespace hpx::resource::detail {
35

36
    ///////////////////////////////////////////////////////////////////////////
37
    [[noreturn]] void throw_runtime_error(
×
38
        std::string const& func, std::string const& message)
39
    {
40
        HPX_THROW_EXCEPTION(hpx::error::invalid_status, func, message);
×
41
    }
×
42

43
    [[noreturn]] void throw_invalid_argument(
×
44
        std::string const& func, std::string const& message)
45
    {
46
        HPX_THROW_EXCEPTION(hpx::error::bad_parameter, func, message);
×
47
    }
×
48

49
    ///////////////////////////////////////////////////////////////////////////
50
    std::size_t init_pool_data::num_threads_overall = 0;
51

52
    init_pool_data::init_pool_data(std::string const& name,
1,627✔
53
        scheduling_policy sched, hpx::threads::policies::scheduler_mode mode)
54
      : pool_name_(name)
1,627✔
55
      , scheduling_policy_(sched)
1,627✔
56
      , num_threads_(0)
1,627✔
57
      , mode_(mode)
1,627✔
58
    {
59
        if (name.empty())
1,627✔
60
        {
61
            throw_invalid_argument("init_pool_data::init_pool_data",
×
62
                "cannot instantiate a thread_pool with empty string as a "
×
63
                "name.");
64
        }
65
    }
1,627✔
66

67
    init_pool_data::init_pool_data(std::string const& name,
6✔
68
        scheduler_function create_func,
69
        hpx::threads::policies::scheduler_mode mode)
70
      : pool_name_(name)
6✔
71
      , scheduling_policy_(scheduling_policy::user_defined)
6✔
72
      , num_threads_(0)
6✔
73
      , mode_(mode)
6✔
74
      , create_function_(HPX_MOVE(create_func))
6✔
75
    {
76
        if (name.empty())
6✔
77
        {
78
            throw_invalid_argument("init_pool_data::init_pool_data",
×
79
                "cannot instantiate a thread_pool with empty string "
×
80
                "as a name.");
81
        }
82
    }
6✔
83

84
    // mechanism for adding resources
85
    // num threads = number of threads desired on a PU. defaults to 1.
86
    // note: if num_threads > 1 => oversubscription
87
    void init_pool_data::add_resource(
4,367✔
88
        std::size_t pu_index, bool exclusive, std::size_t num_threads)
89
    {
90
        if (pu_index >= hpx::threads::hardware_concurrency())
4,367✔
91
        {
92
            throw_invalid_argument("init_pool_data::add_resource",
×
93
                "init_pool_data::add_resource: processing unit index "
94
                "out of bounds. The total available number of "
95
                "processing units on this machine is " +
×
96
                    std::to_string(hpx::threads::hardware_concurrency()));
×
97
        }
98

99
        // Increment thread_num count (for pool-count and global count)
100
        num_threads_ += num_threads;
4,367✔
101
        num_threads_overall += num_threads;
4,367✔
102

103
        // Add pu mask to internal data structure
104
        threads::mask_type pu_mask = threads::mask_type();
4,367✔
105
        threads::resize(pu_mask, threads::hardware_concurrency());
4,367✔
106
        threads::set(pu_mask, pu_index);
4,367✔
107

108
        // Add one mask for each OS-thread
109
        for (std::size_t i = 0; i != num_threads; i++)
8,734✔
110
        {
111
            assigned_pus_.push_back(pu_mask);
4,367✔
112
            assigned_pu_nums_.push_back(
4,367✔
113
                hpx::make_tuple(pu_index, exclusive, false));
4,367✔
114
        }
4,367✔
115
    }
4,367✔
116

117
    void init_pool_data::print_pool(std::ostream& os) const
4✔
118
    {
119
        os << "[pool \"" << pool_name_ << "\"] with scheduler ";
4✔
120

121
        std::string sched;
4✔
122
        switch (scheduling_policy_)
4✔
123
        {
124
        case resource::scheduling_policy::unspecified:
125
            sched = "unspecified";
×
126
            break;
×
127
        case resource::scheduling_policy::user_defined:
128
            sched = "user supplied";
×
129
            break;
×
130
        case resource::scheduling_policy::local:
131
            sched = "local";
×
132
            break;
×
133
        case resource::scheduling_policy::local_priority_fifo:
134
            sched = "local_priority_fifo";
2✔
135
            break;
2✔
136
        case resource::scheduling_policy::local_priority_lifo:
137
            sched = "local_priority_lifo";
×
138
            break;
×
139
        case resource::scheduling_policy::static_:
140
            sched = "static";
×
141
            break;
×
142
        case resource::scheduling_policy::static_priority:
143
            sched = "static_priority";
×
144
            break;
×
145
        case resource::scheduling_policy::abp_priority_fifo:
146
            sched = "abp_priority_fifo";
×
147
            break;
×
148
        case resource::scheduling_policy::abp_priority_lifo:
149
            sched = "abp_priority_lifo";
×
150
            break;
×
151
        case resource::scheduling_policy::shared_priority:
152
            sched = "shared_priority";
2✔
153
            break;
2✔
154
        }
155

156
        os << "\"" << sched << "\" is running on PUs : \n";
4✔
157

158
        for (threads::mask_cref_type assigned_pu : assigned_pus_)
12✔
159
        {
160
            os << hpx::threads::to_string(assigned_pu) << '\n';
8✔
161
        }
162
    }
4✔
163

164
    void init_pool_data::assign_pu(std::size_t virt_core)
4,367✔
165
    {
166
        HPX_ASSERT(virt_core <= assigned_pu_nums_.size());
4,367✔
167
        HPX_ASSERT(!hpx::get<2>(assigned_pu_nums_[virt_core]));
4,367✔
168

169
        hpx::get<2>(assigned_pu_nums_[virt_core]) = true;    // -V601
4,367✔
170
    }
4,367✔
171

172
    void init_pool_data::unassign_pu(std::size_t virt_core)
4,365✔
173
    {
174
        HPX_ASSERT(virt_core <= assigned_pu_nums_.size());
4,365✔
175
        HPX_ASSERT(hpx::get<2>(assigned_pu_nums_[virt_core]));
4,365✔
176

177
        hpx::get<2>(assigned_pu_nums_[virt_core]) = false;    // -V601
4,365✔
178
    }
4,365✔
179

180
    bool init_pool_data::pu_is_exclusive(std::size_t virt_core) const
×
181
    {
182
        HPX_ASSERT(virt_core <= assigned_pu_nums_.size());
×
183
        HPX_ASSERT(hpx::get<2>(assigned_pu_nums_[virt_core]));
×
184

185
        return hpx::get<1>(assigned_pu_nums_[virt_core]);
×
186
    }
187

188
    bool init_pool_data::pu_is_assigned(std::size_t virt_core) const
×
189
    {
190
        HPX_ASSERT(virt_core <= assigned_pu_nums_.size());
×
191
        HPX_ASSERT(hpx::get<2>(assigned_pu_nums_[virt_core]));
×
192

193
        return hpx::get<2>(assigned_pu_nums_[virt_core]);
×
194
    }
195

196
    // 'shift' all thread assignments up by the first_core offset
197
    void init_pool_data::assign_first_core(std::size_t first_core)
142✔
198
    {
199
        for (std::size_t i = 0; i != num_threads_; ++i)
440✔
200
        {
201
            std::size_t& pu_num = hpx::get<0>(assigned_pu_nums_[i]);
298✔
202
            pu_num = (pu_num + first_core) % threads::hardware_concurrency();
298✔
203

204
            threads::reset(assigned_pus_[i]);
298✔
205
            threads::set(assigned_pus_[i], pu_num);
298✔
206
        }
298✔
207
    }
142✔
208

209
    ////////////////////////////////////////////////////////////////////////
210
    partitioner::partitioner()
1,222✔
211
      : rtcfg_()
1,222✔
212
      , first_core_(std::size_t(-1))
1,222✔
213
      , mode_(partitioner_mode::default_)
1,222✔
214
      , topo_(threads::create_topology())
1,222✔
215
      , default_scheduler_mode_(threads::policies::scheduler_mode::default_)
1,222✔
216
    {
217
        // allow only one partitioner instance
218
        if (++instance_number_counter_ > 1)
1,222✔
219
        {
220
            throw_runtime_error("partitioner::partitioner",
×
221
                "Cannot instantiate more than one resource partitioner");
×
222
        }
223

224
#if defined(HPX_HAVE_MAX_CPU_COUNT)
225
        if (HPX_HAVE_MAX_CPU_COUNT < topo_.get_number_of_pus())
226
        {
227
            throw_runtime_error("partitioner::partioner",
228
                hpx::util::format(
229
                    "Currently, HPX_HAVE_MAX_CPU_COUNT is set to {1} "
230
                    "while your system has {2} processing units. Please "
231
                    "reconfigure HPX with -DHPX_WITH_MAX_CPU_COUNT={2} (or "
232
                    "higher) to increase the maximal CPU count supported by "
233
                    "HPX.",
234
                    HPX_HAVE_MAX_CPU_COUNT, topo_.get_number_of_pus()));
235
        }
236
#endif
237

238
        std::string default_scheduler_mode_str =
239
            rtcfg_.get_entry("hpx.default_scheduler_mode", std::string());
1,222✔
240
        if (!default_scheduler_mode_str.empty())
1,222✔
241
        {
242
            default_scheduler_mode_ = threads::policies::scheduler_mode(
×
243
                hpx::util::from_string<std::size_t>(
×
244
                    default_scheduler_mode_str));
245
            HPX_ASSERT_MSG(
×
246
                (default_scheduler_mode_ &
247
                    ~threads::policies::scheduler_mode::all_flags) == 0,
248
                "hpx.default_scheduler_mode contains unknown scheduler "
249
                "modes");
250
        }
×
251

252
        // Create the default pool
253
        initial_thread_pools_.push_back(init_pool_data("default",
1,222✔
254
            scheduling_policy::unspecified, default_scheduler_mode_));
1,222✔
255
    }
1,222✔
256

1,222✔
257
    partitioner::~partitioner()
1,222✔
258
    {
259
        --instance_number_counter_;
1,222✔
260
        detail::init_pool_data::num_threads_overall = 0;
1,222✔
261
    }
1,222✔
262

263
    bool partitioner::pu_exposed(std::size_t pu_num)
87,912✔
264
    {
265
        threads::mask_type pu_mask = threads::mask_type();
87,912✔
266
        threads::resize(pu_mask, threads::hardware_concurrency());
87,912✔
267
        threads::set(pu_mask, pu_num);
87,912✔
268
        threads::topology& topo = get_topology();
87,912✔
269

270
        threads::mask_type comp =
271
            affinity_data_.get_used_pus_mask(topo, pu_num);
87,912✔
272
        return threads::any(comp & pu_mask);
87,912✔
273
    }
87,912✔
274

275
    void partitioner::fill_topology_vectors()
1,221✔
276
    {
277
        threads::topology& topo = get_topology();
1,221✔
278

279
        std::size_t pid = 0;
1,221✔
280
        std::size_t num_numa_nodes = topo.get_number_of_numa_nodes();
1,221✔
281
        if (num_numa_nodes == 0)
1,221✔
282
            num_numa_nodes = topo.get_number_of_sockets();
×
283
        numa_domains_.reserve(num_numa_nodes);
1,221✔
284

285
        // loop on the numa-domains
286
        for (std::size_t i = 0; i != num_numa_nodes; ++i)
3,663✔
287
        {
288
            numa_domains_.emplace_back(i);             // add a numa domain
2,442✔
289
            numa_domain& nd = numa_domains_.back();    // numa-domain just added
2,442✔
290

291
            std::size_t numa_node_cores = topo.get_number_of_numa_node_cores(i);
2,442✔
292
            nd.cores_.reserve(numa_node_cores);
2,442✔
293

294
            bool numa_domain_contains_exposed_cores = false;
2,442✔
295

296
            // loop on the cores
297
            for (std::size_t j = 0; j != numa_node_cores; ++j)
46,398✔
298
            {
299
                nd.cores_.emplace_back(j, &nd);
43,956✔
300
                core& c = nd.cores_.back();
43,956✔
301

302
                std::size_t core_pus = topo.get_number_of_core_pus(j);
43,956✔
303
                c.pus_.reserve(core_pus);
43,956✔
304

305
                bool core_contains_exposed_pus = false;
43,956✔
306

307
                // loop on the processing units
308
                for (std::size_t k = 0; k != core_pus; ++k)
131,868✔
309
                {
310
                    if (pu_exposed(pid))
87,912✔
311
                    {
312
                        c.pus_.emplace_back(pid, &c,
8,740✔
313
                            affinity_data_.get_thread_occupancy(topo, pid));
4,370✔
314
                        pu& p = c.pus_.back();
4,370✔
315

316
                        if (p.thread_occupancy_ == 0)
4,370✔
317
                        {
318
                            throw_runtime_error(
×
319
                                "partitioner::fill_topology_vectors",
×
320
                                "PU #" + std::to_string(pid) +
×
321
                                    " has thread occupancy 0");
322
                        }
323
                        core_contains_exposed_pus = true;
4,370✔
324
                    }
4,370✔
325
                    ++pid;
87,912✔
326
                }
87,912✔
327

328
                if (core_contains_exposed_pus)
43,956✔
329
                {
330
                    numa_domain_contains_exposed_cores = true;
3,973✔
331
                }
3,973✔
332
                else
333
                {
334
                    nd.cores_.pop_back();
39,983✔
335
                }
336
            }
43,956✔
337

338
            if (!numa_domain_contains_exposed_cores)
2,442✔
339
            {
340
                numa_domains_.pop_back();
1,195✔
341
            }
1,195✔
342
        }
2,442✔
343
    }
1,221✔
344

345
    std::size_t partitioner::assign_cores(std::size_t first_core)
736✔
346
    {
347
        std::lock_guard<mutex_type> l(mtx_);
736✔
348

349
        // adjust first_core, if needed
350
        if (first_core_ != first_core)
736✔
351
        {
352
            std::size_t offset = first_core;
736✔
353
            std::size_t num_pus_core =
736✔
354
                get_topology().get_number_of_core_pus(offset);
736✔
355

356
            if (first_core_ != std::size_t(-1))
736✔
357
            {
358
                offset -= first_core_;
142✔
359
            }
142✔
360

361
            if (offset != 0)
736✔
362
            {
363
                offset *= num_pus_core;
142✔
364
                for (auto& d : initial_thread_pools_)
284✔
365
                {
366
                    d.assign_first_core(offset);
142✔
367
                }
368
            }
142✔
369
            first_core_ = first_core;
736✔
370
            reconfigure_affinities_locked();
736✔
371
        }
736✔
372

373
        HPX_ASSERT(affinity_data_.get_num_pus_needed() != std::size_t(-1));
736✔
374
        return affinity_data_.get_num_pus_needed();
736✔
375
    }
736✔
376

377
    // This function is called in hpx_init, before the instantiation of the
378
    // runtime It takes care of configuring some internal parameters of the
379
    // resource partitioner related to the pools
380
    // -1 assigns all free resources to the default pool
381
    // -2 checks whether there are empty pools
382
    void partitioner::setup_pools()
1,218✔
383
    {
384
        // Assign all free resources to the default pool
385
        bool first = true;
1,218✔
386
        for (hpx::resource::numa_domain& d : numa_domains_)
2,462✔
387
        {
388
            for (hpx::resource::core& c : d.cores_)
5,214✔
389
            {
390
                for (hpx::resource::pu& p : c.pus_)
8,337✔
391
                {
392
                    if (p.thread_occupancy_count_ == 0)
4,367✔
393
                    {
394
                        // The default pool resources are assigned non-
395
                        // exclusively if dynamic pools are enabled. Also, by
396
                        // default, the first PU is always exclusive (to avoid
397
                        // deadlocks).
398
                        add_resource(p, get_default_pool_name(),
6,400✔
399
                            first ||
3,805✔
400
                                !as_bool(mode_ &
2,595✔
401
                                    partitioner_mode::allow_dynamic_pools));
402
                        first = false;
3,805✔
403
                    }
3,805✔
404
                }
405
            }
406
        }
407

408
        std::unique_lock<mutex_type> l(mtx_);
1,218✔
409

410
        // @TODO allow empty pools
411
        if (get_pool_data(l, get_default_pool_name()).num_threads_ == 0)
1,218✔
412
        {
413
            l.unlock();
×
414
            throw_runtime_error("partitioner::setup_pools",
×
415
                "Default pool " + get_default_pool_name() +
×
416
                    " has no threads assigned. Please rerun with "
417
                    "--hpx:threads=X and check the pool thread assignment");
418
        }
419

420
        // Check whether any of the pools defined up to now are empty
421
        if (check_empty_pools())
1,218✔
422
        {
423
            l.unlock();
×
424
            print_init_pool_data(std::cout);
×
425
            throw_runtime_error("partitioner::setup_pools",
×
426
                "Pools empty of resources are not allowed. Please re-run this "
×
427
                "application with allow-empty-pool-policy (not implemented "
428
                "yet)");
429
        }
430
        //! FIXME add allow-empty-pools policy. Wait, does this even make sense??
431
    }
1,218✔
432

433
    // This function is called in hpx_init, before the instantiation of the
434
    // runtime It takes care of configuring some internal parameters of the
435
    // resource partitioner related to the pools' schedulers
436
    void partitioner::setup_schedulers()
1,218✔
437
    {
438
        // select the default scheduler
439
        scheduling_policy default_scheduler;
440

441
        std::string default_scheduler_str =
442
            rtcfg_.get_entry("hpx.scheduler", std::string());
1,218✔
443

444
        if (0 == std::string("local").find(default_scheduler_str))
1,218✔
445
        {
446
            default_scheduler = scheduling_policy::local;
1✔
447
        }
1✔
448
        else if (0 ==
1,217✔
449
            std::string("local-priority-fifo").find(default_scheduler_str))
1,217✔
450
        {
451
            default_scheduler = scheduling_policy::local_priority_fifo;
1,211✔
452
        }
1,211✔
453
        else if (0 ==
6✔
454
            std::string("local-priority-lifo").find(default_scheduler_str))
6✔
455
        {
456
            default_scheduler = scheduling_policy::local_priority_lifo;
1✔
457
        }
1✔
458
        else if (0 == std::string("static").find(default_scheduler_str))
5✔
459
        {
460
            default_scheduler = scheduling_policy::static_;
1✔
461
        }
1✔
462
        else if (0 ==
4✔
463
            std::string("static-priority").find(default_scheduler_str))
4✔
464
        {
465
            default_scheduler = scheduling_policy::static_priority;
1✔
466
        }
1✔
467
        else if (0 ==
3✔
468
            std::string("abp-priority-fifo").find(default_scheduler_str))
3✔
469
        {
470
            default_scheduler = scheduling_policy::abp_priority_fifo;
1✔
471
        }
1✔
472
        else if (0 ==
2✔
473
            std::string("abp-priority-lifo").find(default_scheduler_str))
2✔
474
        {
475
            default_scheduler = scheduling_policy::abp_priority_lifo;
1✔
476
        }
1✔
477
        else if (0 ==
1✔
478
            std::string("shared-priority").find(default_scheduler_str))
1✔
479
        {
480
            default_scheduler = scheduling_policy::shared_priority;
1✔
481
        }
1✔
482
        else
483
        {
484
            throw hpx::detail::command_line_error(
×
485
                "Bad value for command line option --hpx:queuing");
486
        }
487

488
        // set this scheduler on the pools that do not have a specified scheduler yet
489
        std::lock_guard<mutex_type> l(mtx_);
1,218✔
490
        std::size_t npools = initial_thread_pools_.size();
1,218✔
491
        for (std::size_t i = 0; i != npools; ++i)
2,799✔
492
        {
493
            if (initial_thread_pools_[i].scheduling_policy_ ==
1,581✔
494
                scheduling_policy::unspecified)
495
            {
496
                initial_thread_pools_[i].scheduling_policy_ = default_scheduler;
1,173✔
497
            }
1,173✔
498
        }
1,581✔
499
    }
1,218✔
500

501
    // This function is called in hpx_init, before the instantiation of the
502
    // runtime. It takes care of configuring some internal parameters of the
503
    // resource partitioner related to the affinity bindings
504
    //
505
    // If we use the resource partitioner, OS-thread numbering gets slightly
506
    // complicated: The affinity_masks_ data member of affinity_data considers
507
    // OS-threads to be numbered in order of occupation of the consecutive
508
    // processing units, while the thread manager will consider them to be
509
    // ordered according to their assignment to pools (first all threads
510
    // belonging to the default pool, then all threads belonging to the first
511
    // pool created, etc.) and instantiate them according to this system. We
512
    // need to re-write affinity_data_ with the masks in the correct order at
513
    // this stage.
514
    void partitioner::reconfigure_affinities()
1,218✔
515
    {
516
        std::lock_guard<mutex_type> l(mtx_);
1,218✔
517
        reconfigure_affinities_locked();
1,218✔
518
    }
1,218✔
519

520
    void partitioner::reconfigure_affinities_locked()
1,954✔
521
    {
522
        std::vector<std::size_t> new_pu_nums;
1,954✔
523
        std::vector<threads::mask_type> new_affinity_masks;
1,954✔
524

525
        new_pu_nums.reserve(initial_thread_pools_.size());
1,954✔
526
        new_affinity_masks.reserve(initial_thread_pools_.size());
1,954✔
527

528
        {
529
            for (auto& itp : initial_thread_pools_)
4,271✔
530
            {
531
                for (auto const& mask : itp.assigned_pus_)
8,649✔
532
                {
533
                    new_affinity_masks.push_back(mask);
6,332✔
534
                }
535
                for (auto const& pu_num : itp.assigned_pu_nums_)
8,649✔
536
                {
537
                    new_pu_nums.push_back(hpx::get<0>(pu_num));
6,332✔
538
                }
539
            }
540
        }
541

542
        affinity_data_.set_num_threads(new_pu_nums.size());
1,954✔
543
        affinity_data_.set_pu_nums(HPX_MOVE(new_pu_nums));
1,954✔
544
        affinity_data_.set_affinity_masks(HPX_MOVE(new_affinity_masks));
1,954✔
545
    }
1,954✔
546

547
    // Returns true if any of the pools defined by the user is empty of
548
    // resources called in set_default_pool()
549
    bool partitioner::check_empty_pools() const
1,218✔
550
    {
551
        std::size_t num_thread_pools = initial_thread_pools_.size();
1,218✔
552

553
        for (std::size_t i = 0; i != num_thread_pools; i++)
2,799✔
554
        {
555
            if (initial_thread_pools_[i].assigned_pus_.empty())
1,581✔
556
            {
557
                return true;
×
558
            }
559
            for (auto assigned_pus : initial_thread_pools_[i].assigned_pus_)
5,948✔
560
            {
561
                if (!threads::any(assigned_pus))
4,367✔
562
                {
563
                    return true;
×
564
                }
565
            }
4,367✔
566
        }
1,581✔
567

568
        return false;
1,218✔
569
    }
1,218✔
570

571
    // create a new thread_pool
572
    void partitioner::create_thread_pool(std::string const& pool_name,
405✔
573
        scheduling_policy sched, hpx::threads::policies::scheduler_mode mode)
574
    {
575
        if (pool_name.empty())
405✔
576
        {
577
            throw std::invalid_argument(
×
578
                "partitioner::create_thread_pool: "
579
                "cannot instantiate a initial_thread_pool with empty string "
580
                "as a name.");
581
        }
582

583
        std::unique_lock<mutex_type> l(mtx_);
405✔
584

585
        if (pool_name == get_default_pool_name())
405✔
586
        {
587
            initial_thread_pools_[0] =
43✔
588
                detail::init_pool_data(get_default_pool_name(), sched, mode);
43✔
589
            return;
43✔
590
        }
591

592
        //! if there already exists a pool with this name
593
        std::size_t num_thread_pools = initial_thread_pools_.size();
362✔
594
        for (std::size_t i = 1; i < num_thread_pools; i++)
8,485✔
595
        {
596
            if (pool_name == initial_thread_pools_[i].pool_name_)
8,123✔
597
            {
598
                l.unlock();
×
599
                throw std::invalid_argument(
×
600
                    "partitioner::create_thread_pool: "
601
                    "there already exists a pool named '" +
×
602
                    pool_name + "'.\n");
×
603
            }
604
        }
8,123✔
605

606
        initial_thread_pools_.push_back(
362✔
607
            detail::init_pool_data(pool_name, sched, mode));
362✔
608
    }
405✔
609

610
    // create a new thread_pool
611
    void partitioner::create_thread_pool(
6✔
612
        std::string const& pool_name, scheduler_function scheduler_creation)
613
    {
614
        if (pool_name.empty())
6✔
615
        {
616
            throw std::invalid_argument(
×
617
                "partitioner::create_thread_pool: "
618
                "cannot instantiate a initial_thread_pool with empty string "
619
                "as a name.");
620
        }
621

622
        std::unique_lock<mutex_type> l(mtx_);
6✔
623

624
        if (pool_name == get_default_pool_name())
6✔
625
        {
626
            initial_thread_pools_[0] =
5✔
627
                detail::init_pool_data(get_default_pool_name(),
10✔
628
                    HPX_MOVE(scheduler_creation), default_scheduler_mode_);
5✔
629
            return;
5✔
630
        }
631

632
        //! if there already exists a pool with this name
633
        std::size_t num_thread_pools = initial_thread_pools_.size();
1✔
634
        for (std::size_t i = 1; i != num_thread_pools; ++i)
1✔
635
        {
636
            if (pool_name == initial_thread_pools_[i].pool_name_)
×
637
            {
638
                l.unlock();
×
639
                throw std::invalid_argument(
×
640
                    "partitioner::create_thread_pool: "
641
                    "there already exists a pool named '" +
×
642
                    pool_name + "'.\n");
×
643
            }
644
        }
×
645

646
        initial_thread_pools_.push_back(detail::init_pool_data(
2✔
647
            pool_name, HPX_MOVE(scheduler_creation), default_scheduler_mode_));
1✔
648
    }
6✔
649

650
    // ----------------------------------------------------------------------
651
    // Add processing units to pools via pu/core/domain api
652
    // ----------------------------------------------------------------------
653
    void partitioner::add_resource(pu const& p, std::string const& pool_name,
4,367✔
654
        bool exclusive, std::size_t num_threads)
655
    {
656
        std::unique_lock<mutex_type> l(mtx_);
4,367✔
657

658
        if (!exclusive &&
4,367✔
659
            !as_bool(mode_ & partitioner_mode::allow_dynamic_pools))
×
660
        {
661
            l.unlock();
×
662
            throw std::invalid_argument(
×
663
                "partitioner::add_resource: dynamic pools have not been "
664
                "enabled for this partitioner");
665
        }
666

667
        if (as_bool(mode_ & partitioner_mode::allow_oversubscription))
4,367✔
668
        {
669
            // increment occupancy counter
670
            get_pool_data(l, pool_name)
4✔
671
                .add_resource(p.id_, exclusive, num_threads);
4✔
672
            ++p.thread_occupancy_count_;
4✔
673
            return;
4✔
674
        }
675

676
        // check occupancy counter and increment it
677
        if (p.thread_occupancy_count_ == 0)
4,363✔
678
        {
679
            get_pool_data(l, pool_name)
4,363✔
680
                .add_resource(p.id_, exclusive, num_threads);
4,363✔
681
            ++p.thread_occupancy_count_;
4,363✔
682

683
            // Make sure the total number of requested threads does not exceed
684
            // the number of threads requested on the command line
685
            std::size_t num_threads =
4,363✔
686
                util::get_entry_as<std::size_t>(rtcfg_, "hpx.os_threads", 0);
4,363✔
687
            HPX_ASSERT(num_threads != 0);
4,363✔
688

689
            if (detail::init_pool_data::num_threads_overall > num_threads)
4,363✔
690
            {
691
                l.unlock();
×
692
                throw std::runtime_error("partitioner::add_resource: "
×
693
                                         "Creation of " +
×
694
                    std::to_string(
×
695
                        detail::init_pool_data::num_threads_overall) +
×
696
                    " threads requested by the resource partitioner, but "
697
                    "only " +
×
698
                    std::to_string(num_threads) +
×
699
                    " provided on the command-line.");
700
            }
701
        }
4,363✔
702
        else
703
        {
704
            l.unlock();
×
705
            throw std::runtime_error("partitioner::add_resource: "
×
706
                                     "PU #" +
×
707
                std::to_string(p.id_) + " can be assigned only " +
×
708
                std::to_string(p.thread_occupancy_) +
×
709
                " threads according to affinity bindings.");
710
        }
711
    }
4,367✔
712

713
    void partitioner::add_resource(
×
714
        std::vector<pu> const& pv, std::string const& pool_name, bool exclusive)
715
    {
716
        for (pu const& p : pv)
×
717
        {
718
            add_resource(p, pool_name, exclusive);
×
719
        }
720
    }
×
721

722
    void partitioner::add_resource(
×
723
        core const& c, std::string const& pool_name, bool exclusive)
724
    {
725
        add_resource(c.pus_, pool_name, exclusive);
×
726
    }
×
727

728
    void partitioner::add_resource(std::vector<core> const& cv,
×
729
        std::string const& pool_name, bool exclusive)
730
    {
731
        for (core const& c : cv)
×
732
        {
733
            add_resource(c.pus_, pool_name, exclusive);
×
734
        }
735
    }
×
736

737
    void partitioner::add_resource(
×
738
        numa_domain const& nd, std::string const& pool_name, bool exclusive)
739
    {
740
        add_resource(nd.cores_, pool_name, exclusive);
×
741
    }
×
742

743
    void partitioner::add_resource(std::vector<numa_domain> const& ndv,
×
744
        std::string const& pool_name, bool exclusive)
745
    {
746
        for (numa_domain const& d : ndv)
×
747
        {
748
            add_resource(d, pool_name, exclusive);
×
749
        }
750
    }
×
751

752
    void partitioner::set_scheduler(
×
753
        scheduling_policy sched, std::string const& pool_name)
754
    {
755
        std::unique_lock<mutex_type> l(mtx_);
×
756
        get_pool_data(l, pool_name).scheduling_policy_ = sched;
×
757
    }
×
758

759
    void partitioner::configure_pools()
1,218✔
760
    {
761
        setup_pools();
1,218✔
762
        setup_schedulers();
1,218✔
763
        reconfigure_affinities();
1,218✔
764

765
        is_initialized_ = true;
1,218✔
766
    }
1,218✔
767

768
    ////////////////////////////////////////////////////////////////////////
769
    // this function is called in the constructor of thread_pool
770
    // returns a scheduler (moved) that thread pool should have as a data member
771
    scheduling_policy partitioner::which_scheduler(std::string const& pool_name)
1,581✔
772
    {
773
        std::unique_lock<mutex_type> l(mtx_);
1,581✔
774

775
        // look up which scheduler is needed
776
        scheduling_policy sched_type =
1,581✔
777
            get_pool_data(l, pool_name).scheduling_policy_;
1,581✔
778
        if (sched_type == scheduling_policy::unspecified)
1,581✔
779
        {
780
            l.unlock();
×
781
            throw std::invalid_argument(
×
782
                "partitioner::which_scheduler: Thread pool " + pool_name +
×
783
                " cannot be instantiated with unspecified scheduler type.");
784
        }
785
        return sched_type;
1,581✔
786
    }
1,581✔
787

788
    threads::topology& partitioner::get_topology() const noexcept
91,087✔
789
    {
790
        return topo_;
91,087✔
791
    }
792

793
    std::size_t partitioner::get_num_threads() const
1,220✔
794
    {
795
        std::size_t num_threads = 0;
1,220✔
796

797
        {
798
            std::unique_lock<mutex_type> l(mtx_);
1,220✔
799
            std::size_t num_thread_pools = initial_thread_pools_.size();
1,220✔
800
            for (size_t i = 0; i != num_thread_pools; ++i)
2,806✔
801
            {
802
                num_threads += get_pool_data(l, i).num_threads_;
1,586✔
803
            }
1,586✔
804
        }
1,220✔
805

806
        // the number of allocated threads should be the same as the number of
807
        // threads to create (if no over-subscription is allowed)
808
        HPX_ASSERT(as_bool(mode_ & partitioner_mode::allow_oversubscription) ||
1,220✔
809
            num_threads ==
810
                util::get_entry_as<std::size_t>(
811
                    rtcfg_, "hpx.os_threads", std::size_t(-1)));
812

813
        return num_threads;
1,220✔
814
    }
×
815

816
    std::size_t partitioner::get_num_pools() const noexcept
1,248✔
817
    {
818
        std::lock_guard<mutex_type> l(mtx_);
1,248✔
819
        return initial_thread_pools_.size();
1,248✔
820
    }
1,248✔
821

822
    std::size_t partitioner::get_num_threads(std::size_t pool_index) const
4,749✔
823
    {
824
        std::unique_lock<mutex_type> l(mtx_);
4,749✔
825
        return get_pool_data(l, pool_index).num_threads_;
4,749✔
826
    }
4,749✔
827

828
    std::size_t partitioner::get_num_threads(std::string const& pool_name) const
112,715✔
829
    {
830
        std::unique_lock<mutex_type> l(mtx_);
112,715✔
831
        return get_pool_data(l, pool_name).num_threads_;
112,715✔
832
    }
112,715✔
833

834
    hpx::threads::policies::scheduler_mode partitioner::get_scheduler_mode(
1,581✔
835
        std::size_t pool_index) const
836
    {
837
        std::unique_lock<mutex_type> l(mtx_);
1,581✔
838
        return get_pool_data(l, pool_index).mode_;
1,581✔
839
    }
1,581✔
840

841
    detail::init_pool_data const& partitioner::get_pool_data(
7,922✔
842
        std::unique_lock<mutex_type>& l, std::size_t pool_index) const
843
    {
844
        if (pool_index >= initial_thread_pools_.size())
7,922✔
845
        {
846
            l.unlock();
×
847
            throw_invalid_argument("partitioner::get_pool_data",
×
848
                "pool index " + std::to_string(pool_index) +
×
849
                    " too large: the resource partitioner owns only " +
×
850
                    std::to_string(initial_thread_pools_.size()) +
×
851
                    " thread pools.");
852
        }
853
        return initial_thread_pools_[pool_index];
7,922✔
854
    }
×
855

856
    std::string const& partitioner::get_pool_name(std::size_t index) const
1,691✔
857
    {
858
        if (index >= initial_thread_pools_.size())
1,691✔
859
        {
860
            throw_invalid_argument("partitioner::get_pool_name: ",
×
861
                "pool " + std::to_string(index) +
×
862
                    " (zero-based index) requested out of bounds. The "
863
                    "partitioner owns only " +
×
864
                    std::to_string(initial_thread_pools_.size()) + " pools");
×
865
        }
866
        return initial_thread_pools_[index].pool_name_;
1,691✔
867
    }
×
868

869
    std::size_t partitioner::get_pu_num(std::size_t global_thread_num) const
389,055✔
870
    {
871
        // protect against stand-alone use of schedulers
872
        if (is_initialized_)
389,054✔
873
        {
874
            return affinity_data_.get_pu_num(global_thread_num);
389,030✔
875
        }
876
        return global_thread_num;
24✔
877
    }
389,054✔
878

879
    std::size_t partitioner::get_thread_occupancy(std::size_t pu_num) const
×
880
    {
881
        return affinity_data_.get_thread_occupancy(topo_, pu_num);
×
882
    }
883

884
    threads::mask_type partitioner::get_used_pus_mask(std::size_t pu_num) const
6✔
885
    {
886
        if (is_initialized_)
6✔
887
        {
888
            return affinity_data_.get_used_pus_mask(topo_, pu_num);
6✔
889
        }
890

891
        auto mask = hpx::threads::mask_type();
×
892
        hpx::threads::resize(mask, hpx::threads::hardware_concurrency());
×
893
        threads::set(mask, pu_num);
×
894
        return mask;
×
895
    }
6✔
896

897
    threads::mask_type partitioner::get_pu_mask(
202,595✔
898
        std::size_t global_thread_num) const
899
    {
900
        if (is_initialized_)
202,595✔
901
        {
902
            return affinity_data_.get_pu_mask(topo_, global_thread_num);
202,570✔
903
        }
904

905
        auto mask = hpx::threads::mask_type();
16✔
906
        hpx::threads::resize(mask, hpx::threads::hardware_concurrency());
16✔
907
        threads::set(mask, global_thread_num);
16✔
908
        return mask;
16✔
909
    }
202,586✔
910

911
    void partitioner::init(resource::partitioner_mode rpmode,
1,221✔
912
        hpx::util::section const& rtcfg,
913
        hpx::threads::policies::detail::affinity_data affinity_data)
914
    {
915
        mode_ = rpmode;
1,221✔
916
        rtcfg_ = rtcfg;
1,221✔
917
        affinity_data_ = affinity_data;
1,221✔
918

919
        fill_topology_vectors();
1,221✔
920
    }
1,221✔
921

922
    scheduler_function partitioner::get_pool_creator(std::size_t index) const
6✔
923
    {
924
        std::unique_lock<mutex_type> l(mtx_);
6✔
925
        if (index >= initial_thread_pools_.size())
6✔
926
        {
927
            l.unlock();
×
928
            throw std::invalid_argument(
×
929
                "partitioner::get_pool_creator: pool requested out of bounds.");
930
        }
931
        return get_pool_data(l, index).create_function_;
6✔
932
    }
6✔
933

934
    ///////////////////////////////////////////////////////////////////////////
935
    void partitioner::assign_pu(
4,367✔
936
        std::string const& pool_name, std::size_t virt_core)
937
    {
938
        std::unique_lock<mutex_type> l(mtx_);
4,367✔
939
        detail::init_pool_data& data = get_pool_data(l, pool_name);
4,367✔
940
        data.assign_pu(virt_core);
4,367✔
941
    }
4,367✔
942

943
    void partitioner::unassign_pu(
4,305✔
944
        std::string const& pool_name, std::size_t virt_core)
945
    {
946
        std::unique_lock<mutex_type> l(mtx_);
4,365✔
947
        detail::init_pool_data& data = get_pool_data(l, pool_name);
4,305✔
948
        data.unassign_pu(virt_core);
4,365✔
949
    }
4,365✔
950

951
    std::size_t partitioner::shrink_pool(std::string const& pool_name,
×
952
        hpx::function<void(std::size_t)> const& remove_pu)
953
    {
954
        if (!as_bool(mode_ & partitioner_mode::allow_dynamic_pools))
×
955
        {
956
            HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
×
957
                "partitioner::shrink_pool",
958
                "dynamic pools have not been enabled for the partitioner");
959
        }
960

961
        std::vector<std::size_t> pu_nums_to_remove;
×
962
        bool has_non_exclusive_pus = false;
×
963

964
        {
965
            std::unique_lock<mutex_type> l(mtx_);
×
966
            detail::init_pool_data const& data = get_pool_data(l, pool_name);
×
967

968
            pu_nums_to_remove.reserve(data.num_threads_);
×
969

970
            for (std::size_t i = 0; i != data.num_threads_; ++i)
×
971
            {
972
                if (!data.pu_is_exclusive(i))
×
973
                {
974
                    has_non_exclusive_pus = true;
×
975
                    if (data.pu_is_assigned(i))
×
976
                    {
977
                        pu_nums_to_remove.push_back(i);
×
978
                    }
×
979
                }
×
980
            }
×
981
        }
×
982

983
        if (!has_non_exclusive_pus)
×
984
        {
985
            HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
×
986
                "partitioner::shrink_pool",
987
                "pool '{}' has no non-exclusive pus associated", pool_name);
988
        }
989

990
        for (std::size_t pu_num : pu_nums_to_remove)
×
991
        {
992
            remove_pu(pu_num);
×
993
        }
994

995
        return pu_nums_to_remove.size();
×
996
    }
×
997

998
    std::size_t partitioner::expand_pool(std::string const& pool_name,
×
999
        hpx::function<void(std::size_t)> const& add_pu)
1000
    {
1001
        if (!as_bool(mode_ & partitioner_mode::allow_dynamic_pools))
×
1002
        {
1003
            HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
×
1004
                "partitioner::expand_pool",
1005
                "dynamic pools have not been enabled for the partitioner");
1006
        }
1007

1008
        std::vector<std::size_t> pu_nums_to_add;
×
1009
        bool has_non_exclusive_pus = false;
×
1010

1011
        {
1012
            std::unique_lock<mutex_type> l(mtx_);
×
1013
            detail::init_pool_data const& data = get_pool_data(l, pool_name);
×
1014

1015
            pu_nums_to_add.reserve(data.num_threads_);
×
1016

1017
            for (std::size_t i = 0; i != data.num_threads_; ++i)
×
1018
            {
1019
                if (!data.pu_is_exclusive(i))
×
1020
                {
1021
                    has_non_exclusive_pus = true;
×
1022
                    if (!data.pu_is_assigned(i))
×
1023
                    {
1024
                        pu_nums_to_add.push_back(i);
×
1025
                    }
×
1026
                }
×
1027
            }
×
1028
        }
×
1029

1030
        if (!has_non_exclusive_pus)
×
1031
        {
1032
            HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
×
1033
                "partitioner::expand_pool",
1034
                "pool '{}' has no non-exclusive pus associated", pool_name);
1035
        }
1036

1037
        for (std::size_t pu_num : pu_nums_to_add)
×
1038
        {
1039
            add_pu(pu_num);
×
1040
        }
1041

1042
        return pu_nums_to_add.size();
×
1043
    }
×
1044

1045
    ////////////////////////////////////////////////////////////////////////
1046
    std::size_t partitioner::get_pool_index(std::string const& pool_name) const
147✔
1047
    {
1048
        // the default pool is always index 0, it may be renamed but the user
1049
        // can always ask for "default"
1050
        if (pool_name == "default")
147✔
1051
        {
1052
            return 0;
139✔
1053
        }
1054

1055
        {
1056
            std::lock_guard<mutex_type> l(mtx_);
8✔
1057
            std::size_t num_pools = initial_thread_pools_.size();
8✔
1058
            for (std::size_t i = 0; i < num_pools; i++)
8✔
1059
            {
1060
                if (initial_thread_pools_[i].pool_name_ == pool_name)
8✔
1061
                {
1062
                    return i;
8✔
1063
                }
1064
            }
×
1065
        }
8✔
1066

1067
        throw_invalid_argument("partitioner::get_pool_index",
×
1068
            "the resource partitioner does not own a thread pool named '" +
×
1069
                pool_name + "'");
×
1070
    }
147✔
1071

1072
    // has to be private bc pointers become invalid after data member
1073
    // thread_pools_ is resized we don't want to allow the user to use it
1074
    detail::init_pool_data const& partitioner::get_pool_data(
112,715✔
1075
        std::unique_lock<mutex_type>& l, std::string const& pool_name) const
1076
    {
1077
        auto pool = std::find_if(initial_thread_pools_.begin(),
225,430✔
1078
            initial_thread_pools_.end(),
112,715✔
1079
            [&pool_name](detail::init_pool_data const& itp) -> bool {
247,961✔
1080
                return (itp.pool_name_ == pool_name);
135,246✔
1081
            });
1082

1083
        if (pool != initial_thread_pools_.end())
112,715✔
1084
        {
1085
            return *pool;
112,715✔
1086
        }
1087

1088
        l.unlock();
×
1089
        throw_invalid_argument("partitioner::get_pool_data",
×
1090
            "the resource partitioner does not own a thread pool named '" +
×
1091
                pool_name + "'");
×
1092
    }
×
1093

1094
    detail::init_pool_data& partitioner::get_pool_data(
15,898✔
1095
        std::unique_lock<mutex_type>& l, std::string const& pool_name)
1096
    {
1097
        auto pool = std::find_if(initial_thread_pools_.begin(),
31,796✔
1098
            initial_thread_pools_.end(),
15,898✔
1099
            [&pool_name](detail::init_pool_data const& itp) -> bool {
77,461✔
1100
                return (itp.pool_name_ == pool_name);
61,563✔
1101
            });
1102

1103
        if (pool != initial_thread_pools_.end())
15,898✔
1104
        {
1105
            return *pool;
15,898✔
1106
        }
1107

1108
        l.unlock();
×
1109
        throw_invalid_argument("partitioner::get_pool_data",
×
1110
            "the resource partitioner does not own a thread pool named '" +
×
1111
                pool_name + "'");
×
1112
    }
×
1113

1114
    void partitioner::print_init_pool_data(std::ostream& os) const
2✔
1115
    {
1116
        std::lock_guard<mutex_type> l(mtx_);
2✔
1117

1118
        //! make this prettier
1119
        os << "the resource partitioner owns " << initial_thread_pools_.size()
2✔
1120
           << " pool(s) : \n";    // -V128
2✔
1121
        for (auto itp : initial_thread_pools_)
6✔
1122
        {
1123
            itp.print_pool(os);
4✔
1124
        }
4✔
1125
    }
2✔
1126

1127
    ////////////////////////////////////////////////////////////////////////
1128
    std::atomic<int> partitioner::instance_number_counter_(-1);
1129
}    // namespace hpx::resource::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc