• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #856

28 Dec 2022 02:00AM UTC coverage: 86.602% (+0.05%) from 86.55%
#856

push

StellarBot
Merge #6119

6119: Update CMakeLists.txt r=hkaiser a=khuck

updating the default APEX version


Co-authored-by: Kevin Huck <khuck@cs.uoregon.edu>

174566 of 201573 relevant lines covered (86.6%)

1876093.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

21.61
/libs/core/affinity/src/parse_affinity_options.cpp
1
//  Copyright (c) 2007-2017 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#include <hpx/affinity/parse_affinity_options.hpp>
8
#include <hpx/assert.hpp>
9
#include <hpx/datastructures/tuple.hpp>
10
#include <hpx/modules/errors.hpp>
11
#include <hpx/topology/topology.hpp>
12

13
#include <hpx/affinity/detail/partlit.hpp>
14

15
#include <hwloc.h>
16

17
#include <boost/fusion/include/adapt_struct.hpp>
18
#include <boost/fusion/include/std_pair.hpp>
19
#include <boost/spirit/home/x3/auxiliary.hpp>
20
#include <boost/spirit/home/x3/char.hpp>
21
#include <boost/spirit/home/x3/core.hpp>
22
#include <boost/spirit/home/x3/nonterminal.hpp>
23
#include <boost/spirit/home/x3/numeric.hpp>
24
#include <boost/spirit/home/x3/operator.hpp>
25
#include <boost/spirit/home/x3/string.hpp>
26

27
#include <algorithm>
28
#include <cstddef>
29
#include <cstdint>
30
#include <string>
31
#include <vector>
32

33
///////////////////////////////////////////////////////////////////////////////
34
// clang-format off
35
BOOST_FUSION_ADAPT_STRUCT(
1,826✔
36
    hpx::threads::detail::spec_type,
37
    type_,
38
    index_bounds_
39
)
40
// clang-format on
41

42
namespace {
43

44
    ///////////////////////////////////////////////////////////////////////////
45
    //
46
    //    mappings:
47
    //        distribution
48
    //        mapping(;mapping)*
49
    //
50
    //    distribution:
51
    //        compact
52
    //        scatter
53
    //        balanced
54
    //        numa-balanced
55
    //
56
    //    mapping:
57
    //        thread-spec=pu-specs
58
    //
59
    //    thread-spec:
60
    //        t:int
61
    //        t:int-int
62
    //        t:all
63
    //
64
    //    pu-specs:
65
    //        pu-spec(.pu-spec)*
66
    //
67
    //    pu-spec:
68
    //        type:range-specs
69
    //        ~pu-spec
70
    //
71
    //    range-specs:
72
    //        range-spec(,range-spec)*
73
    //
74
    //    range-spec:
75
    //        int
76
    //        int-int
77
    //        all
78
    //
79
    //    type:
80
    //        socket | numanode
81
    //        core
82
    //        pu
83
    //
84
    namespace x3 = boost::spirit::x3;
85
    using namespace hpx::threads::detail;
86

87
    x3::rule<class mappings, mappings_type> mappings = "mappings";
1,251✔
88
    x3::rule<class distribution, distribution_type> distribution =
1,251✔
89
        "distribution";
1,251✔
90
    x3::rule<class mapping, full_mapping_type> mapping = "mapping";
1,251✔
91
    x3::rule<class thread_spec, spec_type> thread_spec = "thread_spec";
1,251✔
92
    x3::rule<class pu_specs, mapping_type> pu_specs = "pu_specs";
1,251✔
93
    x3::rule<class socket_spec, spec_type> socket_spec = "socket_spec";
1,251✔
94
    x3::rule<class core_spec, spec_type> core_spec = "core_spec";
1,251✔
95
    x3::rule<class pu_spec, spec_type> pu_spec = "pu_spec";
1,251✔
96
    x3::rule<class specs, bounds_type> specs = "specs";
1,251✔
97
    x3::rule<class spec, bounds_type> spec = "spec";
1,251✔
98

99
    auto mappings_def = distribution | (mapping % ';');
1,251✔
100

101
    auto mapping_def = thread_spec >> '=' >> pu_specs;
1,251✔
102

103
    // clang-format off
104
    auto distribution_def =
1,251✔
105
            partlit("compact", distribution_type::compact)
1,251✔
106
        |   partlit("scatter", distribution_type::scatter)
1,251✔
107
        |   partlit("balanced", distribution_type::balanced)
1,251✔
108
        |   partlit("numa-balanced", distribution_type::numa_balanced)
1,251✔
109
        ;
110

111
    auto thread_spec_def =
1,251✔
112
            partlit("thread", spec_type::type::thread) >> ':' >>  specs
1,251✔
113
        ;
114

115
    auto pu_specs_def =
1,251✔
116
            (socket_spec >> core_spec >> pu_spec)
1,251✔
117
//        |   ('~' >> pu_spec)
118
        ;
119

120
    auto socket_spec_def =
2,502✔
121
            (partlit("socket", spec_type::type::socket) >> ':' >> specs)
2,502✔
122
        |   (partlit("numanode", spec_type::type::numanode) >> ':' >> specs)
1,251✔
123
        |   x3::attr(spec_type(spec_type::type::unknown))
1,251✔
124
        ;
125

126
    auto core_spec_def =
2,502✔
127
           (-x3::lit('.')
2,502✔
128
                >> partlit("core", spec_type::type::core) >> ':' >> specs)
1,251✔
129
        |   x3::attr(spec_type(spec_type::type::unknown))
1,251✔
130
        ;
131

132
    auto pu_spec_def =
2,502✔
133
           (-x3::lit('.') >> partlit("pu", spec_type::type::pu) >> ':' >> specs)
2,502✔
134
        |   x3::attr(spec_type(spec_type::type::unknown))
1,251✔
135
        ;
136

137
    auto specs_def = spec % ',';
1,251✔
138

139
    auto spec_def =
2,502✔
140
            (x3::uint_ >> -(x3::int_))
2,502✔
141
        |   partlit("all", bounds_type{spec_type::all_entities()})
1,251✔
142
        ;
143
    // clang-format on
144

145
    BOOST_SPIRIT_DEFINE(mappings, distribution, mapping, thread_spec, pu_specs,
5,050✔
146
        socket_spec, core_spec, pu_spec, specs, spec)
147

148
}    // namespace
149

150
namespace hpx::threads::detail {
151

152
    inline constexpr char const* const type_names[] = {
153
        "unknown", "thread", "socket", "numanode", "core", "pu"};
154

155
    char const* spec_type::type_name(spec_type::type t) noexcept
×
156
    {
157
        if (t < spec_type::type::unknown || t > spec_type::type::pu)
×
158
            return type_names[0];
×
159
        return type_names[static_cast<int>(t)];
×
160
    }
×
161

162
    template <typename Iterator>
163
    inline bool parse(Iterator& begin, Iterator end, mappings_type& m)
1,370✔
164
    {
165
        return x3::parse(begin, end, mappings, m);
1,370✔
166
    }
167

168
    ///////////////////////////////////////////////////////////////////////////
169
    void parse_mappings(
1,370✔
170
        std::string const& spec, mappings_type& mappings, error_code& ec)
171
    {
172
        std::string::const_iterator begin = spec.begin();
1,370✔
173
        if (!detail::parse(begin, spec.end(), mappings) || begin != spec.end())
1,370✔
174
        {
175
            HPX_THROWS_IF(ec, hpx::error::bad_parameter,
20✔
176
                "parse_affinity_options",
177
                "failed to parse affinity specification: " + spec);
178
            return;
20✔
179
        }
180

181
        if (&ec != &throws)
1,350✔
182
            ec = make_success_code();
128✔
183
    }
1,370✔
184

185
    ///////////////////////////////////////////////////////////////////////////
186
    bounds_type extract_bounds(
×
187
        spec_type const& m, std::size_t default_last, error_code& ec)
188
    {
189
        bounds_type result;
×
190

191
        if (m.index_bounds_.empty())
×
192
            return result;
×
193

194
        bounds_type::const_iterator first = m.index_bounds_.begin();
×
195
        bounds_type::const_iterator last = m.index_bounds_.end();
×
196

197
        while (first != last)
×
198
        {
199
            if (*first == spec_type::all_entities())
×
200
            {
201
                // bind all entities
202
                result.clear();
×
203
                for (std::size_t i = 0; i != default_last; ++i)
×
204
                    result.push_back(i);
×
205
                break;    // we will not get more than 'all'
×
206
            }
207

208
            bounds_type::const_iterator second = first;
×
209
            if (++second != last)
×
210
            {
211
                if (*second == 0 || *second == spec_type::all_entities())
×
212
                {
213
                    // one element only
214
                    if (default_last <= std::size_t(*first))
×
215
                    {
216
                        result.clear();
×
217
                        HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
218
                            "extract_bounds",
219
                            "the resource id given is larger than the "
220
                            "number of existing resources");
221
                        return result;
×
222
                    }
223
                    result.push_back(*first);
×
224
                }
×
225
                else if (*second < 0)
×
226
                {
227
                    // all elements between min and -max
228
                    if (default_last <= std::size_t(-*second))
×
229
                    {
230
                        result.clear();
×
231
                        HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
232
                            "extract_bounds",
233
                            "the upper limit given is larger than the "
234
                            "number of existing resources");
235
                        return result;
×
236
                    }
237
                    for (std::int64_t i = *first; i <= -*second; ++i)
×
238
                        result.push_back(i);
×
239
                }
×
240
                else
241
                {
242
                    // just min and max
243
                    if (default_last <= std::size_t(*second))
×
244
                    {
245
                        result.clear();
×
246
                        HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
247
                            "extract_bounds",
248
                            "the upper limit given is larger than the "
249
                            "number of existing resources");
250
                        return result;
×
251
                    }
252
                    result.push_back(*first);
×
253
                    result.push_back(*second);
×
254
                }
255
                first = second;
×
256
            }
×
257
            else
258
            {
259
                // one element only
260
                if (default_last <= std::size_t(*first))
×
261
                {
262
                    result.clear();
×
263
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
264
                        "extract_bounds",
265
                        "the resource id given is larger than the number "
266
                        "of existing resources");
267
                    return result;
×
268
                }
269
                result.push_back(*first);
×
270
            }
271
            ++first;
×
272
        }
273

274
        if (&ec != &throws)
×
275
            ec = make_success_code();
×
276

277
        return result;
×
278
    }
×
279

280
    ///////////////////////////////////////////////////////////////////////////
281
    //                  index,       mask
282
    typedef hpx::tuple<std::size_t, mask_type> mask_info;
283

284
    inline std::size_t get_index(mask_info const& smi)
×
285
    {
286
        return hpx::get<0>(smi);
×
287
    }
288
    inline mask_cref_type get_mask(mask_info const& smi)
×
289
    {
290
        return hpx::get<1>(smi);
×
291
    }
292

293
    ///////////////////////////////////////////////////////////////////////////
294
    std::vector<mask_info> extract_socket_masks(
×
295
        topology const& t, bounds_type const& b)
296
    {
297
        std::vector<mask_info> masks;
×
298
        for (std::int64_t index : b)
×
299
        {
300
            masks.push_back(hpx::make_tuple(static_cast<std::size_t>(index),
×
301
                t.init_socket_affinity_mask_from_socket(
×
302
                    static_cast<std::size_t>(index))));
×
303
        }
304
        return masks;
×
305
    }
×
306

307
    std::vector<mask_info> extract_numanode_masks(
×
308
        topology const& t, bounds_type const& b)
309
    {
310
        std::vector<mask_info> masks;
×
311
        for (std::int64_t index : b)
×
312
        {
313
            masks.push_back(hpx::make_tuple(static_cast<std::size_t>(index),
×
314
                t.init_numa_node_affinity_mask_from_numa_node(
×
315
                    static_cast<std::size_t>(index))));
×
316
        }
317
        return masks;
×
318
    }
×
319

320
    mask_cref_type extract_machine_mask(topology const& t, error_code& ec)
×
321
    {
322
        return t.get_machine_affinity_mask(ec);
×
323
    }
324

325
    std::vector<mask_info> extract_socket_or_numanode_masks(
×
326
        topology const& t, spec_type const& s, error_code& ec)
327
    {
328
        switch (s.type_)
×
329
        {
330
        case spec_type::type::socket:
331
            // requested top level is a socket
332
            {
333
                std::size_t num_sockets = t.get_number_of_sockets();
×
334
                return extract_socket_masks(
×
335
                    t, extract_bounds(s, num_sockets, ec));
×
336
            }
337

338
        case spec_type::type::numanode:
339
            // requested top level is a NUMA node
340
            {
341
                std::size_t num_numanodes = t.get_number_of_numa_nodes();
×
342
                return extract_numanode_masks(
×
343
                    t, extract_bounds(s, num_numanodes, ec));
×
344
            }
345

346
        case spec_type::type::unknown:
347
        {
348
            std::vector<mask_info> masks;
×
349
            masks.push_back(
×
350
                hpx::make_tuple(std::size_t(-1), extract_machine_mask(t, ec)));
×
351
            return masks;
×
352
        }
×
353

354
        default:
355
            HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
356
                "extract_socket_or_numanode_mask",
357
                "unexpected specification type {}",
358
                spec_type::type_name(s.type_));
359
            break;
×
360
        }
361

362
        return std::vector<mask_info>();
×
363
    }
×
364

365
    std::vector<mask_info> extract_core_masks(topology const& t,
×
366
        spec_type const& s, std::size_t socket, mask_cref_type socket_mask,
367
        error_code& ec)
368
    {
369
        std::vector<mask_info> masks;
×
370

371
        switch (s.type_)
×
372
        {
373
        case spec_type::type::core:
374
        {
375
            std::size_t base = 0;
×
376
            std::size_t num_cores = 0;
×
377

378
            if (socket != std::size_t(-1))
×
379
            {
380
                for (std::size_t i = 0; i != socket; ++i)
×
381
                {
382
                    // The number of NUMA nodes might be zero if there hwloc
383
                    // doesn't detect a NUMA domain. This might be the case when
384
                    // there is no NUMA support configured, or when there are
385
                    // just sockets, but no direct numa domains. The bind
386
                    // description might relate to sockets, not NUMA domains.
387
                    if (t.get_number_of_numa_nodes() == 0)
×
388
                        base += t.get_number_of_socket_cores(i);
×
389
                    else
390
                        base += t.get_number_of_numa_node_cores(i);
×
391
                }
×
392
                if (t.get_number_of_numa_nodes() == 0)
×
393
                    num_cores = t.get_number_of_socket_cores(socket);
×
394
                else
395
                    num_cores = t.get_number_of_numa_node_cores(socket);
×
396
            }
×
397
            else
398
            {
399
                num_cores = t.get_number_of_cores();
×
400
            }
401

402
            bounds_type bounds = extract_bounds(s, num_cores, ec);
×
403
            if (ec)
×
404
                break;
×
405

406
            for (std::int64_t index : bounds)
×
407
            {
408
                mask_type mask = t.init_core_affinity_mask_from_core(
×
409
                    static_cast<std::size_t>(index + base));
×
410
                masks.push_back(hpx::make_tuple(
×
411
                    static_cast<std::size_t>(index), mask & socket_mask));
×
412
            }
×
413
        }
×
414
        break;
×
415

416
        case spec_type::type::unknown:
417
        {
418
            mask_type mask = extract_machine_mask(t, ec);
×
419
            masks.push_back(
×
420
                hpx::make_tuple(std::size_t(-1), mask & socket_mask));
×
421
        }
×
422
        break;
×
423

424
        default:
425
            HPX_THROWS_IF(ec, hpx::error::bad_parameter, "extract_core_mask",
×
426
                "unexpected specification type {}",
427
                spec_type::type_name(s.type_));
428
            break;
×
429
        }
430

431
        return masks;
×
432
    }
×
433

434
    std::vector<mask_info> extract_pu_masks(topology const& t,
×
435
        spec_type const& s, std::size_t socket, std::size_t core,
436
        mask_cref_type core_mask, error_code& ec)
437
    {
438
        std::vector<mask_info> masks;
×
439

440
        switch (s.type_)
×
441
        {
442
        case spec_type::type::pu:
443
        {
444
            std::size_t num_pus = 0;
×
445
            std::size_t socket_base = 0;
×
446
            if (std::size_t(-1) != socket)
×
447
            {
448
                // core number is relative to socket
449
                for (std::size_t i = 0; i != socket; ++i)
×
450
                {
451
                    if (t.get_number_of_numa_nodes() == 0)
×
452
                        socket_base += t.get_number_of_socket_cores(i);
×
453
                    else
454
                        socket_base += t.get_number_of_numa_node_cores(i);
×
455
                }
×
456
            }
×
457

458
            if (std::size_t(-1) != core)
×
459
            {
460
                num_pus = t.get_number_of_core_pus(core);
×
461
            }
×
462
            else
463
            {
464
                num_pus = t.get_number_of_pus();
×
465
            }
466
            bounds_type bounds = extract_bounds(s, num_pus, ec);
×
467
            if (ec)
×
468
                break;
×
469

470
            std::size_t num_cores = t.get_number_of_cores();
×
471
            for (std::int64_t index : bounds)
×
472
            {
473
                std::size_t base_core = socket_base;
×
474
                if (std::size_t(-1) != core)
×
475
                {
476
                    base_core += core;
×
477
                }
×
478
                else
479
                {
480
                    // find core the given pu belongs to
481
                    std::size_t base = 0;
×
482
                    for (/**/; base_core < num_cores; ++base_core)
×
483
                    {
484
                        std::size_t num_core_pus =
×
485
                            t.get_number_of_core_pus(base_core);
×
486
                        if (base + num_core_pus > std::size_t(index))
×
487
                            break;
×
488
                        base += num_core_pus;
×
489
                    }
×
490
                }
491

492
                mask_type mask = t.init_thread_affinity_mask(
×
493
                    base_core, static_cast<std::size_t>(index));
×
494
                masks.push_back(hpx::make_tuple(
×
495
                    static_cast<std::size_t>(index), mask & core_mask));
×
496
            }
×
497
        }
×
498
        break;
×
499

500
        case spec_type::type::unknown:
501
        {
502
            mask_type mask = extract_machine_mask(t, ec);
×
503
            masks.push_back(hpx::make_tuple(std::size_t(-1), mask & core_mask));
×
504
        }
×
505
        break;
×
506

507
        default:
508
            HPX_THROWS_IF(ec, hpx::error::bad_parameter, "extract_pu_mask",
×
509
                "unexpected specification type {}",
510
                spec_type::type_name(s.type_));
511
            break;
×
512
        }
513

514
        return masks;
×
515
    }
×
516

517
    ///////////////////////////////////////////////////////////////////////////
518
    // sanity checks
519
    void mappings_sanity_checks(full_mapping_type& fmt, std::size_t /* size */,
×
520
        bounds_type const& b, error_code& ec)
521
    {
522
        mapping_type& m = fmt.second;
×
523
        if (m.size() != 3)
×
524
        {
525
            HPX_THROWS_IF(ec, hpx::error::bad_parameter, "decode_mapping",
×
526
                "bad size of mappings specification array");
527
            return;
×
528
        }
529

530
        if (b.begin() == b.end())
×
531
        {
532
            HPX_THROWS_IF(ec, hpx::error::bad_parameter, "decode_mapping",
×
533
                "no {1} mapping bounds are specified",
534
                spec_type::type_name(fmt.first.type_));
535
            return;
×
536
        }
537

538
        if (&ec != &throws)
×
539
            ec = make_success_code();
×
540
    }
×
541

542
    // for each given core-mask extract all required pu-masks
543
    void extract_pu_affinities(topology const& t,
×
544
        std::vector<spec_type> const& specs, std::size_t socket,
545
        std::vector<mask_info> const& core_masks,
546
        std::vector<mask_type>& affinities, error_code& ec)
547
    {
548
        // get the core masks for each of the sockets
549
        for (mask_info const& cmi : core_masks)
×
550
        {
551
            if (get_index(cmi) == std::size_t(-1))
×
552
            {
553
                // all cores
554
                if (specs[2].type_ == spec_type::type::unknown)
×
555
                {
556
                    // no pu information
557
                    affinities.push_back(get_mask(cmi));
×
558
                }
×
559
                else
560
                {
561
                    // handle pu information in the absence of core information
562
                    std::vector<mask_info> pu_masks = extract_pu_masks(t,
×
563
                        specs[2], socket, std::size_t(-1), get_mask(cmi), ec);
×
564
                    if (ec)
×
565
                        break;
×
566

567
                    for (mask_info const& pmi : pu_masks)
×
568
                    {
569
                        affinities.push_back(get_mask(pmi));
×
570
                    }
571
                }
×
572
                break;
×
573
            }
574
            else
575
            {
576
                // just this core
577
                std::vector<mask_info> pu_masks = extract_pu_masks(
×
578
                    t, specs[2], socket, get_index(cmi), get_mask(cmi), ec);
×
579
                if (ec)
×
580
                    break;
×
581

582
                for (mask_info const& pmi : pu_masks)
×
583
                {
584
                    affinities.push_back(get_mask(pmi));
×
585
                }
586
            }
×
587
        }
588
    }
×
589

590
    // for each given socket-mask extract all required pu-masks
591
    void extract_core_affinities(topology const& t,
×
592
        std::vector<spec_type> const& specs,
593
        std::vector<mask_info> const& socket_masks,
594
        std::vector<mask_type>& affinities, error_code& ec)
595
    {
596
        // get the core masks for each of the sockets
597
        for (mask_info const& smi : socket_masks)
×
598
        {
599
            if (get_index(smi) == std::size_t(-1))
×
600
            {
601
                // all NUMA domains
602
                if (specs[1].type_ == spec_type::type::unknown)
×
603
                {
604
                    // no core information
605
                    if (specs[2].type_ == spec_type::type::unknown)
×
606
                    {
607
                        // no pu information
608
                        affinities.push_back(get_mask(smi));
×
609
                    }
×
610
                    else
611
                    {
612
                        // handle pu information in the absence of core/socket
613
                        std::vector<mask_info> pu_masks =
614
                            extract_pu_masks(t, specs[2], std::size_t(-1),
×
615
                                std::size_t(-1), get_mask(smi), ec);
×
616
                        if (ec)
×
617
                            break;
×
618

619
                        for (mask_info const& pmi : pu_masks)
×
620
                        {
621
                            affinities.push_back(get_mask(pmi));
×
622
                        }
623
                    }
×
624
                }
×
625
                else
626
                {
627
                    // no socket given, assume cores are numbered for whole
628
                    // machine
629
                    if (specs[2].type_ == spec_type::type::unknown)
×
630
                    {
631
                        // no pu information
632
                        std::vector<mask_info> core_masks = extract_core_masks(
×
633
                            t, specs[1], std::size_t(-1), get_mask(smi), ec);
×
634
                        if (ec)
×
635
                            break;
×
636

637
                        for (mask_info const& cmi : core_masks)
×
638
                        {
639
                            affinities.push_back(get_mask(cmi));
×
640
                        }
641
                    }
×
642
                    else
643
                    {
644
                        std::vector<mask_info> core_masks = extract_core_masks(
×
645
                            t, specs[1], std::size_t(-1), get_mask(smi), ec);
×
646
                        if (ec)
×
647
                            break;
×
648

649
                        // get the pu masks (i.e. overall affinity masks) for
650
                        // all of the core masks
651
                        extract_pu_affinities(t, specs, std::size_t(-1),
×
652
                            core_masks, affinities, ec);
×
653
                        if (ec)
×
654
                            break;
×
655
                    }
×
656
                }
657
                break;
×
658
            }
659
            else
660
            {
661
                std::vector<mask_info> core_masks = extract_core_masks(
×
662
                    t, specs[1], get_index(smi), get_mask(smi), ec);
×
663
                if (ec)
×
664
                    break;
×
665

666
                // get the pu masks (i.e. overall affinity masks) for
667
                // all of the core masks
668
                extract_pu_affinities(
×
669
                    t, specs, get_index(smi), core_masks, affinities, ec);
×
670
                if (ec)
×
671
                    break;
×
672
            }
×
673
        }
674
    }
×
675

676
    void decode_mappings(topology const& t, full_mapping_type& m,
×
677
        std::vector<mask_type>& affinities, std::size_t num_threads,
678
        error_code& ec)
679
    {
680
        // The core numbers are interpreted differently depending on whether a
681
        // socket/numanode is given or not. If no socket(s) is(are) given, then
682
        // the core numbering covers the whole locality, otherwise the core
683
        // numbering is relative to the given socket.
684

685
        // generate overall masks for each of the given sockets
686
        std::vector<mask_info> socket_masks =
687
            extract_socket_or_numanode_masks(t, m.second[0], ec);
×
688

689
        HPX_ASSERT(!socket_masks.empty());
×
690

691
        extract_core_affinities(t, m.second, socket_masks, affinities, ec);
×
692

693
        // special case, all threads share the same options
694
        if (affinities.size() == 1 && num_threads > 1)
×
695
        {
696
            affinities.resize(num_threads, affinities[0]);
×
697
        }
×
698
    }
×
699

700
    ///////////////////////////////////////////////////////////////////////////
701
    bool pu_in_process_mask(bool use_process_mask, topology& t,
4,374✔
702
        std::size_t num_core, std::size_t num_pu)
703
    {
704
        if (!use_process_mask)
4,374✔
705
        {
706
            return true;
4,370✔
707
        }
708

709
        threads::mask_type proc_mask = t.get_cpubind_mask();
4✔
710
        threads::mask_type pu_mask =
711
            t.init_thread_affinity_mask(num_core, num_pu);
4✔
712

713
        return threads::bit_and(proc_mask, pu_mask);
4✔
714
    }
4,374✔
715

716
    void check_num_threads(bool use_process_mask, topology& t,
1,222✔
717
        std::size_t num_threads, error_code& ec)
718
    {
719
        if (use_process_mask)
1,222✔
720
        {
721
            threads::mask_type proc_mask = t.get_cpubind_mask();
1✔
722
            std::size_t num_pus_proc_mask = threads::count(proc_mask);
1✔
723

724
            if (num_threads > num_pus_proc_mask)
1✔
725
            {
726
                HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
727
                    "check_num_threads",
728
                    "specified number of threads ({1}) is larger than number "
729
                    "of processing units available in process mask ({2})",
730
                    num_threads, num_pus_proc_mask);
731
            }
×
732
        }
1✔
733
        else
734
        {
735
            std::size_t num_threads_available = threads::hardware_concurrency();
1,221✔
736

737
            if (num_threads > num_threads_available)
1,221✔
738
            {
739
                HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
740
                    "check_num_threads",
741
                    "specified number of threads ({1}) is larger than number "
742
                    "of available processing units ({2})",
743
                    num_threads, num_threads_available);
744
            }
×
745
        }
746
    }
1,222✔
747

748
    ///////////////////////////////////////////////////////////////////////////
749
    void decode_compact_distribution(topology& t,
×
750
        std::vector<mask_type>& affinities, std::size_t used_cores,
751
        std::size_t max_cores, std::vector<std::size_t>& num_pus,
752
        bool use_process_mask, error_code& ec)
753
    {
754
        std::size_t num_threads = affinities.size();
×
755

756
        check_num_threads(use_process_mask, t, num_threads, ec);
×
757

758
        if (use_process_mask)
×
759
        {
760
            used_cores = 0;
×
761
            max_cores = t.get_number_of_cores();
×
762
        }
×
763

764
        std::size_t num_cores = (std::min)(max_cores, t.get_number_of_cores());
×
765
        num_pus.resize(num_threads);
×
766

767
        for (std::size_t num_thread = 0; num_thread < num_threads; /**/)
×
768
        {
769
            for (std::size_t num_core = 0; num_core < num_cores; ++num_core)
×
770
            {
771
                std::size_t num_core_pus =
×
772
                    t.get_number_of_core_pus(num_core + used_cores);
×
773
                for (std::size_t num_pu = 0; num_pu < num_core_pus; ++num_pu)
×
774
                {
775
                    if (!pu_in_process_mask(
×
776
                            use_process_mask, t, num_core, num_pu))
×
777
                    {
778
                        continue;
×
779
                    }
780

781
                    if (any(affinities[num_thread]))
×
782
                    {
783
                        HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
784
                            "decode_compact_distribution",
785
                            "affinity mask for thread {1} has already been set",
786
                            num_thread);
787
                        return;
×
788
                    }
789

790
                    num_pus[num_thread] =
×
791
                        t.get_pu_number(num_core + used_cores, num_pu);
×
792
                    affinities[num_thread] = t.init_thread_affinity_mask(
×
793
                        num_core + used_cores, num_pu);
×
794

795
                    if (++num_thread == num_threads)
×
796
                        return;
×
797
                }
×
798
            }
×
799
        }
800
    }
×
801

802
    void decode_scatter_distribution(topology& t,
×
803
        std::vector<mask_type>& affinities, std::size_t used_cores,
804
        std::size_t max_cores, std::vector<std::size_t>& num_pus,
805
        bool use_process_mask, error_code& ec)
806
    {
807
        std::size_t num_threads = affinities.size();
×
808

809
        check_num_threads(use_process_mask, t, num_threads, ec);
×
810

811
        if (use_process_mask)
×
812
        {
813
            used_cores = 0;
×
814
            max_cores = t.get_number_of_cores();
×
815
        }
×
816

817
        std::size_t num_cores = (std::min)(max_cores, t.get_number_of_cores());
×
818

819
        std::vector<std::size_t> next_pu_index(num_cores, 0);
×
820
        num_pus.resize(num_threads);
×
821

822
        for (std::size_t num_thread = 0; num_thread < num_threads; /**/)
×
823
        {
824
            for (std::size_t num_core = 0; num_core < num_cores; ++num_core)
×
825
            {
826
                if (any(affinities[num_thread]))
×
827
                {
828
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
829
                        "decode_scatter_distribution",
830
                        "affinity mask for thread {1} has already been set",
831
                        num_thread);
832
                    return;
×
833
                }
834

835
                std::size_t num_core_pus = t.get_number_of_core_pus(num_core);
×
836
                std::size_t pu_index = next_pu_index[num_core];
×
837
                bool use_pu = false;
×
838

839
                // Find the next PU on this core which is in the process mask
840
                while (pu_index < num_core_pus)
×
841
                {
842
                    use_pu = pu_in_process_mask(
×
843
                        use_process_mask, t, num_core, pu_index);
×
844
                    ++pu_index;
×
845

846
                    if (use_pu)
×
847
                    {
848
                        break;
×
849
                    }
850
                }
851

852
                next_pu_index[num_core] = pu_index;
×
853

854
                if (!use_pu)
×
855
                {
856
                    continue;
×
857
                }
858

859
                num_pus[num_thread] = t.get_pu_number(
×
860
                    num_core + used_cores, next_pu_index[num_core] - 1);
×
861
                affinities[num_thread] = t.init_thread_affinity_mask(
×
862
                    num_core + used_cores, next_pu_index[num_core] - 1);
×
863

864
                if (++num_thread == num_threads)
×
865
                    return;
×
866
            }
×
867
        }
868
    }
×
869

870
    ///////////////////////////////////////////////////////////////////////////
871
    void decode_balanced_distribution(topology& t,
1,222✔
872
        std::vector<mask_type>& affinities, std::size_t used_cores,
873
        std::size_t max_cores, std::vector<std::size_t>& num_pus,
874
        bool use_process_mask, error_code& ec)
875
    {
876
        std::size_t num_threads = affinities.size();
1,222✔
877

878
        check_num_threads(use_process_mask, t, num_threads, ec);
1,222✔
879

880
        if (use_process_mask)
1,222✔
881
        {
882
            used_cores = 0;
1✔
883
            max_cores = t.get_number_of_cores();
1✔
884
        }
1✔
885

886
        std::size_t num_cores = (std::min)(max_cores, t.get_number_of_cores());
1,222✔
887

888
        std::vector<std::size_t> num_pus_cores(num_cores, 0);
1,222✔
889
        std::vector<std::size_t> next_pu_index(num_cores, 0);
1,222✔
890
        std::vector<std::vector<std::size_t>> pu_indexes(num_cores);
1,222✔
891
        num_pus.resize(num_threads);
1,222✔
892

893
        // At first, calculate the number of used pus per core. This needs to be
894
        // done to make sure that we occupy all the available cores
895
        for (std::size_t num_thread = 0; num_thread < num_threads; /**/)
2,456✔
896
        {
897
            for (std::size_t num_core = 0; num_core < num_cores; ++num_core)
4,386✔
898
            {
899
                std::size_t num_core_pus = t.get_number_of_core_pus(num_core);
4,374✔
900
                std::size_t pu_index = next_pu_index[num_core];
4,374✔
901
                bool use_pu = false;
4,374✔
902

903
                // Find the next PU on this core which is in the process mask
904
                while (pu_index < num_core_pus)
4,374✔
905
                {
906
                    use_pu = pu_in_process_mask(
4,374✔
907
                        use_process_mask, t, num_core, pu_index);
4,374✔
908
                    ++pu_index;
4,374✔
909

910
                    if (use_pu)
4,374✔
911
                    {
912
                        break;
4,374✔
913
                    }
914
                }
915

916
                next_pu_index[num_core] = pu_index;
4,374✔
917

918
                if (!use_pu)
4,374✔
919
                {
920
                    continue;
×
921
                }
922

923
                pu_indexes[num_core].push_back(next_pu_index[num_core] - 1);
4,374✔
924

925
                num_pus_cores[num_core]++;
4,374✔
926
                if (++num_thread == num_threads)
4,374✔
927
                    break;
1,222✔
928
            }
3,152✔
929
        }
930

931
        // Iterate over the cores and assigned pus per core. this additional
932
        // loop is needed so that we have consecutive worker thread numbers
933
        std::size_t num_thread = 0;
1,222✔
934
        for (std::size_t num_core = 0; num_core < num_cores; ++num_core)
5,231✔
935
        {
936
            for (std::size_t num_pu = 0; num_pu < num_pus_cores[num_core];
8,383✔
937
                 ++num_pu)
4,374✔
938
            {
939
                if (any(affinities[num_thread]))
4,374✔
940
                {
941
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
942
                        "decode_balanced_distribution",
943
                        "affinity mask for thread {1} has already been set",
944
                        num_thread);
945
                    return;
×
946
                }
947

948
                num_pus[num_thread] = t.get_pu_number(
8,748✔
949
                    num_core + used_cores, pu_indexes[num_core][num_pu]);
4,374✔
950
                affinities[num_thread] = t.init_thread_affinity_mask(
8,748✔
951
                    num_core + used_cores, pu_indexes[num_core][num_pu]);
4,374✔
952
                ++num_thread;
4,374✔
953
            }
4,374✔
954
        }
4,009✔
955
    }
1,222✔
956

957
    ///////////////////////////////////////////////////////////////////////////
958
    void decode_numabalanced_distribution(topology& t,
×
959
        std::vector<mask_type>& affinities, std::size_t used_cores,
960
        std::size_t max_cores, std::vector<std::size_t>& num_pus,
961
        bool use_process_mask, error_code& ec)
962
    {
963
        HPX_UNUSED(max_cores);
×
964
        std::size_t num_threads = affinities.size();
×
965

966
        check_num_threads(use_process_mask, t, num_threads, ec);
×
967

968
        if (use_process_mask)
×
969
        {
970
            used_cores = 0;
×
971
        }
×
972

973
        num_pus.resize(num_threads);
×
974

975
        // numa nodes
976
        std::size_t num_numas =
×
977
            (std::max)(std::size_t(1), t.get_number_of_numa_nodes());
×
978
        std::vector<std::size_t> num_cores_numa(num_numas, 0);
×
979
        std::vector<std::size_t> num_pus_numa(num_numas, 0);
×
980
        std::vector<std::size_t> num_threads_numa(num_numas, 0);
×
981
        for (std::size_t n = 0; n < num_numas; ++n)
×
982
        {
983
            num_cores_numa[n] = t.get_number_of_numa_node_cores(n);
×
984
        }
×
985

986
        std::size_t core_offset = 0;
×
987
        std::size_t pus_t = 0;
×
988
        for (std::size_t n = 0; n < num_numas; ++n)
×
989
        {
990
            for (std::size_t num_core = 0; num_core < num_cores_numa[n];
×
991
                 ++num_core)
×
992
            {
993
                std::size_t num_pus =
×
994
                    t.get_number_of_core_pus(num_core + core_offset);
×
995
                for (std::size_t num_pu = 0; num_pu < num_pus; ++num_pu)
×
996
                {
997
                    if (pu_in_process_mask(use_process_mask, t,
×
998
                            num_core + core_offset, num_pu))
×
999
                    {
1000
                        ++num_pus_numa[n];
×
1001
                    }
×
1002
                }
×
1003
            }
×
1004

1005
            pus_t += num_pus_numa[n];
×
1006
            core_offset += num_cores_numa[n];
×
1007
        }
×
1008

1009
        // how many threads should go on each domain
1010
        std::size_t pus_t2 = 0;
×
1011
        for (std::size_t n = 0; n < num_numas; ++n)
×
1012
        {
1013
            std::size_t temp = static_cast<std::size_t>(
×
1014
                std::round(static_cast<double>(num_threads * num_pus_numa[n]) /
×
1015
                    static_cast<double>(pus_t)));
×
1016

1017
            // due to rounding up, we might have too many threads
1018
            if ((pus_t2 + temp) > num_threads)
×
1019
                temp = num_threads - pus_t2;
×
1020
            pus_t2 += temp;
×
1021
            num_threads_numa[n] = temp;
×
1022

1023
            // HPX_ASSERT(num_threads_numa[n] <= num_pus_numa[n]);
1024
        }
×
1025

1026
        // HPX_ASSERT(num_threads <= pus_t2);
1027

1028
        // assign threads to cores on each numa domain
1029
        std::size_t num_thread = 0;
×
1030
        core_offset = 0;
×
1031
        for (std::size_t n = 0; n < num_numas; ++n)
×
1032
        {
1033
            std::vector<std::size_t> num_pus_cores(num_cores_numa[n], 0);
×
1034
            std::vector<std::size_t> next_pu_index(num_cores_numa[n], 0);
×
1035
            std::vector<std::vector<std::size_t>> pu_indexes(num_cores_numa[n]);
×
1036

1037
            // iterate once and count pus/core
1038
            for (std::size_t num_thread_numa = 0;
×
1039
                 num_thread_numa < num_threads_numa[n];
×
1040
                /**/)
1041
            {
1042
                for (std::size_t num_core = 0; num_core < num_cores_numa[n];
×
1043
                     ++num_core)
×
1044
                {
1045
                    std::size_t num_core_pus =
×
1046
                        t.get_number_of_core_pus(num_core);
×
1047
                    std::size_t pu_index = next_pu_index[num_core];
×
1048
                    bool use_pu = false;
×
1049

1050
                    // Find the next PU on this core which is in the process mask
1051
                    while (pu_index < num_core_pus)
×
1052
                    {
1053
                        use_pu = pu_in_process_mask(use_process_mask, t,
×
1054
                            num_core + core_offset, pu_index);
×
1055
                        ++pu_index;
×
1056

1057
                        if (use_pu)
×
1058
                        {
1059
                            break;
×
1060
                        }
1061
                    }
1062

1063
                    next_pu_index[num_core] = pu_index;
×
1064

1065
                    if (!use_pu)
×
1066
                    {
1067
                        continue;
×
1068
                    }
1069

1070
                    pu_indexes[num_core].push_back(next_pu_index[num_core] - 1);
×
1071

1072
                    num_pus_cores[num_core]++;
×
1073
                    if (++num_thread_numa == num_threads_numa[n])
×
1074
                        break;
×
1075
                }
×
1076
            }
1077

1078
            // Iterate over the cores and assigned pus per core. this additional
1079
            // loop is needed so that we have consecutive worker thread numbers
1080
            for (std::size_t num_core = 0; num_core < num_cores_numa[n];
×
1081
                 ++num_core)
×
1082
            {
1083
                for (std::size_t num_pu = 0; num_pu < num_pus_cores[num_core];
×
1084
                     ++num_pu)
×
1085
                {
1086
                    if (any(affinities[num_thread]))
×
1087
                    {
1088
                        HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
1089
                            "decode_numabalanced_distribution",
1090
                            "affinity mask for thread {1} has already been set",
1091
                            num_thread);
1092
                        return;
×
1093
                    }
1094
                    num_pus[num_thread] = t.get_pu_number(
×
1095
                        num_core + used_cores, pu_indexes[num_core][num_pu]);
×
1096
                    affinities[num_thread] = t.init_thread_affinity_mask(
×
1097
                        num_core + used_cores + core_offset,
×
1098
                        pu_indexes[num_core][num_pu]);
×
1099
                    ++num_thread;
×
1100
                }
×
1101
            }
×
1102
            core_offset += num_cores_numa[n];
×
1103
        }
×
1104
    }
×
1105

1106
    ///////////////////////////////////////////////////////////////////////////
1107
    void decode_distribution(distribution_type d, topology& t,
1,222✔
1108
        std::vector<mask_type>& affinities, std::size_t used_cores,
1109
        std::size_t max_cores, std::size_t num_threads,
1110
        std::vector<std::size_t>& num_pus, bool use_process_mask,
1111
        error_code& ec)
1112
    {
1113
        affinities.resize(num_threads);
1,222✔
1114
        switch (d)
1,222✔
1115
        {
1116
        case distribution_type::compact:
1117
            decode_compact_distribution(t, affinities, used_cores, max_cores,
×
1118
                num_pus, use_process_mask, ec);
×
1119
            break;
×
1120

1121
        case distribution_type::scatter:
1122
            decode_scatter_distribution(t, affinities, used_cores, max_cores,
×
1123
                num_pus, use_process_mask, ec);
×
1124
            break;
×
1125

1126
        case distribution_type::balanced:
1127
            decode_balanced_distribution(t, affinities, used_cores, max_cores,
2,444✔
1128
                num_pus, use_process_mask, ec);
1,222✔
1129
            break;
1,222✔
1130

1131
        case distribution_type::numa_balanced:
1132
            decode_numabalanced_distribution(t, affinities, used_cores,
×
1133
                max_cores, num_pus, use_process_mask, ec);
×
1134
            break;
×
1135

1136
        default:
1137
            HPX_ASSERT(false);
×
1138
        }
×
1139
    }
1,222✔
1140
}    // namespace hpx::threads::detail
1141

1142
namespace hpx::threads {
1143

1144
    ///////////////////////////////////////////////////////////////////////////
1145
    void parse_affinity_options(std::string const& spec,
1,242✔
1146
        std::vector<mask_type>& affinities, std::size_t used_cores,
1147
        std::size_t max_cores, std::size_t num_threads,
1148
        std::vector<std::size_t>& num_pus, bool use_process_mask,
1149
        error_code& ec)
1150
    {
1151
        detail::mappings_type mappings;
1,242✔
1152
        detail::parse_mappings(spec, mappings, ec);
1,242✔
1153
        if (ec)
1,242✔
1154
            return;
20✔
1155

1156
        // We need to instantiate a new topology object as the runtime has not
1157
        // been initialized yet
1158
        threads::topology& t = threads::create_topology();
1,222✔
1159

1160
        switch (mappings.which())
1,222✔
1161
        {
1162
        case 0:
1163
        {
1164
            detail::decode_distribution(
1,222✔
1165
                boost::get<detail::distribution_type>(mappings), t, affinities,
1,222✔
1166
                used_cores, max_cores, num_threads, num_pus, use_process_mask,
1,222✔
1167
                ec);
1,222✔
1168
            if (ec)
1,222✔
1169
                return;
×
1170
        }
1171
        break;
1,222✔
1172

1173
        case 1:
1174
        {
1175
            if (use_process_mask)
×
1176
            {
1177
                HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
1178
                    "parse_affinity_options",
1179
                    "can't use --hpx:use-process-mask with custom thread "
1180
                    "bindings");
1181
            }
×
1182
            detail::mappings_spec_type mappings_specs(
×
1183
                boost::get<detail::mappings_spec_type>(mappings));
×
1184

1185
            affinities.clear();
×
1186
            for (detail::full_mapping_type& m : mappings_specs)
×
1187
            {
1188
                if (m.first.type_ != detail::spec_type::type::thread)
×
1189
                {
1190
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
1191
                        "parse_affinity_options",
1192
                        "bind specification ({1}) is ill formatted", spec);
1193
                    return;
×
1194
                }
1195

1196
                if (m.second.size() != 3)
×
1197
                {
1198
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
1199
                        "parse_affinity_options",
1200
                        "bind specification ({1}) is ill formatted", spec);
1201
                    return;
×
1202
                }
1203

1204
                if (m.second[0].type_ == detail::spec_type::type::unknown &&
×
1205
                    m.second[1].type_ == detail::spec_type::type::unknown &&
×
1206
                    m.second[2].type_ == detail::spec_type::type::unknown)
×
1207
                {
1208
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
1209
                        "parse_affinity_options",
1210
                        "bind specification ({1}) is ill formatted", spec);
1211
                    return;
×
1212
                }
1213

1214
                // repeat for each of the threads in the affinity specification
1215
                detail::bounds_type thread_bounds =
1216
                    extract_bounds(m.first, num_threads, ec);
×
1217
                if (ec)
×
1218
                    return;
×
1219

1220
                mappings_sanity_checks(m, num_threads, thread_bounds, ec);
×
1221
                if (ec)
×
1222
                    return;
×
1223

1224
                detail::decode_mappings(
×
1225
                    t, m, affinities, thread_bounds.size(), ec);
×
1226
                if (ec)
×
1227
                    return;
×
1228
            }
×
1229

1230
            if (num_pus.empty())
×
1231
            {
1232
                num_pus.resize(affinities.size());
×
1233
                for (std::size_t i = 0; i != affinities.size(); ++i)
×
1234
                {
1235
                    num_pus[i] = threads::find_first(affinities[i]);
×
1236
                }
×
1237
            }
×
1238
        }
×
1239
        break;
×
1240
        }
1241
    }
1,242✔
1242
}    // namespace hpx::threads
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc