• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #877

28 Jan 2023 02:50AM UTC coverage: 86.595% (+0.2%) from 86.427%
#877

push

StellarBot
Merge #6057

6057: Create a dedicated thread pool to run LCI_progress. r=hkaiser a=JiakunYan

The `LCI_progress` function is not thread-safe and must be called frequently. Previously, we created a dedicated progress thread using pthread when initializing LCI. However, HPX doesn't know this progress thread and thus cannot bind threads to cores correctly.

This PR uses the new resource partitioner hook in the parcelport to create a thread pool "lci-progress-pool" (which only contains one core) to run LCI_progress.

Fixed:
- The thread pool will be allocated even if users are not using LCI parcelport.

Co-authored-by: Jiakun Yan <jiakunyan1998@gmail.com>

174895 of 201970 relevant lines covered (86.59%)

1858228.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.43
/libs/full/actions/tests/unit/thread_affinity.cpp
1
///////////////////////////////////////////////////////////////////////////////
2
//  Copyright (c) 2007-2017 Hartmut Kaiser
3
//  Copyright (c) 2011 Bryce Adelstein-Lelbach
4
//  Copyright (c) 2012-2016 Thomas Heller
5
//
6
//  SPDX-License-Identifier: BSL-1.0
7
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
8
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9
///////////////////////////////////////////////////////////////////////////////
10

11
#include <hpx/config.hpp>
12
#if !defined(HPX_COMPUTE_DEVICE_CODE)
13
#include <hpx/functional/bind.hpp>
14
#include <hpx/hpx_init.hpp>
15
#include <hpx/include/actions.hpp>
16
#include <hpx/include/components.hpp>
17
#include <hpx/include/lcos.hpp>
18
#include <hpx/include/runtime.hpp>
19
#include <hpx/include/threads.hpp>
20
#include <hpx/modules/testing.hpp>
21

22
#include <cstddef>
23
#include <functional>
24
#include <list>
25
#include <set>
26
#include <vector>
27

28
#if !defined(__APPLE__)
29
#include <hwloc.h>
30
#endif
31

32
std::size_t thread_affinity_worker(std::size_t desired)
4✔
33
{
34
    // Returns the OS-thread number of the worker that is running this
35
    // PX-thread.
36
    std::size_t current = hpx::get_worker_thread_num();
4✔
37
    if (current == desired)
4✔
38
    {
39
#if !defined(__APPLE__)
40
        // extract the desired affinity mask
41
        hpx::runtime& rt = hpx::get_runtime();
×
42
        hpx::threads::topology const& t = rt.get_topology();
×
43
        hpx::threads::mask_type desired_mask = t.get_thread_affinity_mask(
×
44
            hpx::resource::get_partitioner().get_pu_num(current));
×
45

46
        std::size_t logical_idx = hpx::threads::find_first(desired_mask);
×
47

48
        std::size_t idx = 0;
×
49

50
        hwloc_topology_t topo;
51
        hwloc_topology_init(&topo);
×
52
        hwloc_topology_load(topo);
×
53

54
        int const pu_depth = hwloc_get_type_or_below_depth(topo, HWLOC_OBJ_PU);
×
55
        hwloc_obj_t const pu_obj =
×
56
            hwloc_get_obj_by_depth(topo, pu_depth, logical_idx);
×
57
        idx = pu_obj->os_index;
×
58

59
        // retrieve the current affinity mask
60
        hwloc_cpuset_t cpuset = hwloc_bitmap_alloc();
×
61
        hwloc_bitmap_zero(cpuset);
×
62
        if (0 == hwloc_get_cpubind(topo, cpuset, HWLOC_CPUBIND_THREAD))
×
63
        {
64
            // sadly get_cpubind is not implemented for Windows based systems
65
            hwloc_cpuset_t cpuset_cmp = hwloc_bitmap_alloc();
×
66
            hwloc_bitmap_zero(cpuset_cmp);
×
67
            hwloc_bitmap_only(cpuset_cmp, unsigned(idx));
×
68
            HPX_TEST_EQ(hwloc_bitmap_compare(cpuset, cpuset_cmp), 0);
×
69
            hwloc_bitmap_free(cpuset_cmp);
×
70
        }
×
71
        else
72
        {
73
            HPX_TEST(false && "hwloc_get_cpubind(topo, cpuset, \
×
74
                        HWLOC_CPUBIND_THREAD) failed!");
75
        }
76

77
        hwloc_bitmap_free(cpuset);
×
78
        hwloc_topology_destroy(topo);
×
79
#endif
80
        return desired;
×
81
    }
×
82

83
    // This PX-thread has been run by the wrong OS-thread, make the foreman
84
    // try again by rescheduling it.
85
    return std::size_t(-1);
4✔
86
}
4✔
87

88
HPX_PLAIN_ACTION(thread_affinity_worker, thread_affinity_worker_action)
11✔
89

90
void check_in(std::set<std::size_t>& attendance, std::size_t t)
4✔
91
{
92
    if (std::size_t(-1) != t)
4✔
93
        attendance.erase(t);
4✔
94
}
4✔
95

96
void thread_affinity_foreman()
1✔
97
{
98
    // Get the number of worker OS-threads in use by this locality.
99
    std::size_t const os_threads = hpx::get_os_thread_count();
1✔
100

101
    // Find the global name of the current locality.
102
    hpx::id_type const here = hpx::find_here();
1✔
103

104
    // Populate a set with the OS-thread numbers of all OS-threads on this
105
    // locality. When the hello world message has been printed on a particular
106
    // OS-thread, we will remove it from the set.
107
    std::set<std::size_t> attendance;
1✔
108
    for (std::size_t os_thread = 0; os_thread < os_threads; ++os_thread)
5✔
109
        attendance.insert(os_thread);
4✔
110

111
    // As long as there are still elements in the set, we must keep scheduling
112
    // PX-threads. Because HPX features work-stealing task schedulers, we have
113
    // no way of enforcing which worker OS-thread will actually execute
114
    // each PX-thread.
115
    while (!attendance.empty())
2✔
116
    {
117
        // Each iteration, we create a task for each element in the set of
118
        // OS-threads that have not said "Hello world". Each of these tasks
119
        // is encapsulated in a future.
120
        std::vector<hpx::future<std::size_t>> futures;
1✔
121
        futures.reserve(attendance.size());
1✔
122

123
        for (std::size_t worker : attendance)
5✔
124
        {
125
            // Asynchronously start a new task. The task is encapsulated in a
126
            // future, which we can query to determine if the task has
127
            // completed.
128
            typedef thread_affinity_worker_action action_type;
129
            futures.push_back(hpx::async<action_type>(here, worker));
4✔
130
        }
131

132
        // Wait for all of the futures to finish. The callback version of the
133
        // hpx::wait_each function takes two arguments: a vector of futures,
134
        // and a binary callback.  The callback takes two arguments; the first
135
        // is the index of the future in the vector, and the second is the
136
        // return value of the future. hpx::wait_each doesn't return until
137
        // all the futures in the vector have returned.
138
        using hpx::placeholders::_1;
139
        hpx::wait_each(
1✔
140
            hpx::unwrapping(hpx::bind(&check_in, std::ref(attendance), _1)),
1✔
141
            futures);
142
    }
1✔
143
}
1✔
144

145
HPX_PLAIN_ACTION(thread_affinity_foreman, thread_affinity_foreman_action)
3✔
146

147
///////////////////////////////////////////////////////////////////////////////
148
int hpx_main(hpx::program_options::variables_map& /*vm*/)
1✔
149
{
150
    {
151
        // Get a list of all available localities.
152
        std::vector<hpx::id_type> localities = hpx::find_all_localities();
1✔
153

154
        // Reserve storage space for futures, one for each locality.
155
        std::vector<hpx::future<void>> futures;
1✔
156
        futures.reserve(localities.size());
1✔
157

158
        for (hpx::id_type const& node : localities)
2✔
159
        {
160
            // Asynchronously start a new task. The task is encapsulated in a
161
            // future, which we can query to determine if the task has
162
            // completed.
163
            typedef thread_affinity_foreman_action action_type;
164
            futures.push_back(hpx::async<action_type>(node));
1✔
165
        }
166

167
        // The non-callback version of hpx::lcos::wait takes a single parameter,
168
        // a future of vectors to wait on. hpx::lcos::wait only returns when
169
        // all of the futures have finished.
170
        hpx::wait_all(futures);
1✔
171
    }
1✔
172

173
    // Initiate shutdown of the runtime system.
174
    hpx::finalize();
1✔
175
    return hpx::util::report_errors();
1✔
176
}
×
177

178
///////////////////////////////////////////////////////////////////////////////
179
int main(int argc, char* argv[])
1✔
180
{
181
    // Configure application-specific options.
182
    hpx::program_options::options_description desc_commandline(
1✔
183
        "usage: " HPX_APPLICATION_STRING " [options]");
1✔
184

185
    // Initialize and run HPX.
186
    hpx::init_params init_args;
1✔
187
    init_args.desc_cmdline = desc_commandline;
1✔
188

189
    return hpx::init(argc, argv, init_args);
1✔
190
}
1✔
191
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc