• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #860

06 Jan 2023 04:19PM UTC coverage: 85.965% (+0.08%) from 85.881%
#860

push

StellarBot
Merge #6124

6124: Fixing use of any_sender in combination with when_all r=hkaiser a=hkaiser



Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

15 of 15 new or added lines in 2 files covered. (100.0%)

173505 of 201833 relevant lines covered (85.96%)

1905847.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.79
/libs/core/threading_base/src/set_thread_state.cpp
1
//  Copyright (c) 2007-2022 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#include <hpx/config.hpp>
8
#include <hpx/assert.hpp>
9
#include <hpx/coroutines/coroutine.hpp>
10
#include <hpx/functional/bind.hpp>
11
#include <hpx/modules/errors.hpp>
12
#include <hpx/modules/format.hpp>
13
#include <hpx/modules/logging.hpp>
14
#include <hpx/threading_base/create_work.hpp>
15
#include <hpx/threading_base/register_thread.hpp>
16
#include <hpx/threading_base/set_thread_state.hpp>
17
#include <hpx/threading_base/thread_data.hpp>
18
#include <hpx/threading_base/threading_base_fwd.hpp>
19

20
#include <cstddef>
21
#include <functional>
22
#include <string>
23
#include <utility>
24

25
namespace hpx::threads::detail {
26

27
    ///////////////////////////////////////////////////////////////////////////
28
    thread_result_type set_active_state(thread_id_ref_type thrd,
27✔
29
        thread_schedule_state newstate, thread_restart_state newstate_ex,
30
        thread_priority priority, thread_state previous_state)
31
    {
32
        if (HPX_UNLIKELY(!thrd))
27✔
33
        {
34
            HPX_THROW_EXCEPTION(hpx::error::null_thread_id,
×
35
                "threads::detail::set_active_state",
36
                "null thread id encountered");
37
            return thread_result_type(
38
                thread_schedule_state::terminated, invalid_thread_id);
39
        }
40

41
        // make sure that the thread has not been suspended and set active again
42
        // in the meantime
43
        thread_state current_state = get_thread_id_data(thrd)->get_state();
27✔
44

45
        if (current_state.state() == previous_state.state() &&
27✔
46
            current_state != previous_state)
×
47
        {
48
            // NOLINTNEXTLINE(bugprone-branch-clone)
49
            LTM_(warning).format(
×
50
                "set_active_state: thread is still active, however it was "
×
51
                "non-active since the original set_state request was issued, "
52
                "aborting state change, thread({}), description({}), new "
53
                "state({})",
54
                thrd, get_thread_id_data(thrd)->get_description(),
×
55
                get_thread_state_name(newstate));
×
56
            return thread_result_type(
×
57
                thread_schedule_state::terminated, invalid_thread_id);
×
58
        }
59

60
        // just retry, set_state will create new thread if target is still active
61
        error_code ec(throwmode::lightweight);    // do not throw
27✔
62
        detail::set_thread_state(thrd.noref(), newstate, newstate_ex, priority,
54✔
63
            thread_schedule_hint(), true, ec);
27✔
64

65
        return thread_result_type(
27✔
66
            thread_schedule_state::terminated, invalid_thread_id);
27✔
67
    }
27✔
68

69
    ///////////////////////////////////////////////////////////////////////////
70
    thread_state set_thread_state(thread_id_type const& thrd,
1,144,851✔
71
        thread_schedule_state new_state, thread_restart_state new_state_ex,
72
        thread_priority priority, thread_schedule_hint schedulehint,
73
        bool retry_on_active, error_code& ec)
74
    {
75
        if (HPX_UNLIKELY(!thrd))
1,144,850✔
76
        {
77
            HPX_THROWS_IF(ec, hpx::error::null_thread_id,
51✔
78
                "threads::detail::set_thread_state",
79
                "null thread id encountered");
80
            return thread_state(
×
81
                thread_schedule_state::unknown, thread_restart_state::unknown);
82
        }
83

84
        // set_state can't be used to force a thread into active state
85
        if (new_state == thread_schedule_state::active)
1,144,847✔
86
        {
87
            HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
88
                "threads::detail::set_thread_state", "invalid new state: {}",
89
                new_state);
90
            return thread_state(
×
91
                thread_schedule_state::unknown, thread_restart_state::unknown);
92
        }
93

94
        thread_state previous_state;
1,144,839✔
95
        std::size_t k = 0;
1,144,839✔
96
        do
1,144,839✔
97
        {
98
            // action depends on the current state
99
            previous_state = get_thread_id_data(thrd)->get_state();
1,147,505✔
100
            thread_schedule_state previous_state_val = previous_state.state();
1,147,505✔
101

102
            // nothing to do here if the state doesn't change
103
            if (new_state == previous_state_val)
1,147,505✔
104
            {
105
                // NOLINTNEXTLINE(bugprone-branch-clone)
106
                LTM_(warning).format(
350✔
107
                    "set_thread_state: old thread state is the same as new "
175✔
108
                    "thread state, aborting state change, thread({}), "
109
                    "description({}), new state({})",
110
                    thrd, get_thread_id_data(thrd)->get_description(),
175✔
111
                    get_thread_state_name(new_state));
175✔
112

113
                if (&ec != &throws)
175✔
114
                    ec = make_success_code();
×
115

116
                return thread_state(new_state, previous_state.state_ex());
175✔
117
            }
118

119
            // the thread to set the state for is currently running, so we
120
            // schedule another thread to execute the pending set_state
121
            switch (previous_state_val)
1,147,332✔
122
            {
123
            case thread_schedule_state::active:
124
            {
125
                if (retry_on_active)
2,702✔
126
                {
127
                    // schedule a new thread to set the state
128
                    // NOLINTNEXTLINE(bugprone-branch-clone)
129
                    LTM_(warning).format(
51✔
130
                        "set_thread_state: thread is currently active, "
24✔
131
                        "scheduling new thread, thread({}), description({}), "
132
                        "new state({})",
133
                        thrd, get_thread_id_data(thrd)->get_description(),
24✔
134
                        get_thread_state_name(new_state));
24✔
135

136
                    thread_init_data data(
27✔
137
                        hpx::bind(&set_active_state, thread_id_ref_type(thrd),
27✔
138
                            new_state, new_state_ex, priority, previous_state),
139
                        "set state for active thread", priority);
27✔
140

141
                    create_work(get_thread_id_data(thrd)->get_scheduler_base(),
54✔
142
                        data, ec);
27✔
143

144
                    if (&ec != &throws)
27✔
145
                        ec = make_success_code();
×
146
                }
27✔
147
                else
148
                {
149
                    hpx::execution_base::this_thread::yield_k(
2,675✔
150
                        k, "hpx::threads::detail::set_thread_state");
2,675✔
151
                    ++k;
2,675✔
152

153
                    // NOLINTNEXTLINE(bugprone-branch-clone)
154
                    LTM_(warning).format(
5,350✔
155
                        "set_thread_state: thread is currently active, but not "
2,675✔
156
                        "scheduling new thread because retry_on_active = "
157
                        "false, thread({}), description({}), new state({})",
158
                        thrd, get_thread_id_data(thrd)->get_description(),
2,675✔
159
                        get_thread_state_name(new_state));
2,675✔
160

161
                    continue;
2,675✔
162
                }
163

164
                if (&ec != &throws)
27✔
165
                    ec = make_success_code();
×
166

167
                return previous_state;    // done
27✔
168
            }
169
            break;
170

171
            case thread_schedule_state::terminated:
172
            {
173
                // NOLINTNEXTLINE(bugprone-branch-clone)
174
                LTM_(warning).format(
2✔
175
                    "set_thread_state: thread is terminated, aborting state "
1✔
176
                    "change, thread({}), description({}), new state({})",
177
                    thrd, get_thread_id_data(thrd)->get_description(),
1✔
178
                    get_thread_state_name(new_state));
1✔
179

180
                if (&ec != &throws)
1✔
181
                    ec = make_success_code();
×
182

183
                // If the thread has been terminated while this set_state was
184
                // pending nothing has to be done anymore.
185
                return previous_state;
1✔
186
            }
51✔
187
            break;
188

189
            case thread_schedule_state::pending:
190
                [[fallthrough]];
191
            case thread_schedule_state::pending_boost:
192
                if (thread_schedule_state::suspended == new_state)
51✔
193
                {
194
                    // we do not allow explicit resetting of a state to suspended
195
                    // without the thread being executed.
196
                    std::string str = hpx::util::format(
51✔
197
                        "set_thread_state: invalid new state, can't demote a "
51✔
198
                        "pending thread, thread({}), description({}), new "
199
                        "state({})",
200
                        thrd, get_thread_id_data(thrd)->get_description(),
51✔
201
                        new_state);
202

203
                    // NOLINTNEXTLINE(bugprone-branch-clone)
204
                    LTM_(fatal) << str;
51✔
205

206
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
51✔
207
                        "threads::detail::set_thread_state", str);
208
                    return thread_state(thread_schedule_state::unknown,
×
209
                        thread_restart_state::unknown);
210
                }
51✔
211
                break;
×
212

213
            case thread_schedule_state::suspended:
214
                break;    // fine, just set the new state
1,144,578✔
215

216
            case thread_schedule_state::pending_do_not_schedule:
217
                [[fallthrough]];
218
            default:
219
            {
220
                HPX_ASSERT_MSG(false,
×
221
                    hpx::util::format("set_thread_state: previous state was {}",
222
                        previous_state_val));    // should not happen
223
                break;
×
224
            }
225
            }
226

227
            // If the previous state was pending we are supposed to remove the
228
            // thread from the queue. But in order to avoid linearly looking
229
            // through the queue we defer this to the thread function, which at
230
            // some point will ignore this thread by simply skipping it (if it's
231
            // not pending anymore).
232

233
            // NOLINTNEXTLINE(bugprone-branch-clone)
234
            LTM_(info).format("set_thread_state: thread({}), description({}), "
1,144,577✔
235
                              "new state({}), old state({})",
236
                thrd, get_thread_id_data(thrd)->get_description(),
1,144,376✔
237
                get_thread_state_name(new_state),
1,144,354✔
238
                get_thread_state_name(previous_state_val));
1,144,354✔
239

240
            // So all what we do here is to set the new state.
241
            if (get_thread_id_data(thrd)->restore_state(
2,289,210✔
242
                    new_state, new_state_ex, previous_state))
1,144,604✔
243
            {
244
                break;
1,144,604✔
245
            }
246

247
            // state has changed since we fetched it from the thread, retry
248
            // NOLINTNEXTLINE(bugprone-branch-clone)
249
            LTM_(warning).format(
×
250
                "set_thread_state: state has been changed since it was "
×
251
                "fetched, retrying, thread({}), description({}), new "
252
                "state({}), old state({})",
253
                get_thread_id_data(thrd),
×
254
                get_thread_id_data(thrd)->get_description(),
×
255
                get_thread_state_name(new_state),
×
256
                get_thread_state_name(previous_state_val));
×
257
        } while (true);
2,675✔
258

259
        thread_schedule_state previous_state_val = previous_state.state();
1,144,604✔
260
        if (!(previous_state_val == thread_schedule_state::pending ||
1,144,604✔
261
                previous_state_val == thread_schedule_state::pending_boost) &&
1,144,604✔
262
            (new_state == thread_schedule_state::pending ||
1,144,604✔
263
                new_state == thread_schedule_state::pending_boost))
×
264
        {
265
            // REVIEW: Passing a specific target thread may interfere with the
266
            // round robin queuing.
267

268
            auto* thrd_data = get_thread_id_data(thrd);
1,144,605✔
269
            auto* scheduler = thrd_data->get_scheduler_base();
1,144,605✔
270
            scheduler->schedule_thread(
2,289,208✔
271
                thrd, schedulehint, false, thrd_data->get_priority());
1,144,605✔
272

273
            // NOTE: Don't care if the hint is a NUMA hint, just want to wake up
274
            // a thread.
275
            scheduler->do_some_work(schedulehint.hint);
1,144,603✔
276
        }
1,144,603✔
277

278
        if (&ec != &throws)
1,144,603✔
279
            ec = make_success_code();
12,172✔
280

281
        return previous_state;
1,144,604✔
282
    }
1,144,858✔
283
}    // namespace hpx::threads::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc