• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #853

19 Dec 2022 01:01AM UTC coverage: 86.287% (+0.4%) from 85.912%
#853

push

StellarBot
Merge #6109

6109: Modernize serialization module r=hkaiser a=hkaiser

- flyby separate serialization of Boost types

working towards https://github.com/STEllAR-GROUP/hpx/issues/5497

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

53 of 53 new or added lines in 6 files covered. (100.0%)

173939 of 201582 relevant lines covered (86.29%)

1931657.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

21.48
/libs/core/affinity/src/parse_affinity_options.cpp
1
//  Copyright (c) 2007-2017 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#include <hpx/affinity/parse_affinity_options.hpp>
8
#include <hpx/assert.hpp>
9
#include <hpx/datastructures/tuple.hpp>
10
#include <hpx/modules/errors.hpp>
11
#include <hpx/topology/topology.hpp>
12

13
#include <hpx/affinity/detail/partlit.hpp>
14

15
#include <hwloc.h>
16

17
#include <boost/fusion/include/adapt_struct.hpp>
18
#include <boost/fusion/include/std_pair.hpp>
19
#include <boost/spirit/home/x3/auxiliary.hpp>
20
#include <boost/spirit/home/x3/char.hpp>
21
#include <boost/spirit/home/x3/core.hpp>
22
#include <boost/spirit/home/x3/nonterminal.hpp>
23
#include <boost/spirit/home/x3/numeric.hpp>
24
#include <boost/spirit/home/x3/operator.hpp>
25
#include <boost/spirit/home/x3/string.hpp>
26

27
#include <algorithm>
28
#include <cstddef>
29
#include <cstdint>
30
#include <string>
31
#include <vector>
32

33
///////////////////////////////////////////////////////////////////////////////
34
// clang-format off
35
BOOST_FUSION_ADAPT_STRUCT(
1,826✔
36
    hpx::threads::detail::spec_type,
37
    type_,
38
    index_bounds_
39
)
40
// clang-format on
41

42
namespace {
43

44
    ///////////////////////////////////////////////////////////////////////////
45
    //
46
    //    mappings:
47
    //        distribution
48
    //        mapping(;mapping)*
49
    //
50
    //    distribution:
51
    //        compact
52
    //        scatter
53
    //        balanced
54
    //        numa-balanced
55
    //
56
    //    mapping:
57
    //        thread-spec=pu-specs
58
    //
59
    //    thread-spec:
60
    //        t:int
61
    //        t:int-int
62
    //        t:all
63
    //
64
    //    pu-specs:
65
    //        pu-spec(.pu-spec)*
66
    //
67
    //    pu-spec:
68
    //        type:range-specs
69
    //        ~pu-spec
70
    //
71
    //    range-specs:
72
    //        range-spec(,range-spec)*
73
    //
74
    //    range-spec:
75
    //        int
76
    //        int-int
77
    //        all
78
    //
79
    //    type:
80
    //        socket | numanode
81
    //        core
82
    //        pu
83
    //
84
    namespace x3 = boost::spirit::x3;
85
    using namespace hpx::threads::detail;
86

87
    x3::rule<class mappings, mappings_type> mappings = "mappings";
1,249✔
88
    x3::rule<class distribution, distribution_type> distribution =
1,249✔
89
        "distribution";
1,249✔
90
    x3::rule<class mapping, full_mapping_type> mapping = "mapping";
1,249✔
91
    x3::rule<class thread_spec, spec_type> thread_spec = "thread_spec";
1,249✔
92
    x3::rule<class pu_specs, mapping_type> pu_specs = "pu_specs";
1,249✔
93
    x3::rule<class socket_spec, spec_type> socket_spec = "socket_spec";
1,249✔
94
    x3::rule<class core_spec, spec_type> core_spec = "core_spec";
1,249✔
95
    x3::rule<class pu_spec, spec_type> pu_spec = "pu_spec";
1,249✔
96
    x3::rule<class specs, bounds_type> specs = "specs";
1,249✔
97
    x3::rule<class spec, bounds_type> spec = "spec";
1,249✔
98

99
    auto mappings_def = distribution | (mapping % ';');
1,249✔
100

101
    auto mapping_def = thread_spec >> '=' >> pu_specs;
1,249✔
102

103
    // clang-format off
104
    auto distribution_def =
1,249✔
105
            partlit("compact", compact)
1,249✔
106
        |   partlit("scatter", scatter)
1,249✔
107
        |   partlit("balanced", balanced)
1,249✔
108
        |   partlit("numa-balanced", numa_balanced)
1,249✔
109
        ;
110

111
    auto thread_spec_def =
1,249✔
112
            partlit("thread", spec_type::thread) >> ':' >>  specs
1,249✔
113
        ;
114

115
    auto pu_specs_def =
1,249✔
116
            (socket_spec >> core_spec >> pu_spec)
1,249✔
117
//        |   ('~' >> pu_spec)
118
        ;
119

120
    auto socket_spec_def =
2,498✔
121
            (partlit("socket", spec_type::socket) >> ':' >> specs)
2,498✔
122
        |   (partlit("numanode", spec_type::numanode) >> ':' >> specs)
1,249✔
123
        |   x3::attr(spec_type(spec_type::unknown))
1,249✔
124
        ;
125

126
    auto core_spec_def =
2,498✔
127
           (-x3::lit('.') >> partlit("core", spec_type::core) >> ':' >> specs)
2,498✔
128
        |   x3::attr(spec_type(spec_type::unknown))
1,249✔
129
        ;
130

131
    auto pu_spec_def =
2,498✔
132
           (-x3::lit('.') >> partlit("pu", spec_type::pu) >> ':' >> specs)
2,498✔
133
        |   x3::attr(spec_type(spec_type::unknown))
1,249✔
134
        ;
135

136
    auto specs_def = spec % ',';
1,249✔
137

138
    auto spec_def =
2,498✔
139
            (x3::uint_ >> -(x3::int_))
2,498✔
140
        |   partlit("all", bounds_type{spec_type::all_entities()})
1,249✔
141
        ;
142
    // clang-format on
143

144
    BOOST_SPIRIT_DEFINE(mappings, distribution, mapping, thread_spec, pu_specs,
5,046✔
145
        socket_spec, core_spec, pu_spec, specs, spec)
146

147
}    // namespace
148

149
namespace hpx { namespace threads { namespace detail {
150
    static char const* const type_names[] = {
151
        "unknown", "thread", "socket", "numanode", "core", "pu"};
152

153
    char const* spec_type::type_name(spec_type::type t)
×
154
    {
155
        if (t < spec_type::unknown || t > spec_type::pu)
×
156
            return type_names[0];
×
157
        return type_names[t];
×
158
    }
×
159

160
    template <typename Iterator>
161
    inline bool parse(Iterator& begin, Iterator end, mappings_type& m)
1,368✔
162
    {
163
        return x3::parse(begin, end, mappings, m);
1,368✔
164
    }
165

166
    ///////////////////////////////////////////////////////////////////////////
167
    void parse_mappings(
1,368✔
168
        std::string const& spec, mappings_type& mappings, error_code& ec)
169
    {
170
        std::string::const_iterator begin = spec.begin();
1,368✔
171
        if (!detail::parse(begin, spec.end(), mappings) || begin != spec.end())
1,368✔
172
        {
173
            HPX_THROWS_IF(ec, hpx::error::bad_parameter,
20✔
174
                "parse_affinity_options",
175
                "failed to parse affinity specification: " + spec);
176
            return;
20✔
177
        }
178

179
        if (&ec != &throws)
1,348✔
180
            ec = make_success_code();
128✔
181
    }
1,368✔
182

183
    ///////////////////////////////////////////////////////////////////////////
184
    bounds_type extract_bounds(
×
185
        spec_type const& m, std::size_t default_last, error_code& ec)
186
    {
187
        bounds_type result;
×
188

189
        if (m.index_bounds_.empty())
×
190
            return result;
×
191

192
        bounds_type::const_iterator first = m.index_bounds_.begin();
×
193
        bounds_type::const_iterator last = m.index_bounds_.end();
×
194

195
        while (first != last)
×
196
        {
197
            if (*first == spec_type::all_entities())
×
198
            {
199
                // bind all entities
200
                result.clear();
×
201
                for (std::size_t i = 0; i != default_last; ++i)
×
202
                    result.push_back(i);
×
203
                break;    // we will not get more than 'all'
×
204
            }
205

206
            bounds_type::const_iterator second = first;
×
207
            if (++second != last)
×
208
            {
209
                if (*second == 0 || *second == spec_type::all_entities())
×
210
                {
211
                    // one element only
212
                    if (default_last <= std::size_t(*first))
×
213
                    {
214
                        result.clear();
×
215
                        HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
216
                            "extract_bounds",
217
                            "the resource id given is larger than the "
218
                            "number of existing resources");
219
                        return result;
×
220
                    }
221
                    result.push_back(*first);
×
222
                }
×
223
                else if (*second < 0)
×
224
                {
225
                    // all elements between min and -max
226
                    if (default_last <= std::size_t(-*second))
×
227
                    {
228
                        result.clear();
×
229
                        HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
230
                            "extract_bounds",
231
                            "the upper limit given is larger than the "
232
                            "number of existing resources");
233
                        return result;
×
234
                    }
235
                    for (std::int64_t i = *first; i <= -*second; ++i)
×
236
                        result.push_back(i);
×
237
                }
×
238
                else
239
                {
240
                    // just min and max
241
                    if (default_last <= std::size_t(*second))
×
242
                    {
243
                        result.clear();
×
244
                        HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
245
                            "extract_bounds",
246
                            "the upper limit given is larger than the "
247
                            "number of existing resources");
248
                        return result;
×
249
                    }
250
                    result.push_back(*first);
×
251
                    result.push_back(*second);
×
252
                }
253
                first = second;
×
254
            }
×
255
            else
256
            {
257
                // one element only
258
                if (default_last <= std::size_t(*first))
×
259
                {
260
                    result.clear();
×
261
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
262
                        "extract_bounds",
263
                        "the resource id given is larger than the number "
264
                        "of existing resources");
265
                    return result;
×
266
                }
267
                result.push_back(*first);
×
268
            }
269
            ++first;
×
270
        }
271

272
        if (&ec != &throws)
×
273
            ec = make_success_code();
×
274

275
        return result;
×
276
    }
×
277

278
    ///////////////////////////////////////////////////////////////////////////
279
    //                  index,       mask
280
    typedef hpx::tuple<std::size_t, mask_type> mask_info;
281

282
    inline std::size_t get_index(mask_info const& smi)
×
283
    {
284
        return hpx::get<0>(smi);
×
285
    }
286
    inline mask_cref_type get_mask(mask_info const& smi)
×
287
    {
288
        return hpx::get<1>(smi);
×
289
    }
290

291
    ///////////////////////////////////////////////////////////////////////////
292
    std::vector<mask_info> extract_socket_masks(
×
293
        topology const& t, bounds_type const& b)
294
    {
295
        std::vector<mask_info> masks;
×
296
        for (std::int64_t index : b)
×
297
        {
298
            masks.push_back(hpx::make_tuple(static_cast<std::size_t>(index),
×
299
                t.init_socket_affinity_mask_from_socket(
×
300
                    static_cast<std::size_t>(index))));
×
301
        }
302
        return masks;
×
303
    }
×
304

305
    std::vector<mask_info> extract_numanode_masks(
×
306
        topology const& t, bounds_type const& b)
307
    {
308
        std::vector<mask_info> masks;
×
309
        for (std::int64_t index : b)
×
310
        {
311
            masks.push_back(hpx::make_tuple(static_cast<std::size_t>(index),
×
312
                t.init_numa_node_affinity_mask_from_numa_node(
×
313
                    static_cast<std::size_t>(index))));
×
314
        }
315
        return masks;
×
316
    }
×
317

318
    mask_cref_type extract_machine_mask(topology const& t, error_code& ec)
×
319
    {
320
        return t.get_machine_affinity_mask(ec);
×
321
    }
322

323
    std::vector<mask_info> extract_socket_or_numanode_masks(
×
324
        topology const& t, spec_type const& s, error_code& ec)
325
    {
326
        switch (s.type_)
×
327
        {
328
        case spec_type::socket:
329
            // requested top level is a socket
330
            {
331
                std::size_t num_sockets = t.get_number_of_sockets();
×
332
                return extract_socket_masks(
×
333
                    t, extract_bounds(s, num_sockets, ec));
×
334
            }
335

336
        case spec_type::numanode:
337
            // requested top level is a NUMA node
338
            {
339
                std::size_t num_numanodes = t.get_number_of_numa_nodes();
×
340
                return extract_numanode_masks(
×
341
                    t, extract_bounds(s, num_numanodes, ec));
×
342
            }
343

344
        case spec_type::unknown:
345
        {
346
            std::vector<mask_info> masks;
×
347
            masks.push_back(
×
348
                hpx::make_tuple(std::size_t(-1), extract_machine_mask(t, ec)));
×
349
            return masks;
×
350
        }
×
351

352
        default:
353
            HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
354
                "extract_socket_or_numanode_mask",
355
                "unexpected specification type {}",
356
                spec_type::type_name(s.type_));
357
            break;
×
358
        }
359

360
        return std::vector<mask_info>();
×
361
    }
×
362

363
    std::vector<mask_info> extract_core_masks(topology const& t,
×
364
        spec_type const& s, std::size_t socket, mask_cref_type socket_mask,
365
        error_code& ec)
366
    {
367
        std::vector<mask_info> masks;
×
368

369
        switch (s.type_)
×
370
        {
371
        case spec_type::core:
372
        {
373
            std::size_t base = 0;
×
374
            std::size_t num_cores = 0;
×
375

376
            if (socket != std::size_t(-1))
×
377
            {
378
                for (std::size_t i = 0; i != socket; ++i)
×
379
                {
380
                    // The number of NUMA nodes might be zero if there hwloc
381
                    // doesn't detect a NUMA domain. This might be the case
382
                    // when there is no NUMA support configured, or when
383
                    // there are just sockets, but no direct numa domains.
384
                    // The bind description might relate to sockets, not
385
                    // NUMA domains.
386
                    if (t.get_number_of_numa_nodes() == 0)
×
387
                        base += t.get_number_of_socket_cores(i);
×
388
                    else
389
                        base += t.get_number_of_numa_node_cores(i);
×
390
                }
×
391
                if (t.get_number_of_numa_nodes() == 0)
×
392
                    num_cores = t.get_number_of_socket_cores(socket);
×
393
                else
394
                    num_cores = t.get_number_of_numa_node_cores(socket);
×
395
            }
×
396
            else
397
            {
398
                num_cores = t.get_number_of_cores();
×
399
            }
400

401
            bounds_type bounds = extract_bounds(s, num_cores, ec);
×
402
            if (ec)
×
403
                break;
×
404

405
            for (std::int64_t index : bounds)
×
406
            {
407
                mask_type mask = t.init_core_affinity_mask_from_core(
×
408
                    static_cast<std::size_t>(index + base));
×
409
                masks.push_back(hpx::make_tuple(
×
410
                    static_cast<std::size_t>(index), mask & socket_mask));
×
411
            }
×
412
        }
×
413
        break;
×
414

415
        case spec_type::unknown:
416
        {
417
            mask_type mask = extract_machine_mask(t, ec);
×
418
            masks.push_back(
×
419
                hpx::make_tuple(std::size_t(-1), mask & socket_mask));
×
420
        }
×
421
        break;
×
422

423
        default:
424
            HPX_THROWS_IF(ec, hpx::error::bad_parameter, "extract_core_mask",
×
425
                "unexpected specification type {}",
426
                spec_type::type_name(s.type_));
427
            break;
×
428
        }
429

430
        return masks;
×
431
    }
×
432

433
    std::vector<mask_info> extract_pu_masks(topology const& t,
×
434
        spec_type const& s, std::size_t socket, std::size_t core,
435
        mask_cref_type core_mask, error_code& ec)
436
    {
437
        std::vector<mask_info> masks;
×
438

439
        switch (s.type_)
×
440
        {
441
        case spec_type::pu:
442
        {
443
            std::size_t num_pus = 0;
×
444
            std::size_t socket_base = 0;
×
445
            if (std::size_t(-1) != socket)
×
446
            {
447
                // core number is relative to socket
448
                for (std::size_t i = 0; i != socket; ++i)
×
449
                {
450
                    if (t.get_number_of_numa_nodes() == 0)
×
451
                        socket_base += t.get_number_of_socket_cores(i);
×
452
                    else
453
                        socket_base += t.get_number_of_numa_node_cores(i);
×
454
                }
×
455
            }
×
456

457
            if (std::size_t(-1) != core)
×
458
            {
459
                num_pus = t.get_number_of_core_pus(core);
×
460
            }
×
461
            else
462
            {
463
                num_pus = t.get_number_of_pus();
×
464
            }
465
            bounds_type bounds = extract_bounds(s, num_pus, ec);
×
466
            if (ec)
×
467
                break;
×
468

469
            std::size_t num_cores = t.get_number_of_cores();
×
470
            for (std::int64_t index : bounds)
×
471
            {
472
                std::size_t base_core = socket_base;
×
473
                if (std::size_t(-1) != core)
×
474
                {
475
                    base_core += core;
×
476
                }
×
477
                else
478
                {
479
                    // find core the given pu belongs to
480
                    std::size_t base = 0;
×
481
                    for (/**/; base_core < num_cores; ++base_core)
×
482
                    {
483
                        std::size_t num_core_pus =
×
484
                            t.get_number_of_core_pus(base_core);
×
485
                        if (base + num_core_pus > std::size_t(index))
×
486
                            break;
×
487
                        base += num_core_pus;
×
488
                    }
×
489
                }
490

491
                mask_type mask = t.init_thread_affinity_mask(
×
492
                    base_core, static_cast<std::size_t>(index));
×
493
                masks.push_back(hpx::make_tuple(
×
494
                    static_cast<std::size_t>(index), mask & core_mask));
×
495
            }
×
496
        }
×
497
        break;
×
498

499
        case spec_type::unknown:
500
        {
501
            mask_type mask = extract_machine_mask(t, ec);
×
502
            masks.push_back(hpx::make_tuple(std::size_t(-1), mask & core_mask));
×
503
        }
×
504
        break;
×
505

506
        default:
507
            HPX_THROWS_IF(ec, hpx::error::bad_parameter, "extract_pu_mask",
×
508
                "unexpected specification type {}",
509
                spec_type::type_name(s.type_));
510
            break;
×
511
        }
512

513
        return masks;
×
514
    }
×
515

516
    ///////////////////////////////////////////////////////////////////////////
517
    // sanity checks
518
    void mappings_sanity_checks(full_mapping_type& fmt, std::size_t /* size */,
×
519
        bounds_type const& b, error_code& ec)
520
    {
521
        mapping_type& m = fmt.second;
×
522
        if (m.size() != 3)
×
523
        {
524
            HPX_THROWS_IF(ec, hpx::error::bad_parameter, "decode_mapping",
×
525
                "bad size of mappings specification array");
526
            return;
×
527
        }
528

529
        if (b.begin() == b.end())
×
530
        {
531
            HPX_THROWS_IF(ec, hpx::error::bad_parameter, "decode_mapping",
×
532
                "no {1} mapping bounds are specified",
533
                spec_type::type_name(fmt.first.type_));
534
            return;
×
535
        }
536

537
        if (&ec != &throws)
×
538
            ec = make_success_code();
×
539
    }
×
540

541
    // for each given core-mask extract all required pu-masks
542
    void extract_pu_affinities(topology const& t,
×
543
        std::vector<spec_type> const& specs, std::size_t socket,
544
        std::vector<mask_info> const& core_masks,
545
        std::vector<mask_type>& affinities, error_code& ec)
546
    {
547
        // get the core masks for each of the sockets
548
        for (mask_info const& cmi : core_masks)
×
549
        {
550
            if (get_index(cmi) == std::size_t(-1))
×
551
            {
552
                // all cores
553
                if (specs[2].type_ == spec_type::unknown)
×
554
                {
555
                    // no pu information
556
                    affinities.push_back(get_mask(cmi));
×
557
                }
×
558
                else
559
                {
560
                    // handle pu information in the absence of core information
561
                    std::vector<mask_info> pu_masks = extract_pu_masks(t,
×
562
                        specs[2], socket, std::size_t(-1), get_mask(cmi), ec);
×
563
                    if (ec)
×
564
                        break;
×
565

566
                    for (mask_info const& pmi : pu_masks)
×
567
                    {
568
                        affinities.push_back(get_mask(pmi));
×
569
                    }
570
                }
×
571
                break;
×
572
            }
573
            else
574
            {
575
                // just this core
576
                std::vector<mask_info> pu_masks = extract_pu_masks(
×
577
                    t, specs[2], socket, get_index(cmi), get_mask(cmi), ec);
×
578
                if (ec)
×
579
                    break;
×
580

581
                for (mask_info const& pmi : pu_masks)
×
582
                {
583
                    affinities.push_back(get_mask(pmi));
×
584
                }
585
            }
×
586
        }
587
    }
×
588

589
    // for each given socket-mask extract all required pu-masks
590
    void extract_core_affinities(topology const& t,
×
591
        std::vector<spec_type> const& specs,
592
        std::vector<mask_info> const& socket_masks,
593
        std::vector<mask_type>& affinities, error_code& ec)
594
    {
595
        // get the core masks for each of the sockets
596
        for (mask_info const& smi : socket_masks)
×
597
        {
598
            if (get_index(smi) == std::size_t(-1))
×
599
            {
600
                // all NUMA domains
601
                if (specs[1].type_ == spec_type::unknown)
×
602
                {
603
                    // no core information
604
                    if (specs[2].type_ == spec_type::unknown)
×
605
                    {
606
                        // no pu information
607
                        affinities.push_back(get_mask(smi));
×
608
                    }
×
609
                    else
610
                    {
611
                        // handle pu information in the absence of core/socket
612
                        std::vector<mask_info> pu_masks =
613
                            extract_pu_masks(t, specs[2], std::size_t(-1),
×
614
                                std::size_t(-1), get_mask(smi), ec);
×
615
                        if (ec)
×
616
                            break;
×
617

618
                        for (mask_info const& pmi : pu_masks)
×
619
                        {
620
                            affinities.push_back(get_mask(pmi));
×
621
                        }
622
                    }
×
623
                }
×
624
                else
625
                {
626
                    // no socket given, assume cores are numbered for whole
627
                    // machine
628
                    if (specs[2].type_ == spec_type::unknown)
×
629
                    {
630
                        // no pu information
631
                        std::vector<mask_info> core_masks = extract_core_masks(
×
632
                            t, specs[1], std::size_t(-1), get_mask(smi), ec);
×
633
                        if (ec)
×
634
                            break;
×
635

636
                        for (mask_info const& cmi : core_masks)
×
637
                        {
638
                            affinities.push_back(get_mask(cmi));
×
639
                        }
640
                    }
×
641
                    else
642
                    {
643
                        std::vector<mask_info> core_masks = extract_core_masks(
×
644
                            t, specs[1], std::size_t(-1), get_mask(smi), ec);
×
645
                        if (ec)
×
646
                            break;
×
647

648
                        // get the pu masks (i.e. overall affinity masks) for
649
                        // all of the core masks
650
                        extract_pu_affinities(t, specs, std::size_t(-1),
×
651
                            core_masks, affinities, ec);
×
652
                        if (ec)
×
653
                            break;
×
654
                    }
×
655
                }
656
                break;
×
657
            }
658
            else
659
            {
660
                std::vector<mask_info> core_masks = extract_core_masks(
×
661
                    t, specs[1], get_index(smi), get_mask(smi), ec);
×
662
                if (ec)
×
663
                    break;
×
664

665
                // get the pu masks (i.e. overall affinity masks) for
666
                // all of the core masks
667
                extract_pu_affinities(
×
668
                    t, specs, get_index(smi), core_masks, affinities, ec);
×
669
                if (ec)
×
670
                    break;
×
671
            }
×
672
        }
673
    }
×
674

675
    void decode_mappings(topology const& t, full_mapping_type& m,
×
676
        std::vector<mask_type>& affinities, std::size_t num_threads,
677
        error_code& ec)
678
    {
679
        // The core numbers are interpreted differently depending on whether a
680
        // socket/numanode is given or not. If no socket(s) is(are) given, then
681
        // the core numbering covers the whole locality, otherwise the core
682
        // numbering is relative to the given socket.
683

684
        // generate overall masks for each of the given sockets
685
        std::vector<mask_info> socket_masks =
686
            extract_socket_or_numanode_masks(t, m.second[0], ec);
×
687

688
        HPX_ASSERT(!socket_masks.empty());
×
689

690
        extract_core_affinities(t, m.second, socket_masks, affinities, ec);
×
691

692
        // special case, all threads share the same options
693
        if (affinities.size() == 1 && num_threads > 1)
×
694
        {
695
            affinities.resize(num_threads, affinities[0]);
×
696
        }
×
697
    }
×
698

699
    ///////////////////////////////////////////////////////////////////////////
700
    bool pu_in_process_mask(bool use_process_mask, topology& t,
4,370✔
701
        std::size_t num_core, std::size_t num_pu)
702
    {
703
        if (!use_process_mask)
4,370✔
704
        {
705
            return true;
4,366✔
706
        }
707

708
        threads::mask_type proc_mask = t.get_cpubind_mask();
4✔
709
        threads::mask_type pu_mask =
710
            t.init_thread_affinity_mask(num_core, num_pu);
4✔
711

712
        return threads::bit_and(proc_mask, pu_mask);
4✔
713
    }
4,370✔
714

715
    void check_num_threads(bool use_process_mask, topology& t,
1,220✔
716
        std::size_t num_threads, error_code& ec)
717
    {
718
        if (use_process_mask)
1,220✔
719
        {
720
            threads::mask_type proc_mask = t.get_cpubind_mask();
1✔
721
            std::size_t num_pus_proc_mask = threads::count(proc_mask);
1✔
722

723
            if (num_threads > num_pus_proc_mask)
1✔
724
            {
725
                HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
726
                    "check_num_threads",
727
                    "specified number of threads ({1}) is larger than number "
728
                    "of processing units available in process mask ({2})",
729
                    num_threads, num_pus_proc_mask);
730
            }
×
731
        }
1✔
732
        else
733
        {
734
            std::size_t num_threads_available = threads::hardware_concurrency();
1,219✔
735

736
            if (num_threads > num_threads_available)
1,219✔
737
            {
738
                HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
739
                    "check_num_threads",
740
                    "specified number of threads ({1}) is larger than number "
741
                    "of available processing units ({2})",
742
                    num_threads, num_threads_available);
743
            }
×
744
        }
745
    }
1,220✔
746

747
    ///////////////////////////////////////////////////////////////////////////
748
    void decode_compact_distribution(topology& t,
×
749
        std::vector<mask_type>& affinities, std::size_t used_cores,
750
        std::size_t max_cores, std::vector<std::size_t>& num_pus,
751
        bool use_process_mask, error_code& ec)
752
    {
753
        std::size_t num_threads = affinities.size();
×
754

755
        check_num_threads(use_process_mask, t, num_threads, ec);
×
756

757
        if (use_process_mask)
×
758
        {
759
            used_cores = 0;
×
760
            max_cores = t.get_number_of_cores();
×
761
        }
×
762

763
        std::size_t num_cores = (std::min)(max_cores, t.get_number_of_cores());
×
764
        num_pus.resize(num_threads);
×
765

766
        for (std::size_t num_thread = 0; num_thread < num_threads; /**/)
×
767
        {
768
            for (std::size_t num_core = 0; num_core < num_cores; ++num_core)
×
769
            {
770
                std::size_t num_core_pus =
×
771
                    t.get_number_of_core_pus(num_core + used_cores);
×
772
                for (std::size_t num_pu = 0; num_pu < num_core_pus; ++num_pu)
×
773
                {
774
                    if (!pu_in_process_mask(
×
775
                            use_process_mask, t, num_core, num_pu))
×
776
                    {
777
                        continue;
×
778
                    }
779

780
                    if (any(affinities[num_thread]))
×
781
                    {
782
                        HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
783
                            "decode_compact_distribution",
784
                            "affinity mask for thread {1} has already been set",
785
                            num_thread);
786
                        return;
×
787
                    }
788

789
                    num_pus[num_thread] =
×
790
                        t.get_pu_number(num_core + used_cores, num_pu);
×
791
                    affinities[num_thread] = t.init_thread_affinity_mask(
×
792
                        num_core + used_cores, num_pu);
×
793

794
                    if (++num_thread == num_threads)
×
795
                        return;
×
796
                }
×
797
            }
×
798
        }
799
    }
×
800

801
    void decode_scatter_distribution(topology& t,
×
802
        std::vector<mask_type>& affinities, std::size_t used_cores,
803
        std::size_t max_cores, std::vector<std::size_t>& num_pus,
804
        bool use_process_mask, error_code& ec)
805
    {
806
        std::size_t num_threads = affinities.size();
×
807

808
        check_num_threads(use_process_mask, t, num_threads, ec);
×
809

810
        if (use_process_mask)
×
811
        {
812
            used_cores = 0;
×
813
            max_cores = t.get_number_of_cores();
×
814
        }
×
815

816
        std::size_t num_cores = (std::min)(max_cores, t.get_number_of_cores());
×
817

818
        std::vector<std::size_t> next_pu_index(num_cores, 0);
×
819
        num_pus.resize(num_threads);
×
820

821
        for (std::size_t num_thread = 0; num_thread < num_threads; /**/)
×
822
        {
823
            for (std::size_t num_core = 0; num_core < num_cores; ++num_core)
×
824
            {
825
                if (any(affinities[num_thread]))
×
826
                {
827
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
828
                        "decode_scatter_distribution",
829
                        "affinity mask for thread {1} has already been set",
830
                        num_thread);
831
                    return;
×
832
                }
833

834
                std::size_t num_core_pus = t.get_number_of_core_pus(num_core);
×
835
                std::size_t pu_index = next_pu_index[num_core];
×
836
                bool use_pu = false;
×
837

838
                // Find the next PU on this core which is in the process mask
839
                while (pu_index < num_core_pus)
×
840
                {
841
                    use_pu = pu_in_process_mask(
×
842
                        use_process_mask, t, num_core, pu_index);
×
843
                    ++pu_index;
×
844

845
                    if (use_pu)
×
846
                    {
847
                        break;
×
848
                    }
849
                }
850

851
                next_pu_index[num_core] = pu_index;
×
852

853
                if (!use_pu)
×
854
                {
855
                    continue;
×
856
                }
857

858
                num_pus[num_thread] = t.get_pu_number(
×
859
                    num_core + used_cores, next_pu_index[num_core] - 1);
×
860
                affinities[num_thread] = t.init_thread_affinity_mask(
×
861
                    num_core + used_cores, next_pu_index[num_core] - 1);
×
862

863
                if (++num_thread == num_threads)
×
864
                    return;
×
865
            }
×
866
        }
867
    }
×
868

869
    ///////////////////////////////////////////////////////////////////////////
870
    void decode_balanced_distribution(topology& t,
1,220✔
871
        std::vector<mask_type>& affinities, std::size_t used_cores,
872
        std::size_t max_cores, std::vector<std::size_t>& num_pus,
873
        bool use_process_mask, error_code& ec)
874
    {
875
        std::size_t num_threads = affinities.size();
1,220✔
876

877
        check_num_threads(use_process_mask, t, num_threads, ec);
1,220✔
878

879
        if (use_process_mask)
1,220✔
880
        {
881
            used_cores = 0;
1✔
882
            max_cores = t.get_number_of_cores();
1✔
883
        }
1✔
884

885
        std::size_t num_cores = (std::min)(max_cores, t.get_number_of_cores());
1,220✔
886

887
        std::vector<std::size_t> num_pus_cores(num_cores, 0);
1,220✔
888
        std::vector<std::size_t> next_pu_index(num_cores, 0);
1,220✔
889
        std::vector<std::vector<std::size_t>> pu_indexes(num_cores);
1,220✔
890
        num_pus.resize(num_threads);
1,220✔
891

892
        // At first, calculate the number of used pus per core.
893
        // This needs to be done to make sure that we occupy all the available
894
        // cores
895
        for (std::size_t num_thread = 0; num_thread < num_threads; /**/)
2,452✔
896
        {
897
            for (std::size_t num_core = 0; num_core < num_cores; ++num_core)
4,382✔
898
            {
899
                std::size_t num_core_pus = t.get_number_of_core_pus(num_core);
4,370✔
900
                std::size_t pu_index = next_pu_index[num_core];
4,370✔
901
                bool use_pu = false;
4,370✔
902

903
                // Find the next PU on this core which is in the process mask
904
                while (pu_index < num_core_pus)
4,370✔
905
                {
906
                    use_pu = pu_in_process_mask(
4,370✔
907
                        use_process_mask, t, num_core, pu_index);
4,370✔
908
                    ++pu_index;
4,370✔
909

910
                    if (use_pu)
4,370✔
911
                    {
912
                        break;
4,370✔
913
                    }
914
                }
915

916
                next_pu_index[num_core] = pu_index;
4,370✔
917

918
                if (!use_pu)
4,370✔
919
                {
920
                    continue;
×
921
                }
922

923
                pu_indexes[num_core].push_back(next_pu_index[num_core] - 1);
4,370✔
924

925
                num_pus_cores[num_core]++;
4,370✔
926
                if (++num_thread == num_threads)
4,370✔
927
                    break;
1,220✔
928
            }
3,150✔
929
        }
930

931
        // Iterate over the cores and assigned pus per core. this additional
932
        // loop is needed so that we have consecutive worker thread numbers
933
        std::size_t num_thread = 0;
1,220✔
934
        for (std::size_t num_core = 0; num_core < num_cores; ++num_core)
5,225✔
935
        {
936
            for (std::size_t num_pu = 0; num_pu < num_pus_cores[num_core];
8,375✔
937
                 ++num_pu)
4,370✔
938
            {
939
                if (any(affinities[num_thread]))
4,370✔
940
                {
941
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
942
                        "decode_balanced_distribution",
943
                        "affinity mask for thread {1} has already been set",
944
                        num_thread);
945
                    return;
×
946
                }
947

948
                num_pus[num_thread] = t.get_pu_number(
8,740✔
949
                    num_core + used_cores, pu_indexes[num_core][num_pu]);
4,370✔
950
                affinities[num_thread] = t.init_thread_affinity_mask(
8,740✔
951
                    num_core + used_cores, pu_indexes[num_core][num_pu]);
4,370✔
952
                ++num_thread;
4,370✔
953
            }
4,370✔
954
        }
4,005✔
955
    }
1,220✔
956

957
    ///////////////////////////////////////////////////////////////////////////
958
    void decode_numabalanced_distribution(topology& t,
×
959
        std::vector<mask_type>& affinities, std::size_t used_cores,
960
        std::size_t max_cores, std::vector<std::size_t>& num_pus,
961
        bool use_process_mask, error_code& ec)
962
    {
963
        HPX_UNUSED(max_cores);
×
964
        std::size_t num_threads = affinities.size();
×
965

966
        check_num_threads(use_process_mask, t, num_threads, ec);
×
967

968
        if (use_process_mask)
×
969
        {
970
            used_cores = 0;
×
971
        }
×
972

973
        num_pus.resize(num_threads);
×
974

975
        // numa nodes
976
        std::size_t num_numas =
×
977
            (std::max)(std::size_t(1), t.get_number_of_numa_nodes());
×
978
        std::vector<std::size_t> num_cores_numa(num_numas, 0);
×
979
        std::vector<std::size_t> num_pus_numa(num_numas, 0);
×
980
        std::vector<std::size_t> num_threads_numa(num_numas, 0);
×
981
        for (std::size_t n = 0; n < num_numas; ++n)
×
982
        {
983
            num_cores_numa[n] = t.get_number_of_numa_node_cores(n);
×
984
        }
×
985

986
        std::size_t core_offset = 0;
×
987
        std::size_t pus_t = 0;
×
988
        for (std::size_t n = 0; n < num_numas; ++n)
×
989
        {
990
            for (std::size_t num_core = 0; num_core < num_cores_numa[n];
×
991
                 ++num_core)
×
992
            {
993
                std::size_t num_pus =
×
994
                    t.get_number_of_core_pus(num_core + core_offset);
×
995
                for (std::size_t num_pu = 0; num_pu < num_pus; ++num_pu)
×
996
                {
997
                    if (pu_in_process_mask(use_process_mask, t,
×
998
                            num_core + core_offset, num_pu))
×
999
                    {
1000
                        ++num_pus_numa[n];
×
1001
                    }
×
1002
                }
×
1003
            }
×
1004

1005
            pus_t += num_pus_numa[n];
×
1006
            core_offset += num_cores_numa[n];
×
1007
        }
×
1008

1009
        // how many threads should go on each domain
1010
        std::size_t pus_t2 = 0;
×
1011
        for (std::size_t n = 0; n < num_numas; ++n)
×
1012
        {
1013
            std::size_t temp = static_cast<std::size_t>(
×
1014
                std::round(static_cast<double>(num_threads * num_pus_numa[n]) /
×
1015
                    static_cast<double>(pus_t)));
×
1016

1017
            // due to rounding up, we might have too many threads
1018
            if ((pus_t2 + temp) > num_threads)
×
1019
                temp = num_threads - pus_t2;
×
1020
            pus_t2 += temp;
×
1021
            num_threads_numa[n] = temp;
×
1022

1023
            // HPX_ASSERT(num_threads_numa[n] <= num_pus_numa[n]);
1024
        }
×
1025

1026
        // HPX_ASSERT(num_threads <= pus_t2);
1027

1028
        // assign threads to cores on each numa domain
1029
        std::size_t num_thread = 0;
×
1030
        core_offset = 0;
×
1031
        for (std::size_t n = 0; n < num_numas; ++n)
×
1032
        {
1033
            std::vector<std::size_t> num_pus_cores(num_cores_numa[n], 0);
×
1034
            std::vector<std::size_t> next_pu_index(num_cores_numa[n], 0);
×
1035
            std::vector<std::vector<std::size_t>> pu_indexes(num_cores_numa[n]);
×
1036

1037
            // iterate once and count pus/core
1038
            for (std::size_t num_thread_numa = 0;
×
1039
                 num_thread_numa < num_threads_numa[n];
×
1040
                /**/)
1041
            {
1042
                for (std::size_t num_core = 0; num_core < num_cores_numa[n];
×
1043
                     ++num_core)
×
1044
                {
1045
                    std::size_t num_core_pus =
×
1046
                        t.get_number_of_core_pus(num_core);
×
1047
                    std::size_t pu_index = next_pu_index[num_core];
×
1048
                    bool use_pu = false;
×
1049

1050
                    // Find the next PU on this core which is in the process mask
1051
                    while (pu_index < num_core_pus)
×
1052
                    {
1053
                        use_pu = pu_in_process_mask(use_process_mask, t,
×
1054
                            num_core + core_offset, pu_index);
×
1055
                        ++pu_index;
×
1056

1057
                        if (use_pu)
×
1058
                        {
1059
                            break;
×
1060
                        }
1061
                    }
1062

1063
                    next_pu_index[num_core] = pu_index;
×
1064

1065
                    if (!use_pu)
×
1066
                    {
1067
                        continue;
×
1068
                    }
1069

1070
                    pu_indexes[num_core].push_back(next_pu_index[num_core] - 1);
×
1071

1072
                    num_pus_cores[num_core]++;
×
1073
                    if (++num_thread_numa == num_threads_numa[n])
×
1074
                        break;
×
1075
                }
×
1076
            }
1077

1078
            // Iterate over the cores and assigned pus per core. this additional
1079
            // loop is needed so that we have consecutive worker thread numbers
1080
            for (std::size_t num_core = 0; num_core < num_cores_numa[n];
×
1081
                 ++num_core)
×
1082
            {
1083
                for (std::size_t num_pu = 0; num_pu < num_pus_cores[num_core];
×
1084
                     ++num_pu)
×
1085
                {
1086
                    if (any(affinities[num_thread]))
×
1087
                    {
1088
                        HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
1089
                            "decode_numabalanced_distribution",
1090
                            "affinity mask for thread {1} has already been set",
1091
                            num_thread);
1092
                        return;
×
1093
                    }
1094
                    num_pus[num_thread] = t.get_pu_number(
×
1095
                        num_core + used_cores, pu_indexes[num_core][num_pu]);
×
1096
                    affinities[num_thread] = t.init_thread_affinity_mask(
×
1097
                        num_core + used_cores + core_offset,
×
1098
                        pu_indexes[num_core][num_pu]);
×
1099
                    ++num_thread;
×
1100
                }
×
1101
            }
×
1102
            core_offset += num_cores_numa[n];
×
1103
        }
×
1104
    }
×
1105

1106
    ///////////////////////////////////////////////////////////////////////////
1107
    void decode_distribution(distribution_type d, topology& t,
1,220✔
1108
        std::vector<mask_type>& affinities, std::size_t used_cores,
1109
        std::size_t max_cores, std::size_t num_threads,
1110
        std::vector<std::size_t>& num_pus, bool use_process_mask,
1111
        error_code& ec)
1112
    {
1113
        affinities.resize(num_threads);
1,220✔
1114
        switch (d)
1,220✔
1115
        {
1116
        case compact:
1117
            decode_compact_distribution(t, affinities, used_cores, max_cores,
×
1118
                num_pus, use_process_mask, ec);
×
1119
            break;
×
1120

1121
        case scatter:
1122
            decode_scatter_distribution(t, affinities, used_cores, max_cores,
×
1123
                num_pus, use_process_mask, ec);
×
1124
            break;
×
1125

1126
        case balanced:
1127
            decode_balanced_distribution(t, affinities, used_cores, max_cores,
2,440✔
1128
                num_pus, use_process_mask, ec);
1,220✔
1129
            break;
1,220✔
1130

1131
        case numa_balanced:
1132
            decode_numabalanced_distribution(t, affinities, used_cores,
×
1133
                max_cores, num_pus, use_process_mask, ec);
×
1134
            break;
×
1135

1136
        default:
1137
            HPX_ASSERT(false);
×
1138
        }
×
1139
    }
1,220✔
1140
}}}    // namespace hpx::threads::detail
1141

1142
namespace hpx { namespace threads {
1143
    ///////////////////////////////////////////////////////////////////////////
1144
    void parse_affinity_options(std::string const& spec,
1,240✔
1145
        std::vector<mask_type>& affinities, std::size_t used_cores,
1146
        std::size_t max_cores, std::size_t num_threads,
1147
        std::vector<std::size_t>& num_pus, bool use_process_mask,
1148
        error_code& ec)
1149
    {
1150
        detail::mappings_type mappings;
1,240✔
1151
        detail::parse_mappings(spec, mappings, ec);
1,240✔
1152
        if (ec)
1,240✔
1153
            return;
20✔
1154

1155
        // We need to instantiate a new topology object as the runtime has not
1156
        // been initialized yet
1157
        threads::topology& t = threads::create_topology();
1,220✔
1158

1159
        switch (mappings.which())
1,220✔
1160
        {
1161
        case 0:
1162
        {
1163
            detail::decode_distribution(
1,220✔
1164
                boost::get<detail::distribution_type>(mappings), t, affinities,
1,220✔
1165
                used_cores, max_cores, num_threads, num_pus, use_process_mask,
1,220✔
1166
                ec);
1,220✔
1167
            if (ec)
1,220✔
1168
                return;
×
1169
        }
1170
        break;
1,220✔
1171

1172
        case 1:
1173
        {
1174
            if (use_process_mask)
×
1175
            {
1176
                HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
1177
                    "parse_affinity_options",
1178
                    "can't use --hpx:use-process-mask with custom thread "
1179
                    "bindings");
1180
            }
×
1181
            detail::mappings_spec_type mappings_specs(
×
1182
                boost::get<detail::mappings_spec_type>(mappings));
×
1183

1184
            affinities.clear();
×
1185
            for (detail::full_mapping_type& m : mappings_specs)
×
1186
            {
1187
                if (m.first.type_ != detail::spec_type::thread)
×
1188
                {
1189
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
1190
                        "parse_affinity_options",
1191
                        "bind specification ({1}) is ill formatted", spec);
1192
                    return;
×
1193
                }
1194

1195
                if (m.second.size() != 3)
×
1196
                {
1197
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
1198
                        "parse_affinity_options",
1199
                        "bind specification ({1}) is ill formatted", spec);
1200
                    return;
×
1201
                }
1202

1203
                if (m.second[0].type_ == detail::spec_type::unknown &&
×
1204
                    m.second[1].type_ == detail::spec_type::unknown &&
×
1205
                    m.second[2].type_ == detail::spec_type::unknown)
×
1206
                {
1207
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
1208
                        "parse_affinity_options",
1209
                        "bind specification ({1}) is ill formatted", spec);
1210
                    return;
×
1211
                }
1212

1213
                // repeat for each of the threads in the affinity specification
1214
                detail::bounds_type thread_bounds =
1215
                    extract_bounds(m.first, num_threads, ec);
×
1216
                if (ec)
×
1217
                    return;
×
1218

1219
                mappings_sanity_checks(m, num_threads, thread_bounds, ec);
×
1220
                if (ec)
×
1221
                    return;
×
1222

1223
                detail::decode_mappings(
×
1224
                    t, m, affinities, thread_bounds.size(), ec);
×
1225
                if (ec)
×
1226
                    return;
×
1227
            }
×
1228

1229
            if (num_pus.empty())
×
1230
            {
1231
                num_pus.resize(affinities.size());
×
1232
                for (std::size_t i = 0; i != affinities.size(); ++i)
×
1233
                {
1234
                    num_pus[i] = threads::find_first(affinities[i]);
×
1235
                }
×
1236
            }
×
1237
        }
×
1238
        break;
×
1239
        }
1240
    }
1,240✔
1241
}}    // namespace hpx::threads
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc