• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #864

12 Jan 2023 03:29PM UTC coverage: 86.354% (-0.2%) from 86.533%
#864

push

web-flow
Merge pull request #6133 from STEllAR-GROUP/background_scheduler

Adding abridged static scheduler that supports running background threads only

263 of 263 new or added lines in 4 files covered. (100.0%)

174319 of 201865 relevant lines covered (86.35%)

1939452.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.86
/libs/core/execution/include/hpx/execution/algorithms/split.hpp
1
//  Copyright (c) 2021 ETH Zurich
2
//  Copyright (c) 2022 Hartmut Kaiser
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
#pragma once
9

10
#include <hpx/config.hpp>
11
#include <hpx/allocator_support/allocator_deleter.hpp>
12
#include <hpx/allocator_support/internal_allocator.hpp>
13
#include <hpx/allocator_support/traits/is_allocator.hpp>
14
#include <hpx/assert.hpp>
15
#include <hpx/concepts/concepts.hpp>
16
#include <hpx/datastructures/detail/small_vector.hpp>
17
#include <hpx/datastructures/tuple.hpp>
18
#include <hpx/datastructures/variant.hpp>
19
#include <hpx/errors/try_catch_exception_ptr.hpp>
20
#include <hpx/execution/algorithms/detail/inject_scheduler.hpp>
21
#include <hpx/execution/algorithms/detail/partial_algorithm.hpp>
22
#include <hpx/execution/algorithms/detail/single_result.hpp>
23
#include <hpx/execution/algorithms/run_loop.hpp>
24
#include <hpx/execution_base/completion_scheduler.hpp>
25
#include <hpx/execution_base/completion_signatures.hpp>
26
#include <hpx/execution_base/operation_state.hpp>
27
#include <hpx/execution_base/receiver.hpp>
28
#include <hpx/execution_base/sender.hpp>
29
#include <hpx/functional/bind_front.hpp>
30
#include <hpx/functional/detail/tag_priority_invoke.hpp>
31
#include <hpx/functional/invoke_fused.hpp>
32
#include <hpx/functional/move_only_function.hpp>
33
#include <hpx/modules/memory.hpp>
34
#include <hpx/synchronization/spinlock.hpp>
35
#include <hpx/thread_support/atomic_count.hpp>
36
#include <hpx/type_support/meta.hpp>
37
#include <hpx/type_support/pack.hpp>
38

39
#include <atomic>
40
#include <cstddef>
41
#include <exception>
42
#include <memory>
43
#include <mutex>
44
#include <type_traits>
45
#include <utility>
46

47
namespace hpx::execution::experimental {
48

49
    namespace detail {
50

51
        enum class submission_type
52
        {
53
            eager,
54
            lazy
55
        };
56

57
        template <typename Receiver>
58
        struct error_visitor
×
59
        {
60
            HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver;
61

62
            template <typename Error>
63
            void operator()(Error const& error) noexcept
4✔
64
            {
65
                // FIXME: check whether it is ok to move the receiver
66
                hpx::execution::experimental::set_error(
4✔
67
                    HPX_MOVE(receiver), error);
4✔
68
            }
4✔
69
        };
70

71
        template <typename Receiver>
72
        struct value_visitor
×
73
        {
74
            HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver;
75

76
            template <typename Ts>
77
            void operator()(Ts const& ts) noexcept
66✔
78
            {
79
                // FIXME: check whether it is ok to move the receiver
80
                hpx::invoke_fused(
66✔
81
                    hpx::bind_front(hpx::execution::experimental::set_value,
66✔
82
                        HPX_MOVE(receiver)),
66✔
83
                    ts);
66✔
84
            }
66✔
85
        };
86

87
        template <typename Sender, typename Allocator, submission_type Type,
88
            typename Scheduler = no_scheduler>
89
        struct split_sender
165✔
90
        {
91
            HPX_NO_UNIQUE_ADDRESS std::decay_t<Scheduler> scheduler;
92

93
            template <typename Tuple>
94
            struct value_types_helper
95
            {
96
                using const_type =
97
                    hpx::util::detail::transform_t<Tuple, std::add_const>;
98
                using type = hpx::util::detail::transform_t<const_type,
99
                    std::add_lvalue_reference>;
100
            };
101

102
            template <typename Env>
103
            struct generate_completion_signatures
104
            {
105
                template <template <typename...> typename Tuple,
106
                    template <typename...> typename Variant>
107
                using value_types = hpx::util::detail::transform_t<
108
                    value_types_of_t<Sender, Env, Tuple, Variant>,
109
                    value_types_helper>;
110

111
                template <template <typename...> typename Variant>
112
                using error_types =
113
                    hpx::util::detail::unique_t<hpx::util::detail::prepend_t<
114
                        error_types_of_t<Sender, Env, Variant>,
115
                        std::exception_ptr>>;
116

117
                static constexpr bool sends_stopped = true;
118
            };
119

120
            template <typename Env>
121
            friend auto tag_invoke(
122
                get_completion_signatures_t, split_sender const&, Env)
123
                -> generate_completion_signatures<Env>;
124

125
            // clang-format off
126
            template <typename CPO, typename Scheduler_ = Scheduler,
127
                HPX_CONCEPT_REQUIRES_(
128
                    hpx::execution::experimental::is_scheduler_v<Scheduler_> &&
129
                    is_receiver_cpo_v<std::decay_t<CPO>>
130
                )>
131
            // clang-format on
132
            friend constexpr auto tag_invoke(
12✔
133
                hpx::execution::experimental::get_completion_scheduler_t<CPO>,
134
                split_sender const& sender)
135
            {
136
                return sender.scheduler;
12✔
137
            }
138

139
            // TODO: add forwarding_sender_query
140

141
            struct shared_state
142
            {
143
                struct split_receiver;
308✔
144

145
                using allocator_type = typename std::allocator_traits<
146
                    Allocator>::template rebind_alloc<shared_state>;
147
                HPX_NO_UNIQUE_ADDRESS allocator_type alloc;
148

149
                hpx::spinlock mtx;
150
                hpx::util::atomic_count reference_count{0};
48✔
151
                std::atomic<bool> start_called{false};
48✔
152
                std::atomic<bool> predecessor_done{false};
48✔
153

154
                using operation_state_type =
155
                    std::decay_t<connect_result_t<Sender, split_receiver>>;
156
                operation_state_type os;
157

158
                using signatures = generate_completion_signatures<empty_env>;
159

160
                struct stopped_type
161
                {
162
                };
163
                using value_type = value_types_of_t<Sender, empty_env,
164
                    decayed_tuple, hpx::variant>;
165
                using error_type = detail::error_types_from<signatures,
166
                    meta::func<hpx::variant>>;
167

168
                hpx::variant<hpx::monostate, stopped_type, error_type,
169
                    value_type>
170
                    v;
171

172
                using continuation_type = hpx::move_only_function<void()>;
173
                hpx::detail::small_vector<continuation_type, 1> continuations;
174

175
                struct split_receiver
176
                {
177
                    hpx::intrusive_ptr<shared_state> state;
178

179
                    template <typename Error>
180
                    friend void tag_invoke(
4✔
181
                        set_error_t, split_receiver&& r, Error&& error) noexcept
182
                    {
183
                        HPX_MOVE(r).set_error(HPX_FORWARD(Error, error));
4✔
184
                    }
4✔
185

186
                    template <typename Error>
187
                    void set_error(Error&& error) && noexcept
4✔
188
                    {
189
                        try
190
                        {
191
                            state->v.template emplace<error_type>(
8✔
192
                                error_type(HPX_FORWARD(Error, error)));
4✔
193
                        }
4✔
194
                        catch (...)
195
                        {
196
                            // no way of reporting this error
197
                            std::terminate();
×
198
                        }
199
                        state->set_predecessor_done();
4✔
200
                        state.reset();
4✔
201
                    }
4✔
202

203
                    friend void tag_invoke(
×
204
                        set_stopped_t, split_receiver&& r) noexcept
205
                    {
206
                        if (r.state)
×
207
                        {
208
                            r.state->v.template emplace<stopped_type>();
×
209
                            r.state->set_predecessor_done();
×
210
                            r.state.reset();
×
211
                        }
×
212
                    };
×
213

214
                    // This typedef is duplicated from the parent struct. The
215
                    // parent typedef is not instantiated early enough for use
216
                    // here.
217
                    using value_type = value_types_of_t<Sender, empty_env,
218
                        decayed_tuple, hpx::variant>;
219

220
                    // different versions of clang-format disagree
221
                    // clang-format off
222
                    template <typename... Ts>
223
                    friend auto tag_invoke(
44✔
224
                        set_value_t, split_receiver&& r, Ts&&... ts) noexcept
225
                        -> decltype(
226
                            std::declval<
227
                                hpx::variant<hpx::monostate, value_type>>()
228
                                .template emplace<value_type>(
229
                                    hpx::tuple<std::decay_t<Ts>...>(
230
                                        HPX_FORWARD(Ts, ts)...)),
231
                            void())
232
                    // clang-format on
233
                    {
234
                        hpx::detail::try_catch_exception_ptr(
44✔
235
                            [&]() {
88✔
236
                                r.state->v.template emplace<value_type>(
88✔
237
                                    hpx::make_tuple(HPX_FORWARD(Ts, ts)...));
44✔
238
                                r.state->set_predecessor_done();
44✔
239
                                r.state.reset();
44✔
240
                            },
44✔
241
                            [&](std::exception_ptr ep) {
44✔
242
                                HPX_MOVE(r).set_error(HPX_MOVE(ep));
×
243
                            });
×
244
                    }
44✔
245
                };
246

247
                // clang-format off
248
                template <typename Sender_,
249
                    HPX_CONCEPT_REQUIRES_(
250
                        meta::value<meta::none_of<
251
                            shared_state, std::decay_t<Sender_>>>
252
                    )>
253
                // clang-format on
254
                shared_state(Sender_&& sender, allocator_type const& alloc)
48✔
255
                  : alloc(alloc)
48✔
256
                  , os(hpx::execution::experimental::connect(
96✔
257
                        HPX_FORWARD(Sender_, sender), split_receiver{this}))
48✔
258
                {
48✔
259
                }
48✔
260

261
                virtual ~shared_state()
48✔
262
                {
48✔
263
                    HPX_ASSERT_MSG(start_called,
48✔
264
                        "start was never called on the operation state of "
265
                        "split or ensure_started. Did you forget to connect the"
266
                        "sender to a receiver, or call start on the operation "
267
                        "state?");
268
                }
48✔
269

270
                template <typename Receiver>
271
                struct done_error_value_visitor
×
272
                {
273
                    HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver;
274

275
                    [[noreturn]] void operator()(hpx::monostate) const
×
276
                    {
277
                        HPX_UNREACHABLE;
×
278
                    }
×
279

280
                    void operator()(stopped_type)
×
281
                    {
282
                        hpx::execution::experimental::set_stopped(
×
283
                            HPX_MOVE(receiver));
×
284
                    }
×
285

286
                    void operator()(error_type const& error)
4✔
287
                    {
288
                        hpx::visit(error_visitor<Receiver>{HPX_FORWARD(
8✔
289
                                       Receiver, receiver)},
290
                            error);
4✔
291
                    }
4✔
292

293
                    void operator()(value_type const& ts)
66✔
294
                    {
295
                        hpx::visit(value_visitor<Receiver>{HPX_FORWARD(
132✔
296
                                       Receiver, receiver)},
297
                            ts);
66✔
298
                    }
66✔
299
                };
300

301
                virtual void set_predecessor_done()
48✔
302
                {
303
                    predecessor_done = true;
48✔
304

305
                    {
306
                        // We require taking the lock here to synchronize with
307
                        // threads attempting to add continuations to the vector
308
                        // of continuations. However, it is enough to take it
309
                        // once and release it immediately.
310
                        //
311
                        // Without the lock we may not see writes to the vector.
312
                        // With the lock threads attempting to add continuations
313
                        // will either:
314
                        // - See predecessor_done = true in which case they will
315
                        //   call the continuation directly without adding it to
316
                        //   the vector of continuations. Accessing the vector
317
                        //   below without the lock is safe in this case because
318
                        //   the vector is not modified.
319
                        // - See predecessor_done = false and proceed to take
320
                        //   the lock. If they see predecessor_done after taking
321
                        //   the lock they can again release the lock and call
322
                        //   the continuation directly. Accessing the vector
323
                        //   without the lock is again safe because the vector
324
                        //   is not modified.
325
                        // - See predecessor_done = false and proceed to take
326
                        //   the lock. If they see predecessor_done is still
327
                        //   false after taking the lock, they will proceed to
328
                        //   add a continuation to the vector. Since they keep
329
                        //   the lock they can safely write to the vector. This
330
                        //   thread will not proceed past the lock until they
331
                        //   have finished writing to the vector.
332
                        //
333
                        // Importantly, once this thread has taken and released
334
                        // this lock, threads attempting to add continuations to
335
                        // the vector must see predecessor_done = true after
336
                        // taking the lock in their threads and will not add
337
                        // continuations to the vector.
338
                        std::unique_lock l{mtx};
48✔
339
                    }
48✔
340

341
                    if (!continuations.empty())
48✔
342
                    {
343
                        for (auto const& continuation : continuations)
42✔
344
                        {
345
                            continuation();
24✔
346
                        }
347

348
                        continuations.clear();
18✔
349
                    }
18✔
350
                }
48✔
351

352
                template <typename Receiver>
353
                void add_continuation(Receiver& receiver) = delete;
354

355
                template <typename Receiver>
356
                void add_continuation(Receiver&& receiver)
70✔
357
                {
358
                    if (predecessor_done)
70✔
359
                    {
360
                        // If we read predecessor_done here it means that one of
361
                        // set_error/set_stopped/set_value has been called and
362
                        // values/errors have been stored into the shared state.
363
                        // We can trigger the continuation directly.
364
                        // TODO: Should this preserve the scheduler? It does not
365
                        // if we call set_* inline.
366
                        hpx::visit(
46✔
367
                            done_error_value_visitor<Receiver>{
46✔
368
                                HPX_FORWARD(Receiver, receiver)},
46✔
369
                            v);
46✔
370
                    }
46✔
371
                    else
372
                    {
373
                        // If predecessor_done is false, we have to take the
374
                        // lock to potentially add the continuation to the
375
                        // vector of continuations.
376
                        std::unique_lock l{mtx};
24✔
377

378
                        if (predecessor_done)
24✔
379
                        {
380
                            // By the time the lock has been taken,
381
                            // predecessor_done might already be true and we can
382
                            // release the lock early and call the continuation
383
                            // directly again.
384
                            l.unlock();
×
385
                            hpx::visit(
×
386
                                done_error_value_visitor<Receiver>{
×
387
                                    HPX_FORWARD(Receiver, receiver)},
×
388
                                v);
×
389
                        }
×
390
                        else
391
                        {
392
                            // If predecessor_done is still false, we add the
393
                            // continuation to the vector of continuations. This
394
                            // has to be done while holding the lock, since
395
                            // other threads may also try to add continuations
396
                            // to the vector and the vector is not threadsafe in
397
                            // itself. The continuation will be called later
398
                            // when set_error/set_stopped/set_value is called.
399
                            continuations.emplace_back(
48✔
400
                                [this,
48✔
401
                                    receiver = HPX_FORWARD(
24✔
402
                                        Receiver, receiver)]() mutable {
403
                                    hpx::visit(
24✔
404
                                        done_error_value_visitor<Receiver>{
24✔
405
                                            HPX_MOVE(receiver)},
24✔
406
                                        v);
24✔
407
                                });
24✔
408
                        }
409
                    }
24✔
410
                }
70✔
411

412
                void start() & noexcept
60✔
413
                {
414
                    if (!start_called.exchange(true))
60✔
415
                    {
416
                        hpx::execution::experimental::start(os);
48✔
417
                    }
48✔
418
                }
60✔
419

420
                friend void intrusive_ptr_add_ref(shared_state* p) noexcept
160✔
421
                {
422
                    ++p->reference_count;
160✔
423
                }
160✔
424

425
                friend void intrusive_ptr_release(shared_state* p) noexcept
160✔
426
                {
427
                    if (--p->reference_count == 0)
160✔
428
                    {
429
                        allocator_type other_alloc(p->alloc);
48✔
430
                        std::allocator_traits<allocator_type>::destroy(
48✔
431
                            other_alloc, p);
48✔
432
                        std::allocator_traits<allocator_type>::deallocate(
48✔
433
                            other_alloc, p, 1);
48✔
434
                    }
48✔
435
                }
160✔
436
            };
437

438
            struct shared_state_run_loop : shared_state
439
            {
440
                run_loop& loop;
441

442
                // clang-format off
443
                template <typename Sender_,
444
                    HPX_CONCEPT_REQUIRES_(
445
                        meta::value<meta::none_of<
446
                            shared_state_run_loop, std::decay<Sender_>>>
447
                    )>
448
                // clang-format on
449
                shared_state_run_loop(Sender_&& sender,
14✔
450
                    typename shared_state::allocator_type const& alloc,
451
                    run_loop& loop)
452
                  : shared_state(HPX_FORWARD(Sender_, sender), alloc)
14✔
453
                  , loop(loop)
14✔
454
                {
28✔
455
                }
14✔
456

457
                ~shared_state_run_loop() override = default;
14✔
458

459
                void set_predecessor_done() override
14✔
460
                {
461
                    shared_state::set_predecessor_done();
14✔
462
                    loop.finish();
14✔
463
                }
14✔
464
            };
465

466
            hpx::intrusive_ptr<shared_state> state;
467

468
            template <typename Sender_, typename Scheduler_ = no_scheduler>
469
            split_sender(Sender_&& sender, Allocator const& allocator,
34✔
470
                Scheduler_&& scheduler = Scheduler_{})
471
              : scheduler(HPX_FORWARD(Scheduler_, scheduler))
34✔
472
            {
473
                using allocator_type = Allocator;
474
                using other_allocator = typename std::allocator_traits<
475
                    allocator_type>::template rebind_alloc<shared_state>;
476
                using allocator_traits = std::allocator_traits<other_allocator>;
477
                using unique_ptr = std::unique_ptr<shared_state,
478
                    util::allocator_deleter<other_allocator>>;
479

480
                other_allocator alloc(allocator);
34✔
481
                unique_ptr p(allocator_traits::allocate(alloc, 1),
34✔
482
                    hpx::util::allocator_deleter<other_allocator>{alloc});
34✔
483

484
                allocator_traits::construct(
34✔
485
                    alloc, p.get(), HPX_FORWARD(Sender_, sender), allocator);
34✔
486
                state = p.release();
34✔
487

488
                // Eager submission means that we start the predecessor
489
                // operation state already when creating the sender. We don't
490
                // wait for another receiver to be connected.
491
                if constexpr (Type == submission_type::eager)
492
                {
493
                    state->start();
18✔
494
                }
495
            }
34✔
496

497
            template <typename Sender_>
498
            split_sender(Sender_&& sender, Allocator const& allocator,
14✔
499
                run_loop_scheduler const& sched)
500
              : scheduler(sched)
14✔
501
            {
502
                using allocator_type = Allocator;
503
                using other_allocator =
504
                    typename std::allocator_traits<allocator_type>::
505
                        template rebind_alloc<shared_state_run_loop>;
506
                using allocator_traits = std::allocator_traits<other_allocator>;
507
                using unique_ptr = std::unique_ptr<shared_state_run_loop,
508
                    util::allocator_deleter<other_allocator>>;
509

510
                other_allocator alloc(allocator);
14✔
511
                unique_ptr p(allocator_traits::allocate(alloc, 1),
14✔
512
                    hpx::util::allocator_deleter<other_allocator>{alloc});
14✔
513

514
                allocator_traits::construct(alloc, p.get(),
28✔
515
                    HPX_FORWARD(Sender_, sender), allocator,
14✔
516
                    sched.get_run_loop());
14✔
517
                state = p.release();
14✔
518

519
                // Eager submission means that we start the predecessor
520
                // operation state already when creating the sender. We don't
521
                // wait for another receiver to be connected.
522
                if constexpr (Type == submission_type::eager)
523
                {
524
                    state->start();
7✔
525
                }
526
            }
14✔
527

528
            split_sender(split_sender const&) = default;
52✔
529
            split_sender& operator=(split_sender const&) = default;
530
            split_sender(split_sender&&) = default;
65✔
531
            split_sender& operator=(split_sender&&) = default;
532

533
            template <typename Receiver>
534
            struct operation_state
70✔
535
            {
536
                HPX_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver;
537
                hpx::intrusive_ptr<shared_state> state;
538

539
                template <typename Receiver_>
540
                operation_state(Receiver_&& receiver,
70✔
541
                    hpx::intrusive_ptr<shared_state> state)
542
                  : receiver(HPX_FORWARD(Receiver_, receiver))
70✔
543
                  , state(HPX_MOVE(state))
70✔
544
                {
545
                }
70✔
546

547
                operation_state(operation_state&&) = delete;
548
                operation_state& operator=(operation_state&&) = delete;
549
                operation_state(operation_state const&) = delete;
550
                operation_state& operator=(operation_state const&) = delete;
551

552
                friend void tag_invoke(start_t, operation_state& os) noexcept
70✔
553
                {
554
                    // Lazy submission means that we wait to start the
555
                    // predecessor operation state when a downstream operation
556
                    // state is started, i.e. this start function is called.
557
                    if constexpr (Type == submission_type::lazy)
558
                    {
559
                        os.state->start();
35✔
560
                    }
561

562
                    os.state->add_continuation(HPX_MOVE(os.receiver));
70✔
563
                }
70✔
564
            };
565

566
            template <typename Receiver>
567
            friend operation_state<Receiver> tag_invoke(
58✔
568
                connect_t, split_sender&& s, Receiver&& receiver)
569
            {
570
                return {HPX_FORWARD(Receiver, receiver), HPX_MOVE(s.state)};
58✔
571
            }
×
572

573
            template <typename Receiver>
574
            friend operation_state<Receiver> tag_invoke(
12✔
575
                connect_t, split_sender& s, Receiver&& receiver)
576
            {
577
                return {HPX_FORWARD(Receiver, receiver), s.state};
12✔
578
            }
×
579
        };
580
    }    // namespace detail
581

582
    // execution::split is used to adapt an arbitrary sender into a sender that
583
    // can be connected multiple times.
584
    //
585
    // If the provided sender is a multi-shot sender, returns that sender.
586
    // Otherwise, returns a multi-shot sender which sends values equivalent to
587
    // the values sent by the provided sender.
588
    //
589
    // A single-shot sender can only be connected to a receiver at most once.
590
    // Its implementation of execution::connect only has overloads for an
591
    // rvalue-qualified sender. Callers must pass the sender as an rvalue to the
592
    // call to execution::connect, indicating that the call consumes the sender.
593
    //
594
    // A multi-shot sender can be connected to multiple receivers and can be
595
    // launched multiple times. Multi-shot senders customise execution::connect
596
    // to accept an lvalue reference to the sender. Callers can indicate that
597
    // they want the sender to remain valid after the call to execution::connect
598
    // by passing an lvalue reference to the sender to call these overloads.
599
    // Multi-shot senders should also define overloads of execution::connect
600
    // that accept rvalue-qualified senders to allow the sender to be also used
601
    // in places where only a single-shot sender is required.
602
    inline constexpr struct split_t final
603
      : hpx::functional::detail::tag_priority<split_t>
604
    {
605
    private:
606
        // clang-format off
607
        template <typename Sender,
608
            typename Allocator = hpx::util::internal_allocator<>,
609
            HPX_CONCEPT_REQUIRES_(
610
                is_sender_v<Sender> &&
611
                hpx::traits::is_allocator_v<Allocator> &&
612
                experimental::detail::is_completion_scheduler_tag_invocable_v<
613
                    hpx::execution::experimental::set_value_t,
614
                    Sender, split_t, Allocator
615
                >
616
            )>
617
        // clang-format on
618
        friend constexpr HPX_FORCEINLINE auto tag_override_invoke(
619
            split_t, Sender&& sender, Allocator const& allocator = {})
620
        {
621
            auto scheduler =
622
                hpx::execution::experimental::get_completion_scheduler<
623
                    hpx::execution::experimental::set_value_t>(sender);
624

625
            return hpx::functional::tag_invoke(split_t{}, HPX_MOVE(scheduler),
626
                HPX_FORWARD(Sender, sender), allocator);
627
        }
628

629
        // clang-format off
630
        template <typename Sender,
631
            typename Allocator = hpx::util::internal_allocator<>,
632
            HPX_CONCEPT_REQUIRES_(
633
                hpx::execution::experimental::is_sender_v<Sender> &&
634
                hpx::traits::is_allocator_v<Allocator>
635
            )>
636
        // clang-format on
637
        friend constexpr HPX_FORCEINLINE auto tag_invoke(split_t,
7✔
638
            hpx::execution::experimental::run_loop_scheduler const& sched,
639
            Sender&& sender, Allocator const& allocator = {})
640
        {
641
            return detail::split_sender<Sender, Allocator,
7✔
642
                detail::submission_type::lazy,
643
                hpx::execution::experimental::run_loop_scheduler>{
644
                HPX_FORWARD(Sender, sender), allocator, sched};
7✔
645
        }
646

647
        // clang-format off
648
        template <typename Sender,
649
            typename Allocator = hpx::util::internal_allocator<>,
650
            HPX_CONCEPT_REQUIRES_(
651
                hpx::execution::experimental::is_sender_v<Sender> &&
652
                hpx::traits::is_allocator_v<Allocator>
653
            )>
654
        // clang-format on
655
        friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
16✔
656
            split_t, Sender&& sender, Allocator const& allocator = {})
657
        {
658
            return detail::split_sender<Sender, Allocator,
16✔
659
                detail::submission_type::lazy>{
660
                HPX_FORWARD(Sender, sender), allocator};
16✔
661
        }
662

663
        template <typename Sender, typename Allocator>
664
        friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(split_t,
6✔
665
            detail::split_sender<Sender, Allocator,
666
                detail::submission_type::lazy>
667
                sender,
668
            Allocator const& = {})
669
        {
670
            return sender;
6✔
671
        }
672

673
        // clang-format off
674
        template <typename Scheduler, typename Allocator,
675
            HPX_CONCEPT_REQUIRES_(
676
                hpx::execution::experimental::is_scheduler_v<Scheduler> &&
677
                hpx::traits::is_allocator_v<Allocator>
678
            )>
679
        // clang-format on
680
        friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
681
            split_t, Scheduler&& scheduler, Allocator const& allocator = {})
682
        {
683
            return hpx::execution::experimental::detail::inject_scheduler<
684
                split_t, Scheduler, Allocator>{
685
                HPX_FORWARD(Scheduler, scheduler), allocator};
686
        }
687

688
        // clang-format off
689
        template <typename Allocator = hpx::util::internal_allocator<>,
690
            HPX_CONCEPT_REQUIRES_(
691
                hpx::traits::is_allocator_v<Allocator>
692
            )>
693
        // clang-format on
694
        friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
22✔
695
            split_t, Allocator const& allocator = {})
696
        {
697
            return detail::partial_algorithm<split_t, Allocator>{allocator};
22✔
698
        }
699
    } split{};
700
}    // namespace hpx::execution::experimental
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc