• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #844

02 Dec 2022 12:37AM UTC coverage: 85.8% (+0.2%) from 85.634%
#844

push

StellarBot
Merge #6084

6084: Renaming hpx::apply and friends to hpx::post r=hkaiser a=hkaiser

- this is needed to be able to rename invoke_fused to apply later

working towards https://github.com/STEllAR-GROUP/hpx/issues/5497



Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

468 of 468 new or added lines in 96 files covered. (100.0%)

171389 of 199753 relevant lines covered (85.8%)

1914550.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.91
/libs/core/thread_pool_util/src/thread_pool_suspension_helpers.cpp
1
//  Copyright (c) 2019 Mikael Simberg
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#include <hpx/async_local/async.hpp>
8
#include <hpx/async_local/post.hpp>
9
#include <hpx/functional/function.hpp>
10
#include <hpx/futures/future.hpp>
11
#include <hpx/thread_pool_util/thread_pool_suspension_helpers.hpp>
12
#include <hpx/threading_base/scheduler_base.hpp>
13
#include <hpx/threading_base/thread_data.hpp>
14
#include <hpx/threading_base/thread_pool_base.hpp>
15

16
#include <cstddef>
17
#include <utility>
18

19
namespace hpx { namespace threads {
20
    hpx::future<void> resume_processing_unit(
710✔
21
        thread_pool_base& pool, std::size_t virt_core)
22
    {
23
        if (!threads::get_self_ptr())
710✔
24
        {
25
            HPX_THROW_EXCEPTION(invalid_status, "resume_processing_unit",
×
26
                "cannot call resume_processing_unit from outside HPX, use"
27
                "resume_processing_unit_cb instead");
28
        }
29
        else if (!pool.get_scheduler()->has_scheduler_mode(
710✔
30
                     policies::scheduler_mode::enable_elasticity))
31
        {
32
            return hpx::make_exceptional_future<void>(
×
33
                HPX_GET_EXCEPTION(invalid_status, "resume_processing_unit",
×
34
                    "this thread pool does not support suspending "
35
                    "processing units"));
36
        }
37

38
        return hpx::async([&pool, virt_core]() -> void {
1,420✔
39
            return pool.resume_processing_unit_direct(virt_core, throws);
710✔
40
        });
41
    }
710✔
42

43
    void resume_processing_unit_cb(thread_pool_base& pool,
×
44
        hpx::function<void(void)> callback, std::size_t virt_core,
45
        error_code& ec)
46
    {
47
        if (!pool.get_scheduler()->has_scheduler_mode(
×
48
                policies::scheduler_mode::enable_elasticity))
49
        {
50
            HPX_THROWS_IF(ec, invalid_status, "resume_processing_unit_cb",
×
51
                "this thread pool does not support suspending "
52
                "processing units");
53
            return;
×
54
        }
55

56
        auto resume_direct_wrapper = [&pool, virt_core,
×
57
                                         callback = HPX_MOVE(callback)]() {
×
58
            pool.resume_processing_unit_direct(virt_core, throws);
×
59
            callback();
×
60
        };
×
61

62
        if (threads::get_self_ptr())
×
63
        {
64
            hpx::post(HPX_MOVE(resume_direct_wrapper));
×
65
        }
×
66
        else
67
        {
68
            std::thread(HPX_MOVE(resume_direct_wrapper)).detach();
×
69
        }
70
    }
×
71

72
    hpx::future<void> suspend_processing_unit(
662✔
73
        thread_pool_base& pool, std::size_t virt_core)
74
    {
75
        if (!threads::get_self_ptr())
662✔
76
        {
77
            HPX_THROW_EXCEPTION(invalid_status, "suspend_processing_unit",
×
78
                "cannot call suspend_processing_unit from outside HPX, use"
79
                "suspend_processing_unit_cb instead");
80
        }
81
        if (!pool.get_scheduler()->has_scheduler_mode(
662✔
82
                policies::scheduler_mode::enable_elasticity))
83
        {
84
            return hpx::make_exceptional_future<void>(
17✔
85
                HPX_GET_EXCEPTION(invalid_status, "suspend_processing_unit",
17✔
86
                    "this thread pool does not support suspending "
87
                    "processing units"));
88
        }
89
        if (!pool.get_scheduler()->has_scheduler_mode(
645✔
90
                policies::scheduler_mode::enable_stealing) &&
645✔
91
            hpx::this_thread::get_pool() == &pool)
78✔
92
        {
93
            return hpx::make_exceptional_future<void>(
4✔
94
                HPX_GET_EXCEPTION(invalid_status, "suspend_processing_unit",
4✔
95
                    "this thread pool does not support suspending "
96
                    "processing units from itself (no thread stealing)"));
97
        }
98

99
        return hpx::async([&pool, virt_core]() -> void {
1,282✔
100
            return pool.suspend_processing_unit_direct(virt_core, throws);
641✔
101
        });
102
    }
662✔
103

104
    void suspend_processing_unit_cb(thread_pool_base& pool,
×
105
        hpx::function<void(void)> callback, std::size_t virt_core,
106
        error_code& ec)
107
    {
108
        if (!pool.get_scheduler()->has_scheduler_mode(
×
109
                policies::scheduler_mode::enable_elasticity))
110
        {
111
            HPX_THROWS_IF(ec, invalid_status, "suspend_processing_unit_cb",
×
112
                "this thread pool does not support suspending processing "
113
                "units");
114
            return;
×
115
        }
116

117
        auto suspend_direct_wrapper = [&pool, virt_core,
×
118
                                          callback = HPX_MOVE(callback)]() {
×
119
            pool.suspend_processing_unit_direct(virt_core, throws);
×
120
            callback();
×
121
        };
×
122

123
        if (threads::get_self_ptr())
×
124
        {
125
            if (!pool.get_scheduler()->has_scheduler_mode(
×
126
                    policies::scheduler_mode::enable_stealing) &&
×
127
                hpx::this_thread::get_pool() == &pool)
×
128
            {
129
                HPX_THROW_EXCEPTION(invalid_status,
×
130
                    "suspend_processing_unit_cb",
131
                    "this thread pool does not support suspending "
132
                    "processing units from itself (no thread stealing)");
133
            }
134

135
            hpx::post(HPX_MOVE(suspend_direct_wrapper));
×
136
        }
×
137
        else
138
        {
139
            std::thread(HPX_MOVE(suspend_direct_wrapper)).detach();
×
140
        }
141
    }
×
142

143
    future<void> resume_pool(thread_pool_base& pool)
468✔
144
    {
145
        if (!threads::get_self_ptr())
468✔
146
        {
147
            HPX_THROW_EXCEPTION(invalid_status, "resume_pool",
×
148
                "cannot call resume_pool from outside HPX, use resume_pool_cb "
149
                "or the member function resume_direct instead");
150
            return hpx::make_ready_future();
151
        }
152

153
        return hpx::async(
468✔
154
            [&pool]() -> void { return pool.resume_direct(throws); });
936✔
155
    }
×
156

157
    void resume_pool_cb(thread_pool_base& pool,
16✔
158
        hpx::function<void(void)> callback, error_code& /* ec */)
159
    {
160
        auto resume_direct_wrapper =
161
            [&pool, callback = HPX_MOVE(callback)]() -> void {
128✔
162
            pool.resume_direct(throws);
16✔
163
            callback();
16✔
164
        };
16✔
165

166
        if (threads::get_self_ptr())
16✔
167
        {
168
            hpx::post(HPX_MOVE(resume_direct_wrapper));
8✔
169
        }
8✔
170
        else
171
        {
172
            std::thread(HPX_MOVE(resume_direct_wrapper)).detach();
8✔
173
        }
174
    }
16✔
175

176
    future<void> suspend_pool(thread_pool_base& pool)
24✔
177
    {
178
        if (!threads::get_self_ptr())
24✔
179
        {
180
            HPX_THROW_EXCEPTION(invalid_status, "suspend_pool",
×
181
                "cannot call suspend_pool from outside HPX, use "
182
                "suspend_pool_cb or the member function suspend_direct "
183
                "instead");
184
            return hpx::make_ready_future();
185
        }
186
        if (threads::get_self_ptr() && hpx::this_thread::get_pool() == &pool)
24✔
187
        {
188
            return hpx::make_exceptional_future<void>(
8✔
189
                HPX_GET_EXCEPTION(bad_parameter, "suspend_pool",
8✔
190
                    "cannot suspend a pool from itself"));
191
        }
192

193
        return hpx::async(
16✔
194
            [&pool]() -> void { return pool.suspend_direct(throws); });
32✔
195
    }
24✔
196

197
    void suspend_pool_cb(thread_pool_base& pool,
16✔
198
        hpx::function<void(void)> callback, error_code& ec)
199
    {
200
        if (threads::get_self_ptr() && hpx::this_thread::get_pool() == &pool)
16✔
201
        {
202
            HPX_THROWS_IF(ec, bad_parameter, "suspend_pool_cb",
×
203
                "cannot suspend a pool from itself");
204
            return;
×
205
        }
206

207
        auto suspend_direct_wrapper = [&pool, callback = HPX_MOVE(callback)]() {
128✔
208
            pool.suspend_direct(throws);
16✔
209
            callback();
16✔
210
        };
16✔
211

212
        if (threads::get_self_ptr())
16✔
213
        {
214
            hpx::post(HPX_MOVE(suspend_direct_wrapper));
8✔
215
        }
8✔
216
        else
217
        {
218
            std::thread(HPX_MOVE(suspend_direct_wrapper)).detach();
8✔
219
        }
220
    }
16✔
221

222
}}    // namespace hpx::threads
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc