• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #868

16 Jan 2023 08:21PM UTC coverage: 86.487%. Remained the same
#868

push

StellarBot
Merge #6137

6137: Adding example of a simple master/slave distributed application r=hkaiser a=hkaiser

The purpose of this example is to demonstrate how HPX actions can be used to build a simple master-slave application. The master (locality 0) assigns work to the slaves (all other localities). Note that if this application is run on one locality only it uses the same locality for the master and the slave functionalities.

The slaves receive a message that encodes how many sub-tasks of a certain type they should spawn locally.


Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

72 of 72 new or added lines in 1 file covered. (100.0%)

174663 of 201952 relevant lines covered (86.49%)

1849169.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.46
/libs/core/threading_base/src/set_thread_state.cpp
1
//  Copyright (c) 2007-2022 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#include <hpx/config.hpp>
8
#include <hpx/assert.hpp>
9
#include <hpx/coroutines/coroutine.hpp>
10
#include <hpx/functional/bind.hpp>
11
#include <hpx/modules/errors.hpp>
12
#include <hpx/modules/format.hpp>
13
#include <hpx/modules/logging.hpp>
14
#include <hpx/threading_base/create_work.hpp>
15
#include <hpx/threading_base/register_thread.hpp>
16
#include <hpx/threading_base/set_thread_state.hpp>
17
#include <hpx/threading_base/thread_data.hpp>
18
#include <hpx/threading_base/threading_base_fwd.hpp>
19

20
#include <cstddef>
21
#include <functional>
22
#include <string>
23
#include <utility>
24

25
namespace hpx::threads::detail {
26

27
    ///////////////////////////////////////////////////////////////////////////
28
    thread_result_type set_active_state(thread_id_ref_type thrd,
52✔
29
        thread_schedule_state newstate, thread_restart_state newstate_ex,
30
        thread_priority priority, thread_state previous_state)
31
    {
32
        if (HPX_UNLIKELY(!thrd))
52✔
33
        {
34
            HPX_THROW_EXCEPTION(hpx::error::null_thread_id,
×
35
                "threads::detail::set_active_state",
36
                "null thread id encountered");
37
            return thread_result_type(
38
                thread_schedule_state::terminated, invalid_thread_id);
39
        }
40

41
        // make sure that the thread has not been suspended and set active again
42
        // in the meantime
43
        thread_state current_state = get_thread_id_data(thrd)->get_state();
52✔
44

45
        if (current_state.state() == previous_state.state() &&
52✔
46
            current_state != previous_state)
15✔
47
        {
48
            // NOLINTNEXTLINE(bugprone-branch-clone)
49
            LTM_(warning).format(
×
50
                "set_active_state: thread is still active, however it was "
×
51
                "non-active since the original set_state request was issued, "
52
                "aborting state change, thread({}), description({}), new "
53
                "state({})",
54
                thrd, get_thread_id_data(thrd)->get_description(),
×
55
                get_thread_state_name(newstate));
×
56
            return thread_result_type(
×
57
                thread_schedule_state::terminated, invalid_thread_id);
×
58
        }
59

60
        // just retry, set_state will create new thread if target is still active
61
        error_code ec(throwmode::lightweight);    // do not throw
52✔
62
        detail::set_thread_state(thrd.noref(), newstate, newstate_ex, priority,
104✔
63
            thread_schedule_hint(), true, ec);
52✔
64

65
        return thread_result_type(
52✔
66
            thread_schedule_state::terminated, invalid_thread_id);
52✔
67
    }
52✔
68

69
    ///////////////////////////////////////////////////////////////////////////
70
    thread_state set_thread_state(thread_id_type const& thrd,
1,144,599✔
71
        thread_schedule_state new_state, thread_restart_state new_state_ex,
72
        thread_priority priority, thread_schedule_hint schedulehint,
73
        bool retry_on_active, error_code& ec)
74
    {
75
        if (HPX_UNLIKELY(!thrd))
1,144,595✔
76
        {
77
            HPX_THROWS_IF(ec, hpx::error::null_thread_id,
52✔
78
                "threads::detail::set_thread_state",
79
                "null thread id encountered");
80
            return thread_state(
×
81
                thread_schedule_state::unknown, thread_restart_state::unknown);
82
        }
83

84
        // set_state can't be used to force a thread into active state
85
        if (new_state == thread_schedule_state::active)
1,144,595✔
86
        {
87
            HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
88
                "threads::detail::set_thread_state", "invalid new state: {}",
89
                new_state);
90
            return thread_state(
×
91
                thread_schedule_state::unknown, thread_restart_state::unknown);
92
        }
93

94
        thread_state previous_state;
1,144,588✔
95
        std::size_t k = 0;
1,144,588✔
96
        do
1,144,588✔
97
        {
98
            // action depends on the current state
99
            previous_state = get_thread_id_data(thrd)->get_state();
1,148,311✔
100
            thread_schedule_state previous_state_val = previous_state.state();
1,148,311✔
101

102
            // nothing to do here if the state doesn't change
103
            if (new_state == previous_state_val)
1,148,311✔
104
            {
105
                // NOLINTNEXTLINE(bugprone-branch-clone)
106
                LTM_(warning).format(
358✔
107
                    "set_thread_state: old thread state is the same as new "
179✔
108
                    "thread state, aborting state change, thread({}), "
109
                    "description({}), new state({})",
110
                    thrd, get_thread_id_data(thrd)->get_description(),
179✔
111
                    get_thread_state_name(new_state));
177✔
112

113
                if (&ec != &throws)
179✔
114
                    ec = make_success_code();
×
115

116
                return thread_state(new_state, previous_state.state_ex());
179✔
117
            }
118

119
            // the thread to set the state for is currently running, so we
120
            // schedule another thread to execute the pending set_state
121
            switch (previous_state_val)
1,148,138✔
122
            {
123
            case thread_schedule_state::active:
124
            {
125
                if (retry_on_active)
3,787✔
126
                {
127
                    // schedule a new thread to set the state
128
                    // NOLINTNEXTLINE(bugprone-branch-clone)
129
                    LTM_(warning).format(
85✔
130
                        "set_thread_state: thread is currently active, "
33✔
131
                        "scheduling new thread, thread({}), description({}), "
132
                        "new state({})",
133
                        thrd, get_thread_id_data(thrd)->get_description(),
33✔
134
                        get_thread_state_name(new_state));
33✔
135

136
                    thread_init_data data(
52✔
137
                        hpx::bind(&set_active_state, thread_id_ref_type(thrd),
52✔
138
                            new_state, new_state_ex, priority, previous_state),
139
                        "set state for active thread", priority);
52✔
140

141
                    create_work(get_thread_id_data(thrd)->get_scheduler_base(),
104✔
142
                        data, ec);
52✔
143

144
                    if (&ec != &throws)
52✔
145
                        ec = make_success_code();
16✔
146
                }
52✔
147
                else
148
                {
149
                    hpx::execution_base::this_thread::yield_k(
3,735✔
150
                        k, "hpx::threads::detail::set_thread_state");
3,735✔
151
                    ++k;
3,735✔
152

153
                    // NOLINTNEXTLINE(bugprone-branch-clone)
154
                    LTM_(warning).format(
7,453✔
155
                        "set_thread_state: thread is currently active, but not "
3,718✔
156
                        "scheduling new thread because retry_on_active = "
157
                        "false, thread({}), description({}), new state({})",
158
                        thrd, get_thread_id_data(thrd)->get_description(),
3,718✔
159
                        get_thread_state_name(new_state));
3,718✔
160

161
                    continue;
3,735✔
162
                }
163

164
                if (&ec != &throws)
52✔
165
                    ec = make_success_code();
16✔
166

167
                return previous_state;    // done
52✔
168
            }
169
            break;
170

171
            case thread_schedule_state::terminated:
172
            {
173
                // NOLINTNEXTLINE(bugprone-branch-clone)
174
                LTM_(warning).format(
4✔
175
                    "set_thread_state: thread is terminated, aborting state "
2✔
176
                    "change, thread({}), description({}), new state({})",
177
                    thrd, get_thread_id_data(thrd)->get_description(),
2✔
178
                    get_thread_state_name(new_state));
2✔
179

180
                if (&ec != &throws)
2✔
181
                    ec = make_success_code();
×
182

183
                // If the thread has been terminated while this set_state was
184
                // pending nothing has to be done anymore.
185
                return previous_state;
2✔
186
            }
52✔
187
            break;
188

189
            case thread_schedule_state::pending:
190
                [[fallthrough]];
191
            case thread_schedule_state::pending_boost:
192
                if (thread_schedule_state::suspended == new_state)
52✔
193
                {
194
                    // we do not allow explicit resetting of a state to suspended
195
                    // without the thread being executed.
196
                    std::string str = hpx::util::format(
52✔
197
                        "set_thread_state: invalid new state, can't demote a "
52✔
198
                        "pending thread, thread({}), description({}), new "
199
                        "state({})",
200
                        thrd, get_thread_id_data(thrd)->get_description(),
52✔
201
                        new_state);
202

203
                    // NOLINTNEXTLINE(bugprone-branch-clone)
204
                    LTM_(fatal) << str;
52✔
205

206
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
52✔
207
                        "threads::detail::set_thread_state", str);
208
                    return thread_state(thread_schedule_state::unknown,
×
209
                        thread_restart_state::unknown);
210
                }
52✔
211
                break;
×
212

213
            case thread_schedule_state::suspended:
214
                break;    // fine, just set the new state
1,144,297✔
215

216
            case thread_schedule_state::pending_do_not_schedule:
217
                [[fallthrough]];
218
            default:
219
            {
220
                HPX_ASSERT_MSG(false,
×
221
                    hpx::util::format("set_thread_state: previous state was {}",
222
                        previous_state_val));    // should not happen
223
                break;
×
224
            }
225
            }
226

227
            // If the previous state was pending we are supposed to remove the
228
            // thread from the queue. But in order to avoid linearly looking
229
            // through the queue we defer this to the thread function, which at
230
            // some point will ignore this thread by simply skipping it (if it's
231
            // not pending anymore).
232

233
            // NOLINTNEXTLINE(bugprone-branch-clone)
234
            LTM_(info).format("set_thread_state: thread({}), description({}), "
1,144,292✔
235
                              "new state({}), old state({})",
236
                thrd, get_thread_id_data(thrd)->get_description(),
1,144,083✔
237
                get_thread_state_name(new_state),
1,144,052✔
238
                get_thread_state_name(previous_state_val));
1,144,052✔
239

240
            // So all what we do here is to set the new state.
241
            if (get_thread_id_data(thrd)->restore_state(
2,288,646✔
242
                    new_state, new_state_ex, previous_state))
1,144,323✔
243
            {
244
                break;
1,144,323✔
245
            }
246

247
            // state has changed since we fetched it from the thread, retry
248
            // NOLINTNEXTLINE(bugprone-branch-clone)
249
            LTM_(warning).format(
×
250
                "set_thread_state: state has been changed since it was "
×
251
                "fetched, retrying, thread({}), description({}), new "
252
                "state({}), old state({})",
253
                get_thread_id_data(thrd),
×
254
                get_thread_id_data(thrd)->get_description(),
×
255
                get_thread_state_name(new_state),
×
256
                get_thread_state_name(previous_state_val));
×
257
        } while (true);
3,735✔
258

259
        thread_schedule_state previous_state_val = previous_state.state();
1,144,323✔
260
        if (!(previous_state_val == thread_schedule_state::pending ||
1,144,323✔
261
                previous_state_val == thread_schedule_state::pending_boost) &&
1,144,323✔
262
            (new_state == thread_schedule_state::pending ||
1,144,323✔
263
                new_state == thread_schedule_state::pending_boost))
×
264
        {
265
            // REVIEW: Passing a specific target thread may interfere with the
266
            // round robin queuing.
267

268
            auto* thrd_data = get_thread_id_data(thrd);
1,144,323✔
269
            auto* scheduler = thrd_data->get_scheduler_base();
1,144,323✔
270
            scheduler->schedule_thread(
2,288,646✔
271
                thrd, schedulehint, false, thrd_data->get_priority());
1,144,323✔
272

273
            // NOTE: Don't care if the hint is a NUMA hint, just want to wake up
274
            // a thread.
275
            scheduler->do_some_work(schedulehint.hint);
1,144,321✔
276
        }
1,144,321✔
277

278
        if (&ec != &throws)
1,144,322✔
279
            ec = make_success_code();
12,209✔
280

281
        return previous_state;
1,144,322✔
282
    }
1,144,607✔
283
}    // namespace hpx::threads::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc