• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.09
/libs/core/futures/src/future_data.cpp
1
//  Copyright (c) 2015-2025 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#include <hpx/config.hpp>
8
#include <hpx/assert.hpp>
9
#include <hpx/futures/detail/execute_thread.hpp>
10
#include <hpx/futures/detail/future_data.hpp>
11
#include <hpx/futures/future.hpp>
12
#include <hpx/futures/futures_factory.hpp>
13
#include <hpx/modules/async_base.hpp>
14
#include <hpx/modules/errors.hpp>
15
#include <hpx/modules/execution_base.hpp>
16
#include <hpx/modules/functional.hpp>
17
#include <hpx/modules/logging.hpp>
18
#include <hpx/modules/memory.hpp>
19

20
#include <cstddef>
21
#include <exception>
22
#include <mutex>
23
#include <utility>
24

25
namespace hpx::lcos::detail {
26

27
    namespace {
28
        run_on_completed_error_handler_type run_on_completed_error_handler;
29
    }
30

31
    void set_run_on_completed_error_handler(
32
        run_on_completed_error_handler_type f)
33
    {
64✔
34
        run_on_completed_error_handler = HPX_MOVE(f);
35
    }
36

37
    future_data_refcnt_base::~future_data_refcnt_base() = default;
64✔
38

39
    ///////////////////////////////////////////////////////////////////////////
46,446✔
40
    struct handle_continuation_recursion_count
41
    {
42
        handle_continuation_recursion_count() = default;
43

44
        std::size_t increment()
45
        {
46
            HPX_ASSERT(count_ == nullptr);
47
            count_ = &threads::get_continuation_recursion_count();
48
            return ++*count_;
49
        }
50

51
        handle_continuation_recursion_count(
52
            handle_continuation_recursion_count const&) = delete;
53
        handle_continuation_recursion_count(
54
            handle_continuation_recursion_count&&) = delete;
55
        handle_continuation_recursion_count& operator=(
56
            handle_continuation_recursion_count const&) = delete;
57
        handle_continuation_recursion_count& operator=(
58
            handle_continuation_recursion_count&&) = delete;
59

60
        ~handle_continuation_recursion_count()
61
        {
62
            if (count_ != nullptr)
63
            {
64
                --*count_;
65
            }
66
        }
67

68
    private:
69
        std::size_t* count_ = nullptr;
×
70
    };
71

72
    ///////////////////////////////////////////////////////////////////////////
73
    template <typename Callback>
×
74
    void run_on_completed_on_new_thread(Callback&& f)
×
75
    {
×
76
        lcos::local::futures_factory<void()> p(HPX_FORWARD(Callback, f));
77

×
78
        HPX_ASSERT(nullptr != hpx::threads::get_self_ptr());
79
        hpx::launch policy = launch::fork;
80

81
        policy.set_priority(threads::thread_priority::boost);
82
        policy.set_stacksize(threads::thread_stacksize::current);
83

84
        // launch a new thread executing the given function
×
85
        threads::thread_id_ref_type const tid =    //-V821
86
            p.post("run_on_completed_on_new_thread", policy);
87

88
        // make sure this thread is executed last
×
89
        this_thread::suspend(
90
            threads::thread_schedule_state::pending, tid.noref());
91

×
92
        // wait for the task to run
93
        return p.get_future().get();
×
94
    }
95

96
    ///////////////////////////////////////////////////////////////////////////
97
    future_data_base<traits::detail::future_data_void>::~future_data_base()
98
    {
×
99
        if (runs_child_ != threads::invalid_thread_id)
100
        {
101
            [[maybe_unused]] auto* thrd = get_thread_id_data(runs_child_);
46,446✔
102
            LTM_(debug).format(
103
                "task_object::~task_object({}), description({}): "
46,446✔
104
                "destroy runs_as_child thread",
105
                thrd, thrd->get_description());
×
106

107
            runs_child_ = threads::invalid_thread_id;
×
108
        }
109
    }
110

×
111
    // try to performed scoped execution of the associated thread (if any)
112
    bool future_data_base<traits::detail::future_data_void>::execute_thread()
113
    {
114
        // we try to directly execute the thread exactly once
46,446✔
115
        threads::thread_id_ref_type runs_child = runs_child_;
116
        if (!runs_child)
117
        {
85,661✔
118
            return false;
119
        }
120

121
        auto const state = this->state_.load(std::memory_order_acquire);
85,661✔
122
        if (state != future_data_base::empty)
123
        {
124
            return false;
125
        }
126

127
        // this thread would block on the future
×
128
        [[maybe_unused]] auto* thrd = get_thread_id_data(runs_child);
129

130
        LTM_(debug).format("task_object::get_result_void: attempting to "
131
                           "directly execute child({}), description({})",
132
            thrd, thrd->get_description());
133

134
        if (threads::detail::execute_thread(HPX_MOVE(runs_child)))
×
135
        {
136
            // don't try running this twice
137
            runs_child_.reset();
×
138

139
            LTM_(debug).format("task_object::get_result_void: successfully "
×
140
                               "directly executed child({}), description({})",
141
                thrd, thrd->get_description());
×
142

143
            // thread terminated, mark as being destroyed
144
            HPX_ASSERT(thrd->get_state().state() ==
145
                threads::thread_schedule_state::deleted);
146

×
147
            return true;
148
        }
×
149

150
        LTM_(debug).format("task_object::get_result_void: failed to "
151
                           "directly execute child({}), description({})",
152
            thrd, thrd->get_description());
153

154
        return false;
×
155
    }
156

157
    util::unused_type*
×
158
    future_data_base<traits::detail::future_data_void>::get_result_void(
159
        void const* storage, error_code& ec)
×
160
    {
161
        // yields control if needed
162
        state s = wait(ec);
163
        if (ec)
164
        {
165
            return nullptr;
166
        }
167

46,296✔
168
        // No locking is required. Once a future has been made ready, which is a
169
        // postcondition of wait, either:
170
        //
171
        // - there is only one writer (future), or
46,296✔
172
        // - there are multiple readers only (shared_future, lock hurts
46,296✔
173
        //   concurrency)
174

175
        // Avoid retrieving state twice. If wait() returns 'empty' then this
176
        // thread was suspended, in this case we need to load it again.
177
        if (s == empty)
178
        {
179
            s = state_.load(std::memory_order_relaxed);
180
        }
181

182
        if (s == value)
183
        {
184
            static util::unused_type unused_;
185
            return &unused_;
186
        }
46,296✔
187

188
        if (s == empty)
189
        {
190
            // the value has already been moved out of this future
191
            HPX_THROWS_IF(ec, hpx::error::no_state,
46,296✔
192
                "future_data_base::get_result",
193
                "this future has no valid shared state");
194
            return nullptr;
195
        }
196

2✔
197
        // the thread has been re-activated by one of the actions supported by
198
        // this promise (see promise::set_event and promise::set_exception).
199
        if (s == exception)
×
200
        {
201
            auto const* exception_ptr =
202
                static_cast<std::exception_ptr const*>(storage);
×
203

204
            // an error has been reported in the meantime, throw or set the
205
            // error code
206
            if (&ec == &throws)
207
            {
2✔
208
                std::rethrow_exception(*exception_ptr);
209
                // never reached
210
            }
211

212
            ec = make_error_code(*exception_ptr);
213
        }
214

2✔
215
        return nullptr;
216
    }
1✔
217

218
    // deferred execution of a given continuation
219
    void future_data_base<traits::detail::future_data_void>::run_on_completed(
220
        completed_callback_type&& on_completed) noexcept
2✔
221
    {
222
        hpx::detail::try_catch_exception_ptr(
223
            [&]() {
224
                hpx::scoped_annotation annotate(on_completed);
225
                HPX_MOVE(on_completed)();
226
            },
227
            [&](std::exception_ptr const& ep) {
43,208✔
228
                // If the completion handler throws an exception, there's
229
                // nothing we can do, report the exception and terminate.
230
                if (run_on_completed_error_handler)
231
                {
43,208✔
232
                    run_on_completed_error_handler(ep);
43,208✔
233
                }
234
                else
43,208✔
235
                {
×
236
                    std::terminate();
237
                }
238
            });
×
239
    }
240

241
    void future_data_base<traits::detail::future_data_void>::run_on_completed(
242
        completed_callback_vector_type&& on_completed) noexcept
243
    {
244
        for (auto&& func : HPX_MOVE(on_completed))
×
245
        {
246
            run_on_completed(HPX_MOVE(func));
×
247
        }
43,208✔
248
    }
249

42,365✔
250
    // make sure continuation invocation does not recurse deeper than allowed
251
    template <typename Callback>
252
    void handle_on_completed_impl(Callback&& on_completed)
84,996✔
253
    {
254
        // We need to run the completion on a new thread if we are on a non HPX
42,631✔
255
        // thread.
256
        bool const is_hpx_thread = nullptr != hpx::threads::get_self_ptr();
42,365✔
257
        bool recurse_asynchronously = false;
258

259
        handle_continuation_recursion_count cnt;
260
        if (is_hpx_thread)
261
        {
42,365✔
262
#if defined(HPX_HAVE_THREADS_GET_STACK_POINTER)
263
            recurse_asynchronously = !this_thread::has_sufficient_stack_space();
264
#else
265
            recurse_asynchronously =
266
                cnt.increment() > HPX_CONTINUATION_MAX_RECURSION_DEPTH;
267
#endif
268
        }
42,365✔
269

270
        using future_data_base =
271
            future_data_base<traits::detail::future_data_void>;
272

273
        if (!is_hpx_thread || !recurse_asynchronously)
274
        {
275
            // directly execute continuation on this thread
42,365✔
276
            future_data_base::run_on_completed(
277
                HPX_FORWARD(Callback, on_completed));
278
        }
42,365✔
279
        else
280
        {
281
            // re-spawn continuation on a new thread
282
            hpx::detail::try_catch_exception_ptr(
283
                [&]() {
284
                    // clang-format off
×
285
                    constexpr void (*p)(std::decay_t<Callback>&&) noexcept =
286
                        &future_data_base::run_on_completed;
287
                    // clang-format on
288
                    run_on_completed_on_new_thread(util::deferred_call(
289
                        p, HPX_FORWARD(Callback, on_completed)));
×
290
                },
291
                [&](std::exception_ptr const& ep) {
292
                    // If an exception while creating the new task or inside the
×
293
                    // completion handler is thrown, there is nothing we can do...
294
                    // ... but terminate and report the error
295
                    if (run_on_completed_error_handler)
296
                    {
×
297
                        run_on_completed_error_handler(ep);
298
                    }
299
                    else
300
                    {
301
                        std::rethrow_exception(ep);
302
                    }
×
303
                });
304
        }
305
    }
306

42,365✔
307
    void handle_on_completed(
308
        future_data_refcnt_base::completed_callback_type&& on_completed)
309
    {
310
        handle_on_completed_impl(HPX_MOVE(on_completed));
311
    }
312

313
    void handle_on_completed(
314
        future_data_refcnt_base::completed_callback_vector_type&& on_completed)
315
    {
316
        handle_on_completed_impl(HPX_MOVE(on_completed));
317
    }
318

319
    // Set the callback which needs to be invoked when the future becomes ready.
43,208✔
320
    // If the future is ready the function will be invoked immediately.
321
    void future_data_base<traits::detail::future_data_void>::set_on_completed(
322
        completed_callback_type&& data_sink)
43,208✔
323
    {
324
        if (!data_sink)
325
            return;
43,208✔
326

327
        hpx::intrusive_ptr<future_data_base> this_(this);    // keep alive
328
        if (is_ready(std::memory_order_relaxed))
577✔
329
        {
330
            // invoke the callback (continuation) function right away
331
            handle_on_completed_impl(HPX_MOVE(data_sink));
332
        }
333
        else
334
        {
42,631✔
335
            std::unique_lock l(mtx_);
42,631✔
336
            if (is_ready())
337
            {
×
338
                l.unlock();
339

340
                // invoke the callback (continuation) function
×
341
                handle_on_completed_impl(HPX_MOVE(data_sink));
342
            }
343
            else
344
            {
42,631✔
345
                on_completed_.push_back(HPX_MOVE(data_sink));
346
            }
347
        }
348
    }
349

350
    future_data_base<traits::detail::future_data_void>::state
46,664✔
351
    future_data_base<traits::detail::future_data_void>::wait(error_code& ec)
352
    {
353
        // block if this entry is empty
354
        state s = state_.load(std::memory_order_acquire);
46,664✔
355
        if (s == empty)
356
        {
357
            hpx::intrusive_ptr<future_data_base> this_(this);    // keep alive
358

27,582✔
359
            std::unique_lock l(mtx_);
360
            s = state_.load(std::memory_order_relaxed);
27,582✔
361
            if (s == empty)
362
            {
27,572✔
363
                cond_.wait(l, "future_data_base::wait", ec);
27,572✔
364
                if (ec)
365
                {
366
                    return s;
367
                }
368

369
                // reload the state, it's not empty anymore
370
                s = state_.load(std::memory_order_relaxed);
371
            }
372
        }
373

46,664✔
374
        if (&ec != &throws)
375
        {
2,128✔
376
            ec = make_success_code();
377
        }
378
        return s;
379
    }
380

381
    hpx::future_status
×
382
    future_data_base<traits::detail::future_data_void>::wait_until(
383
        std::chrono::steady_clock::time_point const& abs_time, error_code& ec)
384
    {
385
        // block if this entry is empty
×
386
        if (state_.load(std::memory_order_acquire) == empty)
387
        {
388
            hpx::intrusive_ptr<future_data_base> this_(this);    // keep alive
389

×
390
            std::unique_lock l(mtx_);
×
391
            if (state_.load(std::memory_order_relaxed) == empty)
392
            {
×
393
                threads::thread_restart_state const reason = cond_.wait_until(
394
                    l, abs_time, "future_data_base::wait_until", ec);
×
395
                if (ec)
396
                {
397
                    return hpx::future_status::uninitialized;
398
                }
399

×
400
                if (reason == threads::thread_restart_state::timeout &&
401
                    state_.load(std::memory_order_acquire) == empty)
402
                {
403
                    return hpx::future_status::timeout;
404
                }
405
            }
406
        }
407

×
408
        if (&ec != &throws)
409
        {
×
410
            ec = make_success_code();
411
        }
412
        return hpx::future_status::ready;    //-V110
413
    }
414
}    // namespace hpx::lcos::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc