• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

4.17
/libs/core/thread_pool_util/src/thread_pool_suspension_helpers.cpp
1
//  Copyright (c) 2019 Mikael Simberg
2
//  Copyright (c) 2024 Hartmut Kaiser
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
#include <hpx/modules/async_local.hpp>
9
#include <hpx/modules/errors.hpp>
10
#include <hpx/modules/executors.hpp>
11
#include <hpx/modules/functional.hpp>
12
#include <hpx/modules/futures.hpp>
13
#include <hpx/modules/threading_base.hpp>
14
#include <hpx/thread_pool_util/thread_pool_suspension_helpers.hpp>
15

16
#include <cstddef>
17
#include <utility>
18

19
namespace hpx::threads {
20

21
    hpx::future<void> resume_processing_unit(
×
22
        thread_pool_base& pool, std::size_t virt_core)
23
    {
24
        if (!threads::get_self_ptr())
×
25
        {
26
            HPX_THROW_EXCEPTION(hpx::error::invalid_status,
×
27
                "resume_processing_unit",
28
                "cannot call resume_processing_unit from outside HPX, use"
29
                "resume_processing_unit_cb instead");
30
        }
31
        if (!pool.get_scheduler()->has_scheduler_mode(
×
32
                policies::scheduler_mode::enable_elasticity, virt_core))
33
        {
34
            return hpx::make_exceptional_future<void>(HPX_GET_EXCEPTION(
×
35
                hpx::error::invalid_status, "resume_processing_unit",
36
                "this thread pool does not support suspending "
37
                "processing units"));
×
38
        }
39

40
        return hpx::async([&pool, virt_core]() -> void {
×
41
            return pool.resume_processing_unit_direct(virt_core, throws);
×
42
        });
43
    }
44

45
    void resume_processing_unit_cb(thread_pool_base& pool,
×
46
        hpx::function<void()> callback, std::size_t virt_core, error_code& ec)
47
    {
48
        if (!pool.get_scheduler()->has_scheduler_mode(
49
                policies::scheduler_mode::enable_elasticity, virt_core))
×
50
        {
51
            HPX_THROWS_IF(ec, hpx::error::invalid_status,
52
                "resume_processing_unit_cb",
×
53
                "this thread pool does not support suspending "
54
                "processing units");
55
            return;
56
        }
×
57

58
        auto resume_direct_wrapper = [&pool, virt_core,
59
                                         callback = HPX_MOVE(callback)]() {
×
60
            pool.resume_processing_unit_direct(virt_core, throws);
×
61
            callback();
×
62
        };
63

×
64
        if (threads::get_self_ptr())
65
        {
×
66
            hpx::post(HPX_MOVE(resume_direct_wrapper));
67
        }
68
        else
69
        {
70
            std::thread(HPX_MOVE(resume_direct_wrapper)).detach();
71
        }
×
72
    }
73

74
    hpx::future<void> suspend_processing_unit(
75
        thread_pool_base& pool, std::size_t virt_core)
×
76
    {
77
        if (!threads::get_self_ptr())
78
        {
×
79
            HPX_THROW_EXCEPTION(hpx::error::invalid_status,
80
                "suspend_processing_unit",
×
81
                "cannot call suspend_processing_unit from outside HPX, use"
82
                "suspend_processing_unit_cb instead");
83
        }
84
        if (!pool.get_scheduler()->has_scheduler_mode(
85
                policies::scheduler_mode::enable_elasticity, virt_core))
×
86
        {
87
            return hpx::make_exceptional_future<void>(HPX_GET_EXCEPTION(
88
                hpx::error::invalid_status, "suspend_processing_unit",
×
89
                "this thread pool does not support suspending "
90
                "processing units"));
91
        }
×
92
        if (!pool.get_scheduler()->has_scheduler_mode(
93
                policies::scheduler_mode::enable_stealing, virt_core) &&
×
94
            hpx::this_thread::get_pool() == &pool)
×
95
        {
×
96
            return hpx::make_exceptional_future<void>(HPX_GET_EXCEPTION(
97
                hpx::error::invalid_status, "suspend_processing_unit",
×
98
                "this thread pool does not support suspending "
99
                "processing units from itself (no thread stealing)"));
100
        }
×
101

102
        return hpx::async([&pool, virt_core]() -> void {
103
            return pool.suspend_processing_unit_direct(virt_core, throws);
×
104
        });
×
105
    }
106

107
    void suspend_processing_unit_cb(thread_pool_base& pool,
108
        hpx::function<void()> callback, std::size_t virt_core, error_code& ec)
×
109
    {
110
        if (!pool.get_scheduler()->has_scheduler_mode(
111
                policies::scheduler_mode::enable_elasticity, virt_core))
112
        {
×
113
            HPX_THROWS_IF(ec, hpx::error::invalid_status,
114
                "suspend_processing_unit_cb",
115
                "this thread pool does not support suspending processing "
×
116
                "units");
117
            return;
118
        }
119

×
120
        auto suspend_direct_wrapper = [&pool, virt_core,
121
                                          callback = HPX_MOVE(callback)]() {
122
            pool.suspend_processing_unit_direct(virt_core, throws);
×
123
            callback();
×
124
        };
×
125

126
        if (threads::get_self_ptr())
×
127
        {
128
            if (!pool.get_scheduler()->has_scheduler_mode(
×
129
                    policies::scheduler_mode::enable_stealing, virt_core) &&
130
                hpx::this_thread::get_pool() == &pool)
×
131
            {
×
132
                HPX_THROW_EXCEPTION(hpx::error::invalid_status,
×
133
                    "suspend_processing_unit_cb",
134
                    "this thread pool does not support suspending "
×
135
                    "processing units from itself (no thread stealing)");
136
            }
137

138
            hpx::post(HPX_MOVE(suspend_direct_wrapper));
139
        }
140
        else
141
        {
142
            std::thread(HPX_MOVE(suspend_direct_wrapper)).detach();
143
        }
144
    }
×
145

146
    future<void> resume_pool(thread_pool_base& pool)
147
    {
148
        if (!threads::get_self_ptr())
29✔
149
        {
150
            HPX_THROW_EXCEPTION(hpx::error::invalid_status, "resume_pool",
29✔
151
                "cannot call resume_pool from outside HPX, use resume_pool_cb "
152
                "or the member function resume_direct instead");
×
153
        }
154

155
        return hpx::async(
156
            [&pool]() -> void { return pool.resume_direct(throws); });
157
    }
158

159
    void resume_pool_cb(thread_pool_base& pool, hpx::function<void()> callback,
29✔
160
        error_code& /* ec */)
161
    {
162
        auto resume_direct_wrapper =
×
163
            [&pool, callback = HPX_MOVE(callback)]() -> void {
164
            pool.resume_direct(throws);
165
            callback();
166
        };
×
167

×
168
        if (threads::get_self_ptr())
169
        {
×
170
            hpx::post(HPX_MOVE(resume_direct_wrapper));
171
        }
×
172
        else
173
        {
174
            std::thread(HPX_MOVE(resume_direct_wrapper)).detach();
175
        }
176
    }
177

×
178
    future<void> suspend_pool(thread_pool_base& pool)
179
    {
×
180
        if (!threads::get_self_ptr())
181
        {
×
182
            HPX_THROW_EXCEPTION(hpx::error::invalid_status, "suspend_pool",
183
                "cannot call suspend_pool from outside HPX, use "
×
184
                "suspend_pool_cb or the member function suspend_direct "
185
                "instead");
×
186
        }
187
        if (hpx::this_thread::get_pool() == &pool)
188
        {
189
            return hpx::make_exceptional_future<void>(
190
                HPX_GET_EXCEPTION(hpx::error::bad_parameter, "suspend_pool",
191
                    "cannot suspend a pool from itself"));
×
192
        }
193

194
        return hpx::async(
×
195
            [&pool]() -> void { return pool.suspend_direct(throws); });
×
196
    }
197

198
    void suspend_pool_cb(
199
        thread_pool_base& pool, hpx::function<void()> callback, error_code& ec)
×
200
    {
201
        if (threads::get_self_ptr() && hpx::this_thread::get_pool() == &pool)
202
        {
×
203
            HPX_THROWS_IF(ec, hpx::error::bad_parameter, "suspend_pool_cb",
204
                "cannot suspend a pool from itself");
205
            return;
×
206
        }
207

×
208
        auto suspend_direct_wrapper = [&pool, callback = HPX_MOVE(callback)]() {
209
            pool.suspend_direct(throws);
×
210
            callback();
211
        };
212

×
213
        if (threads::get_self_ptr())
×
214
        {
215
            hpx::post(HPX_MOVE(suspend_direct_wrapper));
×
216
        }
217
        else
×
218
        {
219
            std::thread(HPX_MOVE(suspend_direct_wrapper)).detach();
220
        }
221
    }
222
}    // namespace hpx::threads
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc