• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #852

17 Dec 2022 04:43PM UTC coverage: 85.912% (-0.7%) from 86.568%
#852

push

StellarBot
Merge #6106

6106: Modernizing modules of levels 0 to 5 r=hkaiser a=hkaiser

- flyby: HPX_FORWARD/HPX_MOVE now expand to std::forward and std::move if those are implemented as builtin functions

working towards #5497

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

87 of 87 new or added lines in 24 files covered. (100.0%)

173152 of 201546 relevant lines covered (85.91%)

1910264.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.23
/libs/core/execution/include/hpx/execution/algorithms/sync_wait.hpp
1
//  Copyright (c) 2020 ETH Zurich
2
//  Copyright (c) 2022 Hartmut Kaiser
3
//  Copyright (c) 2022 Chuanqiu He
4
//
5
//  SPDX-License-Identifier: BSL-1.0
6
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
7
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8

9
#pragma once
10

11
#include <hpx/config.hpp>
12
#include <hpx/assert.hpp>
13
#include <hpx/concepts/concepts.hpp>
14
#include <hpx/datastructures/optional.hpp>
15
#include <hpx/datastructures/tuple.hpp>
16
#include <hpx/datastructures/variant.hpp>
17
#include <hpx/execution/algorithms/detail/inject_scheduler.hpp>
18
#include <hpx/execution/algorithms/detail/partial_algorithm.hpp>
19
#include <hpx/execution/algorithms/detail/single_result.hpp>
20
#include <hpx/execution/algorithms/run_loop.hpp>
21
#include <hpx/execution/queries/get_delegatee_scheduler.hpp>
22
#include <hpx/execution/queries/get_scheduler.hpp>
23
#include <hpx/execution_base/completion_signatures.hpp>
24
#include <hpx/execution_base/operation_state.hpp>
25
#include <hpx/execution_base/receiver.hpp>
26
#include <hpx/execution_base/sender.hpp>
27
#include <hpx/functional/detail/tag_priority_invoke.hpp>
28
#include <hpx/type_support/meta.hpp>
29
#include <hpx/type_support/pack.hpp>
30
#include <hpx/type_support/unused.hpp>
31

32
#include <atomic>
33
#include <exception>
34
#include <mutex>
35
#include <system_error>
36
#include <type_traits>
37
#include <utility>
38

39
namespace hpx::execution::experimental::detail {
40

41
    enum class sync_wait_type
42
    {
43
        single,
44
        variant
45
    };
46

47
    struct sync_wait_error_visitor
48
    {
49
        void operator()(std::exception_ptr ep) const
23✔
50
        {
51
            std::rethrow_exception(HPX_MOVE(ep));
23✔
52
        }
23✔
53

54
        template <typename Error>
55
        void operator()(Error& error) const
56
        {
57
            throw error;
58
        }
59
    };
60

61
    struct sync_wait_receiver_env
62
    {
63
        using scheduler_type =
64
            decltype(std::declval<run_loop>().get_scheduler());
65

66
        scheduler_type sched;
67

68
        friend auto tag_invoke(hpx::execution::experimental::get_scheduler_t,
69
            sync_wait_receiver_env const& env) noexcept -> scheduler_type
70
        {
71
            return env.sched;
72
        }
73

74
        friend auto tag_invoke(
75
            hpx::execution::experimental::get_delegatee_scheduler_t,
76
            sync_wait_receiver_env const& env) noexcept -> scheduler_type
77
        {
78
            return env.sched;
79
        }
80
    };
81

82
    template <typename Pack>
83
    struct make_decayed_pack;
84

85
    template <template <typename...> typename Pack, typename... Ts>
86
    struct make_decayed_pack<Pack<Ts...>>
87
    {
88
        using type = Pack<std::decay_t<Ts>...>;
89
    };
90

91
    template <typename Pack>
92
    using make_decayed_pack_t = typename make_decayed_pack<Pack>::type;
93

94
    ///////////////////////////////////////////////////////////////////////////
95
    template <sync_wait_type Type, typename T>
96
    struct select_result;
97

98
    template <typename T>
99
    struct select_result<sync_wait_type::single, T>
100
    {
101
        using type = hpx::variant<make_decayed_pack_t<single_variant_t<T>>>;
102
    };
103

104
    template <typename T>
105
    struct select_result<sync_wait_type::variant, T>
106
    {
107
        using type = T;
108
    };
109

110
    template <sync_wait_type Type, typename T>
111
    using select_result_t = typename select_result<Type, T>::type;
112

113
    ///////////////////////////////////////////////////////////////////////////
114
    template <typename Sender, sync_wait_type Type>
115
    struct sync_wait_receiver
116
    {
117
        // value and error_types of the predecessor sender
118
        template <template <typename...> class Tuple,
119
            template <typename...> class Variant>
120
        using predecessor_value_types =
121
            value_types_of_t<Sender, sync_wait_receiver_env, Tuple, Variant>;
122

123
        template <template <typename...> class Variant>
124
        using predecessor_error_types =
125
            error_types_of_t<Sender, sync_wait_receiver_env, Variant>;
126

127
        // forcing static_assert ensuring variant has exactly one tuple
128
        //
129
        // FIXME: using make_decayed_pack is a workaround for the impedance
130
        // mismatch between the different techniques we use for calculating
131
        // value_types for a sender. In particular, split() explicitly adds a
132
        // const& to all tuple members in a way that prevent simply passing
133
        // decayed_tuple to predecessor_value_types.
134

135
        // The template should compute the result type of whatever returned from
136
        // sync_wait or sync_wait_with_variant by checking sync_wait_type is
137
        // single or variant
138
        using result_type = select_result_t<Type,
139
            predecessor_value_types<hpx::tuple, hpx::variant>>;
140

141
        // The type of errors to store in the variant. This in itself is a
142
        // variant.
143
        using error_type =
144
            hpx::util::detail::unique_t<hpx::util::detail::prepend_t<
145
                predecessor_error_types<hpx::variant>, std::exception_ptr>>;
146

147
        using stopped_type = hpx::execution::experimental::set_stopped_t;
148

149
        struct shared_state
3,672✔
150
        {
151
            hpx::variant<hpx::monostate, error_type, result_type, stopped_type>
152
                value;
153

154
            auto get_value()
3,672✔
155
            {
156
                if (hpx::holds_alternative<result_type>(value))
3,672✔
157
                {
158
                    // pull the tuple out of the variant and wrap it into an
159
                    // optional, make sure to remove the references
160
                    if constexpr (Type == sync_wait_type::single)
161
                    {
162
                        using single_result_type = make_decayed_pack_t<
163
                            single_variant_t<predecessor_value_types<hpx::tuple,
164
                                meta::pack>>>;
165

166
                        return hpx::optional<single_result_type>(hpx::get<0>(
3,635✔
167
                            hpx::get<result_type>(HPX_MOVE(value))));
3,635✔
168
                    }
169
                    else
170
                    {
171
                        return hpx::optional(
12✔
172
                            hpx::get<result_type>(HPX_MOVE(value)));
12✔
173
                    }
174
                }
175
                else if (hpx::holds_alternative<error_type>(value))
25✔
176
                {
177
                    hpx::visit(
×
178
                        sync_wait_error_visitor{}, hpx::get<error_type>(value));
×
179
                    HPX_UNREACHABLE;
23✔
180
                }
181

182
                // Something went very wrong if this assert fired. Essentially
183
                // this means that none of set_value/set_error/set_stopped was
184
                // called.
185
                HPX_ASSERT(hpx::holds_alternative<stopped_type>(value));
2✔
186
                if constexpr (Type == sync_wait_type::single)
187
                {
188
                    using single_result_type =
189
                        make_decayed_pack_t<single_variant_t<
190
                            predecessor_value_types<hpx::tuple, meta::pack>>>;
191
                    return hpx::optional<single_result_type>();
1✔
192
                }
193
                else
194
                {
195
                    return hpx::optional<result_type>();
1✔
196
                }
197
            }
3,649✔
198
        };
199

200
        shared_state& state;
201
        run_loop& loop;
202

203
        template <typename Error>
204
        friend void tag_invoke(
23✔
205
            set_error_t, sync_wait_receiver&& r, Error&& error) noexcept
206
        {
207
            using error_t = std::decay_t<Error>;
208
            if constexpr (std::is_same_v<error_t, std::exception_ptr>)
209
            {
210
                r.state.value.template emplace<error_type>(
46✔
211
                    HPX_FORWARD(Error, error));
23✔
212
            }
213
            else if constexpr (std::is_same_v<error_t, std::error_code>)
214
            {
215
                r.state.value.template emplace<error_type>(
216
                    std::exception_ptr(std::system_error(error)));
217
            }
218
            else
219
            {
220
                try
221
                {
222
                    throw error;
223
                }
224
                catch (...)
225
                {
226
                    r.state.value.template emplace<error_type>(
227
                        std::current_exception());
228
                }
229
            }
230

231
            r.loop.finish();
23✔
232
        }
23✔
233

234
        friend void tag_invoke(
2✔
235
            set_stopped_t tag, sync_wait_receiver&& r) noexcept
236
        {
237
            r.state.value.template emplace<stopped_type>(tag);
2✔
238
            r.loop.finish();
2✔
239
        }
2✔
240

241
        template <typename... Us>
242
        friend void tag_invoke(
3,647✔
243
            set_value_t, sync_wait_receiver&& r, Us&&... us) noexcept
244
        {
245
            r.state.value.template emplace<result_type>(
7,294✔
246
                hpx::forward_as_tuple(HPX_FORWARD(Us, us)...));
3,647✔
247
            r.loop.finish();
3,647✔
248
        }
3,647✔
249

250
        friend sync_wait_receiver_env tag_invoke(
×
251
            hpx::execution::experimental::get_env_t,
252
            sync_wait_receiver const& r) noexcept
253
        {
254
            return {r.loop.get_scheduler()};
×
255
        }
256
    };
257
}    // namespace hpx::execution::experimental::detail
258

259
namespace hpx::this_thread::experimental {
260

261
    // this_thread::sync_wait is a sender consumer that submits the work
262
    // described by the provided sender for execution, similarly to
263
    // ensure_started, except that it blocks the current std::thread or thread
264
    // of main until the work is completed, and returns an optional tuple of
265
    // values that were sent by the provided sender on its completion of work.
266
    // Where 4.20.1 execution::schedule and 4.20.3 execution::transfer_just are
267
    // meant to enter the domain of senders, sync_wait is meant to exit the
268
    // domain of senders, retrieving the result of the task graph.
269
    //
270
    // If the provided sender sends an error instead of values, sync_wait throws
271
    // that error as an exception, or rethrows the original exception if the
272
    // error is of type std::exception_ptr.
273
    //
274
    // If the provided sender sends the "stopped" signal instead of values,
275
    // sync_wait returns an empty optional.
276
    //
277
    // For an explanation of the requires clause, see 5.8 All senders are typed.
278
    // That clause also explains another sender consumer, built on top of
279
    // sync_wait: sync_wait_with_variant.
280
    //
281
    // Note: This function is specified inside hpx::this_thread::experimental,
282
    // and not inside hpx::execution::experimental. This is because sync_wait
283
    // has to block the current execution agent, but determining what the
284
    // current execution agent is is not reliable. Since the standard does not
285
    // specify any functions on the current execution agent other than those in
286
    // std::this_thread, this is the flavor of this function that is being
287
    // proposed.
288

289
    // this_thread::sync_wait and this_thread::sync_wait_with_variant are used
290
    // to block a current thread until a sender passed into it as an argument
291
    // has completed, and to obtain the values (if any) it completed with.
292
    //
293
    // For any receiver r created by an implementation of sync_wait and
294
    // sync_wait_with_variant, the expressions get_scheduler(get_env(r)) and
295
    // get_delegatee_scheduler(get_env(r)) shall be well-formed. For a receiver
296
    // created by the default implementation of this_thread::sync_wait, these
297
    // expressions shall return a scheduler to the same thread-safe,
298
    // first-in-first-out queue of work such that tasks scheduled to the queue
299
    // execute on the thread of the caller of sync_wait. [Note: The scheduler
300
    // for an instance of execution::run_loop that is a local variable within
301
    // sync_wait is one valid implementation. -- end note]
302
    //
303
    // The templates sync-wait-type and sync-wait-with-variant-type are used to
304
    // determine the return types of this_thread::sync_wait and
305
    // this_thread::sync_wait_with_variant. Let sync-wait-env be the type of the
306
    // expression get_env(r) where r is an instance of the receiver created by
307
    // the default implementation of sync_wait. Then:
308
    //
309
    // template<sender<sync-wait-env> S> using sync-wait-type =
310
    //  optional<execution::value_types_of_t< S, sync-wait-env, decayed-tuple,
311
    //  type_identity_t>>;
312
    //
313
    //  template<sender<sync-wait-env> S> using sync-wait-with-variant-type =
314
    //  optional<execution::into-variant-type<S, sync-wait-env>>;
315
    //
316
    // The name this_thread::sync_wait denotes a customization point object. For
317
    // some subexpression s, let S be decltype((s)). If execution::sender<S,
318
    // sync-wait-env> is false, or the number of the arguments
319
    // completion_signatures_of_t<S, sync-wait-env>::value_types passed into the
320
    // Variant template parameter is not 1, this_thread::sync_wait is
321
    // ill-formed. Otherwise, this_thread::sync_wait is expression-equivalent
322
    // to:
323
    //
324
    // 1. tag_invoke(this_thread::sync_wait,
325
    //          execution::get_completion_scheduler< execution::set_value_t>(s),
326
    //          s), if this expression is valid.
327
    //
328
    //      - Mandates: The type of the tag_invoke expression above is
329
    //                  sync-wait-type<S, sync-wait-env>.
330
    //
331
    // 2. Otherwise, tag_invoke(this_thread::sync_wait, s), if this expression
332
    //    is valid and its type is.
333
    //
334
    //      - Mandates: The type of the tag_invoke expression above is
335
    //                  sync-wait-type<S, sync-wait-env>.
336
    //
337
    // 3. Otherwise:
338
    //
339
    //      1. Constructs a receiver r.
340
    //
341
    //      2. Calls execution::connect(s, r), resulting in an operation state
342
    //         op_state, then calls execution::start(op_state).
343
    //
344
    //      3. Blocks the current thread until a receiver completion-signal of r
345
    //         is called. When it is:
346
    //
347
    //          1. If execution::set_value(r, ts...) has been called, returns
348
    //              sync-wait-type<S, sync-wait-env>{
349
    //                  decayed-tuple<decltype(ts)...>{ts...}}.
350
    //              If that expression exits exceptionally, the exception is
351
    //              propagated to the caller of sync_wait.
352
    //
353
    //          2. If execution::set_error(r, e) has been called, let E be the
354
    //             decayed type of e. If E is exception_ptr, calls
355
    //             std::rethrow_exception(e). Otherwise, if the E is error_code,
356
    //             throws system_error(e). Otherwise, throws e.
357
    //
358
    //          3. If execution::set_stopped(r) has been called, returns
359
    //             sync-wait-type<S, sync-wait-env>{}.
360
    //
361
    inline constexpr struct sync_wait_t final
362
      : hpx::functional::detail::tag_priority<sync_wait_t>
363
    {
364
    private:
365
        // clang-format off
366
        template <typename Sender,
367
            HPX_CONCEPT_REQUIRES_(
368
                hpx::execution::experimental::is_sender_v<Sender,
369
                    hpx::execution::experimental::detail::sync_wait_receiver_env> &&
370
                hpx::execution::experimental::detail::
371
                    is_completion_scheduler_tag_invocable_v<
372
                        hpx::execution::experimental::set_value_t,
373
                        Sender, sync_wait_t
374
                    >
375
            )>
376
        // clang-format on
377
        friend constexpr HPX_FORCEINLINE auto tag_override_invoke(
×
378
            sync_wait_t, Sender&& sender)
379
        {
380
            auto scheduler =
381
                hpx::execution::experimental::get_completion_scheduler<
×
382
                    hpx::execution::experimental::set_value_t>(sender);
×
383

384
            return hpx::functional::tag_invoke(sync_wait_t{},
×
385
                HPX_MOVE(scheduler), HPX_FORWARD(Sender, sender));
×
386
        }
387

388
        // clang-format off
389
        template <typename Sender,
390
            HPX_CONCEPT_REQUIRES_(
391
                hpx::execution::experimental::is_sender_v<Sender,
392
                    hpx::execution::experimental::detail::sync_wait_receiver_env>
393
            )>
394
        // clang-format on
395
        friend auto tag_invoke(sync_wait_t,
×
396
            hpx::execution::experimental::run_loop_scheduler const& sched,
397
            Sender&& sender)
398
        {
399
            using hpx::execution::experimental::detail::sync_wait_type;
400
            using receiver_type =
401
                hpx::execution::experimental::detail::sync_wait_receiver<Sender,
402
                    sync_wait_type::single>;
403
            using state_type = typename receiver_type::shared_state;
404

405
            hpx::execution::experimental::run_loop& loop = sched.get_run_loop();
×
406
            state_type state{};
×
407
            auto op_state = hpx::execution::experimental::connect(
×
408
                HPX_FORWARD(Sender, sender), receiver_type{state, loop});
×
409
            hpx::execution::experimental::start(op_state);
×
410

411
            // Wait for the variant to be filled in.
412
            loop.run();
×
413

414
            return state.get_value();
×
415
        }
×
416

417
        // clang-format off
418
        template <typename Sender,
419
            HPX_CONCEPT_REQUIRES_(
420
                hpx::execution::experimental::is_sender_v<Sender,
421
                    hpx::execution::experimental::detail::sync_wait_receiver_env>
422
            )>
423
        // clang-format on
424
        friend HPX_FORCEINLINE auto tag_fallback_invoke(
3,658✔
425
            sync_wait_t, Sender&& sender)
426
        {
427
            using hpx::execution::experimental::detail::sync_wait_type;
428
            using receiver_type =
429
                hpx::execution::experimental::detail::sync_wait_receiver<Sender,
430
                    sync_wait_type::single>;
431
            using state_type = typename receiver_type::shared_state;
432

433
            hpx::execution::experimental::run_loop loop{};
3,658✔
434
            state_type state{};
3,658✔
435
            auto op_state = hpx::execution::experimental::connect(
3,658✔
436
                HPX_FORWARD(Sender, sender), receiver_type{state, loop});
3,658✔
437
            hpx::execution::experimental::start(op_state);
3,658✔
438

439
            // Wait for the variant to be filled in.
440
            loop.run();
3,658✔
441

442
            return state.get_value();
3,658✔
443
        }
3,658✔
444

445
        // clang-format off
446
        template <typename Scheduler,
447
            HPX_CONCEPT_REQUIRES_(
448
                hpx::execution::experimental::is_scheduler_v<Scheduler>
449
            )>
450
        // clang-format on
451
        friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
×
452
            sync_wait_t, Scheduler&& scheduler)
453
        {
454
            return hpx::execution::experimental::detail::inject_scheduler<
×
455
                sync_wait_t, Scheduler>{HPX_FORWARD(Scheduler, scheduler)};
×
456
        }
457

458
        friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(sync_wait_t)
219✔
459
        {
460
            return hpx::execution::experimental::detail::partial_algorithm<
219✔
461
                sync_wait_t>{};
462
        }
463
    } sync_wait{};
464

465
    ////////////////////////////////////////////////////////////////////
466
    // CPO for sync_wait_with_variant
467

468
    // this_thread::sync_wait_with_variant is a sender consumer that submits
469
    // the work described by the provided sender for execution, similarly to
470
    // ensure_started, except that it blocks the current std::thread or
471
    // thread of main until the work is completed, and returns an optional
472
    // of variant of tuples that were sent by the provided sender on its
473
    // completion of work.
474
    inline constexpr struct sync_wait_with_variant_t final
475
      : hpx::functional::detail::tag_priority<sync_wait_with_variant_t>
476
    {
477
    private:
478
        // clang-format off
479
        template <typename Sender,
480
            HPX_CONCEPT_REQUIRES_(
481
                hpx::execution::experimental::is_sender_v<Sender> &&
482
                hpx::execution::experimental::detail::
483
                    is_completion_scheduler_tag_invocable_v<
484
                        hpx::execution::experimental::set_value_t,
485
                        Sender, sync_wait_with_variant_t
486
                    >
487
            )>
488
        // clang-format on
489
        friend constexpr HPX_FORCEINLINE auto tag_override_invoke(
490
            sync_wait_with_variant_t, Sender&& sender)
491
        {
492
            auto scheduler =
493
                hpx::execution::experimental::get_completion_scheduler<
494
                    hpx::execution::experimental::set_value_t>(sender);
495

496
            return hpx::functional::tag_invoke(sync_wait_with_variant_t{},
497
                HPX_MOVE(scheduler), HPX_FORWARD(Sender, sender));
498
        }
499

500
        // clang-format off
501
        template <typename Sender,
502
            HPX_CONCEPT_REQUIRES_(
503
                hpx::execution::experimental::is_sender_v<Sender>
504
            )>
505
        // clang-format on
506
        friend auto tag_invoke(sync_wait_with_variant_t,
507
            hpx::execution::experimental::run_loop_scheduler const& sched,
508
            Sender&& sender)
509
        {
510
            using hpx::execution::experimental::detail::sync_wait_type;
511
            using receiver_type =
512
                hpx::execution::experimental::detail::sync_wait_receiver<Sender,
513
                    sync_wait_type::variant>;
514
            using state_type = typename receiver_type::shared_state;
515

516
            hpx::execution::experimental::run_loop& loop = sched.get_run_loop();
517
            state_type state{};
518
            auto op_state = hpx::execution::experimental::connect(
519
                HPX_FORWARD(Sender, sender), receiver_type{state, loop});
520
            hpx::execution::experimental::start(op_state);
521

522
            // Wait for the variant to be filled in.
523
            loop.run();
524

525
            return state.get_value();
526
        }
527

528
        // clang-format off
529
        template <typename Sender,
530
            HPX_CONCEPT_REQUIRES_(
531
                hpx::execution::experimental::is_sender_v<Sender>
532
            )>
533
        // clang-format on
534
        friend HPX_FORCEINLINE auto tag_fallback_invoke(
14✔
535
            sync_wait_with_variant_t, Sender&& sender)
536
        {
537
            using hpx::execution::experimental::detail::sync_wait_type;
538
            using receiver_type =
539
                hpx::execution::experimental::detail::sync_wait_receiver<Sender,
540
                    sync_wait_type::variant>;
541
            using state_type = typename receiver_type::shared_state;
542

543
            hpx::execution::experimental::run_loop loop{};
14✔
544
            state_type state{};
14✔
545
            auto op_state = hpx::execution::experimental::connect(
14✔
546
                HPX_FORWARD(Sender, sender), receiver_type{state, loop});
14✔
547
            hpx::execution::experimental::start(op_state);
14✔
548

549
            // Wait for the variant to be filled in.
550
            loop.run();
14✔
551

552
            return state.get_value();
14✔
553
        }
14✔
554

555
        // clang-format off
556
        template <typename Scheduler,
557
            HPX_CONCEPT_REQUIRES_(
558
                hpx::execution::experimental::is_scheduler_v<Scheduler>
559
            )>
560
        // clang-format on
561
        friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
562
            sync_wait_with_variant_t, Scheduler&& scheduler)
563
        {
564
            return hpx::execution::experimental::detail::inject_scheduler<
565
                sync_wait_with_variant_t, Scheduler>{
566
                HPX_FORWARD(Scheduler, scheduler)};
567
        }
568

569
        friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
5✔
570
            sync_wait_with_variant_t)
571
        {
572
            return hpx::execution::experimental::detail::partial_algorithm<
5✔
573
                sync_wait_with_variant_t>{};
574
        }
575
    } sync_wait_with_variant{};
576
}    // namespace hpx::this_thread::experimental
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc