• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/libs/core/synchronization/src/stop_token.cpp
1
//  Copyright (c) 2020 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#include <hpx/config.hpp>
8
#include <hpx/assert.hpp>
9
#include <hpx/modules/execution_base.hpp>
10
#include <hpx/modules/thread_support.hpp>
11
#include <hpx/synchronization/mutex.hpp>
12
#include <hpx/synchronization/stop_token.hpp>
13

14
#include <atomic>
15
#include <cstddef>
16
#include <cstdint>
17
#include <mutex>
18

19
namespace hpx::detail {
20

21
    ///////////////////////////////////////////////////////////////////////////
22
    void intrusive_ptr_add_ref(stop_state* p) noexcept
×
23
    {
24
        p->state_.fetch_add(
25
            stop_state::token_ref_increment, std::memory_order_relaxed);
26
    }
×
27

28
    void intrusive_ptr_release(stop_state* p) noexcept
×
29
    {
30
        std::uint64_t const old_state = p->state_.fetch_sub(
31
            stop_state::token_ref_increment, std::memory_order_release);
32

33
        if ((old_state & stop_state::token_ref_mask) ==
×
34
            stop_state::token_ref_increment)
35
        {
36
            // The thread that decrements the reference count to zero must
×
37
            // perform an acquire to ensure that it doesn't start destructing
38
            // the object until all previous writes have drained.
×
39
            std::atomic_thread_fence(std::memory_order_acquire);
40

41
            delete p;
×
42
        }
43
    }
44

×
45
    ///////////////////////////////////////////////////////////////////////////
×
46
    void stop_callback_base::add_this_callback(
47
        stop_callback_base*& callbacks) noexcept
×
48
    {
49
        next_ = callbacks;
×
50
        if (next_ != nullptr)
×
51
        {
×
52
            next_->prev_ = &next_;
53
        }
54
        prev_ = &callbacks;
×
55
        callbacks = this;
56
    }
×
57

58
    // returns true if the callback was successfully removed
59
    bool stop_callback_base::remove_this_callback() const noexcept
×
60
    {
×
61
        if (prev_ != nullptr)
62
        {
×
63
            // Still registered, not yet executed: just remove from the list.
64
            *prev_ = next_;
×
65
            if (next_ != nullptr)
66
            {
67
                next_->prev_ = prev_;
68
            }
69
            return true;
70
        }
×
71
        return false;
72
    }
73

74
    ///////////////////////////////////////////////////////////////////////////
75
    void stop_state::lock() noexcept
×
76
    {
77
        auto old_state = state_.load(std::memory_order_relaxed);
×
78
        do
79
        {
80
            for (std::size_t k = 0; is_locked(old_state); ++k)
81
            {
×
82
                hpx::execution_base::this_thread::yield_k(
83
                    k, "stop_state::lock");
84
                old_state = state_.load(std::memory_order_relaxed);
×
85
            }
86
        } while (!state_.compare_exchange_weak(old_state,
87
            old_state | stop_state::locked_flag, std::memory_order_acquire,
×
88
            std::memory_order_relaxed));
89
    }
90

91
    ///////////////////////////////////////////////////////////////////////////
×
92
    bool stop_state::lock_and_request_stop() noexcept
93
    {
94
        std::uint64_t old_state = state_.load(std::memory_order_acquire);
95

96
        if (stop_requested(old_state))
×
97
            return false;
98

×
99
        do
100
        {
101
            for (std::size_t k = 0; is_locked(old_state); ++k)
102
            {
×
103
                hpx::execution_base::this_thread::yield_k(
104
                    k, "stop_state::lock_and_request_stop");
105
                old_state = state_.load(std::memory_order_acquire);
×
106

107
                if (stop_requested(old_state))
108
                    return false;
109
            }
110
        } while (!state_.compare_exchange_weak(old_state,
111
            old_state | stop_state::stop_requested_flag |
112
                stop_state::locked_flag,
113
            std::memory_order_acquire, std::memory_order_relaxed));
×
114
        return true;
115
    }
116

117
    ///////////////////////////////////////////////////////////////////////////
×
118
    bool stop_state::lock_if_not_stopped(stop_callback_base* cb) noexcept
119
    {
×
120
        std::uint64_t old_state = state_.load(std::memory_order_acquire);
121

122
        if (stop_requested(old_state))
×
123
        {
124
            cb->execute();
125
            cb->callback_finished_executing_.store(
126
                true, std::memory_order_release);
127
            return false;
128
        }
129
        else if (!stop_possible(old_state))
130
        {
131
            return false;
×
132
        }
133

×
134
        do
135
        {
136
            for (std::size_t k = 0; is_locked(old_state); ++k)
137
            {
×
138
                hpx::execution_base::this_thread::yield_k(
139
                    k, "stop_state::add_callback");
×
140
                old_state = state_.load(std::memory_order_acquire);
141

142
                if (stop_requested(old_state))
×
143
                {
144
                    cb->execute();
145
                    cb->callback_finished_executing_.store(
146
                        true, std::memory_order_release);
147
                    return false;
148
                }
149
                else if (!stop_possible(old_state))
×
150
                {
151
                    return false;
152
                }
153
            }
154
        } while (!state_.compare_exchange_weak(old_state,
155
            old_state | stop_state::locked_flag, std::memory_order_acquire,
156
            std::memory_order_relaxed));
157

158
        return true;
159
    }
160

161
    ///////////////////////////////////////////////////////////////////////////
162
    struct scoped_lock_if_not_stopped
×
163
    {
164
        scoped_lock_if_not_stopped(
165
            stop_state& state, stop_callback_base* cb) noexcept
166
          : state_(state)
167
          , has_lock_(state_.lock_if_not_stopped(cb))
×
168
        {
169
        }
170
        ~scoped_lock_if_not_stopped()
171
        {
172
            if (has_lock_)
173
                state_.unlock();
174
        }
175

176
        explicit operator bool() const noexcept
177
        {
178
            return has_lock_;
179
        }
180

×
181
        stop_state& state_;
182
        bool has_lock_;
183
    };
×
184

185
    bool stop_state::add_callback(stop_callback_base* cb) noexcept
186
    {
187
        scoped_lock_if_not_stopped const l(*this, cb);
×
188
        if (!l)
×
189
            return false;
190

191
        // Push callback onto callback list
192
        cb->add_this_callback(callbacks_);
×
193
        return true;
194
    }
195

196
    ///////////////////////////////////////////////////////////////////////////
197
    void stop_state::remove_callback(stop_callback_base const* cb) noexcept
198
    {
199
        {
200
            std::lock_guard<stop_state> l(*this);
201
            if (cb->remove_this_callback())
202
                return;
×
203
        }
204

205
        // Callback has either already executed or is executing concurrently
206
        // on another thread.
×
207
        if (signalling_thread_ == hpx::threads::get_self_id())
208
        {
209
            // Callback executed on this thread or is still currently executing
210
            // and is unregistering itself from within the callback.
211
            if (cb->is_removed_ != nullptr)
×
212
            {
213
                // Currently inside the callback, let the request_stop() method
214
                // know the object is about to be destructed and that it should
215
                // not try to access the object when the callback returns.
216
                *cb->is_removed_ = true;
217
            }
218
        }
×
219
        else
220
        {
221
            // Callback is currently executing on another thread,
222
            // block until it finishes executing.
×
223
            for (std::size_t k = 0; !cb->callback_finished_executing_.load(
224
                     std::memory_order_relaxed);
225
                ++k)
226
            {
227
                hpx::execution_base::this_thread::yield_k(
228
                    k, "stop_state::remove_callback");
229
            }
230
        }
231
    }
232

233
    ///////////////////////////////////////////////////////////////////////////
×
234
    struct scoped_lock_and_request_stop
235
    {
236
        explicit scoped_lock_and_request_stop(stop_state& state) noexcept
237
          : state_(state)
238
          , has_lock_(state_.lock_and_request_stop())
×
239
        {
240
        }
241
        ~scoped_lock_and_request_stop()
242
        {
243
            if (has_lock_)
244
                state_.unlock();
245
        }
246

247
        scoped_lock_and_request_stop(
248
            scoped_lock_and_request_stop const&) = delete;
249
        scoped_lock_and_request_stop(scoped_lock_and_request_stop&&) = delete;
250
        scoped_lock_and_request_stop& operator=(
251
            scoped_lock_and_request_stop const&) = delete;
252
        scoped_lock_and_request_stop& operator=(
253
            scoped_lock_and_request_stop&&) = delete;
254

255
        explicit operator bool() const noexcept
256
        {
×
257
            return has_lock_;
258
        }
259

260
        void unlock() const noexcept
×
261
        {
262
            state_.unlock();
263
        }
264

265
    private:
266
        stop_state& state_;
×
267
        bool has_lock_;
268
    };
269

×
270
    bool stop_state::request_stop() noexcept
271
    {
272
        // Set the 'stop_requested' signal and acquired the lock.
273
        scoped_lock_and_request_stop const l(*this);
×
274
        if (!l)
275
            return false;    // stop has already been requested.
×
276

×
277
        HPX_ASSERT_LOCKED(
278
            l, stop_requested(state_.load(std::memory_order_acquire)));
279

×
280
        signalling_thread_ = hpx::threads::get_self_id();
281

×
282
        // invoke registered callbacks
×
283
        while (callbacks_ != nullptr)
284
        {
285
            // Dequeue the head of the queue
286
            auto* cb = callbacks_;
287
            callbacks_ = cb->next_;
288

×
289
            if (callbacks_ != nullptr)
290
                callbacks_->prev_ = &callbacks_;
291

×
292
            // Mark this item as removed from the list.
293
            cb->prev_ = nullptr;
×
294

295
            bool is_removed = false;
296
            cb->is_removed_ = &is_removed;    //-V506
297

298
            {
299
                // Don't hold lock while executing callback so we don't
300
                // block other threads from unregistering callbacks.
301
                unlock_guard ul(*this);
302
                cb->execute();
303
            }
304

305
            if (!is_removed)
306
            {
307
                cb->is_removed_ = nullptr;
308
                cb->callback_finished_executing_.store(
309
                    true, std::memory_order_release);
310
            }
311
        }
312

313
        return true;
314
    }
315
}    // namespace hpx::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc