• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #862

10 Jan 2023 05:30PM UTC coverage: 86.582% (-0.05%) from 86.634%
#862

push

StellarBot
Merge #6130

6130: Remove the mutex lock in the critical path of get_partitioner. r=hkaiser a=JiakunYan

Remove the mutex lock in the critical path of hpx::resource::detail::get_partitioner.

The protected variable `partitioner_ref` is only set once during initialization.

Co-authored-by: Jiakun Yan <jiakunyan1998@gmail.com>

6 of 6 new or added lines in 1 file covered. (100.0%)

174767 of 201851 relevant lines covered (86.58%)

2069816.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.7
/libs/core/affinity/src/affinity_data.cpp
1
//  Copyright (c) 2007-2022 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#include <hpx/affinity/affinity_data.hpp>
8
#include <hpx/affinity/parse_affinity_options.hpp>
9
#include <hpx/assert.hpp>
10
#include <hpx/modules/errors.hpp>
11
#include <hpx/topology/cpu_mask.hpp>
12
#include <hpx/topology/topology.hpp>
13

14
#include <algorithm>
15
#include <atomic>
16
#include <cstddef>
17
#include <string>
18
#include <utility>
19
#include <vector>
20

21
namespace hpx::threads::policies::detail {
22

23
    inline std::size_t count_initialized(
1,224✔
24
        std::vector<mask_type> const& masks) noexcept
25
    {
26
        std::size_t count = 0;
1,224✔
27
        for (mask_cref_type m : masks)
5,606✔
28
        {
29
            if (threads::any(m))
4,382✔
30
                ++count;
4,382✔
31
        }
32
        return count;
1,224✔
33
    }
34

35
    affinity_data::affinity_data()
2,448✔
36
      : num_threads_(0)
2,448✔
37
      , pu_offset_(std::size_t(-1))
2,448✔
38
      , pu_step_(1)
2,448✔
39
      , used_cores_(0)
2,448✔
40
      , affinity_domain_("pu")
2,448✔
41
      , affinity_masks_()
2,448✔
42
      , pu_nums_()
2,448✔
43
      , no_affinity_()
2,448✔
44
      , use_process_mask_(false)
2,448✔
45
      , num_pus_needed_(0)
2,448✔
46
    {
47
        threads::resize(
2,448✔
48
            no_affinity_, static_cast<std::size_t>(hardware_concurrency()));
2,448✔
49
    }
2,448✔
50

51
    affinity_data::~affinity_data()
2,448✔
52
    {
53
        --instance_number_counter_;
2,448✔
54
    }
2,448✔
55

56
    void affinity_data::init(std::size_t num_threads, std::size_t max_cores,
1,224✔
57
        std::size_t pu_offset, std::size_t pu_step, std::size_t used_cores,
58
        std::string affinity_domain,    // -V813
59
        std::string const& affinity_description, bool use_process_mask)
60
    {
61
#if defined(__APPLE__)
62
        use_process_mask = false;
63
#endif
64

65
        use_process_mask_ = use_process_mask;
1,224✔
66
        num_threads_ = num_threads;
1,224✔
67
        std::size_t num_system_pus =
1,224✔
68
            static_cast<std::size_t>(hardware_concurrency());
1,224✔
69

70
        if (pu_offset == std::size_t(-1))
1,224✔
71
        {
72
            pu_offset_ = 0;
×
73
        }
×
74
        else
75
        {
76
            pu_offset_ = pu_offset;
1,224✔
77
        }
78

79
        if (num_system_pus > 1)
1,224✔
80
        {
81
            pu_step_ = pu_step % num_system_pus;
1,224✔
82
        }
1,224✔
83

84
        affinity_domain_ = HPX_MOVE(affinity_domain);
1,224✔
85
        pu_nums_.clear();
1,224✔
86

87
        init_cached_pu_nums(num_system_pus);
1,224✔
88

89
        auto const& topo = threads::create_topology();
1,224✔
90

91
        if (affinity_description == "none")
1,224✔
92
        {
93
            // don't use any affinity for any of the os-threads
94
            threads::resize(no_affinity_, num_system_pus);
×
95
            for (std::size_t i = 0; i != num_threads_; ++i)
×
96
                threads::set(no_affinity_, get_pu_num(i));
×
97
        }
×
98
        else if (!affinity_description.empty())
1,224✔
99
        {
100
            affinity_masks_.clear();
1,224✔
101
            affinity_masks_.resize(num_threads_, mask_type{});
1,224✔
102

103
            for (std::size_t i = 0; i != num_threads_; ++i)
5,606✔
104
                threads::resize(affinity_masks_[i], num_system_pus);
4,382✔
105

106
            parse_affinity_options(affinity_description, affinity_masks_,
2,448✔
107
                used_cores, max_cores, num_threads_, pu_nums_,
1,224✔
108
                use_process_mask_);
1,224✔
109

110
            std::size_t num_initialized = count_initialized(affinity_masks_);
1,224✔
111
            if (num_initialized != num_threads_)
1,224✔
112
            {
113
                HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
×
114
                    "affinity_data::affinity_data",
115
                    "The number of OS threads requested ({1}) does not match "
116
                    "the number of threads to bind ({2})",
117
                    num_threads_, num_initialized);
118
            }
119
        }
1,224✔
120
        else if (pu_offset == std::size_t(-1))
×
121
        {
122
            // calculate the pu offset based on the used cores, but only if its
123
            // not explicitly specified
124
            for (std::size_t num_core = 0; num_core != used_cores; ++num_core)
×
125
            {
126
                pu_offset_ += topo.get_number_of_core_pus(num_core);
×
127
            }
×
128
        }
×
129

130
        // correct used_cores from config data if appropriate
131
        if (used_cores_ == 0)
1,224✔
132
        {
133
            used_cores_ = used_cores;
1,224✔
134
        }
1,224✔
135

136
        pu_offset_ %= num_system_pus;
1,224✔
137

138
        std::vector<std::size_t> cores;
1,224✔
139
        cores.reserve(num_threads_);
1,224✔
140
        for (std::size_t i = 0; i != num_threads_; ++i)
5,606✔
141
        {
142
            std::size_t add_me = topo.get_core_number(get_pu_num(i));
4,382✔
143
            cores.push_back(add_me);
4,382✔
144
        }
4,382✔
145

146
        std::sort(cores.begin(), cores.end());
1,224✔
147
        std::vector<std::size_t>::iterator it =
148
            std::unique(cores.begin(), cores.end());
1,224✔
149
        cores.erase(it, cores.end());
1,224✔
150

151
        std::size_t num_unique_cores = cores.size();
1,224✔
152

153
        num_pus_needed_ = (std::max)(num_unique_cores, max_cores);
1,224✔
154
    }
1,224✔
155

156
    mask_type affinity_data::get_pu_mask(
614,618✔
157
        threads::topology const& topo, std::size_t global_thread_num) const
158
    {
159
        // --hpx:bind=none disables all affinity
160
        if (threads::test(no_affinity_, global_thread_num))
614,604✔
161
        {
162
            mask_type m = mask_type();
×
163
            threads::resize(
×
164
                m, static_cast<std::size_t>(hardware_concurrency()));
×
165
            threads::set(m, get_pu_num(global_thread_num));
×
166
            return m;
×
167
        }
×
168

169
        // if we have individual, predefined affinity masks, return those
170
        if (!affinity_masks_.empty())
614,591✔
171
            return affinity_masks_[global_thread_num];
614,600✔
172

173
        // otherwise return mask based on affinity domain
174
        std::size_t pu_num = get_pu_num(global_thread_num);
×
175
        if (0 == std::string("pu").find(affinity_domain_))
×
176
        {
177
            // The affinity domain is 'processing unit', just convert the
178
            // pu-number into a bit-mask.
179
            return topo.get_thread_affinity_mask(pu_num);
×
180
        }
181
        if (0 == std::string("core").find(affinity_domain_))
×
182
        {
183
            // The affinity domain is 'core', return a bit mask corresponding
184
            // to all processing units of the core containing the given
185
            // pu_num.
186
            return topo.get_core_affinity_mask(pu_num);
×
187
        }
188
        if (0 == std::string("numa").find(affinity_domain_))
×
189
        {
190
            // The affinity domain is 'numa', return a bit mask corresponding
191
            // to all processing units of the NUMA domain containing the
192
            // given pu_num.
193
            return topo.get_numa_node_affinity_mask(pu_num);
×
194
        }
195

196
        // The affinity domain is 'machine', return a bit mask corresponding
197
        // to all processing units of the machine.
198
        HPX_ASSERT(0 == std::string("machine").find(affinity_domain_));
×
199
        return topo.get_machine_affinity_mask();
×
200
    }
614,600✔
201

202
    mask_type affinity_data::get_used_pus_mask(
88,062✔
203
        threads::topology const& topo, std::size_t pu_num) const
204
    {
205
        auto overall_threads = static_cast<std::size_t>(hardware_concurrency());
88,062✔
206

207
        mask_type ret = mask_type();
88,062✔
208
        threads::resize(ret, overall_threads);
88,062✔
209

210
        // --hpx:bind=none disables all affinity
211
        if (threads::test(no_affinity_, pu_num))
88,062✔
212
        {
213
            threads::set(ret, pu_num);
×
214
            return ret;
×
215
        }
216

217
        for (std::size_t thread_num = 0; thread_num != num_threads_;
403,302✔
218
             ++thread_num)
315,240✔
219
        {
220
            auto thread_mask = get_pu_mask(topo, thread_num);
315,240✔
221
            for (std::size_t i = 0; i != overall_threads; ++i)
23,012,520✔
222
            {
223
                if (threads::test(thread_mask, i))
22,697,280✔
224
                {
225
                    threads::set(ret, i);
315,240✔
226
                }
315,240✔
227
            }
22,697,280✔
228
        }
315,240✔
229

230
        return ret;
88,062✔
231
    }
88,062✔
232

233
    std::size_t affinity_data::get_thread_occupancy(
4,378✔
234
        threads::topology const& topo, std::size_t pu_num) const
235
    {
236
        std::size_t count = 0;
4,378✔
237
        if (threads::test(no_affinity_, pu_num))
4,378✔
238
        {
239
            ++count;
×
240
        }
×
241
        else
242
        {
243
            mask_type pu_mask = mask_type();
4,378✔
244

245
            threads::resize(
4,378✔
246
                pu_mask, static_cast<std::size_t>(hardware_concurrency()));
4,378✔
247
            threads::set(pu_mask, pu_num);
4,378✔
248

249
            for (std::size_t num_thread = 0; num_thread != num_threads_;
91,222✔
250
                 ++num_thread)
86,844✔
251
            {
252
                mask_cref_type affinity_mask = get_pu_mask(topo, num_thread);
86,844✔
253
                if (threads::any(pu_mask & affinity_mask))
86,844✔
254
                    ++count;
4,378✔
255
            }
86,844✔
256
        }
4,378✔
257
        return count;
4,378✔
258
    }
×
259

260
    // means of adding a processing unit after initialization
261
    void affinity_data::add_punit(std::size_t virt_core, std::size_t thread_num)
×
262
    {
263
        std::size_t num_system_pus =
×
264
            static_cast<std::size_t>(hardware_concurrency());
×
265

266
        // initialize affinity_masks and set the mask for the given virt_core
267
        if (affinity_masks_.empty())
×
268
        {
269
            affinity_masks_.resize(num_threads_);
×
270
            for (std::size_t i = 0; i != num_threads_; ++i)
×
271
                threads::resize(affinity_masks_[i], num_system_pus);
×
272
        }
×
273
        threads::set(affinity_masks_[virt_core], thread_num);
×
274

275
        // find first used pu, which is then stored as the pu_offset
276
        std::size_t first_pu = std::size_t(-1);
×
277
        for (std::size_t i = 0; i != num_threads_; ++i)
×
278
        {
279
            std::size_t first = threads::find_first(affinity_masks_[i]);
×
280
            first_pu = (std::min)(first_pu, first);
×
281
        }
×
282
        if (first_pu != std::size_t(-1))
×
283
            pu_offset_ = first_pu;
×
284

285
        init_cached_pu_nums(num_system_pus);
×
286
    }
×
287

288
    void affinity_data::init_cached_pu_nums(std::size_t hardware_concurrency)
1,224✔
289
    {
290
        if (pu_nums_.empty())
1,224✔
291
        {
292
            pu_nums_.resize(num_threads_);
1,224✔
293
            for (std::size_t i = 0; i != num_threads_; ++i)
5,606✔
294
            {
295
                pu_nums_[i] = get_pu_num(i, hardware_concurrency);
4,382✔
296
            }
4,382✔
297
        }
1,224✔
298
    }
1,224✔
299

300
    std::size_t affinity_data::get_pu_num(
4,382✔
301
        std::size_t num_thread, std::size_t hardware_concurrency) const
302
    {
303
        // The offset shouldn't be larger than the number of available
304
        // processing units.
305
        HPX_ASSERT(pu_offset_ < hardware_concurrency);
4,382✔
306

307
        // The distance between assigned processing units shouldn't be zero
308
        HPX_ASSERT(pu_step_ > 0 && pu_step_ <= hardware_concurrency);
4,382✔
309

310
        // We 'scale' the thread number to compute the corresponding
311
        // processing unit number.
312
        //
313
        // The base line processing unit number is computed from the given
314
        // pu-offset and pu-step.
315
        std::size_t num_pu = pu_offset_ + pu_step_ * num_thread;
4,382✔
316

317
        // We add an additional offset, which allows to 'roll over' if the
318
        // pu number would get larger than the number of available
319
        // processing units. Note that it does not make sense to 'roll over'
320
        // farther than the given pu-step.
321
        std::size_t offset = (num_pu / hardware_concurrency) % pu_step_;
4,382✔
322

323
        // The resulting pu number has to be smaller than the available
324
        // number of processing units.
325
        return (num_pu + offset) % hardware_concurrency;
4,382✔
326
    }
327

328
    std::atomic<int> affinity_data::instance_number_counter_(-1);
329
}    // namespace hpx::threads::policies::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc