• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #853

19 Dec 2022 01:01AM UTC coverage: 86.287% (+0.4%) from 85.912%
#853

push

StellarBot
Merge #6109

6109: Modernize serialization module r=hkaiser a=hkaiser

- flyby separate serialization of Boost types

working towards https://github.com/STEllAR-GROUP/hpx/issues/5497

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

53 of 53 new or added lines in 6 files covered. (100.0%)

173939 of 201582 relevant lines covered (86.29%)

1931657.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.46
/libs/core/futures/src/future_data.cpp
1
//  Copyright (c) 2015-2022 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#include <hpx/futures/detail/future_data.hpp>
8
#include <hpx/futures/future.hpp>
9

10
#include <hpx/config.hpp>
11
#include <hpx/async_base/launch_policy.hpp>
12
#include <hpx/errors/try_catch_exception_ptr.hpp>
13
#include <hpx/execution_base/this_thread.hpp>
14
#include <hpx/functional/deferred_call.hpp>
15
#include <hpx/functional/move_only_function.hpp>
16
#include <hpx/futures/futures_factory.hpp>
17
#include <hpx/modules/errors.hpp>
18
#include <hpx/modules/memory.hpp>
19
#include <hpx/threading_base/annotated_function.hpp>
20

21
#include <cstddef>
22
#include <exception>
23
#include <functional>
24
#include <mutex>
25
#include <utility>
26

27
namespace hpx { namespace lcos { namespace detail {
28

29
    static run_on_completed_error_handler_type run_on_completed_error_handler;
1,249✔
30

31
    void set_run_on_completed_error_handler(
1,219✔
32
        run_on_completed_error_handler_type f)
33
    {
34
        run_on_completed_error_handler = f;
1,219✔
35
    }
1,219✔
36

37
    future_data_refcnt_base::~future_data_refcnt_base() = default;
4,699,664✔
38

39
    ///////////////////////////////////////////////////////////////////////////
40
    struct handle_continuation_recursion_count
41
    {
42
        handle_continuation_recursion_count() noexcept
43
          : count_(threads::get_continuation_recursion_count())
44
        {
45
            ++count_;
46
        }
47
        ~handle_continuation_recursion_count()
48
        {
49
            --count_;
50
        }
51

52
        std::size_t& count_;
53
    };
54

55
    ///////////////////////////////////////////////////////////////////////////
56
    template <typename Callback>
57
    static void run_on_completed_on_new_thread(Callback&& f)
223✔
58
    {
59
        lcos::local::futures_factory<void()> p(HPX_FORWARD(Callback, f));
223✔
60

61
        bool is_hpx_thread = nullptr != hpx::threads::get_self_ptr();
223✔
62
        hpx::launch policy = launch::fork;
223✔
63
        if (!is_hpx_thread)
223✔
64
        {
65
            policy = launch::async;
197✔
66
        }
197✔
67

68
        policy.set_priority(threads::thread_priority::boost);
223✔
69
        policy.set_stacksize(threads::thread_stacksize::current);
223✔
70

71
        // launch a new thread executing the given function
72
        threads::thread_id_ref_type tid =
73
            p.post("run_on_completed_on_new_thread", policy);
223✔
74

75
        // wait for the task to run
76
        if (is_hpx_thread)
223✔
77
        {
78
            // make sure this thread is executed last
79
            this_thread::suspend(
26✔
80
                threads::thread_schedule_state::pending, tid.noref());
26✔
81
            return p.get_future().get();
26✔
82
        }
83

84
        // If we are not on a HPX thread, we need to return immediately, to
85
        // allow the newly spawned thread to execute.
86
    }
223✔
87

88
    ///////////////////////////////////////////////////////////////////////////
89
    future_data_base<traits::detail::future_data_void>::~future_data_base() =
4,699,464✔
90
        default;
91

92
    static util::unused_type unused_;
93

94
    util::unused_type*
95
    future_data_base<traits::detail::future_data_void>::get_result_void(
3,206,701✔
96
        void const* storage, error_code& ec)
97
    {
98
        // yields control if needed
99
        state s = wait(ec);
3,206,678✔
100
        if (ec)
3,206,678✔
101
        {
102
            return nullptr;
×
103
        }
104

105
        // No locking is required. Once a future has been made ready, which
106
        // is a postcondition of wait, either:
107
        //
108
        // - there is only one writer (future), or
109
        // - there are multiple readers only (shared_future, lock hurts
110
        //   concurrency)
111

112
        // Avoid retrieving state twice. If wait() returns 'empty' then this
113
        // thread was suspended, in this case we need to load it again.
114
        if (s == empty)
3,206,678✔
115
        {
116
            s = state_.load(std::memory_order_relaxed);
×
117
        }
×
118

119
        if (s == value)
3,206,680✔
120
        {
121
            return &unused_;
3,186,053✔
122
        }
123

124
        if (s == empty)
20,620✔
125
        {
126
            // the value has already been moved out of this future
127
            HPX_THROWS_IF(ec, hpx::error::no_state,
15,124✔
128
                "future_data_base::get_result",
129
                "this future has no valid shared state");
130
            return nullptr;
×
131
        }
132

133
        // the thread has been re-activated by one of the actions
134
        // supported by this promise (see promise::set_event
135
        // and promise::set_exception).
136
        if (s == exception)
20,620✔
137
        {
138
            std::exception_ptr const* exception_ptr =
20,620✔
139
                static_cast<std::exception_ptr const*>(storage);
20,620✔
140

141
            // an error has been reported in the meantime, throw or set
142
            // the error code
143
            if (&ec == &throws)
20,620✔
144
            {
145
                std::rethrow_exception(*exception_ptr);
15,124✔
146
                // never reached
147
            }
148
            else
149
            {
150
                ec = make_error_code(*exception_ptr);
5,496✔
151
            }
152
        }
5,496✔
153

154
        return nullptr;
5,496✔
155
    }
3,206,673✔
156

157
    // deferred execution of a given continuation
158
    void future_data_base<traits::detail::future_data_void>::run_on_completed(
3,272,074✔
159
        completed_callback_type&& on_completed) noexcept
160
    {
161
        hpx::detail::try_catch_exception_ptr(
3,272,074✔
162
            [&]() {
6,544,145✔
163
                hpx::scoped_annotation annotate(on_completed);
3,272,103✔
164
                on_completed();
3,272,103✔
165
            },
3,272,103✔
166
            [&](std::exception_ptr ep) {
×
167
                // If the completion handler throws an exception, there's nothing
168
                // we can do, report the exception and terminate.
169
                if (run_on_completed_error_handler)
×
170
                {
171
                    run_on_completed_error_handler(HPX_MOVE(ep));
×
172
                }
×
173
                else
174
                {
175
                    std::terminate();
×
176
                }
177
            });
×
178
    }
3,272,075✔
179

180
    void future_data_base<traits::detail::future_data_void>::run_on_completed(
2,531,607✔
181
        completed_callback_vector_type&& on_completed) noexcept
182
    {
183
        for (auto&& func : on_completed)
5,449,973✔
184
        {
185
            run_on_completed(HPX_MOVE(func));
2,918,366✔
186
        }
187
    }
2,531,635✔
188

189
    // make sure continuation invocation does not recurse deeper than
190
    // allowed
191
    template <typename Callback>
192
    void
193
    future_data_base<traits::detail::future_data_void>::handle_on_completed(
2,885,376✔
194
        Callback&& on_completed)
195
    {
196
        // We need to run the completion on a new thread if we are on a
197
        // non HPX thread.
198
#if defined(HPX_HAVE_THREADS_GET_STACK_POINTER)
199
        bool recurse_asynchronously =
2,885,346✔
200
            !this_thread::has_sufficient_stack_space();
2,885,346✔
201
#else
202
        handle_continuation_recursion_count cnt;
203
        bool recurse_asynchronously =
204
            cnt.count_ > HPX_CONTINUATION_MAX_RECURSION_DEPTH ||
205
            (hpx::threads::get_self_ptr() == nullptr);
206
#endif
207
        if (!recurse_asynchronously)
2,885,346✔
208
        {
209
            // directly execute continuation on this thread
210
            run_on_completed(HPX_FORWARD(Callback, on_completed));
2,885,143✔
211
        }
2,885,143✔
212
        else
213
        {
214
            // re-spawn continuation on a new thread
215

216
            hpx::detail::try_catch_exception_ptr(
223✔
217
                [&]() {
446✔
218
                    constexpr void (*p)(Callback &&) =
223✔
219
                        &future_data_base::run_on_completed;
220
                    run_on_completed_on_new_thread(util::deferred_call(
223✔
221
                        p, HPX_FORWARD(Callback, on_completed)));
223✔
222
                },
223✔
223
                [&](std::exception_ptr ep) {
×
224
                    // If an exception while creating the new task or inside the
225
                    // completion handler is thrown, there is nothing we can do...
226
                    // ... but terminate and report the error
227
                    if (run_on_completed_error_handler)
×
228
                    {
229
                        run_on_completed_error_handler(HPX_MOVE(ep));
×
230
                    }
×
231
                    else
232
                    {
233
                        std::rethrow_exception(HPX_MOVE(ep));
×
234
                    }
235
                });
×
236
        }
237
    }
2,885,367✔
238

239
    // We need only one explicit instantiation here as the second version
240
    // (single callback) is implicitly instantiated below.
241
    using completed_callback_vector_type =
242
        future_data_refcnt_base::completed_callback_vector_type;
243

244
    template HPX_CORE_EXPORT void
245
    future_data_base<traits::detail::future_data_void>::handle_on_completed<
246
        completed_callback_vector_type>(completed_callback_vector_type&&);
247

248
    /// Set the callback which needs to be invoked when the future becomes
249
    /// ready. If the future is ready the function will be invoked
250
    /// immediately.
251
    void future_data_base<traits::detail::future_data_void>::set_on_completed(
3,272,160✔
252
        completed_callback_type data_sink)
253
    {
254
        if (!data_sink)
3,272,138✔
255
            return;
×
256

257
        if (is_ready(std::memory_order_relaxed))
3,272,112✔
258
        {
259
            // invoke the callback (continuation) function right away
260
            handle_on_completed(HPX_MOVE(data_sink));
353,014✔
261
        }
353,014✔
262
        else
263
        {
264
            std::unique_lock l(mtx_);
2,919,050✔
265
            if (is_ready())
2,919,050✔
266
            {
267
                l.unlock();
717✔
268

269
                // invoke the callback (continuation) function
270
                handle_on_completed(HPX_MOVE(data_sink));
717✔
271
            }
717✔
272
            else
273
            {
274
                on_completed_.push_back(HPX_MOVE(data_sink));
2,918,360✔
275
            }
276
        }
2,919,077✔
277
    }
3,272,081✔
278

279
    future_data_base<traits::detail::future_data_void>::state
280
    future_data_base<traits::detail::future_data_void>::wait(error_code& ec)
3,890,534✔
281
    {
282
        // block if this entry is empty
283
        state s = state_.load(std::memory_order_acquire);
3,890,544✔
284
        if (s == empty)
3,890,544✔
285
        {
286
            std::unique_lock l(mtx_);
922,288✔
287
            s = state_.load(std::memory_order_relaxed);
922,288✔
288
            if (s == empty)
922,288✔
289
            {
290
                cond_.wait(l, "future_data_base::wait", ec);
922,260✔
291
                if (ec)
922,260✔
292
                {
293
                    return s;
×
294
                }
295

296
                // reload the state, it's not empty anymore
297
                s = state_.load(std::memory_order_relaxed);
922,260✔
298
            }
922,260✔
299
        }
922,297✔
300

301
        if (&ec != &throws)
3,890,538✔
302
        {
303
            ec = make_success_code();
37,293✔
304
        }
37,293✔
305
        return s;
3,890,531✔
306
    }
3,890,531✔
307

308
    hpx::future_status
309
    future_data_base<traits::detail::future_data_void>::wait_until(
21✔
310
        std::chrono::steady_clock::time_point const& abs_time, error_code& ec)
311
    {
312
        // block if this entry is empty
313
        if (state_.load(std::memory_order_acquire) == empty)
21✔
314
        {
315
            std::unique_lock l(mtx_);
19✔
316
            if (state_.load(std::memory_order_relaxed) == empty)
19✔
317
            {
318
                threads::thread_restart_state const reason = cond_.wait_until(
38✔
319
                    l, abs_time, "future_data_base::wait_until", ec);
19✔
320
                if (ec)
19✔
321
                {
322
                    return hpx::future_status::uninitialized;
×
323
                }
324

325
                if (reason == threads::thread_restart_state::timeout &&
19✔
326
                    state_.load(std::memory_order_acquire) == empty)
8✔
327
                {
328
                    return hpx::future_status::timeout;
8✔
329
                }
330
            }
11✔
331
        }
19✔
332

333
        if (&ec != &throws)
13✔
334
        {
335
            ec = make_success_code();
×
336
        }
×
337
        return hpx::future_status::ready;    //-V110
13✔
338
    }
21✔
339
}}}    // namespace hpx::lcos::detail
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc