• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

14.87
/libs/core/affinity/src/parse_affinity_options.cpp
1
//  Copyright (c) 2007-2025 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#include <hpx/affinity/detail/parse_mappings.hpp>
8
#include <hpx/affinity/detail/partlit.hpp>
9
#include <hpx/affinity/parse_affinity_options.hpp>
10
#include <hpx/assert.hpp>
11
#include <hpx/modules/datastructures.hpp>
12
#include <hpx/modules/errors.hpp>
13
#include <hpx/modules/topology.hpp>
14

15
#include <hwloc.h>
16

17
#include <boost/fusion/include/adapt_struct.hpp>
18
#include <boost/fusion/include/std_pair.hpp>
19
#include <boost/spirit/home/x3/auxiliary.hpp>
20
#include <boost/spirit/home/x3/char.hpp>
21
#include <boost/spirit/home/x3/core.hpp>
22
#include <boost/spirit/home/x3/nonterminal.hpp>
23
#include <boost/spirit/home/x3/numeric.hpp>
24
#include <boost/spirit/home/x3/operator.hpp>
25
#include <boost/spirit/home/x3/string.hpp>
26

27
#include <algorithm>
28
#include <cstddef>
29
#include <cstdint>
30
#include <string>
31
#include <vector>
32

33
///////////////////////////////////////////////////////////////////////////////
34
// clang-format off
×
35
BOOST_FUSION_ADAPT_STRUCT(
36
    hpx::threads::detail::spec_type,
37
    type_,
38
    index_bounds_
39
)
40
// clang-format on
41

42
namespace {
43

44
    ///////////////////////////////////////////////////////////////////////////
45
    //
46
    //    mappings:
47
    //        distribution
48
    //        mapping(;mapping)*
49
    //
50
    //    distribution:
51
    //        compact
52
    //        scatter
53
    //        balanced
54
    //        numa-balanced
55
    //
56
    //    mapping:
57
    //        thread-spec=pu-specs
58
    //
59
    //    thread-spec:
60
    //        t:int
61
    //        t:int-int
62
    //        t:all
63
    //
64
    //    pu-specs:
65
    //        pu-spec(.pu-spec)*
66
    //
67
    //    pu-spec:
68
    //        type:range-specs
69
    //        ~pu-spec
70
    //
71
    //    range-specs:
72
    //        range-spec(,range-spec)*
73
    //
74
    //    range-spec:
75
    //        int
76
    //        int-int
77
    //        all
78
    //
79
    //    type:
80
    //        socket | numanode
81
    //        core
82
    //        pu
83
    //
84
    namespace x3 = boost::spirit::x3;
85
    using namespace hpx::threads::detail;
86

87
    x3::rule<class mappings, mappings_type> mappings = "mappings";
88
    x3::rule<class distribution, distribution_type> distribution =
89
        "distribution";
90
    x3::rule<class mapping, full_mapping_type> mapping = "mapping";
91
    x3::rule<class thread_spec, spec_type> thread_spec = "thread_spec";
92
    x3::rule<class pu_specs, mapping_type> pu_specs = "pu_specs";
93
    x3::rule<class socket_spec, spec_type> socket_spec = "socket_spec";
94
    x3::rule<class core_spec, spec_type> core_spec = "core_spec";
95
    x3::rule<class pu_spec, spec_type> pu_spec = "pu_spec";
96
    x3::rule<class specs, bounds_type> specs = "specs";
97
    x3::rule<class spec, bounds_type> spec = "spec";
98

99
    auto mappings_def = distribution | (mapping % ';');
100

101
    auto mapping_def = thread_spec >> '=' >> pu_specs;
102

103
    // clang-format off
104
    auto distribution_def =
105
            partlit("compact", distribution_type::compact)
106
        |   partlit("scatter", distribution_type::scatter)
107
        |   partlit("balanced", distribution_type::balanced)
108
        |   partlit("numa-balanced", distribution_type::numa_balanced)
109
        ;
110

111
    auto thread_spec_def =
112
            partlit("thread", spec_type::type::thread) >> ':' >>  specs
113
        ;
114

115
    auto pu_specs_def =
116
            (socket_spec >> core_spec >> pu_spec)
117
//        |   ('~' >> pu_spec)
118
        ;
119

120
    auto socket_spec_def =
121
            (partlit("socket", spec_type::type::socket) >> ':' >> specs)
122
        |   (partlit("numanode", spec_type::type::numanode) >> ':' >> specs)
123
        |   x3::attr(spec_type(spec_type::type::unknown))
124
        ;
125

126
    auto core_spec_def =
127
           (-x3::lit('.')
128
                >> partlit("core", spec_type::type::core) >> ':' >> specs)
129
        |   x3::attr(spec_type(spec_type::type::unknown))
130
        ;
131

132
    auto pu_spec_def =
133
           (-x3::lit('.') >> partlit("pu", spec_type::type::pu) >> ':' >> specs)
134
        |   x3::attr(spec_type(spec_type::type::unknown))
135
        ;
136

137
    auto specs_def = spec % ',';
138

139
    auto spec_def =
140
            (x3::uint_ >> -(x3::int_))
141
        |   partlit("all", bounds_type{spec_type::all_entities()})
142
        ;
143
    // clang-format on
144

145
    BOOST_SPIRIT_DEFINE(mappings, distribution, mapping, thread_spec, pu_specs,
146
        socket_spec, core_spec, pu_spec, specs, spec)
147

148
}    // namespace
149

150
namespace hpx::threads::detail {
151

152
    inline constexpr char const* const type_names[] = {
153
        "unknown", "thread", "socket", "numanode", "core", "pu"};
154

×
155
    char const* spec_type::type_name(spec_type::type t) noexcept
156
    {
×
157
        if (t < spec_type::type::unknown || t > spec_type::type::pu)
158
            return type_names[0];
×
159
        return type_names[static_cast<int>(t)];
160
    }
161

162
    template <typename Iterator>
163
    inline bool parse(Iterator& begin, Iterator end, mappings_type& m)
164
    {
165
        return x3::parse(begin, end, mappings, m);
166
    }
167

168
    ///////////////////////////////////////////////////////////////////////////
6✔
169
    void parse_mappings(
170
        std::string const& spec, mappings_type& mappings, error_code& ec)
171
    {
6✔
172
        if (auto begin = spec.begin();
6✔
173
            !detail::parse(begin, spec.end(), mappings) || begin != spec.end())
174
        {
×
175
            HPX_THROWS_IF(ec, hpx::error::bad_parameter,
176
                "parse_affinity_options",
177
                "failed to parse affinity specification: " + spec);
×
178
            return;
179
        }
180

6✔
181
        if (&ec != &throws)
×
182
            ec = make_success_code();
183
    }
184

185
    ///////////////////////////////////////////////////////////////////////////
×
186
    bounds_type extract_bounds(
187
        spec_type const& m, std::size_t default_last, error_code& ec)
188
    {
×
189
        bounds_type result;
190

×
191
        if (m.index_bounds_.empty())
192
            return result;
193

194
        auto first = m.index_bounds_.begin();
195
        auto const last = m.index_bounds_.end();
196

×
197
        while (first != last)
198
        {
×
199
            if (*first == spec_type::all_entities())
200
            {
201
                // bind all entities
202
                result.clear();
×
203
                for (std::size_t i = 0; i != default_last; ++i)
204
                {
×
205
                    result.push_back(static_cast<std::int64_t>(i));
206
                }
207
                break;    // we will not get more than 'all'
208
            }
209

×
210
            if (auto second = first; ++second != last)
211
            {
×
212
                if (*second == 0 || *second == spec_type::all_entities())
213
                {
214
                    // one element only
×
215
                    if (default_last <= static_cast<std::size_t>(*first))
216
                    {
217
                        result.clear();
×
218
                        HPX_THROWS_IF(ec, hpx::error::bad_parameter,
219
                            "extract_bounds",
220
                            "the resource id given is larger than the "
221
                            "number of existing resources");
×
222
                        return result;
223
                    }
×
224
                    result.push_back(*first);
225
                }
×
226
                else if (*second < 0)
227
                {
228
                    // all elements between min and -max
×
229
                    if (default_last <= static_cast<std::size_t>(-*second))
230
                    {
231
                        result.clear();
×
232
                        HPX_THROWS_IF(ec, hpx::error::bad_parameter,
233
                            "extract_bounds",
234
                            "the upper limit given is larger than the "
235
                            "number of existing resources");
×
236
                        return result;
237
                    }
238

×
239
                    for (std::int64_t i = *first; i <= -*second; ++i)
240
                    {
×
241
                        result.push_back(i);
242
                    }
243
                }
244
                else
245
                {
246
                    // just min and max
×
247
                    if (default_last <= static_cast<std::size_t>(*second))
248
                    {
249
                        result.clear();
×
250
                        HPX_THROWS_IF(ec, hpx::error::bad_parameter,
251
                            "extract_bounds",
252
                            "the upper limit given is larger than the "
253
                            "number of existing resources");
×
254
                        return result;
255
                    }
×
256
                    result.push_back(*first);
×
257
                    result.push_back(*second);
258
                }
259
                first = second;
260
            }
261
            else
262
            {
263
                // one element only
×
264
                if (default_last <= static_cast<std::size_t>(*first))
265
                {
266
                    result.clear();
×
267
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
268
                        "extract_bounds",
269
                        "the resource id given is larger than the number "
270
                        "of existing resources");
×
271
                    return result;
272
                }
×
273
                result.push_back(*first);
274
            }
275
            ++first;
276
        }
277

×
278
        if (&ec != &throws)
×
279
            ec = make_success_code();
280

281
        return result;
282
    }
283

284
    ///////////////////////////////////////////////////////////////////////////
285
    //                                 index,      mask
286
    using mask_info = hpx::tuple<std::size_t, mask_type>;
287

288
    constexpr std::size_t get_index(mask_info const& smi) noexcept
289
    {
×
290
        return hpx::get<0>(smi);
291
    }
292
    constexpr mask_cref_type get_mask(mask_info const& smi) noexcept
293
    {
294
        return hpx::get<1>(smi);
295
    }
296

297
    ///////////////////////////////////////////////////////////////////////////
×
298
    std::vector<mask_info> extract_socket_masks(
299
        topology const& t, bounds_type const& b)
300
    {
×
301
        std::vector<mask_info> masks;
×
302
        for (std::int64_t const index : b)
303
        {
×
304
            masks.emplace_back(static_cast<std::size_t>(index),
×
305
                t.init_socket_affinity_mask_from_socket(
306
                    static_cast<std::size_t>(index)));
307
        }
×
308
        return masks;
×
309
    }
310

×
311
    std::vector<mask_info> extract_numanode_masks(
312
        topology const& t, bounds_type const& b)
313
    {
×
314
        std::vector<mask_info> masks;
×
315
        for (std::int64_t const index : b)
316
        {
×
317
            masks.emplace_back(static_cast<std::size_t>(index),
×
318
                t.init_numa_node_affinity_mask_from_numa_node(
319
                    static_cast<std::size_t>(index)));
320
        }
×
321
        return masks;
×
322
    }
323

×
324
    mask_cref_type extract_machine_mask(topology const& t, error_code& ec)
325
    {
×
326
        return t.get_machine_affinity_mask(ec);
327
    }
328

×
329
    std::vector<mask_info> extract_socket_or_numanode_masks(
330
        topology const& t, spec_type const& s, error_code& ec)
331
    {
×
332
        switch (s.type_)
333
        {
334
        // requested top level is a socket
×
335
        case spec_type::type::socket:
336
        {
×
337
            std::size_t const num_sockets = t.get_number_of_sockets();
×
338
            return extract_socket_masks(t, extract_bounds(s, num_sockets, ec));
339
        }
340

341
        // requested top level is a NUMA node
×
342
        case spec_type::type::numanode:
343
        {
×
344
            std::size_t const num_numanodes = t.get_number_of_numa_nodes();
345
            return extract_numanode_masks(
×
346
                t, extract_bounds(s, num_numanodes, ec));
347
        }
348

×
349
        case spec_type::type::unknown:
350
        {
×
351
            std::vector<mask_info> masks;
×
352
            masks.emplace_back(
×
353
                static_cast<std::size_t>(-1), extract_machine_mask(t, ec));
354
            return masks;
×
355
        }
356

357
        default:
×
358
            HPX_THROWS_IF(ec, hpx::error::bad_parameter,
359
                "extract_socket_or_numanode_mask",
360
                "unexpected specification type {}",
361
                spec_type::type_name(s.type_));
362
            break;
363
        }
364

×
365
        return {};
366
    }
367

×
368
    std::vector<mask_info> extract_core_masks(topology const& t,
369
        spec_type const& s, std::size_t socket, mask_cref_type socket_mask,
370
        error_code& ec)
371
    {
×
372
        std::vector<mask_info> masks;
373

×
374
        if (s.type_ == spec_type::type::core)
375
        {
×
376
            std::size_t base = 0;
377
            std::size_t num_cores;
378

379
            if (socket != static_cast<std::size_t>(-1))
380
            {
×
381
                for (std::size_t i = 0; i != socket; ++i)
382
                {
×
383
                    // The number of NUMA nodes might be zero if there hwloc
384
                    // doesn't detect a NUMA domain. This might be the case when
385
                    // there is no NUMA support configured, or when there are
386
                    // just sockets, but no direct numa domains. The bind
387
                    // description might relate to sockets, not NUMA domains.
388
                    if (t.get_number_of_numa_nodes() == 0)
389
                    {
×
390
                        base += t.get_number_of_socket_cores(i);
391
                    }
×
392
                    else
393
                    {
394
                        base += t.get_number_of_numa_node_cores(i);
395
                    }
×
396
                }
397

398
                if (t.get_number_of_numa_nodes() == 0)
399
                {
×
400
                    num_cores = t.get_number_of_socket_cores(socket);
401
                }
×
402
                else
403
                {
404
                    num_cores = t.get_number_of_numa_node_cores(socket);
405
                }
×
406
            }
407
            else
408
            {
409
                num_cores = t.get_number_of_cores();
410
            }
×
411

412
            bounds_type const bounds = extract_bounds(s, num_cores, ec);
413
            if (!ec)
×
414
            {
×
415
                for (std::int64_t const index : bounds)
416
                {
417
                    mask_type mask = t.init_core_affinity_mask_from_core(
×
418
                        static_cast<std::size_t>(index + base));
419
                    masks.emplace_back(
420
                        static_cast<std::size_t>(index), mask & socket_mask);
×
421
                }
×
422
            }
×
423
        }
424
        else if (s.type_ == spec_type::type::unknown)
425
        {
426
            mask_type const mask = extract_machine_mask(t, ec);
427
            masks.emplace_back(
428
                static_cast<std::size_t>(-1), mask & socket_mask);
429
        }
430
        else
×
431
        {
×
432
            HPX_THROWS_IF(ec, hpx::error::bad_parameter, "extract_core_mask",
433
                "unexpected specification type {}",
×
434
                spec_type::type_name(s.type_));
435
        }
436
        return masks;
×
437
    }
438

439
    std::vector<mask_info> extract_pu_masks(topology const& t,
×
440
        spec_type const& s, std::size_t socket, std::size_t core,
441
        mask_cref_type core_mask, error_code& ec)
442
    {
×
443
        std::vector<mask_info> masks;
×
444

445
        if (s.type_ == spec_type::type::pu)
×
446
        {
447
            std::size_t socket_base = 0;
448
            if (static_cast<std::size_t>(-1) != socket)
449
            {
×
450
                // core number is relative to socket
451
                for (std::size_t i = 0; i != socket; ++i)
×
452
                {
453
                    if (t.get_number_of_numa_nodes() == 0)
×
454
                    {
455
                        socket_base += t.get_number_of_socket_cores(i);
456
                    }
×
457
                    else
458
                    {
459
                        socket_base += t.get_number_of_numa_node_cores(i);
×
460
                    }
461
                }
×
462
            }
463

×
464
            std::size_t num_pus;
465
            if (static_cast<std::size_t>(-1) != core)
466
            {
467
                num_pus = t.get_number_of_core_pus(core);
×
468
            }
469
            else
470
            {
471
                num_pus = t.get_number_of_pus();
472
            }
473

×
474
            bounds_type const bounds = extract_bounds(s, num_pus, ec);
475
            if (!ec)
×
476
            {
477
                std::size_t const num_cores = t.get_number_of_cores();
478
                for (std::int64_t const index : bounds)
479
                {
×
480
                    std::size_t base_core = socket_base;
481
                    if (static_cast<std::size_t>(-1) != core)    //-V1051
482
                    {
×
483
                        base_core += core;
×
484
                    }
485
                    else
486
                    {
×
487
                        // find core the given pu belongs to
×
488
                        std::size_t base = 0;
489
                        for (/**/; base_core < num_cores; ++base_core)
490
                        {
×
491
                            std::size_t const num_core_pus =
492
                                t.get_number_of_core_pus(base_core);
×
493
                            if (base + num_core_pus >
494
                                static_cast<std::size_t>(index))
495
                            {
496
                                break;
497
                            }
498
                            base += num_core_pus;
×
499
                        }
500
                    }
501

×
502
                    mask_type mask = t.init_thread_affinity_mask(
×
503
                        base_core, static_cast<std::size_t>(index));
×
504
                    masks.emplace_back(
505
                        static_cast<std::size_t>(index), mask & core_mask);
506
                }
507
            }
508
        }
509
        else if (s.type_ == spec_type::type::unknown)
510
        {
511
            mask_type const mask = extract_machine_mask(t, ec);
512
            masks.emplace_back(static_cast<std::size_t>(-1), mask & core_mask);
×
513
        }
×
514
        else
×
515
        {
516
            HPX_THROWS_IF(ec, hpx::error::bad_parameter, "extract_pu_mask",
517
                "unexpected specification type {}",
518
                spec_type::type_name(s.type_));
519
        }
520

521
        return masks;
522
    }
×
523

524
    ///////////////////////////////////////////////////////////////////////////
×
525
    // sanity checks
526
    void mappings_sanity_checks(full_mapping_type const& fmt,
527
        std::size_t /* size */, bounds_type const& b, error_code& ec)
×
528
    {
529
        mapping_type const& m = fmt.second;
530
        if (m.size() != 3)
×
531
        {
532
            HPX_THROWS_IF(ec, hpx::error::bad_parameter, "decode_mapping",
533
                "bad size of mappings specification array");
×
534
            return;
×
535
        }
536

537
        if (b.begin() == b.end())
538
        {
×
539
            HPX_THROWS_IF(ec, hpx::error::bad_parameter, "decode_mapping",
540
                "no {1} mapping bounds are specified",
541
                spec_type::type_name(fmt.first.type_));
542
            return;
×
543
        }
544

×
545
        if (&ec != &throws)
546
            ec = make_success_code();
×
547
    }
548

549
    // for each given core-mask extract all required pu-masks
×
550
    void extract_pu_affinities(topology const& t,
551
        std::vector<spec_type> const& specs, std::size_t socket,
×
552
        std::vector<mask_info> const& core_masks,
553
        std::vector<mask_type>& affinities, error_code& ec)
554
    {
×
555
        // get the core masks for each of the sockets
556
        for (mask_info const& cmi : core_masks)
557
        {
×
558
            if (get_index(cmi) == static_cast<std::size_t>(-1))
×
559
            {
560
                // all cores
561
                if (specs[2].type_ == spec_type::type::unknown)
562
                {
×
563
                    // no pu information
564
                    affinities.push_back(get_mask(cmi));
565
                }
566
                else
567
                {
568
                    // handle pu information in the absence of core information
×
569
                    std::vector<mask_info> const pu_masks =
570
                        extract_pu_masks(t, specs[2], socket,
×
571
                            static_cast<std::size_t>(-1), get_mask(cmi), ec);
572
                    if (ec)
573
                        break;
×
574

575
                    for (mask_info const& pmi : pu_masks)
576
                    {
×
577
                        affinities.push_back(get_mask(pmi));
578
                    }
579
                }
580
                break;
581
            }
582
            else
583
            {
×
584
                // just this core
×
585
                std::vector<mask_info> pu_masks = extract_pu_masks(
586
                    t, specs[2], socket, get_index(cmi), get_mask(cmi), ec);
587
                if (ec)
×
588
                    break;
589

×
590
                for (mask_info const& pmi : pu_masks)
591
                {
×
592
                    affinities.push_back(get_mask(pmi));
593
                }
594
            }
595
        }
596
    }
597

598
    // for each given socket-mask extract all required pu-masks
×
599
    void extract_core_affinities(topology const& t,
×
600
        std::vector<spec_type> const& specs,
601
        std::vector<mask_info> const& socket_masks,
602
        std::vector<mask_type>& affinities, error_code& ec)
×
603
    {
604
        // get the core masks for each of the sockets
×
605
        for (mask_info const& smi : socket_masks)
606
        {
×
607
            if (get_index(smi) == static_cast<std::size_t>(-1))
608
            {
×
609
                // all NUMA domains
610
                if (specs[1].type_ == spec_type::type::unknown)
611
                {
×
612
                    // no core information
613
                    if (specs[2].type_ == spec_type::type::unknown)
614
                    {
615
                        // no pu information
616
                        affinities.push_back(get_mask(smi));
617
                    }
×
618
                    else
619
                    {
×
620
                        // handle pu information in the absence of core/socket
621
                        std::vector<mask_info> const pu_masks =
622
                            extract_pu_masks(t, specs[2],
×
623
                                static_cast<std::size_t>(-1),
624
                                static_cast<std::size_t>(-1), get_mask(smi),
625
                                ec);
×
626
                        if (ec)
627
                            break;
628

×
629
                        for (mask_info const& pmi : pu_masks)
630
                        {
631
                            affinities.push_back(get_mask(pmi));
632
                        }
633
                    }
634
                }
635
                else
636
                {
637
                    // no socket given, assume cores are numbered for whole
×
638
                    // machine
×
639
                    if (specs[2].type_ == spec_type::type::unknown)
640
                    {
641
                        // no pu information
×
642
                        std::vector<mask_info> const core_masks =
643
                            extract_core_masks(t, specs[1],
×
644
                                static_cast<std::size_t>(-1), get_mask(smi),
645
                                ec);
×
646
                        if (ec)
647
                            break;
648

649
                        for (mask_info const& cmi : core_masks)
650
                        {
651
                            affinities.push_back(get_mask(cmi));
×
652
                        }
653
                    }
654
                    else
655
                    {
656
                        std::vector<mask_info> const core_masks =
657
                            extract_core_masks(t, specs[1],
×
658
                                static_cast<std::size_t>(-1), get_mask(smi),
×
659
                                ec);
660
                        if (ec)
661
                            break;
×
662

663
                        // get the pu masks (i.e. overall affinity masks) for
×
664
                        // all the core masks
665
                        extract_pu_affinities(t, specs,
×
666
                            static_cast<std::size_t>(-1), core_masks,
667
                            affinities, ec);
668
                        if (ec)
669
                            break;
670
                    }
671
                }
×
672
                break;
×
673
            }
674
            else
675
            {
676
                std::vector<mask_info> core_masks = extract_core_masks(
677
                    t, specs[1], get_index(smi), get_mask(smi), ec);
×
678
                if (ec)
679
                    break;
680

×
681
                // get the pu masks (i.e. overall affinity masks) for
682
                // all the core masks
×
683
                extract_pu_affinities(
684
                    t, specs, get_index(smi), core_masks, affinities, ec);
685
                if (ec)
686
                    break;
687
            }
688
        }
689
    }
×
690

×
691
    void decode_mappings(topology const& t, full_mapping_type const& m,
692
        std::vector<mask_type>& affinities, std::size_t num_threads,
693
        error_code& ec)
694
    {
695
        // The core numbers are interpreted differently depending on whether a
×
696
        // socket/numanode is given or not. If no socket(s) is(are) given, then
697
        // the core numbering covers the whole locality, otherwise the core
×
698
        // numbering is relative to the given socket.
699

×
700
        // generate overall masks for each of the given sockets
701
        std::vector<mask_info> const socket_masks =
×
702
            extract_socket_or_numanode_masks(t, m.second[0], ec);
703

×
704
        HPX_ASSERT(!socket_masks.empty());
705

706
        extract_core_affinities(t, m.second, socket_masks, affinities, ec);
707

708
        // special case, all threads share the same options
709
        if (affinities.size() == 1 && num_threads > 1)
710
        {
711
            affinities.resize(num_threads, affinities[0]);
712
        }
713
    }
714

×
715
    ///////////////////////////////////////////////////////////////////////////
716
    bool pu_in_process_mask(bool use_process_mask, topology const& t,
717
        std::size_t num_core, std::size_t num_pu)
718
    {
×
719
        if (!use_process_mask)
720
        {
721
            return true;
×
722
        }
723

×
724
        threads::mask_type const proc_mask = t.get_cpubind_mask();
725
        threads::mask_type const pu_mask =
×
726
            t.init_thread_affinity_mask(num_core, num_pu);
727

728
        return threads::bit_and(proc_mask, pu_mask);
6✔
729
    }
730

731
    void check_num_threads(bool use_process_mask, topology const& t,
6✔
732
        std::size_t num_threads, error_code& ec)
733
    {
734
        if (use_process_mask)
735
        {
736
            threads::mask_type const proc_mask = t.get_cpubind_mask();
×
737
            std::size_t const num_pus_proc_mask = threads::count(proc_mask);
738

×
739
            if (num_threads > num_pus_proc_mask)
740
            {
×
741
                HPX_THROWS_IF(ec, hpx::error::bad_parameter,
742
                    "check_num_threads",
743
                    "specified number of threads ({1}) is larger than number "
6✔
744
                    "of processing units available in process mask ({2})",
745
                    num_threads, num_pus_proc_mask);
746
            }
6✔
747
        }
748
        else
×
749
        {
×
750
            auto const num_threads_available =
751
                static_cast<std::size_t>(threads::hardware_concurrency());
×
752

753
            if (num_threads > num_threads_available)
×
754
            {
755
                HPX_THROWS_IF(ec, hpx::error::bad_parameter,
756
                    "check_num_threads",
757
                    "specified number of threads ({1}) is larger than number "
758
                    "of available processing units ({2})",
759
                    num_threads, num_threads_available);
760
            }
761
        }
762
    }
763

6✔
764
    ///////////////////////////////////////////////////////////////////////////
765
    void decode_compact_distribution(topology const& t,
6✔
766
        std::vector<mask_type>& affinities, std::size_t used_cores,
767
        std::size_t max_cores, std::vector<std::size_t>& num_pus,
×
768
        bool use_process_mask, error_code& ec)
769
    {
770
        std::size_t const num_threads = affinities.size();
771

772
        check_num_threads(use_process_mask, t, num_threads, ec);
773

774
        if (use_process_mask)
6✔
775
        {
776
            used_cores = 0;
777
            max_cores = t.get_number_of_cores();
×
778
        }
779

780
        std::size_t const num_cores =
781
            (std::min) (max_cores, t.get_number_of_cores());
782
        num_pus.resize(num_threads);
783

784
        for (std::size_t num_thread = 0; num_thread < num_threads; /**/)
×
785
        {
786
            for (std::size_t num_core = 0; num_core < num_cores; ++num_core)
×
787
            {
788
                std::size_t const num_core_pus =
789
                    t.get_number_of_core_pus(num_core + used_cores);
×
790
                for (std::size_t num_pu = 0; num_pu < num_core_pus; ++num_pu)
791
                {
792
                    if (!pu_in_process_mask(
793
                            use_process_mask, t, num_core, num_pu))
×
794
                    {
×
795
                        continue;
796
                    }
×
797

798
                    if (any(affinities[num_thread]))
×
799
                    {
800
                        HPX_THROWS_IF(ec, hpx::error::bad_parameter,
801
                            "decode_compact_distribution",
×
802
                            "affinity mask for thread {1} has already been set",
×
803
                            num_thread);
804
                        return;
×
805
                    }
806

807
                    num_pus[num_thread] =
×
808
                        t.get_pu_number(num_core + used_cores, num_pu);
809
                    affinities[num_thread] = t.init_thread_affinity_mask(
810
                        num_core + used_cores, num_pu);
×
811

812
                    if (++num_thread == num_threads)
×
813
                        return;
814
                }
815
            }
816
        }
×
817
    }
818

819
    void decode_scatter_distribution(topology const& t,
×
820
        std::vector<mask_type>& affinities, std::size_t used_cores,
×
821
        std::size_t max_cores, std::vector<std::size_t>& num_pus,
×
822
        bool use_process_mask, error_code& ec)
×
823
    {
824
        std::size_t const num_threads = affinities.size();
×
825

826
        check_num_threads(use_process_mask, t, num_threads, ec);
827

828
        if (use_process_mask)
829
        {
830
            used_cores = 0;
831
            max_cores = t.get_number_of_cores();
×
832
        }
833

834
        std::size_t const num_cores =
835
            (std::min) (max_cores, t.get_number_of_cores());
836

837
        std::vector<std::size_t> next_pu_index(num_cores, 0);
838
        num_pus.resize(num_threads);
×
839

840
        for (std::size_t num_thread = 0; num_thread < num_threads; /**/)
×
841
        {
842
            for (std::size_t num_core = 0; num_core < num_cores; ++num_core)
843
            {
×
844
                if (any(affinities[num_thread]))
845
                {
846
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
847
                        "decode_scatter_distribution",
×
848
                        "affinity mask for thread {1} has already been set",
849
                        num_thread);
×
850
                    return;
×
851
                }
852

×
853
                std::size_t const num_core_pus =
854
                    t.get_number_of_core_pus(num_core);
×
855
                std::size_t pu_index = next_pu_index[num_core];
856
                bool use_pu = false;
×
857

858
                // Find the next PU on this core which is in the process mask
×
859
                while (pu_index < num_core_pus)
860
                {
861
                    use_pu = pu_in_process_mask(
862
                        use_process_mask, t, num_core, pu_index);
×
863
                    ++pu_index;
864

865
                    if (use_pu)
866
                    {
×
867
                        break;
×
868
                    }
869
                }
870

871
                next_pu_index[num_core] = pu_index;
×
872

873
                if (!use_pu)
×
874
                {
875
                    continue;
×
876
                }
877

×
878
                num_pus[num_thread] = t.get_pu_number(
879
                    num_core + used_cores, next_pu_index[num_core] - 1);
880
                affinities[num_thread] = t.init_thread_affinity_mask(
881
                    num_core + used_cores, next_pu_index[num_core] - 1);
882

883
                if (++num_thread == num_threads)
×
884
                    return;
885
            }
×
886
        }
887
    }
×
888

889
    ///////////////////////////////////////////////////////////////////////////
890
    void decode_balanced_distribution(topology const& t,
×
891
        std::vector<mask_type>& affinities, std::size_t used_cores,
892
        std::size_t max_cores, std::vector<std::size_t>& num_pus,
×
893
        bool use_process_mask, error_code& ec)
×
894
    {
895
        std::size_t const num_threads = affinities.size();
×
896

897
        check_num_threads(use_process_mask, t, num_threads, ec);
898

899
        if (use_process_mask)
900
        {
901
            used_cores = 0;
902
            max_cores = t.get_number_of_cores();
6✔
903
        }
904

905
        std::size_t const num_cores =
906
            (std::min) (max_cores, t.get_number_of_cores());
907

908
        std::vector<std::size_t> num_pus_cores(num_cores, 0);
909
        std::vector<std::size_t> next_pu_index(num_cores, 0);
6✔
910
        std::vector<std::vector<std::size_t>> pu_indexes(num_cores);
911
        num_pus.resize(num_threads);
6✔
912

913
        // At first, calculate the number of used pus per core. This needs to be
914
        // done to make sure that we occupy all the available cores
×
915
        for (std::size_t num_thread = 0; num_thread < num_threads; /**/)
916
        {
917
            for (std::size_t num_core = 0; num_core < num_cores; ++num_core)
918
            {
6✔
919
                std::size_t const num_core_pus =
920
                    t.get_number_of_core_pus(num_core);
6✔
921
                std::size_t pu_index = next_pu_index[num_core];
6✔
922
                bool use_pu = false;
6✔
923

6✔
924
                // Find the next PU on this core which is in the process mask
925
                while (pu_index < num_core_pus)
926
                {
927
                    use_pu = pu_in_process_mask(
12✔
928
                        use_process_mask, t, num_core, pu_index);
929
                    ++pu_index;
6✔
930

931
                    if (use_pu)
932
                    {
6✔
933
                        break;
6✔
934
                    }
935
                }
936

937
                next_pu_index[num_core] = pu_index;
6✔
938

939
                if (!use_pu)
6✔
940
                {
941
                    continue;
6✔
942
                }
943

6✔
944
                pu_indexes[num_core].push_back(next_pu_index[num_core] - 1);
945

946
                num_pus_cores[num_core]++;
947
                if (++num_thread == num_threads)
948
                    break;
949
            }
6✔
950
        }
951

6✔
952
        // Iterate over the cores and assigned pus per core. this additional
953
        // loop is needed so that we have consecutive worker thread numbers
×
954
        std::size_t num_thread = 0;
955
        for (std::size_t num_core = 0; num_core < num_cores; ++num_core)
956
        {
6✔
957
            for (std::size_t num_pu = 0; num_pu < num_pus_cores[num_core];
958
                ++num_pu)
6✔
959
            {
6✔
960
                if (any(affinities[num_thread]))
961
                {
962
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
963
                        "decode_balanced_distribution",
964
                        "affinity mask for thread {1} has already been set",
965
                        num_thread);
966
                    return;
6✔
967
                }
12✔
968

969
                num_pus[num_thread] = t.get_pu_number(
12✔
970
                    num_core + used_cores, pu_indexes[num_core][num_pu]);
971
                affinities[num_thread] = t.init_thread_affinity_mask(
972
                    num_core + used_cores, pu_indexes[num_core][num_pu]);
12✔
973
                ++num_thread;
974
            }
×
975
        }
976
    }
977

978
    ///////////////////////////////////////////////////////////////////////////
979
    void decode_numabalanced_distribution(topology const& t,
980
        std::vector<mask_type>& affinities, std::size_t used_cores,
981
        [[maybe_unused]] std::size_t max_cores,
6✔
982
        std::vector<std::size_t>& num_pus, bool use_process_mask,
983
        error_code& ec)
6✔
984
    {
6✔
985
        std::size_t const num_threads = affinities.size();
6✔
986

987
        check_num_threads(use_process_mask, t, num_threads, ec);
988

6✔
989
        if (use_process_mask)
990
        {
991
            used_cores = 0;
×
992
        }
993

994
        num_pus.resize(num_threads);
995

996
        // numa nodes
997
        std::size_t const num_numas = (std::max) (static_cast<std::size_t>(1),
998
            t.get_number_of_numa_nodes());
999
        std::vector<std::size_t> num_cores_numa(num_numas, 0);
×
1000
        std::vector<std::size_t> num_pus_numa(num_numas, 0);
1001
        std::vector<std::size_t> num_threads_numa(num_numas, 0);
×
1002
        for (std::size_t n = 0; n < num_numas; ++n)
1003
        {
1004
            num_cores_numa[n] = t.get_number_of_numa_node_cores(n);
1005
        }
1006

×
1007
        std::size_t core_offset = 0;
1008
        std::size_t pus_t = 0;
1009
        for (std::size_t n = 0; n < num_numas; ++n)
1010
        {
×
1011
            for (std::size_t num_core = 0; num_core < num_cores_numa[n];
×
1012
                ++num_core)
×
1013
            {
×
1014
                std::size_t const pus =
×
1015
                    t.get_number_of_core_pus(num_core + core_offset);
1016
                for (std::size_t num_pu = 0; num_pu < pus; ++num_pu)
×
1017
                {
1018
                    if (pu_in_process_mask(use_process_mask, t,
1019
                            num_core + core_offset, num_pu))
1020
                    {
1021
                        ++num_pus_numa[n];
×
1022
                    }
1023
                }
×
1024
            }
1025

1026
            pus_t += num_pus_numa[n];
1027
            core_offset += num_cores_numa[n];
×
1028
        }
×
1029

1030
        // how many threads should go on each domain
×
1031
        std::size_t pus_t2 = 0;
1032
        for (std::size_t n = 0; n < num_numas; ++n)
1033
        {
×
1034
            auto temp = static_cast<std::size_t>(
1035
                std::round(static_cast<double>(num_threads * num_pus_numa[n]) /
1036
                    static_cast<double>(pus_t)));
1037

1038
            // due to rounding up, we might have too many threads
×
1039
            if ((pus_t2 + temp) > num_threads)
×
1040
                temp = num_threads - pus_t2;
1041
            pus_t2 += temp;
1042
            num_threads_numa[n] = temp;
1043

1044
            // HPX_ASSERT(num_threads_numa[n] <= num_pus_numa[n]);
×
1045
        }
1046

1047
        // HPX_ASSERT(num_threads <= pus_t2);
×
1048

×
1049
        // assign threads to cores on each numa domain
1050
        std::size_t num_thread = 0;
1051
        core_offset = 0;
×
1052
        for (std::size_t n = 0; n < num_numas; ++n)
×
1053
        {
×
1054
            std::vector<std::size_t> num_pus_cores(num_cores_numa[n], 0);
×
1055
            std::vector<std::size_t> next_pu_index(num_cores_numa[n], 0);
1056
            std::vector<std::vector<std::size_t>> pu_indexes(num_cores_numa[n]);
1057

1058
            // iterate once and count pus/core
1059
            for (std::size_t num_thread_numa = 0;
1060
                num_thread_numa < num_threads_numa[n];
1061
                /**/)
1062
            {
×
1063
                for (std::size_t num_core = 0; num_core < num_cores_numa[n];
1064
                    ++num_core)
×
1065
                {
1066
                    std::size_t const num_core_pus =
×
1067
                        t.get_number_of_core_pus(num_core);
×
1068
                    std::size_t pu_index = next_pu_index[num_core];
×
1069
                    bool use_pu = false;
1070

1071
                    // Find the next PU on this core which is in the process mask
×
1072
                    while (pu_index < num_core_pus)
×
1073
                    {
1074
                        use_pu = pu_in_process_mask(use_process_mask, t,
1075
                            num_core + core_offset, pu_index);
×
1076
                        ++pu_index;
1077

1078
                        if (use_pu)
1079
                        {
×
1080
                            break;
×
1081
                        }
1082
                    }
1083

1084
                    next_pu_index[num_core] = pu_index;
×
1085

1086
                    if (!use_pu)
×
1087
                    {
1088
                        continue;
×
1089
                    }
1090

×
1091
                    pu_indexes[num_core].push_back(next_pu_index[num_core] - 1);
1092

1093
                    num_pus_cores[num_core]++;
1094
                    if (++num_thread_numa == num_threads_numa[n])
1095
                        break;
1096
                }
×
1097
            }
1098

×
1099
            // Iterate over the cores and assigned pus per core. this additional
1100
            // loop is needed so that we have consecutive worker thread numbers
×
1101
            for (std::size_t num_core = 0; num_core < num_cores_numa[n];
1102
                ++num_core)
1103
            {
×
1104
                for (std::size_t num_pu = 0; num_pu < num_pus_cores[num_core];
1105
                    ++num_pu)
×
1106
                {
×
1107
                    if (any(affinities[num_thread]))
1108
                    {
1109
                        HPX_THROWS_IF(ec, hpx::error::bad_parameter,
1110
                            "decode_numabalanced_distribution",
1111
                            "affinity mask for thread {1} has already been set",
1112
                            num_thread);
1113
                        return;
×
1114
                    }
1115
                    num_pus[num_thread] = t.get_pu_number(
1116
                        num_core + used_cores, pu_indexes[num_core][num_pu]);
×
1117
                    affinities[num_thread] = t.init_thread_affinity_mask(
1118
                        num_core + used_cores + core_offset,
1119
                        pu_indexes[num_core][num_pu]);
×
1120
                    ++num_thread;
1121
                }
×
1122
            }
1123
            core_offset += num_cores_numa[n];
1124
        }
1125
    }
1126

1127
    ///////////////////////////////////////////////////////////////////////////
×
1128
    void decode_distribution(distribution_type d, topology const& t,
1129
        std::vector<mask_type>& affinities, std::size_t used_cores,
×
1130
        std::size_t max_cores, std::size_t num_threads,
1131
        std::vector<std::size_t>& num_pus, bool use_process_mask,
×
1132
        error_code& ec)
×
1133
    {
1134
        affinities.resize(num_threads);
1135
        switch (d)
×
1136
        {
×
1137
        case distribution_type::compact:
1138
            decode_compact_distribution(t, affinities, used_cores, max_cores,
1139
                num_pus, use_process_mask, ec);
1140
            break;
6✔
1141

1142
        case distribution_type::scatter:
1143
            decode_scatter_distribution(t, affinities, used_cores, max_cores,
1144
                num_pus, use_process_mask, ec);
1145
            break;
1146

6✔
1147
        case distribution_type::balanced:
6✔
1148
            decode_balanced_distribution(t, affinities, used_cores, max_cores,
1149
                num_pus, use_process_mask, ec);
×
1150
            break;
×
1151

1152
        case distribution_type::numa_balanced:
×
1153
            decode_numabalanced_distribution(t, affinities, used_cores,
1154
                max_cores, num_pus, use_process_mask, ec);
×
1155
            break;
×
1156

1157
        default:
×
1158
            HPX_ASSERT(false);
1159
        }
6✔
1160
    }
6✔
1161
}    // namespace hpx::threads::detail
1162

6✔
1163
namespace hpx::threads {
1164

×
1165
    ///////////////////////////////////////////////////////////////////////////
×
1166
    void parse_affinity_options(std::string const& spec,
1167
        std::vector<mask_type>& affinities, std::size_t used_cores,
×
1168
        std::size_t max_cores, std::size_t num_threads,
1169
        std::vector<std::size_t>& num_pus, bool use_process_mask,
6✔
1170
        error_code& ec)
1171
    {
1172
        detail::mappings_type mappings;
6✔
1173
        detail::parse_mappings(spec, mappings, ec);
1174
        if (ec)
1175
            return;
1176

1177
        // We need to instantiate a new topology object as the runtime has not
1178
        // been initialized yet
6✔
1179
        threads::topology const& t = threads::create_topology();
1180

1181
        if (mappings.which() == 0)
1182
        {
1183
            detail::decode_distribution(
1184
                boost::get<detail::distribution_type>(mappings), t, affinities,
1185
                used_cores, max_cores, num_threads, num_pus, use_process_mask,
6✔
1186
                ec);
6✔
1187
            if (ec)
1188
                return;
1189
        }
1190
        else if (mappings.which() == 1)
1191
        {
6✔
1192
            if (use_process_mask)
1193
            {
6✔
1194
                HPX_THROWS_IF(ec, hpx::error::bad_parameter,
1195
                    "parse_affinity_options",
6✔
1196
                    "can't use --hpx:use-process-mask with custom thread "
1197
                    "bindings");
6✔
1198
            }
1199
            detail::mappings_spec_type mappings_specs(
1200
                boost::get<detail::mappings_spec_type>(mappings));
1201

6✔
1202
            affinities.clear();
1203
            for (detail::full_mapping_type& m : mappings_specs)
1204
            {
1205
                if (m.first.type_ != detail::spec_type::type::thread)
1206
                {
×
1207
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
1208
                        "parse_affinity_options",
×
1209
                        "bind specification ({1}) is ill formatted", spec);
1210
                    return;
×
1211
                }
1212

1213
                if (m.second.size() != 3)
1214
                {
1215
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
1216
                        "parse_affinity_options",
×
1217
                        "bind specification ({1}) is ill formatted", spec);
1218
                    return;
1219
                }
×
1220

1221
                if (m.second[0].type_ == detail::spec_type::type::unknown &&
×
1222
                    m.second[1].type_ == detail::spec_type::type::unknown &&
1223
                    m.second[2].type_ == detail::spec_type::type::unknown)
×
1224
                {
1225
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
1226
                        "parse_affinity_options",
×
1227
                        "bind specification ({1}) is ill formatted", spec);
1228
                    return;
1229
                }
×
1230

1231
                // repeat for each of the threads in the affinity specification
×
1232
                detail::bounds_type thread_bounds =
1233
                    extract_bounds(m.first, num_threads, ec);
1234
                if (ec)
×
1235
                    return;
1236

1237
                mappings_sanity_checks(m, num_threads, thread_bounds, ec);
×
1238
                if (ec)
×
1239
                    return;
×
1240

1241
                detail::decode_mappings(
×
1242
                    t, m, affinities, thread_bounds.size(), ec);
1243
                if (ec)
1244
                    return;
×
1245
            }
1246

1247
            if (num_pus.empty())
1248
            {
1249
                num_pus.resize(affinities.size());
×
1250
                for (std::size_t i = 0; i != affinities.size(); ++i)
×
1251
                {
1252
                    num_pus[i] = threads::find_first(affinities[i]);
1253
                }
×
1254
            }
×
1255
        }
1256
        else
1257
        {
×
1258
            HPX_ASSERT(false);
1259
        }
×
1260

1261
        if (&ec != &throws)
1262
            ec = make_success_code();
1263
    }
×
1264
}    // namespace hpx::threads
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc