• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #862

10 Jan 2023 05:30PM UTC coverage: 86.582% (-0.05%) from 86.634%
#862

push

StellarBot
Merge #6130

6130: Remove the mutex lock in the critical path of get_partitioner. r=hkaiser a=JiakunYan

Remove the mutex lock in the critical path of hpx::resource::detail::get_partitioner.

The protected variable `partitioner_ref` is only set once during initialization.

Co-authored-by: Jiakun Yan <jiakunyan1998@gmail.com>

6 of 6 new or added lines in 1 file covered. (100.0%)

174767 of 201851 relevant lines covered (86.58%)

2069816.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

21.81
/libs/core/affinity/src/parse_affinity_options.cpp
1
//  Copyright (c) 2007-2017 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#include <hpx/affinity/parse_affinity_options.hpp>
8
#include <hpx/assert.hpp>
9
#include <hpx/datastructures/tuple.hpp>
10
#include <hpx/modules/errors.hpp>
11
#include <hpx/topology/topology.hpp>
12

13
#include <hpx/affinity/detail/partlit.hpp>
14

15
#include <hwloc.h>
16

17
#include <boost/fusion/include/adapt_struct.hpp>
18
#include <boost/fusion/include/std_pair.hpp>
19
#include <boost/spirit/home/x3/auxiliary.hpp>
20
#include <boost/spirit/home/x3/char.hpp>
21
#include <boost/spirit/home/x3/core.hpp>
22
#include <boost/spirit/home/x3/nonterminal.hpp>
23
#include <boost/spirit/home/x3/numeric.hpp>
24
#include <boost/spirit/home/x3/operator.hpp>
25
#include <boost/spirit/home/x3/string.hpp>
26

27
#include <algorithm>
28
#include <cstddef>
29
#include <cstdint>
30
#include <string>
31
#include <vector>
32

33
///////////////////////////////////////////////////////////////////////////////
34
// clang-format off
35
BOOST_FUSION_ADAPT_STRUCT(
1,826✔
36
    hpx::threads::detail::spec_type,
37
    type_,
38
    index_bounds_
39
)
40
// clang-format on
41

42
namespace {
43

44
    ///////////////////////////////////////////////////////////////////////////
45
    //
46
    //    mappings:
47
    //        distribution
48
    //        mapping(;mapping)*
49
    //
50
    //    distribution:
51
    //        compact
52
    //        scatter
53
    //        balanced
54
    //        numa-balanced
55
    //
56
    //    mapping:
57
    //        thread-spec=pu-specs
58
    //
59
    //    thread-spec:
60
    //        t:int
61
    //        t:int-int
62
    //        t:all
63
    //
64
    //    pu-specs:
65
    //        pu-spec(.pu-spec)*
66
    //
67
    //    pu-spec:
68
    //        type:range-specs
69
    //        ~pu-spec
70
    //
71
    //    range-specs:
72
    //        range-spec(,range-spec)*
73
    //
74
    //    range-spec:
75
    //        int
76
    //        int-int
77
    //        all
78
    //
79
    //    type:
80
    //        socket | numanode
81
    //        core
82
    //        pu
83
    //
84
    namespace x3 = boost::spirit::x3;
85
    using namespace hpx::threads::detail;
86

87
    x3::rule<class mappings, mappings_type> mappings = "mappings";
1,253✔
88
    x3::rule<class distribution, distribution_type> distribution =
1,253✔
89
        "distribution";
1,253✔
90
    x3::rule<class mapping, full_mapping_type> mapping = "mapping";
1,253✔
91
    x3::rule<class thread_spec, spec_type> thread_spec = "thread_spec";
1,253✔
92
    x3::rule<class pu_specs, mapping_type> pu_specs = "pu_specs";
1,253✔
93
    x3::rule<class socket_spec, spec_type> socket_spec = "socket_spec";
1,253✔
94
    x3::rule<class core_spec, spec_type> core_spec = "core_spec";
1,253✔
95
    x3::rule<class pu_spec, spec_type> pu_spec = "pu_spec";
1,253✔
96
    x3::rule<class specs, bounds_type> specs = "specs";
1,253✔
97
    x3::rule<class spec, bounds_type> spec = "spec";
1,253✔
98

99
    auto mappings_def = distribution | (mapping % ';');
1,253✔
100

101
    auto mapping_def = thread_spec >> '=' >> pu_specs;
1,253✔
102

103
    // clang-format off
104
    auto distribution_def =
1,253✔
105
            partlit("compact", distribution_type::compact)
1,253✔
106
        |   partlit("scatter", distribution_type::scatter)
1,253✔
107
        |   partlit("balanced", distribution_type::balanced)
1,253✔
108
        |   partlit("numa-balanced", distribution_type::numa_balanced)
1,253✔
109
        ;
110

111
    auto thread_spec_def =
1,253✔
112
            partlit("thread", spec_type::type::thread) >> ':' >>  specs
1,253✔
113
        ;
114

115
    auto pu_specs_def =
1,253✔
116
            (socket_spec >> core_spec >> pu_spec)
1,253✔
117
//        |   ('~' >> pu_spec)
118
        ;
119

120
    auto socket_spec_def =
2,506✔
121
            (partlit("socket", spec_type::type::socket) >> ':' >> specs)
2,506✔
122
        |   (partlit("numanode", spec_type::type::numanode) >> ':' >> specs)
1,253✔
123
        |   x3::attr(spec_type(spec_type::type::unknown))
1,253✔
124
        ;
125

126
    auto core_spec_def =
2,506✔
127
           (-x3::lit('.')
2,506✔
128
                >> partlit("core", spec_type::type::core) >> ':' >> specs)
1,253✔
129
        |   x3::attr(spec_type(spec_type::type::unknown))
1,253✔
130
        ;
131

132
    auto pu_spec_def =
2,506✔
133
           (-x3::lit('.') >> partlit("pu", spec_type::type::pu) >> ':' >> specs)
2,506✔
134
        |   x3::attr(spec_type(spec_type::type::unknown))
1,253✔
135
        ;
136

137
    auto specs_def = spec % ',';
1,253✔
138

139
    auto spec_def =
2,506✔
140
            (x3::uint_ >> -(x3::int_))
2,506✔
141
        |   partlit("all", bounds_type{spec_type::all_entities()})
1,253✔
142
        ;
143
    // clang-format on
144

145
    BOOST_SPIRIT_DEFINE(mappings, distribution, mapping, thread_spec, pu_specs,
5,054✔
146
        socket_spec, core_spec, pu_spec, specs, spec)
147

148
}    // namespace
149

150
namespace hpx::threads::detail {
151

152
    inline constexpr char const* const type_names[] = {
153
        "unknown", "thread", "socket", "numanode", "core", "pu"};
154

155
    char const* spec_type::type_name(spec_type::type t) noexcept
×
156
    {
157
        if (t < spec_type::type::unknown || t > spec_type::type::pu)
×
158
            return type_names[0];
×
159
        return type_names[static_cast<int>(t)];
×
160
    }
×
161

162
    template <typename Iterator>
163
    inline bool parse(Iterator& begin, Iterator end, mappings_type& m)
1,372✔
164
    {
165
        return x3::parse(begin, end, mappings, m);
1,372✔
166
    }
167

168
    ///////////////////////////////////////////////////////////////////////////
169
    void parse_mappings(
1,372✔
170
        std::string const& spec, mappings_type& mappings, error_code& ec)
171
    {
172
        std::string::const_iterator begin = spec.begin();
1,372✔
173
        if (!detail::parse(begin, spec.end(), mappings) || begin != spec.end())
1,372✔
174
        {
175
            HPX_THROWS_IF(ec, hpx::error::bad_parameter,
20✔
176
                "parse_affinity_options",
177
                "failed to parse affinity specification: " + spec);
178
            return;
20✔
179
        }
180

181
        if (&ec != &throws)
1,352✔
182
            ec = make_success_code();
128✔
183
    }
1,372✔
184

185
    ///////////////////////////////////////////////////////////////////////////
186
    bounds_type extract_bounds(
×
187
        spec_type const& m, std::size_t default_last, error_code& ec)
188
    {
189
        bounds_type result;
×
190

191
        if (m.index_bounds_.empty())
×
192
            return result;
×
193

194
        bounds_type::const_iterator first = m.index_bounds_.begin();
×
195
        bounds_type::const_iterator last = m.index_bounds_.end();
×
196

197
        while (first != last)
×
198
        {
199
            if (*first == spec_type::all_entities())
×
200
            {
201
                // bind all entities
202
                result.clear();
×
203
                for (std::size_t i = 0; i != default_last; ++i)
×
204
                    result.push_back(i);
×
205
                break;    // we will not get more than 'all'
×
206
            }
207

208
            bounds_type::const_iterator second = first;
×
209
            if (++second != last)
×
210
            {
211
                if (*second == 0 || *second == spec_type::all_entities())
×
212
                {
213
                    // one element only
214
                    if (default_last <= std::size_t(*first))
×
215
                    {
216
                        result.clear();
×
217
                        HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
218
                            "extract_bounds",
219
                            "the resource id given is larger than the "
220
                            "number of existing resources");
221
                        return result;
×
222
                    }
223
                    result.push_back(*first);
×
224
                }
×
225
                else if (*second < 0)
×
226
                {
227
                    // all elements between min and -max
228
                    if (default_last <= std::size_t(-*second))
×
229
                    {
230
                        result.clear();
×
231
                        HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
232
                            "extract_bounds",
233
                            "the upper limit given is larger than the "
234
                            "number of existing resources");
235
                        return result;
×
236
                    }
237
                    for (std::int64_t i = *first; i <= -*second; ++i)
×
238
                        result.push_back(i);
×
239
                }
×
240
                else
241
                {
242
                    // just min and max
243
                    if (default_last <= std::size_t(*second))
×
244
                    {
245
                        result.clear();
×
246
                        HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
247
                            "extract_bounds",
248
                            "the upper limit given is larger than the "
249
                            "number of existing resources");
250
                        return result;
×
251
                    }
252
                    result.push_back(*first);
×
253
                    result.push_back(*second);
×
254
                }
255
                first = second;
×
256
            }
×
257
            else
258
            {
259
                // one element only
260
                if (default_last <= std::size_t(*first))
×
261
                {
262
                    result.clear();
×
263
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
264
                        "extract_bounds",
265
                        "the resource id given is larger than the number "
266
                        "of existing resources");
267
                    return result;
×
268
                }
269
                result.push_back(*first);
×
270
            }
271
            ++first;
×
272
        }
273

274
        if (&ec != &throws)
×
275
            ec = make_success_code();
×
276

277
        return result;
×
278
    }
×
279

280
    ///////////////////////////////////////////////////////////////////////////
281
    //                  index,       mask
282
    typedef hpx::tuple<std::size_t, mask_type> mask_info;
283

284
    inline std::size_t get_index(mask_info const& smi)
×
285
    {
286
        return hpx::get<0>(smi);
×
287
    }
288
    inline mask_cref_type get_mask(mask_info const& smi)
×
289
    {
290
        return hpx::get<1>(smi);
×
291
    }
292

293
    ///////////////////////////////////////////////////////////////////////////
294
    std::vector<mask_info> extract_socket_masks(
×
295
        topology const& t, bounds_type const& b)
296
    {
297
        std::vector<mask_info> masks;
×
298
        for (std::int64_t index : b)
×
299
        {
300
            masks.emplace_back(static_cast<std::size_t>(index),
×
301
                t.init_socket_affinity_mask_from_socket(
×
302
                    static_cast<std::size_t>(index)));
×
303
        }
304
        return masks;
×
305
    }
×
306

307
    std::vector<mask_info> extract_numanode_masks(
×
308
        topology const& t, bounds_type const& b)
309
    {
310
        std::vector<mask_info> masks;
×
311
        for (std::int64_t index : b)
×
312
        {
313
            masks.emplace_back(static_cast<std::size_t>(index),
×
314
                t.init_numa_node_affinity_mask_from_numa_node(
×
315
                    static_cast<std::size_t>(index)));
×
316
        }
317
        return masks;
×
318
    }
×
319

320
    mask_cref_type extract_machine_mask(topology const& t, error_code& ec)
×
321
    {
322
        return t.get_machine_affinity_mask(ec);
×
323
    }
324

325
    std::vector<mask_info> extract_socket_or_numanode_masks(
×
326
        topology const& t, spec_type const& s, error_code& ec)
327
    {
328
        switch (s.type_)
×
329
        {
330
        case spec_type::type::socket:
331
            // requested top level is a socket
332
            {
333
                std::size_t num_sockets = t.get_number_of_sockets();
×
334
                return extract_socket_masks(
×
335
                    t, extract_bounds(s, num_sockets, ec));
×
336
            }
337

338
        case spec_type::type::numanode:
339
            // requested top level is a NUMA node
340
            {
341
                std::size_t num_numanodes = t.get_number_of_numa_nodes();
×
342
                return extract_numanode_masks(
×
343
                    t, extract_bounds(s, num_numanodes, ec));
×
344
            }
345

346
        case spec_type::type::unknown:
347
        {
348
            std::vector<mask_info> masks;
×
349
            masks.emplace_back(std::size_t(-1), extract_machine_mask(t, ec));
×
350
            return masks;
×
351
        }
×
352

353
        default:
354
            HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
355
                "extract_socket_or_numanode_mask",
356
                "unexpected specification type {}",
357
                spec_type::type_name(s.type_));
358
            break;
×
359
        }
360

361
        return std::vector<mask_info>();
×
362
    }
×
363

364
    std::vector<mask_info> extract_core_masks(topology const& t,
×
365
        spec_type const& s, std::size_t socket, mask_cref_type socket_mask,
366
        error_code& ec)
367
    {
368
        std::vector<mask_info> masks;
×
369

370
        switch (s.type_)
×
371
        {
372
        case spec_type::type::core:
373
        {
374
            std::size_t base = 0;
×
375
            std::size_t num_cores = 0;
×
376

377
            if (socket != std::size_t(-1))
×
378
            {
379
                for (std::size_t i = 0; i != socket; ++i)
×
380
                {
381
                    // The number of NUMA nodes might be zero if there hwloc
382
                    // doesn't detect a NUMA domain. This might be the case when
383
                    // there is no NUMA support configured, or when there are
384
                    // just sockets, but no direct numa domains. The bind
385
                    // description might relate to sockets, not NUMA domains.
386
                    if (t.get_number_of_numa_nodes() == 0)
×
387
                        base += t.get_number_of_socket_cores(i);
×
388
                    else
389
                        base += t.get_number_of_numa_node_cores(i);
×
390
                }
×
391
                if (t.get_number_of_numa_nodes() == 0)
×
392
                    num_cores = t.get_number_of_socket_cores(socket);
×
393
                else
394
                    num_cores = t.get_number_of_numa_node_cores(socket);
×
395
            }
×
396
            else
397
            {
398
                num_cores = t.get_number_of_cores();
×
399
            }
400

401
            bounds_type bounds = extract_bounds(s, num_cores, ec);
×
402
            if (ec)
×
403
                break;
×
404

405
            for (std::int64_t index : bounds)
×
406
            {
407
                mask_type mask = t.init_core_affinity_mask_from_core(
×
408
                    static_cast<std::size_t>(index + base));
×
409
                masks.emplace_back(
×
410
                    static_cast<std::size_t>(index), mask & socket_mask);
×
411
            }
×
412
        }
×
413
        break;
×
414

415
        case spec_type::type::unknown:
416
        {
417
            mask_type mask = extract_machine_mask(t, ec);
×
418
            masks.emplace_back(std::size_t(-1), mask & socket_mask);
×
419
        }
×
420
        break;
×
421

422
        default:
423
            HPX_THROWS_IF(ec, hpx::error::bad_parameter, "extract_core_mask",
×
424
                "unexpected specification type {}",
425
                spec_type::type_name(s.type_));
426
            break;
×
427
        }
428

429
        return masks;
×
430
    }
×
431

432
    std::vector<mask_info> extract_pu_masks(topology const& t,
×
433
        spec_type const& s, std::size_t socket, std::size_t core,
434
        mask_cref_type core_mask, error_code& ec)
435
    {
436
        std::vector<mask_info> masks;
×
437

438
        switch (s.type_)
×
439
        {
440
        case spec_type::type::pu:
441
        {
442
            std::size_t num_pus = 0;
×
443
            std::size_t socket_base = 0;
×
444
            if (std::size_t(-1) != socket)
×
445
            {
446
                // core number is relative to socket
447
                for (std::size_t i = 0; i != socket; ++i)
×
448
                {
449
                    if (t.get_number_of_numa_nodes() == 0)
×
450
                        socket_base += t.get_number_of_socket_cores(i);
×
451
                    else
452
                        socket_base += t.get_number_of_numa_node_cores(i);
×
453
                }
×
454
            }
×
455

456
            if (std::size_t(-1) != core)
×
457
            {
458
                num_pus = t.get_number_of_core_pus(core);
×
459
            }
×
460
            else
461
            {
462
                num_pus = t.get_number_of_pus();
×
463
            }
464
            bounds_type bounds = extract_bounds(s, num_pus, ec);
×
465
            if (ec)
×
466
                break;
×
467

468
            std::size_t num_cores = t.get_number_of_cores();
×
469
            for (std::int64_t index : bounds)
×
470
            {
471
                std::size_t base_core = socket_base;
×
472
                if (std::size_t(-1) != core)    //-V1051
×
473
                {
474
                    base_core += core;
×
475
                }
×
476
                else
477
                {
478
                    // find core the given pu belongs to
479
                    std::size_t base = 0;
×
480
                    for (/**/; base_core < num_cores; ++base_core)
×
481
                    {
482
                        std::size_t num_core_pus =
×
483
                            t.get_number_of_core_pus(base_core);
×
484
                        if (base + num_core_pus > std::size_t(index))
×
485
                            break;
×
486
                        base += num_core_pus;
×
487
                    }
×
488
                }
489

490
                mask_type mask = t.init_thread_affinity_mask(
×
491
                    base_core, static_cast<std::size_t>(index));
×
492
                masks.emplace_back(
×
493
                    static_cast<std::size_t>(index), mask & core_mask);
×
494
            }
×
495
        }
×
496
        break;
×
497

498
        case spec_type::type::unknown:
499
        {
500
            mask_type mask = extract_machine_mask(t, ec);
×
501
            masks.emplace_back(std::size_t(-1), mask & core_mask);
×
502
        }
×
503
        break;
×
504

505
        default:
506
            HPX_THROWS_IF(ec, hpx::error::bad_parameter, "extract_pu_mask",
×
507
                "unexpected specification type {}",
508
                spec_type::type_name(s.type_));
509
            break;
×
510
        }
511

512
        return masks;
×
513
    }
×
514

515
    ///////////////////////////////////////////////////////////////////////////
516
    // sanity checks
517
    void mappings_sanity_checks(full_mapping_type& fmt, std::size_t /* size */,
×
518
        bounds_type const& b, error_code& ec)
519
    {
520
        mapping_type& m = fmt.second;
×
521
        if (m.size() != 3)
×
522
        {
523
            HPX_THROWS_IF(ec, hpx::error::bad_parameter, "decode_mapping",
×
524
                "bad size of mappings specification array");
525
            return;
×
526
        }
527

528
        if (b.begin() == b.end())
×
529
        {
530
            HPX_THROWS_IF(ec, hpx::error::bad_parameter, "decode_mapping",
×
531
                "no {1} mapping bounds are specified",
532
                spec_type::type_name(fmt.first.type_));
533
            return;
×
534
        }
535

536
        if (&ec != &throws)
×
537
            ec = make_success_code();
×
538
    }
×
539

540
    // for each given core-mask extract all required pu-masks
541
    void extract_pu_affinities(topology const& t,
×
542
        std::vector<spec_type> const& specs, std::size_t socket,
543
        std::vector<mask_info> const& core_masks,
544
        std::vector<mask_type>& affinities, error_code& ec)
545
    {
546
        // get the core masks for each of the sockets
547
        for (mask_info const& cmi : core_masks)
×
548
        {
549
            if (get_index(cmi) == std::size_t(-1))
×
550
            {
551
                // all cores
552
                if (specs[2].type_ == spec_type::type::unknown)
×
553
                {
554
                    // no pu information
555
                    affinities.push_back(get_mask(cmi));
×
556
                }
×
557
                else
558
                {
559
                    // handle pu information in the absence of core information
560
                    std::vector<mask_info> pu_masks = extract_pu_masks(t,
×
561
                        specs[2], socket, std::size_t(-1), get_mask(cmi), ec);
×
562
                    if (ec)
×
563
                        break;
×
564

565
                    for (mask_info const& pmi : pu_masks)
×
566
                    {
567
                        affinities.push_back(get_mask(pmi));
×
568
                    }
569
                }
×
570
                break;
×
571
            }
572
            else
573
            {
574
                // just this core
575
                std::vector<mask_info> pu_masks = extract_pu_masks(
×
576
                    t, specs[2], socket, get_index(cmi), get_mask(cmi), ec);
×
577
                if (ec)
×
578
                    break;
×
579

580
                for (mask_info const& pmi : pu_masks)
×
581
                {
582
                    affinities.push_back(get_mask(pmi));
×
583
                }
584
            }
×
585
        }
586
    }
×
587

588
    // for each given socket-mask extract all required pu-masks
589
    void extract_core_affinities(topology const& t,
×
590
        std::vector<spec_type> const& specs,
591
        std::vector<mask_info> const& socket_masks,
592
        std::vector<mask_type>& affinities, error_code& ec)
593
    {
594
        // get the core masks for each of the sockets
595
        for (mask_info const& smi : socket_masks)
×
596
        {
597
            if (get_index(smi) == std::size_t(-1))
×
598
            {
599
                // all NUMA domains
600
                if (specs[1].type_ == spec_type::type::unknown)
×
601
                {
602
                    // no core information
603
                    if (specs[2].type_ == spec_type::type::unknown)
×
604
                    {
605
                        // no pu information
606
                        affinities.push_back(get_mask(smi));
×
607
                    }
×
608
                    else
609
                    {
610
                        // handle pu information in the absence of core/socket
611
                        std::vector<mask_info> pu_masks =
612
                            extract_pu_masks(t, specs[2], std::size_t(-1),
×
613
                                std::size_t(-1), get_mask(smi), ec);
×
614
                        if (ec)
×
615
                            break;
×
616

617
                        for (mask_info const& pmi : pu_masks)
×
618
                        {
619
                            affinities.push_back(get_mask(pmi));
×
620
                        }
621
                    }
×
622
                }
×
623
                else
624
                {
625
                    // no socket given, assume cores are numbered for whole
626
                    // machine
627
                    if (specs[2].type_ == spec_type::type::unknown)
×
628
                    {
629
                        // no pu information
630
                        std::vector<mask_info> core_masks = extract_core_masks(
×
631
                            t, specs[1], std::size_t(-1), get_mask(smi), ec);
×
632
                        if (ec)
×
633
                            break;
×
634

635
                        for (mask_info const& cmi : core_masks)
×
636
                        {
637
                            affinities.push_back(get_mask(cmi));
×
638
                        }
639
                    }
×
640
                    else
641
                    {
642
                        std::vector<mask_info> core_masks = extract_core_masks(
×
643
                            t, specs[1], std::size_t(-1), get_mask(smi), ec);
×
644
                        if (ec)
×
645
                            break;
×
646

647
                        // get the pu masks (i.e. overall affinity masks) for
648
                        // all of the core masks
649
                        extract_pu_affinities(t, specs, std::size_t(-1),
×
650
                            core_masks, affinities, ec);
×
651
                        if (ec)
×
652
                            break;
×
653
                    }
×
654
                }
655
                break;
×
656
            }
657
            else
658
            {
659
                std::vector<mask_info> core_masks = extract_core_masks(
×
660
                    t, specs[1], get_index(smi), get_mask(smi), ec);
×
661
                if (ec)
×
662
                    break;
×
663

664
                // get the pu masks (i.e. overall affinity masks) for
665
                // all of the core masks
666
                extract_pu_affinities(
×
667
                    t, specs, get_index(smi), core_masks, affinities, ec);
×
668
                if (ec)
×
669
                    break;
×
670
            }
×
671
        }
672
    }
×
673

674
    void decode_mappings(topology const& t, full_mapping_type& m,
×
675
        std::vector<mask_type>& affinities, std::size_t num_threads,
676
        error_code& ec)
677
    {
678
        // The core numbers are interpreted differently depending on whether a
679
        // socket/numanode is given or not. If no socket(s) is(are) given, then
680
        // the core numbering covers the whole locality, otherwise the core
681
        // numbering is relative to the given socket.
682

683
        // generate overall masks for each of the given sockets
684
        std::vector<mask_info> socket_masks =
685
            extract_socket_or_numanode_masks(t, m.second[0], ec);
×
686

687
        HPX_ASSERT(!socket_masks.empty());
×
688

689
        extract_core_affinities(t, m.second, socket_masks, affinities, ec);
×
690

691
        // special case, all threads share the same options
692
        if (affinities.size() == 1 && num_threads > 1)
×
693
        {
694
            affinities.resize(num_threads, affinities[0]);
×
695
        }
×
696
    }
×
697

698
    ///////////////////////////////////////////////////////////////////////////
699
    bool pu_in_process_mask(bool use_process_mask, topology& t,
4,382✔
700
        std::size_t num_core, std::size_t num_pu)
701
    {
702
        if (!use_process_mask)
4,382✔
703
        {
704
            return true;
4,378✔
705
        }
706

707
        threads::mask_type proc_mask = t.get_cpubind_mask();
4✔
708
        threads::mask_type pu_mask =
709
            t.init_thread_affinity_mask(num_core, num_pu);
4✔
710

711
        return threads::bit_and(proc_mask, pu_mask);
4✔
712
    }
4,382✔
713

714
    void check_num_threads(bool use_process_mask, topology& t,
1,224✔
715
        std::size_t num_threads, error_code& ec)
716
    {
717
        if (use_process_mask)
1,224✔
718
        {
719
            threads::mask_type proc_mask = t.get_cpubind_mask();
1✔
720
            std::size_t num_pus_proc_mask = threads::count(proc_mask);
1✔
721

722
            if (num_threads > num_pus_proc_mask)
1✔
723
            {
724
                HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
725
                    "check_num_threads",
726
                    "specified number of threads ({1}) is larger than number "
727
                    "of processing units available in process mask ({2})",
728
                    num_threads, num_pus_proc_mask);
729
            }
×
730
        }
1✔
731
        else
732
        {
733
            std::size_t num_threads_available =
1,223✔
734
                static_cast<std::size_t>(threads::hardware_concurrency());
1,223✔
735

736
            if (num_threads > num_threads_available)
1,223✔
737
            {
738
                HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
739
                    "check_num_threads",
740
                    "specified number of threads ({1}) is larger than number "
741
                    "of available processing units ({2})",
742
                    num_threads, num_threads_available);
743
            }
×
744
        }
745
    }
1,224✔
746

747
    ///////////////////////////////////////////////////////////////////////////
748
    void decode_compact_distribution(topology& t,
×
749
        std::vector<mask_type>& affinities, std::size_t used_cores,
750
        std::size_t max_cores, std::vector<std::size_t>& num_pus,
751
        bool use_process_mask, error_code& ec)
752
    {
753
        std::size_t num_threads = affinities.size();
×
754

755
        check_num_threads(use_process_mask, t, num_threads, ec);
×
756

757
        if (use_process_mask)
×
758
        {
759
            used_cores = 0;
×
760
            max_cores = t.get_number_of_cores();
×
761
        }
×
762

763
        std::size_t num_cores = (std::min)(max_cores, t.get_number_of_cores());
×
764
        num_pus.resize(num_threads);
×
765

766
        for (std::size_t num_thread = 0; num_thread < num_threads; /**/)
×
767
        {
768
            for (std::size_t num_core = 0; num_core < num_cores; ++num_core)
×
769
            {
770
                std::size_t num_core_pus =
×
771
                    t.get_number_of_core_pus(num_core + used_cores);
×
772
                for (std::size_t num_pu = 0; num_pu < num_core_pus; ++num_pu)
×
773
                {
774
                    if (!pu_in_process_mask(
×
775
                            use_process_mask, t, num_core, num_pu))
×
776
                    {
777
                        continue;
×
778
                    }
779

780
                    if (any(affinities[num_thread]))
×
781
                    {
782
                        HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
783
                            "decode_compact_distribution",
784
                            "affinity mask for thread {1} has already been set",
785
                            num_thread);
786
                        return;
×
787
                    }
788

789
                    num_pus[num_thread] =
×
790
                        t.get_pu_number(num_core + used_cores, num_pu);
×
791
                    affinities[num_thread] = t.init_thread_affinity_mask(
×
792
                        num_core + used_cores, num_pu);
×
793

794
                    if (++num_thread == num_threads)
×
795
                        return;
×
796
                }
×
797
            }
×
798
        }
799
    }
×
800

801
    void decode_scatter_distribution(topology& t,
×
802
        std::vector<mask_type>& affinities, std::size_t used_cores,
803
        std::size_t max_cores, std::vector<std::size_t>& num_pus,
804
        bool use_process_mask, error_code& ec)
805
    {
806
        std::size_t num_threads = affinities.size();
×
807

808
        check_num_threads(use_process_mask, t, num_threads, ec);
×
809

810
        if (use_process_mask)
×
811
        {
812
            used_cores = 0;
×
813
            max_cores = t.get_number_of_cores();
×
814
        }
×
815

816
        std::size_t num_cores = (std::min)(max_cores, t.get_number_of_cores());
×
817

818
        std::vector<std::size_t> next_pu_index(num_cores, 0);
×
819
        num_pus.resize(num_threads);
×
820

821
        for (std::size_t num_thread = 0; num_thread < num_threads; /**/)
×
822
        {
823
            for (std::size_t num_core = 0; num_core < num_cores; ++num_core)
×
824
            {
825
                if (any(affinities[num_thread]))
×
826
                {
827
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
828
                        "decode_scatter_distribution",
829
                        "affinity mask for thread {1} has already been set",
830
                        num_thread);
831
                    return;
×
832
                }
833

834
                std::size_t num_core_pus = t.get_number_of_core_pus(num_core);
×
835
                std::size_t pu_index = next_pu_index[num_core];
×
836
                bool use_pu = false;
×
837

838
                // Find the next PU on this core which is in the process mask
839
                while (pu_index < num_core_pus)
×
840
                {
841
                    use_pu = pu_in_process_mask(
×
842
                        use_process_mask, t, num_core, pu_index);
×
843
                    ++pu_index;
×
844

845
                    if (use_pu)
×
846
                    {
847
                        break;
×
848
                    }
849
                }
850

851
                next_pu_index[num_core] = pu_index;
×
852

853
                if (!use_pu)
×
854
                {
855
                    continue;
×
856
                }
857

858
                num_pus[num_thread] = t.get_pu_number(
×
859
                    num_core + used_cores, next_pu_index[num_core] - 1);
×
860
                affinities[num_thread] = t.init_thread_affinity_mask(
×
861
                    num_core + used_cores, next_pu_index[num_core] - 1);
×
862

863
                if (++num_thread == num_threads)
×
864
                    return;
×
865
            }
×
866
        }
867
    }
×
868

869
    ///////////////////////////////////////////////////////////////////////////
870
    void decode_balanced_distribution(topology& t,
1,224✔
871
        std::vector<mask_type>& affinities, std::size_t used_cores,
872
        std::size_t max_cores, std::vector<std::size_t>& num_pus,
873
        bool use_process_mask, error_code& ec)
874
    {
875
        std::size_t num_threads = affinities.size();
1,224✔
876

877
        check_num_threads(use_process_mask, t, num_threads, ec);
1,224✔
878

879
        if (use_process_mask)
1,224✔
880
        {
881
            used_cores = 0;
1✔
882
            max_cores = t.get_number_of_cores();
1✔
883
        }
1✔
884

885
        std::size_t num_cores = (std::min)(max_cores, t.get_number_of_cores());
1,224✔
886

887
        std::vector<std::size_t> num_pus_cores(num_cores, 0);
1,224✔
888
        std::vector<std::size_t> next_pu_index(num_cores, 0);
1,224✔
889
        std::vector<std::vector<std::size_t>> pu_indexes(num_cores);
1,224✔
890
        num_pus.resize(num_threads);
1,224✔
891

892
        // At first, calculate the number of used pus per core. This needs to be
893
        // done to make sure that we occupy all the available cores
894
        for (std::size_t num_thread = 0; num_thread < num_threads; /**/)
2,460✔
895
        {
896
            for (std::size_t num_core = 0; num_core < num_cores; ++num_core)
4,394✔
897
            {
898
                std::size_t num_core_pus = t.get_number_of_core_pus(num_core);
4,382✔
899
                std::size_t pu_index = next_pu_index[num_core];
4,382✔
900
                bool use_pu = false;
4,382✔
901

902
                // Find the next PU on this core which is in the process mask
903
                while (pu_index < num_core_pus)
4,382✔
904
                {
905
                    use_pu = pu_in_process_mask(
4,382✔
906
                        use_process_mask, t, num_core, pu_index);
4,382✔
907
                    ++pu_index;
4,382✔
908

909
                    if (use_pu)
4,382✔
910
                    {
911
                        break;
4,382✔
912
                    }
913
                }
914

915
                next_pu_index[num_core] = pu_index;
4,382✔
916

917
                if (!use_pu)
4,382✔
918
                {
919
                    continue;
×
920
                }
921

922
                pu_indexes[num_core].push_back(next_pu_index[num_core] - 1);
4,382✔
923

924
                num_pus_cores[num_core]++;
4,382✔
925
                if (++num_thread == num_threads)
4,382✔
926
                    break;
1,224✔
927
            }
3,158✔
928
        }
929

930
        // Iterate over the cores and assigned pus per core. this additional
931
        // loop is needed so that we have consecutive worker thread numbers
932
        std::size_t num_thread = 0;
1,224✔
933
        for (std::size_t num_core = 0; num_core < num_cores; ++num_core)
5,241✔
934
        {
935
            for (std::size_t num_pu = 0; num_pu < num_pus_cores[num_core];
8,399✔
936
                 ++num_pu)
4,382✔
937
            {
938
                if (any(affinities[num_thread]))
4,382✔
939
                {
940
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
941
                        "decode_balanced_distribution",
942
                        "affinity mask for thread {1} has already been set",
943
                        num_thread);
944
                    return;
×
945
                }
946

947
                num_pus[num_thread] = t.get_pu_number(
8,764✔
948
                    num_core + used_cores, pu_indexes[num_core][num_pu]);
4,382✔
949
                affinities[num_thread] = t.init_thread_affinity_mask(
8,764✔
950
                    num_core + used_cores, pu_indexes[num_core][num_pu]);
4,382✔
951
                ++num_thread;
4,382✔
952
            }
4,382✔
953
        }
4,017✔
954
    }
1,224✔
955

956
    ///////////////////////////////////////////////////////////////////////////
957
    void decode_numabalanced_distribution(topology& t,
×
958
        std::vector<mask_type>& affinities, std::size_t used_cores,
959
        std::size_t max_cores, std::vector<std::size_t>& num_pus,
960
        bool use_process_mask, error_code& ec)
961
    {
962
        HPX_UNUSED(max_cores);
×
963
        std::size_t num_threads = affinities.size();
×
964

965
        check_num_threads(use_process_mask, t, num_threads, ec);
×
966

967
        if (use_process_mask)
×
968
        {
969
            used_cores = 0;
×
970
        }
×
971

972
        num_pus.resize(num_threads);
×
973

974
        // numa nodes
975
        std::size_t num_numas =
×
976
            (std::max)(std::size_t(1), t.get_number_of_numa_nodes());
×
977
        std::vector<std::size_t> num_cores_numa(num_numas, 0);
×
978
        std::vector<std::size_t> num_pus_numa(num_numas, 0);
×
979
        std::vector<std::size_t> num_threads_numa(num_numas, 0);
×
980
        for (std::size_t n = 0; n < num_numas; ++n)
×
981
        {
982
            num_cores_numa[n] = t.get_number_of_numa_node_cores(n);
×
983
        }
×
984

985
        std::size_t core_offset = 0;
×
986
        std::size_t pus_t = 0;
×
987
        for (std::size_t n = 0; n < num_numas; ++n)
×
988
        {
989
            for (std::size_t num_core = 0; num_core < num_cores_numa[n];
×
990
                 ++num_core)
×
991
            {
992
                std::size_t num_pus =
×
993
                    t.get_number_of_core_pus(num_core + core_offset);
×
994
                for (std::size_t num_pu = 0; num_pu < num_pus; ++num_pu)
×
995
                {
996
                    if (pu_in_process_mask(use_process_mask, t,
×
997
                            num_core + core_offset, num_pu))
×
998
                    {
999
                        ++num_pus_numa[n];
×
1000
                    }
×
1001
                }
×
1002
            }
×
1003

1004
            pus_t += num_pus_numa[n];
×
1005
            core_offset += num_cores_numa[n];
×
1006
        }
×
1007

1008
        // how many threads should go on each domain
1009
        std::size_t pus_t2 = 0;
×
1010
        for (std::size_t n = 0; n < num_numas; ++n)
×
1011
        {
1012
            std::size_t temp = static_cast<std::size_t>(
×
1013
                std::round(static_cast<double>(num_threads * num_pus_numa[n]) /
×
1014
                    static_cast<double>(pus_t)));
×
1015

1016
            // due to rounding up, we might have too many threads
1017
            if ((pus_t2 + temp) > num_threads)
×
1018
                temp = num_threads - pus_t2;
×
1019
            pus_t2 += temp;
×
1020
            num_threads_numa[n] = temp;
×
1021

1022
            // HPX_ASSERT(num_threads_numa[n] <= num_pus_numa[n]);
1023
        }
×
1024

1025
        // HPX_ASSERT(num_threads <= pus_t2);
1026

1027
        // assign threads to cores on each numa domain
1028
        std::size_t num_thread = 0;
×
1029
        core_offset = 0;
×
1030
        for (std::size_t n = 0; n < num_numas; ++n)
×
1031
        {
1032
            std::vector<std::size_t> num_pus_cores(num_cores_numa[n], 0);
×
1033
            std::vector<std::size_t> next_pu_index(num_cores_numa[n], 0);
×
1034
            std::vector<std::vector<std::size_t>> pu_indexes(num_cores_numa[n]);
×
1035

1036
            // iterate once and count pus/core
1037
            for (std::size_t num_thread_numa = 0;
×
1038
                 num_thread_numa < num_threads_numa[n];
×
1039
                /**/)
1040
            {
1041
                for (std::size_t num_core = 0; num_core < num_cores_numa[n];
×
1042
                     ++num_core)
×
1043
                {
1044
                    std::size_t num_core_pus =
×
1045
                        t.get_number_of_core_pus(num_core);
×
1046
                    std::size_t pu_index = next_pu_index[num_core];
×
1047
                    bool use_pu = false;
×
1048

1049
                    // Find the next PU on this core which is in the process mask
1050
                    while (pu_index < num_core_pus)
×
1051
                    {
1052
                        use_pu = pu_in_process_mask(use_process_mask, t,
×
1053
                            num_core + core_offset, pu_index);
×
1054
                        ++pu_index;
×
1055

1056
                        if (use_pu)
×
1057
                        {
1058
                            break;
×
1059
                        }
1060
                    }
1061

1062
                    next_pu_index[num_core] = pu_index;
×
1063

1064
                    if (!use_pu)
×
1065
                    {
1066
                        continue;
×
1067
                    }
1068

1069
                    pu_indexes[num_core].push_back(next_pu_index[num_core] - 1);
×
1070

1071
                    num_pus_cores[num_core]++;
×
1072
                    if (++num_thread_numa == num_threads_numa[n])
×
1073
                        break;
×
1074
                }
×
1075
            }
1076

1077
            // Iterate over the cores and assigned pus per core. this additional
1078
            // loop is needed so that we have consecutive worker thread numbers
1079
            for (std::size_t num_core = 0; num_core < num_cores_numa[n];
×
1080
                 ++num_core)
×
1081
            {
1082
                for (std::size_t num_pu = 0; num_pu < num_pus_cores[num_core];
×
1083
                     ++num_pu)
×
1084
                {
1085
                    if (any(affinities[num_thread]))
×
1086
                    {
1087
                        HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
1088
                            "decode_numabalanced_distribution",
1089
                            "affinity mask for thread {1} has already been set",
1090
                            num_thread);
1091
                        return;
×
1092
                    }
1093
                    num_pus[num_thread] = t.get_pu_number(
×
1094
                        num_core + used_cores, pu_indexes[num_core][num_pu]);
×
1095
                    affinities[num_thread] = t.init_thread_affinity_mask(
×
1096
                        num_core + used_cores + core_offset,
×
1097
                        pu_indexes[num_core][num_pu]);
×
1098
                    ++num_thread;
×
1099
                }
×
1100
            }
×
1101
            core_offset += num_cores_numa[n];
×
1102
        }
×
1103
    }
×
1104

1105
    ///////////////////////////////////////////////////////////////////////////
1106
    void decode_distribution(distribution_type d, topology& t,
1,224✔
1107
        std::vector<mask_type>& affinities, std::size_t used_cores,
1108
        std::size_t max_cores, std::size_t num_threads,
1109
        std::vector<std::size_t>& num_pus, bool use_process_mask,
1110
        error_code& ec)
1111
    {
1112
        affinities.resize(num_threads);
1,224✔
1113
        switch (d)
1,224✔
1114
        {
1115
        case distribution_type::compact:
1116
            decode_compact_distribution(t, affinities, used_cores, max_cores,
×
1117
                num_pus, use_process_mask, ec);
×
1118
            break;
×
1119

1120
        case distribution_type::scatter:
1121
            decode_scatter_distribution(t, affinities, used_cores, max_cores,
×
1122
                num_pus, use_process_mask, ec);
×
1123
            break;
×
1124

1125
        case distribution_type::balanced:
1126
            decode_balanced_distribution(t, affinities, used_cores, max_cores,
2,448✔
1127
                num_pus, use_process_mask, ec);
1,224✔
1128
            break;
1,224✔
1129

1130
        case distribution_type::numa_balanced:
1131
            decode_numabalanced_distribution(t, affinities, used_cores,
×
1132
                max_cores, num_pus, use_process_mask, ec);
×
1133
            break;
×
1134

1135
        default:
1136
            HPX_ASSERT(false);
×
1137
        }
×
1138
    }
1,224✔
1139
}    // namespace hpx::threads::detail
1140

1141
namespace hpx::threads {
1142

1143
    ///////////////////////////////////////////////////////////////////////////
1144
    void parse_affinity_options(std::string const& spec,
1,244✔
1145
        std::vector<mask_type>& affinities, std::size_t used_cores,
1146
        std::size_t max_cores, std::size_t num_threads,
1147
        std::vector<std::size_t>& num_pus, bool use_process_mask,
1148
        error_code& ec)
1149
    {
1150
        detail::mappings_type mappings;
1,244✔
1151
        detail::parse_mappings(spec, mappings, ec);
1,244✔
1152
        if (ec)
1,244✔
1153
            return;
20✔
1154

1155
        // We need to instantiate a new topology object as the runtime has not
1156
        // been initialized yet
1157
        threads::topology& t = threads::create_topology();
1,224✔
1158

1159
        switch (mappings.which())
1,224✔
1160
        {
1161
        case 0:
1162
        {
1163
            detail::decode_distribution(
1,224✔
1164
                boost::get<detail::distribution_type>(mappings), t, affinities,
1,224✔
1165
                used_cores, max_cores, num_threads, num_pus, use_process_mask,
1,224✔
1166
                ec);
1,224✔
1167
            if (ec)
1,224✔
1168
                return;
×
1169
        }
1170
        break;
1,224✔
1171

1172
        case 1:
1173
        {
1174
            if (use_process_mask)
×
1175
            {
1176
                HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
1177
                    "parse_affinity_options",
1178
                    "can't use --hpx:use-process-mask with custom thread "
1179
                    "bindings");
1180
            }
×
1181
            detail::mappings_spec_type mappings_specs(
×
1182
                boost::get<detail::mappings_spec_type>(mappings));
×
1183

1184
            affinities.clear();
×
1185
            for (detail::full_mapping_type& m : mappings_specs)
×
1186
            {
1187
                if (m.first.type_ != detail::spec_type::type::thread)
×
1188
                {
1189
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
1190
                        "parse_affinity_options",
1191
                        "bind specification ({1}) is ill formatted", spec);
1192
                    return;
×
1193
                }
1194

1195
                if (m.second.size() != 3)
×
1196
                {
1197
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
1198
                        "parse_affinity_options",
1199
                        "bind specification ({1}) is ill formatted", spec);
1200
                    return;
×
1201
                }
1202

1203
                if (m.second[0].type_ == detail::spec_type::type::unknown &&
×
1204
                    m.second[1].type_ == detail::spec_type::type::unknown &&
×
1205
                    m.second[2].type_ == detail::spec_type::type::unknown)
×
1206
                {
1207
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
1208
                        "parse_affinity_options",
1209
                        "bind specification ({1}) is ill formatted", spec);
1210
                    return;
×
1211
                }
1212

1213
                // repeat for each of the threads in the affinity specification
1214
                detail::bounds_type thread_bounds =
1215
                    extract_bounds(m.first, num_threads, ec);
×
1216
                if (ec)
×
1217
                    return;
×
1218

1219
                mappings_sanity_checks(m, num_threads, thread_bounds, ec);
×
1220
                if (ec)
×
1221
                    return;
×
1222

1223
                detail::decode_mappings(
×
1224
                    t, m, affinities, thread_bounds.size(), ec);
×
1225
                if (ec)
×
1226
                    return;
×
1227
            }
×
1228

1229
            if (num_pus.empty())
×
1230
            {
1231
                num_pus.resize(affinities.size());
×
1232
                for (std::size_t i = 0; i != affinities.size(); ++i)
×
1233
                {
1234
                    num_pus[i] = threads::find_first(affinities[i]);
×
1235
                }
×
1236
            }
×
1237
        }
×
1238
        break;
×
1239
        }
1240
    }
1,244✔
1241
}    // namespace hpx::threads
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc