• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.27
/libs/core/affinity/src/affinity_data.cpp
1
//  Copyright (c) 2007-2025 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#include <hpx/affinity/affinity_data.hpp>
8
#include <hpx/affinity/parse_affinity_options.hpp>
9
#include <hpx/assert.hpp>
10
#include <hpx/modules/errors.hpp>
11
#include <hpx/modules/topology.hpp>
12

13
#include <algorithm>
14
#include <atomic>
15
#include <cstddef>
16
#include <string>
17
#include <utility>
18
#include <vector>
19

20
namespace hpx::threads::policies::detail {
21

22
    namespace {
23

6✔
24
        inline std::size_t count_initialized(
25
            std::vector<mask_type> const& masks) noexcept
26
        {
27
            std::size_t count = 0;
12✔
28
            for (mask_cref_type m : masks)
29
            {
6✔
30
                if (threads::any(m))
6✔
31
                    ++count;
32
            }
6✔
33
            return count;
34
        }
35
    }    // namespace
128✔
36

128✔
37
    affinity_data::affinity_data()
128✔
38
      : num_threads_(0)
128✔
39
      , pu_offset_(static_cast<std::size_t>(-1))
128✔
40
      , pu_step_(1)
128✔
41
      , used_cores_(0)
128✔
42
      , affinity_domain_("pu")
128✔
43
      , no_affinity_()
44
      , disable_affinities_(false)
128✔
45
      , use_process_mask_(false)
128✔
46
      , num_pus_needed_(0)
128✔
47
    {
48
        threads::resize(
×
49
            no_affinity_, static_cast<std::size_t>(hardware_concurrency()));
×
50
    }
64✔
51

×
52
    affinity_data::affinity_data(affinity_data const&) = default;
53
    affinity_data::affinity_data(affinity_data&&) noexcept = default;
128✔
54
    affinity_data& affinity_data::operator=(affinity_data const&) = default;
55
    affinity_data& affinity_data::operator=(affinity_data&&) noexcept = default;
56

128✔
57
    affinity_data::~affinity_data()
58
    {
64✔
59
        --instance_number_counter_;
60
    }
61

62
    void affinity_data::init(std::size_t const num_threads,
63
        std::size_t const max_cores, std::size_t const pu_offset,
64
        std::size_t const pu_step, std::size_t const used_cores,
65
        std::string affinity_domain,    // -V813
66
        std::string const& affinity_description, bool use_process_mask)
67
    {
64✔
68
#if defined(__APPLE__)
64✔
69
        use_process_mask = false;
70
        disable_affinities_ = true;
64✔
71
#endif
72

64✔
73
        use_process_mask_ = use_process_mask;
74
        num_threads_ = num_threads;
×
75
        std::size_t const num_system_pus =
76
            static_cast<std::size_t>(hardware_concurrency());
77

78
        if (pu_offset == static_cast<std::size_t>(-1))
64✔
79
        {
80
            pu_offset_ = 0;
81
        }
64✔
82
        else
83
        {
64✔
84
            pu_offset_ = pu_offset;
85
        }
86

64✔
87
        if (num_system_pus > 1)
64✔
88
        {
89
            pu_step_ = pu_step % num_system_pus;
64✔
90
        }
91

64✔
92
        affinity_domain_ = HPX_MOVE(affinity_domain);
93
        pu_nums_.clear();
64✔
94

95
        init_cached_pu_nums(num_system_pus);
96

58✔
97
        auto const& topo = threads::create_topology();
182✔
98

99
        if (affinity_description == "none")
100
        {
6✔
101
            // don't use any affinity for any of the os-threads
102
            threads::resize(no_affinity_, num_system_pus);
6✔
103
            for (std::size_t i = 0; i != num_threads_; ++i)
6✔
104
                threads::set(no_affinity_, get_pu_num(i));
105
            disable_affinities_ = true;
12✔
106
        }
107
        else if (!affinity_description.empty())
108
        {
6✔
109
            affinity_masks_.clear();
110
            affinity_masks_.resize(num_threads_, mask_type{});
6✔
111

112
            for (std::size_t i = 0; i != num_threads_; ++i)
6✔
113
                threads::resize(affinity_masks_[i], num_system_pus);
6✔
114

6✔
115
            parse_affinity_options(affinity_description, affinity_masks_,
116
                used_cores, max_cores, num_threads_, pu_nums_,
×
117
                use_process_mask_);
118

119
            if (std::size_t const num_initialized =
120
                    count_initialized(affinity_masks_);
121
                num_initialized != num_threads_)
122
            {
123
                HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
×
124
                    "affinity_data::affinity_data",
125
                    "The number of OS threads requested ({1}) does not match "
126
                    "the number of threads to bind ({2})",
127
                    num_threads_, num_initialized);
×
128
            }
129
        }
×
130
        else if (pu_offset == static_cast<std::size_t>(-1))
131
        {
132
            // calculate the pu offset based on the used cores, but only if it's
133
            // not explicitly specified
134
            for (std::size_t num_core = 0; num_core != used_cores; ++num_core)
64✔
135
            {
136
                pu_offset_ += topo.get_number_of_core_pus(num_core);
64✔
137
            }
138
        }
139

64✔
140
        // correct used_cores from config data if appropriate
141
        if (used_cores_ == 0)
64✔
142
        {
64✔
143
            used_cores_ = used_cores;
194✔
144
        }
145

130✔
146
        pu_offset_ %= num_system_pus;
130✔
147

148
        std::vector<std::size_t> cores;
149
        cores.reserve(num_threads_);
64✔
150
        for (std::size_t i = 0; i != num_threads_; ++i)
64✔
151
        {
152
            std::size_t const add_me = topo.get_core_number(get_pu_num(i));
153
            cores.push_back(add_me);
64✔
154
        }
64✔
155

64✔
156
        std::sort(cores.begin(), cores.end());
157
        auto const it = std::unique(cores.begin(), cores.end());
1,966✔
158
        cores.erase(it, cores.end());
159

160
        std::size_t const num_unique_cores = cores.size();
161
        num_pus_needed_ = (std::max) (num_unique_cores, max_cores);
1,966✔
162
    }
163

1,852✔
164
    void affinity_data::set_num_threads(size_t const num_threads) noexcept
1,852✔
165
    {
1,852✔
166
        num_threads_ = num_threads;
167
    }
168

169
    void affinity_data::set_affinity_masks(
170
        std::vector<threads::mask_type> const& affinity_masks)
171
    {
114✔
172
        affinity_masks_ = affinity_masks;
173
    }
174

175
    void affinity_data::set_affinity_masks(
176
        std::vector<threads::mask_type>&& affinity_masks) noexcept
×
177
    {
178
        affinity_masks_ = HPX_MOVE(affinity_masks);
179
    }
180

×
181
    mask_type affinity_data::get_pu_mask(threads::topology const& topo,
182
        std::size_t const global_thread_num) const
×
183
    {
184
        // --hpx:bind=none disables all affinity
185
        if (threads::test(no_affinity_, global_thread_num))
186
        {
187
            auto m = mask_type();
×
188
            threads::resize(
189
                m, static_cast<std::size_t>(hardware_concurrency()));
×
190
            threads::set(m, get_pu_num(global_thread_num));
191
            return m;
192
        }
193

194
        // if we have individual, predefined affinity masks, return those
×
195
        if (!affinity_masks_.empty())
196
            return affinity_masks_[global_thread_num];
197

198
        // otherwise return mask based on affinity domain
199
        std::size_t const pu_num = get_pu_num(global_thread_num);
200
        if (0 == std::string("pu").find(affinity_domain_))
×
201
        {
202
            // The affinity domain is 'processing unit', just convert the
203
            // pu-number into a bit-mask.
1,024✔
204
            return topo.get_thread_affinity_mask(pu_num);
205
        }
206
        if (0 == std::string("core").find(affinity_domain_))
207
        {
1,024✔
208
            // The affinity domain is 'core', return a bit mask corresponding to
209
            // all processing units of the core containing the given pu_num.
1,024✔
210
            return topo.get_core_affinity_mask(pu_num);
211
        }
212
        if (0 == std::string("numa").find(affinity_domain_))
213
        {
1,024✔
214
            // The affinity domain is 'numa', return a bit mask corresponding to
215
            // all processing units of the NUMA domain containing the given
216
            // pu_num.
217
            return topo.get_numa_node_affinity_mask(pu_num);
218
        }
219

2,592✔
220
        // The affinity domain is 'machine', return a bit mask corresponding
221
        // to all processing units of the machine.
222
        HPX_ASSERT(0 == std::string("machine").find(affinity_domain_));
1,692✔
223
        return topo.get_machine_affinity_mask();
28,764✔
224
    }
225

27,072✔
226
    mask_type affinity_data::get_used_pus_mask(
227
        threads::topology const& topo, std::size_t const pu_num) const
228
    {
229
        auto const overall_threads =
230
            static_cast<std::size_t>(hardware_concurrency());
231

232
        auto ret = mask_type();
233
        threads::resize(ret, overall_threads);
234

235
        // --hpx:bind=none disables all affinity
130✔
236
        if (static_cast<std::size_t>(-1) != pu_num &&
237
            threads::test(no_affinity_, pu_num))
238
        {
239
            threads::set(ret, pu_num);
130✔
240
            return ret;
241
        }
242

243
        for (std::size_t thread_num = 0; thread_num != num_threads_;
244
            ++thread_num)
245
        {
6✔
246
            auto const thread_mask = get_pu_mask(topo, thread_num);
247
            for (std::size_t i = 0; i != overall_threads; ++i)
6✔
248
            {
6✔
249
                if (threads::test(no_affinity_, i) ||
250
                    threads::test(thread_mask, i))
251
                {
12✔
252
                    threads::set(ret, i);
253
                }
254
            }
6✔
255
        }
12✔
256
        return ret;
6✔
257
    }
258

259
    std::size_t affinity_data::get_thread_occupancy(
130✔
260
        threads::topology const& topo, std::size_t const pu_num) const
261
    {
262
        std::size_t count = 0;
263
        if (threads::test(no_affinity_, pu_num))
×
264
        {
265
            ++count;
266
        }
×
267
        else
268
        {
269
            auto pu_mask = mask_type();
×
270

271
            threads::resize(
×
272
                pu_mask, static_cast<std::size_t>(hardware_concurrency()));
×
273
            threads::set(pu_mask, pu_num);
274

275
            // clang-format off
276
            for (std::size_t num_thread = 0; num_thread != num_threads_;
277
                ++num_thread)
278
            // clang-format on
×
279
            {
×
280
                mask_cref_type affinity_mask = get_pu_mask(topo, num_thread);
281
                if (threads::any(pu_mask & affinity_mask))
×
282
                    ++count;
×
283
            }
284
        }
×
285
        return count;
×
286
    }
287

×
288
    std::size_t affinity_data::get_pu_num(
×
289
        std::size_t const num_thread) const noexcept
290
    {
64✔
291
        HPX_ASSERT(num_thread < pu_nums_.size());
292
        return pu_nums_[num_thread];
64✔
293
    }
294

64✔
295
    void affinity_data::set_pu_nums(std::vector<std::size_t> const& pu_nums)
194✔
296
    {
297
        pu_nums_ = pu_nums;
130✔
298
    }
299

300
    void affinity_data::set_pu_nums(std::vector<std::size_t>&& pu_nums) noexcept
64✔
301
    {
302
        pu_nums_ = HPX_MOVE(pu_nums);
130✔
303
    }
304

305
    // means of adding a processing unit after initialization
306
    void affinity_data::add_punit(
307
        std::size_t const virt_core, std::size_t const thread_num)
308
    {
309
        std::size_t const num_system_pus =
310
            static_cast<std::size_t>(hardware_concurrency());
311

312
        // initialize affinity_masks and set the mask for the given virt_core
313
        if (affinity_masks_.empty())
314
        {
315
            affinity_masks_.resize(num_threads_);
316
            for (std::size_t i = 0; i != num_threads_; ++i)
317
                threads::resize(affinity_masks_[i], num_system_pus);
130✔
318
        }
319
        threads::set(affinity_masks_[virt_core], thread_num);
320

321
        // find first used pu, which is then stored as the pu_offset
322
        std::size_t first_pu = static_cast<std::size_t>(-1);
323
        for (std::size_t i = 0; i != num_threads_; ++i)
130✔
324
        {
325
            std::size_t first = threads::find_first(affinity_masks_[i]);
326
            first_pu = (std::min) (first_pu, first);
327
        }
130✔
328
        if (first_pu != static_cast<std::size_t>(-1))
329
            pu_offset_ = first_pu;
330

331
        init_cached_pu_nums(num_system_pus);
332
    }
333

334
    void affinity_data::init_cached_pu_nums(
335
        std::size_t const hardware_concurrency)
336
    {
337
        if (pu_nums_.empty())
338
        {
339
            pu_nums_.resize(num_threads_);
340
            for (std::size_t i = 0; i != num_threads_; ++i)
341
            {
342
                pu_nums_[i] = get_pu_num(i, hardware_concurrency);
343
            }
344
        }
345
    }
346

347
    std::size_t affinity_data::get_pu_num(std::size_t const num_thread,
348
        std::size_t const hardware_concurrency) const
349
    {
350
        // The offset shouldn't be larger than the number of available
351
        // processing units.
352
        HPX_ASSERT(pu_offset_ < hardware_concurrency);
353

354
        // The distance between assigned processing units shouldn't be zero
355
        HPX_ASSERT(pu_step_ > 0 && pu_step_ <= hardware_concurrency);
356

357
        // We 'scale' the thread number to compute the corresponding processing
358
        // unit number.
359
        //
360
        // The baseline processing unit number is computed from the given
361
        // pu-offset and pu-step.
362
        std::size_t const num_pu = pu_offset_ + pu_step_ * num_thread;
363

364
        // We add an offset, which allows to 'roll over' if the pu number would
365
        // get larger than the number of available processing units. Note that
366
        // it does not make sense to 'roll over' farther than the given pu-step.
367
        std::size_t const offset = (num_pu / hardware_concurrency) % pu_step_;
368

369
        // The resulting pu number has to be smaller than the available
370
        // number of processing units.
371
        return (num_pu + offset) % hardware_concurrency;
372
    }
373

374
    std::atomic<int> affinity_data::instance_number_counter_(-1);
375
}    // namespace hpx::threads::policies::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc