• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.67
/libs/core/synchronization/src/detail/condition_variable.cpp
1
//  Copyright (c) 2007-2025 Hartmut Kaiser
2
//  Copyright (c) 2013-2015 Agustin Berge
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
#include <hpx/assert.hpp>
9
#include <hpx/modules/errors.hpp>
10
#include <hpx/modules/execution_base.hpp>
11
#include <hpx/modules/logging.hpp>
12
#include <hpx/modules/memory.hpp>
13
#include <hpx/modules/thread_support.hpp>
14
#include <hpx/modules/threading_base.hpp>
15
#include <hpx/modules/timing.hpp>
16
#include <hpx/modules/type_support.hpp>
17
#include <hpx/synchronization/detail/condition_variable.hpp>
18
#include <hpx/synchronization/no_mutex.hpp>
19
#include <hpx/synchronization/spinlock.hpp>
20

21
#include <atomic>
22
#include <cstddef>
23
#include <exception>
24
#include <mutex>
25
#include <utility>
26

27
namespace hpx::lcos::local::detail {
28

29
    ///////////////////////////////////////////////////////////////////////////
30
    struct condition_variable::queue_entry
31
    {
32
        constexpr queue_entry(
33
            hpx::execution_base::agent_ref ctx, void* q) noexcept
34
          : ctx_(ctx)
35
          , q_(q)
27,770✔
36
        {
27,770✔
37
        }
38

39
        hpx::execution_base::agent_ref ctx_;
40
        void* q_;
41

42
        queue_entry* next = nullptr;
43
        queue_entry* prev = nullptr;
44
    };
45

46
    struct condition_variable::reset_queue_entry
47
    {
48
        explicit constexpr reset_queue_entry(
49
            condition_variable::queue_entry& e) noexcept
50
          : e_(e)
51
        {
27,770✔
52
        }
53

54
        reset_queue_entry(reset_queue_entry const&) = delete;
55
        reset_queue_entry(reset_queue_entry&&) = delete;
27,770✔
56
        reset_queue_entry& operator=(reset_queue_entry const&) = delete;
57
        reset_queue_entry& operator=(reset_queue_entry&&) = delete;
27,770✔
58

59
        ~reset_queue_entry()
×
60
        {
61
            if (e_.ctx_)
62
            {
63
                auto* q = static_cast<condition_variable::queue_type*>(e_.q_);
27,770✔
64
                q->erase(&e_);    // remove entry from queue
65
            }
66
        }
67

68
        condition_variable::queue_entry& e_;
69
    };
56,698✔
70

71
    ///////////////////////////////////////////////////////////////////////////
56,698✔
72
    condition_variable::~condition_variable()
73
    {
×
74
        if (!queue_.empty())
75
        {
76
            LERR_(fatal).format(
77
                "~condition_variable: queue is not empty, aborting threads");
78

79
            hpx::no_mutex no_mtx;
80
            std::unique_lock<hpx::no_mutex> lock(no_mtx);
81

82
            // Failing to release lock 'no_mtx' in function
83
#if defined(HPX_MSVC)
84
#pragma warning(push)
×
85
#pragma warning(disable : 26115)
86
#endif
87
            abort_all<hpx::no_mutex>(HPX_MOVE(lock));
88
#if defined(HPX_MSVC)
89
#pragma warning(pop)
56,698✔
90
#endif
91
        }
×
92
    }
93

94
    bool condition_variable::empty(
95
        [[maybe_unused]] std::unique_lock<mutex_type>& lock) const noexcept
×
96
    {
97
        HPX_ASSERT_OWNS_LOCK(lock);
98
        return queue_.empty();
×
99
    }
100

101
    std::size_t condition_variable::size(
102
        [[maybe_unused]] std::unique_lock<mutex_type>& lock) const noexcept
×
103
    {
104
        HPX_ASSERT_OWNS_LOCK(lock);
105
        return queue_.size();
106
    }
107

93,845✔
108
    // Return false if no more threads are waiting (returns true if queue
109
    // is non-empty).
110
    bool condition_variable::notify_one(std::unique_lock<mutex_type>& lock,
111
        threads::thread_priority priority, bool unlock, error_code& ec)
112
    {
113
        // Caller failing to hold lock 'lock' before calling function
114
#if defined(HPX_MSVC)
115
#pragma warning(push)
116
#pragma warning(disable : 26110)
117
#endif
118

93,845✔
119
        HPX_ASSERT_OWNS_LOCK(lock);
120

27,673✔
121
        if (!queue_.empty())
122
        {
123
            auto const ctx = queue_.front()->ctx_;
124

125
            // remove item from queue before error handling
126
            queue_.front()->ctx_.reset();
27,673✔
127
            queue_.pop_front();
128

×
129
            if (HPX_UNLIKELY(!ctx))
130
            {
×
131
                lock.unlock();
132

133
                HPX_THROWS_IF(ec, hpx::error::null_thread_id,
×
134
                    "condition_variable::notify_one",
135
                    "null thread id encountered");
136
                return false;
27,673✔
137
            }
27,673✔
138

139
            bool const not_empty = !queue_.empty();
27,673✔
140
            if (unlock)
141
                lock.unlock();
27,673✔
142

143
            ctx.resume(priority);
144

66,172✔
145
            return not_empty;
×
146
        }
147

148
        if (&ec != &throws)
149
            ec = make_success_code();
150

151
        if (unlock)
152
            lock.unlock();
153

154
        return false;
158✔
155

156
#if defined(HPX_MSVC)
157
#pragma warning(pop)
158
#endif
159
    }
160

161
    void condition_variable::notify_all(std::unique_lock<mutex_type>& lock,
162
        threads::thread_priority priority, bool unlock, error_code& ec)
163
    {
164
        // Caller failing to hold lock 'lock' before calling function
165
#if defined(HPX_MSVC)
166
#pragma warning(push)
167
#pragma warning(disable : 26110)
168
#endif
158✔
169
        HPX_ASSERT_OWNS_LOCK(lock);
170

171
        // swap the list
172
        queue_type queue;
173
        queue.swap(queue_);
174

175
        if (!queue.empty())
176
        {
177
            // update reference to queue for all queue entries
178
            for (queue_entry* qe = queue_.front(); qe != nullptr; qe = qe->next)
97✔
179
            {
180
                qe->q_ = &queue;    //-V506
181
            }
182

183
            do
184
            {
97✔
185
                auto ctx = queue.front()->ctx_;
186

×
187
                // remove item from queue before error handling
×
188
                queue.front()->ctx_.reset();
189
                queue.pop_front();
×
190

191
                if (HPX_UNLIKELY(!ctx))
192
                {
×
193
                    prepend_entries(lock, queue);
194
                    lock.unlock();
195

196
                    HPX_THROWS_IF(ec, hpx::error::null_thread_id,
197
                        "condition_variable::notify_all",
198
                        "null thread id encountered");
97✔
199
                    return;
200
                }
97✔
201

202
                [[maybe_unused]] util::ignore_while_checking const il(&lock);
203

158✔
204
                ctx.resume(priority);
×
205

206
            } while (!queue.empty());
207
        }
208

209
        if (&ec != &throws)
210
            ec = make_success_code();
211

×
212
        if (unlock)
213
            lock.unlock();
214

215
#if defined(HPX_MSVC)
×
216
#pragma warning(pop)
×
217
#endif
218
    }
27,770✔
219

220
    void condition_variable::abort_all(std::unique_lock<mutex_type> lock)
221
    {
222
        HPX_ASSERT_OWNS_LOCK(lock);
223

224
        abort_all<mutex_type>(HPX_MOVE(lock));
225
    }
27,770✔
226

27,770✔
227
    threads::thread_restart_state condition_variable::wait(
228
        std::unique_lock<mutex_type>& lock, char const* /* description */,
229
        error_code& /* ec */)
230
    {
231
        HPX_ASSERT_OWNS_LOCK(lock);
232

233
        // enqueue the request and block this thread
27,770✔
234
        auto const this_ctx = hpx::execution_base::this_thread::agent();
235
        queue_entry f(this_ctx, &queue_);
236
        queue_.push_back(f);
27,770✔
237

27,770✔
238
        reset_queue_entry r(f);
27,770✔
239
        {
240
            // suspend this thread
×
241
            unlock_guard<std::unique_lock<mutex_type>> ul(lock);
242
            this_ctx.suspend();
243
        }
244

245
        return f.ctx_ ? threads::thread_restart_state::timeout :
246
                        threads::thread_restart_state::signaled;
247
    }
248

×
249
    threads::thread_restart_state condition_variable::wait_until(
×
250
        std::unique_lock<mutex_type>& lock,
251
        hpx::chrono::steady_time_point const& abs_time,
252
        char const* /* description */, error_code& /* ec */)
253
    {
254
        HPX_ASSERT_OWNS_LOCK(lock);
255

256
        // enqueue the request and block this thread
257
        auto this_ctx = hpx::execution_base::this_thread::agent();
258
        queue_entry f(this_ctx, &queue_);
259
        queue_.push_back(f);
×
260

×
261
        reset_queue_entry r(f);
×
262
        {
263
            // suspend this thread
264
            unlock_guard<std::unique_lock<mutex_type>> ul(lock);
×
265
            this_ctx.sleep_until(abs_time.value());
266
        }
267

×
268
        return f.ctx_ ? threads::thread_restart_state::timeout :
269
                        threads::thread_restart_state::signaled;
270
    }
271

272
    template <typename Mutex>
273
    void condition_variable::abort_all(std::unique_lock<Mutex> lock)
274
    {
275
        // new threads might have been added while we were notifying
276
        while (!queue_.empty())
277
        {
278
            // swap the list
279
            queue_type queue;
×
280
            queue.swap(queue_);
281

×
282
            // update reference to queue for all queue entries
283
            for (queue_entry* qe = queue_.front(); qe != nullptr; qe = qe->next)
284
            {
285
                qe->q_ = &queue;    //-V506
286
            }
287

×
288
            while (!queue.empty())
289
            {
×
290
                auto ctx = queue.front()->ctx_;
291

×
292
                // remove item from queue before error handling
293
                queue.front()->ctx_.reset();
294
                queue.pop_front();
×
295

296
                if (HPX_UNLIKELY(!ctx))
297
                {
298
                    LERR_(fatal).format("condition_variable::abort_all: null "
299
                                        "thread id encountered");
300
                    continue;
301
                }
×
302

303
                LERR_(fatal).format(
304
                    "condition_variable::abort_all: pending thread: {}", ctx);
×
305

306
                // unlock while notifying thread as this can suspend
307
                unlock_guard<std::unique_lock<Mutex>> unlock(lock);
×
308

309
                // forcefully abort thread, do not throw
310
                ctx.abort();
311
            }
312
        }
313
    }
314

×
315
    // re-add the remaining items to the original queue
316
    void condition_variable::prepend_entries(
317
        [[maybe_unused]] std::unique_lock<mutex_type>& lock,
114✔
318
        queue_type& queue) noexcept
319
    {
320
        HPX_ASSERT_OWNS_LOCK(lock);
114✔
321
        queue.splice(queue_);
322
        queue_.swap(queue);
119✔
323
    }
324

119✔
325
    ///////////////////////////////////////////////////////////////////////////
326
    void intrusive_ptr_add_ref(condition_variable_data* p) noexcept
10✔
327
    {
328
        p->count_.increment();
119✔
329
    }
330

331
    void intrusive_ptr_release(condition_variable_data* p) noexcept
332
    {
333
        if (0 == p->count_.decrement())
334
        {
335
            // The thread that decrements the reference count to zero must
336
            // perform an acquire to ensure that it doesn't start destructing
337
            // the object until all previous writes have drained.
338
            std::atomic_thread_fence(std::memory_order_acquire);
339

340
            delete p;
341
        }
342
    }
343
}    // namespace hpx::lcos::local::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc