• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #856

28 Dec 2022 02:00AM UTC coverage: 86.602% (+0.05%) from 86.55%
#856

push

StellarBot
Merge #6119

6119: Update CMakeLists.txt r=hkaiser a=khuck

updating the default APEX version


Co-authored-by: Kevin Huck <khuck@cs.uoregon.edu>

174566 of 201573 relevant lines covered (86.6%)

1876093.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.0
/libs/core/affinity/src/affinity_data.cpp
1
//  Copyright (c) 2007-2022 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#include <hpx/affinity/affinity_data.hpp>
8
#include <hpx/affinity/parse_affinity_options.hpp>
9
#include <hpx/assert.hpp>
10
#include <hpx/modules/errors.hpp>
11
#include <hpx/topology/cpu_mask.hpp>
12
#include <hpx/topology/topology.hpp>
13

14
#include <algorithm>
15
#include <atomic>
16
#include <cstddef>
17
#include <string>
18
#include <utility>
19
#include <vector>
20

21
namespace hpx::threads::policies::detail {
22

23
    inline std::size_t count_initialized(
1,222✔
24
        std::vector<mask_type> const& masks) noexcept
25
    {
26
        std::size_t count = 0;
1,222✔
27
        for (mask_cref_type m : masks)
5,596✔
28
        {
29
            if (threads::any(m))
4,374✔
30
                ++count;
4,374✔
31
        }
32
        return count;
1,222✔
33
    }
34

35
    affinity_data::affinity_data()
2,443✔
36
      : num_threads_(0)
2,443✔
37
      , pu_offset_(std::size_t(-1))
2,443✔
38
      , pu_step_(1)
2,443✔
39
      , used_cores_(0)
2,443✔
40
      , affinity_domain_("pu")
2,443✔
41
      , affinity_masks_()
2,443✔
42
      , pu_nums_()
2,443✔
43
      , no_affinity_()
2,443✔
44
      , use_process_mask_(false)
2,443✔
45
      , num_pus_needed_(0)
2,443✔
46
    {
47
        threads::resize(no_affinity_, hardware_concurrency());
2,443✔
48
    }
2,443✔
49

50
    affinity_data::~affinity_data()
7,327✔
51
    {
52
        --instance_number_counter_;
7,327✔
53
    }
7,327✔
54

55
    void affinity_data::init(std::size_t num_threads, std::size_t max_cores,
1,222✔
56
        std::size_t pu_offset, std::size_t pu_step, std::size_t used_cores,
57
        std::string affinity_domain,    // -V813
58
        std::string const& affinity_description, bool use_process_mask)
59
    {
60
#if defined(__APPLE__)
61
        use_process_mask = false;
62
#endif
63

64
        use_process_mask_ = use_process_mask;
1,222✔
65
        num_threads_ = num_threads;
1,222✔
66
        std::size_t num_system_pus = hardware_concurrency();
1,222✔
67

68
        if (pu_offset == std::size_t(-1))
1,222✔
69
        {
70
            pu_offset_ = 0;
×
71
        }
×
72
        else
73
        {
74
            pu_offset_ = pu_offset;
1,222✔
75
        }
76

77
        if (num_system_pus > 1)
1,222✔
78
        {
79
            pu_step_ = pu_step % num_system_pus;
1,222✔
80
        }
1,222✔
81

82
        affinity_domain_ = HPX_MOVE(affinity_domain);
1,222✔
83
        pu_nums_.clear();
1,222✔
84

85
        init_cached_pu_nums(num_system_pus);
1,222✔
86

87
        auto const& topo = threads::create_topology();
1,222✔
88

89
        if (affinity_description == "none")
1,222✔
90
        {
91
            // don't use any affinity for any of the os-threads
92
            threads::resize(no_affinity_, num_system_pus);
×
93
            for (std::size_t i = 0; i != num_threads_; ++i)
×
94
                threads::set(no_affinity_, get_pu_num(i));
×
95
        }
×
96
        else if (!affinity_description.empty())
1,222✔
97
        {
98
            affinity_masks_.clear();
1,222✔
99
            affinity_masks_.resize(num_threads_, mask_type{});
1,222✔
100

101
            for (std::size_t i = 0; i != num_threads_; ++i)
5,596✔
102
                threads::resize(affinity_masks_[i], num_system_pus);
4,374✔
103

104
            parse_affinity_options(affinity_description, affinity_masks_,
2,444✔
105
                used_cores, max_cores, num_threads_, pu_nums_,
1,222✔
106
                use_process_mask_);
1,222✔
107

108
            std::size_t num_initialized = count_initialized(affinity_masks_);
1,222✔
109
            if (num_initialized != num_threads_)
1,222✔
110
            {
111
                HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
×
112
                    "affinity_data::affinity_data",
113
                    "The number of OS threads requested ({1}) does not match "
114
                    "the number of threads to bind ({2})",
115
                    num_threads_, num_initialized);
116
            }
117
        }
1,222✔
118
        else if (pu_offset == std::size_t(-1))
×
119
        {
120
            // calculate the pu offset based on the used cores, but only if its
121
            // not explicitly specified
122
            for (std::size_t num_core = 0; num_core != used_cores; ++num_core)
×
123
            {
124
                pu_offset_ += topo.get_number_of_core_pus(num_core);
×
125
            }
×
126
        }
×
127

128
        // correct used_cores from config data if appropriate
129
        if (used_cores_ == 0)
1,222✔
130
        {
131
            used_cores_ = used_cores;
1,222✔
132
        }
1,222✔
133

134
        pu_offset_ %= num_system_pus;
1,222✔
135

136
        std::vector<std::size_t> cores;
1,222✔
137
        cores.reserve(num_threads_);
1,222✔
138
        for (std::size_t i = 0; i != num_threads_; ++i)
5,596✔
139
        {
140
            std::size_t add_me = topo.get_core_number(get_pu_num(i));
4,374✔
141
            cores.push_back(add_me);
4,374✔
142
        }
4,374✔
143

144
        std::sort(cores.begin(), cores.end());
1,222✔
145
        std::vector<std::size_t>::iterator it =
146
            std::unique(cores.begin(), cores.end());
1,222✔
147
        cores.erase(it, cores.end());
1,222✔
148

149
        std::size_t num_unique_cores = cores.size();
1,222✔
150

151
        num_pus_needed_ = (std::max)(num_unique_cores, max_cores);
1,222✔
152
    }
1,222✔
153

154
    mask_type affinity_data::get_pu_mask(
412,324✔
155
        threads::topology const& topo, std::size_t global_thread_num) const
156
    {
157
        // --hpx:bind=none disables all affinity
158
        if (threads::test(no_affinity_, global_thread_num))
412,306✔
159
        {
160
            mask_type m = mask_type();
×
161
            threads::resize(m, hardware_concurrency());
×
162
            threads::set(m, get_pu_num(global_thread_num));
×
163
            return m;
×
164
        }
×
165

166
        // if we have individual, predefined affinity masks, return those
167
        if (!affinity_masks_.empty())
412,295✔
168
            return affinity_masks_[global_thread_num];
412,314✔
169

170
        // otherwise return mask based on affinity domain
171
        std::size_t pu_num = get_pu_num(global_thread_num);
×
172
        if (0 == std::string("pu").find(affinity_domain_))
×
173
        {
174
            // The affinity domain is 'processing unit', just convert the
175
            // pu-number into a bit-mask.
176
            return topo.get_thread_affinity_mask(pu_num);
×
177
        }
178
        if (0 == std::string("core").find(affinity_domain_))
×
179
        {
180
            // The affinity domain is 'core', return a bit mask corresponding
181
            // to all processing units of the core containing the given
182
            // pu_num.
183
            return topo.get_core_affinity_mask(pu_num);
×
184
        }
185
        if (0 == std::string("numa").find(affinity_domain_))
×
186
        {
187
            // The affinity domain is 'numa', return a bit mask corresponding
188
            // to all processing units of the NUMA domain containing the
189
            // given pu_num.
190
            return topo.get_numa_node_affinity_mask(pu_num);
×
191
        }
192

193
        // The affinity domain is 'machine', return a bit mask corresponding
194
        // to all processing units of the machine.
195
        HPX_ASSERT(0 == std::string("machine").find(affinity_domain_));
×
196
        return topo.get_machine_affinity_mask();
×
197
    }
412,314✔
198

199
    mask_type affinity_data::get_used_pus_mask(
87,918✔
200
        threads::topology const& topo, std::size_t pu_num) const
201
    {
202
        auto overall_threads = hardware_concurrency();
87,918✔
203

204
        mask_type ret = mask_type();
87,918✔
205
        threads::resize(ret, overall_threads);
87,918✔
206

207
        // --hpx:bind=none disables all affinity
208
        if (threads::test(no_affinity_, pu_num))
87,918✔
209
        {
210
            threads::set(ret, pu_num);
×
211
            return ret;
×
212
        }
213

214
        for (std::size_t thread_num = 0; thread_num != num_threads_;
402,582✔
215
             ++thread_num)
314,664✔
216
        {
217
            auto thread_mask = get_pu_mask(topo, thread_num);
314,664✔
218
            for (std::size_t i = 0; i != overall_threads; ++i)
22,970,472✔
219
            {
220
                if (threads::test(thread_mask, i))
22,655,808✔
221
                {
222
                    threads::set(ret, i);
314,664✔
223
                }
314,664✔
224
            }
22,655,808✔
225
        }
314,664✔
226

227
        return ret;
87,918✔
228
    }
87,918✔
229

230
    std::size_t affinity_data::get_thread_occupancy(
4,370✔
231
        threads::topology const& topo, std::size_t pu_num) const
232
    {
233
        std::size_t count = 0;
4,370✔
234
        if (threads::test(no_affinity_, pu_num))
4,370✔
235
        {
236
            ++count;
×
237
        }
×
238
        else
239
        {
240
            mask_type pu_mask = mask_type();
4,370✔
241

242
            threads::resize(pu_mask, hardware_concurrency());
4,370✔
243
            threads::set(pu_mask, pu_num);
4,370✔
244

245
            for (std::size_t num_thread = 0; num_thread != num_threads_;
91,182✔
246
                 ++num_thread)
86,812✔
247
            {
248
                mask_cref_type affinity_mask = get_pu_mask(topo, num_thread);
86,812✔
249
                if (threads::any(pu_mask & affinity_mask))
86,812✔
250
                    ++count;
4,370✔
251
            }
86,812✔
252
        }
4,370✔
253
        return count;
4,370✔
254
    }
×
255

256
    // means of adding a processing unit after initialization
257
    void affinity_data::add_punit(std::size_t virt_core, std::size_t thread_num)
×
258
    {
259
        std::size_t num_system_pus = hardware_concurrency();
×
260

261
        // initialize affinity_masks and set the mask for the given virt_core
262
        if (affinity_masks_.empty())
×
263
        {
264
            affinity_masks_.resize(num_threads_);
×
265
            for (std::size_t i = 0; i != num_threads_; ++i)
×
266
                threads::resize(affinity_masks_[i], num_system_pus);
×
267
        }
×
268
        threads::set(affinity_masks_[virt_core], thread_num);
×
269

270
        // find first used pu, which is then stored as the pu_offset
271
        std::size_t first_pu = std::size_t(-1);
×
272
        for (std::size_t i = 0; i != num_threads_; ++i)
×
273
        {
274
            std::size_t first = threads::find_first(affinity_masks_[i]);
×
275
            first_pu = (std::min)(first_pu, first);
×
276
        }
×
277
        if (first_pu != std::size_t(-1))
×
278
            pu_offset_ = first_pu;
×
279

280
        init_cached_pu_nums(num_system_pus);
×
281
    }
×
282

283
    void affinity_data::init_cached_pu_nums(std::size_t hardware_concurrency)
1,222✔
284
    {
285
        if (pu_nums_.empty())
1,222✔
286
        {
287
            pu_nums_.resize(num_threads_);
1,222✔
288
            for (std::size_t i = 0; i != num_threads_; ++i)
5,596✔
289
            {
290
                pu_nums_[i] = get_pu_num(i, hardware_concurrency);
4,374✔
291
            }
4,374✔
292
        }
1,222✔
293
    }
1,222✔
294

295
    std::size_t affinity_data::get_pu_num(
4,374✔
296
        std::size_t num_thread, std::size_t hardware_concurrency) const
297
    {
298
        // The offset shouldn't be larger than the number of available
299
        // processing units.
300
        HPX_ASSERT(pu_offset_ < hardware_concurrency);
4,374✔
301

302
        // The distance between assigned processing units shouldn't be zero
303
        HPX_ASSERT(pu_step_ > 0 && pu_step_ <= hardware_concurrency);
4,374✔
304

305
        // We 'scale' the thread number to compute the corresponding
306
        // processing unit number.
307
        //
308
        // The base line processing unit number is computed from the given
309
        // pu-offset and pu-step.
310
        std::size_t num_pu = pu_offset_ + pu_step_ * num_thread;
4,374✔
311

312
        // We add an additional offset, which allows to 'roll over' if the
313
        // pu number would get larger than the number of available
314
        // processing units. Note that it does not make sense to 'roll over'
315
        // farther than the given pu-step.
316
        std::size_t offset = (num_pu / hardware_concurrency) % pu_step_;
4,374✔
317

318
        // The resulting pu number has to be smaller than the available
319
        // number of processing units.
320
        return (num_pu + offset) % hardware_concurrency;
4,374✔
321
    }
322

323
    std::atomic<int> affinity_data::instance_number_counter_(-1);
324
}    // namespace hpx::threads::policies::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc