• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #872

24 Jan 2023 07:29PM UTC coverage: 85.694% (-0.9%) from 86.624%
#872

push

StellarBot
Merge #6148

6148: Investigate the failure of the LCI parcelport. r=hkaiser a=JiakunYan



Co-authored-by: Jiakun Yan <jiakunyan1998@gmail.com>

173076 of 201969 relevant lines covered (85.69%)

2110584.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/libs/core/execution/tests/unit/algorithm_run_loop.cpp
1
//  Copyright (c) 2022 Hartmut Kaiser
2
//  Copyright (c) 2020 ETH Zurich
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
#include <hpx/config.hpp>
9

10
// Clang V11 ICE's on this test
11
#if !defined(HPX_CLANG_VERSION) || (HPX_CLANG_VERSION / 10000) != 11
12

13
#include <hpx/local/condition_variable.hpp>
14
#include <hpx/local/execution.hpp>
15
#include <hpx/local/functional.hpp>
16
#include <hpx/local/init.hpp>
17
#include <hpx/local/mutex.hpp>
18
#include <hpx/local/thread.hpp>
19
#include <hpx/modules/preprocessor.hpp>
20
#include <hpx/modules/testing.hpp>
21

22
#include "algorithm_test_utils.hpp"
23

24
#include <atomic>
25
#include <chrono>
26
#include <cstddef>
27
#include <exception>
28
#include <iostream>
29
#include <mutex>
30
#include <string>
31
#include <type_traits>
32
#include <unordered_set>
33
#include <utility>
34
#include <vector>
35

36
namespace ex = hpx::execution::experimental;
37
namespace tt = hpx::this_thread::experimental;
38

39
void test_concepts()
×
40
{
41
    ex::run_loop loop;
×
42

43
    auto sched = loop.get_scheduler();
×
44
    static_assert(ex::is_scheduler_v<decltype(sched)>,
45
        "ex::is_scheduler_v<decltype(sched)>");
46

47
    auto s = ex::schedule(sched);
×
48
    static_assert(
49
        std::is_same_v<ex::schedule_result_t<decltype(sched)>, decltype(s)>,
50
        "ex::schedule_result_t<decltype(sched)> must be result of "
51
        "ex::schedule(sched)");
52
    static_assert(ex::is_sender_v<decltype(s)>, "ex::is_sender_v<decltype(s)>");
53
    static_assert(
54
        ex::is_sender_of_v<decltype(s)>, "ex::is_sender_of_v<decltype(s)>");
55

56
    static_assert(std::is_same_v<ex::error_types_of_t<decltype(s)>,
57
                      hpx::variant<std::exception_ptr>>,
58
        "ex::error_types_of_t<decltype(s)> must be "
59
        "variant<std::exception_ptr>");
60
    static_assert(ex::sends_stopped_of_v<decltype(s)>,
61
        "ex::sends_stopped_of_v<decltype(s)> must be true");
62

63
    loop.finish();
×
64
    loop.run();
×
65
}
×
66

67
void test_execute()
×
68
{
69
    hpx::thread::id parent_id = hpx::this_thread::get_id();
×
70

71
    ex::run_loop loop;
×
72
    ex::execute(loop.get_scheduler(),
×
73
        [parent_id]() { HPX_TEST_EQ(hpx::this_thread::get_id(), parent_id); });
×
74

75
    loop.finish();
×
76
    loop.run();
×
77
}
×
78

79
struct check_context_receiver
×
80
{
81
    hpx::thread::id parent_id;
82
    ex::run_loop& loop;
83
    bool& executed;
84

85
    template <typename E>
86
    friend void tag_invoke(
×
87
        ex::set_error_t, check_context_receiver&&, E&&) noexcept
88
    {
89
        HPX_TEST(false);
×
90
    }
×
91

92
    friend void tag_invoke(ex::set_stopped_t, check_context_receiver&&) noexcept
×
93
    {
94
        HPX_TEST(false);
×
95
    }
×
96

97
    template <typename... Ts>
98
    friend void tag_invoke(ex::set_value_t, check_context_receiver&& r, Ts&&...)
×
99
    {
100
        HPX_TEST_EQ(r.parent_id, hpx::this_thread::get_id());
×
101
        HPX_TEST_NEQ(hpx::thread::id(hpx::threads::invalid_thread_id),
×
102
            hpx::this_thread::get_id());
103

104
        r.executed = true;
×
105
        r.loop.finish();
×
106
    }
×
107
};
108

109
void test_sender_receiver_basic()
×
110
{
111
    ex::run_loop loop;
×
112
    auto sched = loop.get_scheduler();
×
113

114
    hpx::thread::id parent_id = hpx::this_thread::get_id();
×
115
    bool executed{false};
×
116

117
    auto begin = ex::schedule(sched);
×
118
    auto os = ex::connect(
×
119
        std::move(begin), check_context_receiver{parent_id, loop, executed});
×
120
    ex::start(os);
×
121

122
    loop.run();
×
123

124
    HPX_TEST(executed);
×
125
}
×
126

127
hpx::thread::id sender_receiver_then_thread_id;
128

129
void test_sender_receiver_then()
×
130
{
131
    ex::run_loop loop;
×
132
    auto sched = loop.get_scheduler();
×
133

134
    hpx::thread::id parent_id = hpx::this_thread::get_id();
×
135
    bool executed{false};
×
136

137
    auto begin = ex::schedule(sched);
×
138
    auto work1 = ex::then(std::move(begin), [=]() {
×
139
        sender_receiver_then_thread_id = hpx::this_thread::get_id();
×
140
        HPX_TEST_EQ(sender_receiver_then_thread_id, parent_id);
×
141
    });
×
142
    auto work2 = ex::then(std::move(work1), []() {
×
143
        HPX_TEST_EQ(sender_receiver_then_thread_id, hpx::this_thread::get_id());
×
144
    });
×
145
    auto os = ex::connect(
×
146
        std::move(work2), check_context_receiver{parent_id, loop, executed});
×
147
    ex::start(os);
×
148

149
    loop.run();
×
150

151
    HPX_TEST(executed);
×
152
}
×
153

154
void test_sender_receiver_then_wait()
×
155
{
156
    ex::run_loop loop;
×
157
    auto sched = loop.get_scheduler();
×
158

159
    hpx::thread::id parent_id = hpx::this_thread::get_id();
×
160
    std::atomic<std::size_t> then_count{0};
×
161
    bool executed{false};
×
162

163
    auto begin = ex::schedule(sched);
×
164

165
    static_assert(
166
        ex::detail::is_completion_scheduler_tag_invocable_v<ex::set_value_t,
167
            decltype(begin), tt::sync_wait_t>);
168
    auto compl_sched_begin =
169
        ex::get_completion_scheduler<ex::set_value_t>(begin);
×
170
    HPX_TEST(sched == compl_sched_begin);
×
171

172
    auto work1 = ex::then(std::move(begin), [&then_count, parent_id]() {
×
173
        sender_receiver_then_thread_id = hpx::this_thread::get_id();
×
174
        HPX_TEST_EQ(sender_receiver_then_thread_id, parent_id);
×
175
        ++then_count;
×
176
    });
×
177

178
    static_assert(
179
        ex::detail::is_completion_scheduler_tag_invocable_v<ex::set_value_t,
180
            decltype(work1), tt::sync_wait_t>);
181
    auto compl_sched_work1 =
182
        ex::get_completion_scheduler<ex::set_value_t>(work1);
×
183
    HPX_TEST(sched == compl_sched_work1);
×
184

185
    auto work2 = ex::then(std::move(work1), [&then_count, &executed]() {
×
186
        HPX_TEST_EQ(sender_receiver_then_thread_id, hpx::this_thread::get_id());
×
187
        ++then_count;
×
188
        executed = true;
×
189
    });
×
190

191
    static_assert(
192
        ex::detail::is_completion_scheduler_tag_invocable_v<ex::set_value_t,
193
            decltype(work2), tt::sync_wait_t>);
194
    auto compl_sched_work2 =
195
        ex::get_completion_scheduler<ex::set_value_t>(work2);
×
196
    HPX_TEST(sched == compl_sched_work2);
×
197

198
    tt::sync_wait(std::move(work2));
×
199

200
    HPX_TEST_EQ(then_count, std::size_t(2));
×
201
    HPX_TEST(executed);
×
202
}
×
203

204
void test_sender_receiver_then_sync_wait()
×
205
{
206
    ex::run_loop loop;
×
207
    auto sched = loop.get_scheduler();
×
208

209
    hpx::thread::id parent_id = hpx::this_thread::get_id();
×
210
    std::atomic<std::size_t> then_count{0};
×
211

212
    auto begin = ex::schedule(sched);
×
213
    auto work = ex::then(std::move(begin), [&then_count, parent_id]() {
×
214
        sender_receiver_then_thread_id = hpx::this_thread::get_id();
×
215
        HPX_TEST_EQ(sender_receiver_then_thread_id, parent_id);
×
216
        ++then_count;
×
217
        return 42;
×
218
    });
219
    auto work_result = tt::sync_wait(std::move(work));
×
220
    auto result = hpx::get<0>(*work_result);
×
221
    HPX_TEST_EQ(then_count, std::size_t(1));
×
222
    static_assert(
223
        std::is_same<int, typename std::decay<decltype(result)>::type>::value,
224
        "result should be an int");
225
    HPX_TEST_EQ(result, 42);
×
226
}
×
227

228
void test_sender_receiver_then_arguments()
×
229
{
230
    ex::run_loop loop;
×
231
    auto sched = loop.get_scheduler();
×
232

233
    hpx::thread::id parent_id = hpx::this_thread::get_id();
×
234
    std::atomic<std::size_t> then_count{0};
×
235

236
    auto begin = ex::schedule(sched);
×
237
    auto work1 = ex::then(std::move(begin), [&then_count, parent_id]() {
×
238
        sender_receiver_then_thread_id = hpx::this_thread::get_id();
×
239
        HPX_TEST_EQ(sender_receiver_then_thread_id, parent_id);
×
240
        ++then_count;
×
241
        return 3;
×
242
    });
243
    auto work2 =
244
        ex::then(std::move(work1), [&then_count](int x) -> std::string {
×
245
            HPX_TEST_EQ(
×
246
                sender_receiver_then_thread_id, hpx::this_thread::get_id());
247
            ++then_count;
×
248
            return std::string("hello") + std::to_string(x);
×
249
        });
×
250
    auto work3 = ex::then(std::move(work2), [&then_count](std::string s) {
×
251
        HPX_TEST_EQ(sender_receiver_then_thread_id, hpx::this_thread::get_id());
×
252
        ++then_count;
×
253
        return 2 * s.size();
×
254
    });
255
    auto work_result = tt::sync_wait(std::move(work3));
×
256
    auto result = hpx::get<0>(*work_result);
×
257
    HPX_TEST_EQ(then_count, std::size_t(3));
×
258
    static_assert(std::is_same<std::size_t,
259
                      typename std::decay<decltype(result)>::type>::value,
260
        "result should be a std::size_t");
261
    HPX_TEST_EQ(result, std::size_t(12));
×
262
}
×
263

264
void test_transfer_basic()
×
265
{
266
    ex::run_loop loop;
×
267
    auto sched = loop.get_scheduler();
×
268

269
    hpx::thread::id parent_id = hpx::this_thread::get_id();
×
270
    hpx::thread::id current_id;
×
271

272
    auto begin = ex::schedule(sched);
×
273
    auto work1 = ex::then(begin, [=, &current_id]() {
×
274
        current_id = hpx::this_thread::get_id();
×
275
        HPX_TEST_EQ(current_id, parent_id);
×
276
    });
×
277
    auto work2 = ex::then(work1, [=, &current_id]() {
×
278
        HPX_TEST_EQ(current_id, hpx::this_thread::get_id());
×
279
    });
×
280
    auto transfer1 = ex::transfer(work2, sched);
×
281
    auto work3 = ex::then(transfer1, [=, &current_id]() {
×
282
        hpx::thread::id new_id = hpx::this_thread::get_id();
×
283
        HPX_TEST_EQ(current_id, new_id);
×
284
        current_id = new_id;
×
285
        HPX_TEST_EQ(current_id, parent_id);
×
286
    });
×
287
    auto work4 = ex::then(work3, [=, &current_id]() {
×
288
        HPX_TEST_EQ(current_id, hpx::this_thread::get_id());
×
289
    });
×
290
    auto transfer2 = ex::transfer(work4, sched);
×
291
    auto work5 = ex::then(transfer2, [=, &current_id]() {
×
292
        hpx::thread::id new_id = hpx::this_thread::get_id();
×
293
        HPX_TEST_EQ(current_id, new_id);
×
294
        current_id = new_id;
×
295
        HPX_TEST_EQ(current_id, parent_id);
×
296
    });
×
297

298
    tt::sync_wait(work5);
×
299
}
×
300

301
void test_transfer_arguments()
×
302
{
303
    ex::run_loop loop;
×
304
    auto sched = loop.get_scheduler();
×
305

306
    hpx::thread::id parent_id = hpx::this_thread::get_id();
×
307
    hpx::thread::id current_id;
×
308

309
    auto begin = ex::schedule(sched);
×
310
    auto work1 = ex::then(begin, [=, &current_id]() {
×
311
        current_id = hpx::this_thread::get_id();
×
312
        HPX_TEST_EQ(current_id, parent_id);
×
313
        return 3;
×
314
    });
315
    auto work2 = ex::then(work1, [=, &current_id](int x) {
×
316
        HPX_TEST_EQ(current_id, hpx::this_thread::get_id());
×
317
        return x / 2.0;
×
318
    });
319
    auto transfer1 = ex::transfer(work2, sched);
×
320
    auto work3 = ex::then(transfer1, [=, &current_id](double x) {
×
321
        hpx::thread::id new_id = hpx::this_thread::get_id();
×
322
        HPX_TEST_EQ(current_id, new_id);
×
323
        current_id = new_id;
×
324
        HPX_TEST_EQ(current_id, parent_id);
×
325
        return x / 2;
×
326
    });
327
    auto work4 = ex::then(work3, [=, &current_id](int x) {
×
328
        HPX_TEST_EQ(current_id, hpx::this_thread::get_id());
×
329
        return "result: " + std::to_string(x);
×
330
    });
×
331
    auto transfer2 = ex::transfer(work4, sched);
×
332
    auto work5 = ex::then(transfer2, [=, &current_id](std::string s) {
×
333
        hpx::thread::id new_id = hpx::this_thread::get_id();
×
334
        HPX_TEST_EQ(current_id, new_id);
×
335
        current_id = new_id;
×
336
        HPX_TEST_EQ(current_id, parent_id);
×
337
        return s + "!";
×
338
    });
339

340
    auto work_result = tt::sync_wait(work5);
×
341
    auto result = hpx::get<0>(*work_result);
×
342
    static_assert(std::is_same_v<std::string, std::decay_t<decltype(result)>>,
343
        "result should be a std::string");
344
    HPX_TEST_EQ(result, std::string("result: 0!"));
×
345
}
×
346

347
void test_just_void()
×
348
{
349
    ex::run_loop loop;
×
350

351
    hpx::thread::id parent_id = hpx::this_thread::get_id();
×
352

353
    auto begin = ex::just();
×
354
    auto transfer1 = ex::transfer(begin, loop.get_scheduler());
×
355
    auto work1 = ex::then(transfer1,
×
356
        [parent_id]() { HPX_TEST_EQ(parent_id, hpx::this_thread::get_id()); });
×
357

358
    tt::sync_wait(work1);
×
359
}
×
360

361
void test_just_one_arg()
×
362
{
363
    ex::run_loop loop;
×
364

365
    hpx::thread::id parent_id = hpx::this_thread::get_id();
×
366

367
    auto begin = ex::just(3);
×
368
    auto transfer1 = ex::transfer(begin, loop.get_scheduler());
×
369
    auto work1 = ex::then(transfer1, [parent_id](int x) {
×
370
        HPX_TEST_EQ(parent_id, hpx::this_thread::get_id());
×
371
        HPX_TEST_EQ(x, 3);
×
372
    });
×
373

374
    tt::sync_wait(work1);
×
375
}
×
376

377
void test_just_two_args()
×
378
{
379
    ex::run_loop loop;
×
380

381
    hpx::thread::id parent_id = hpx::this_thread::get_id();
×
382

383
    auto begin = ex::just(3, std::string("hello"));
×
384
    auto transfer1 = ex::transfer(begin, loop.get_scheduler());
×
385
    auto work1 = ex::then(transfer1, [parent_id](int x, std::string y) {
×
386
        HPX_TEST_EQ(parent_id, hpx::this_thread::get_id());
×
387
        HPX_TEST_EQ(x, 3);
×
388
        HPX_TEST_EQ(y, std::string("hello"));
×
389
    });
×
390

391
    tt::sync_wait(work1);
×
392
}
×
393

394
void test_transfer_just_void()
×
395
{
396
    ex::run_loop loop;
×
397
    auto sched = loop.get_scheduler();
×
398

399
    hpx::thread::id parent_id = hpx::this_thread::get_id();
×
400

401
    auto begin = ex::transfer_just(sched);
×
402
    auto work1 = ex::then(begin,
×
403
        [parent_id]() { HPX_TEST_EQ(parent_id, hpx::this_thread::get_id()); });
×
404

405
    tt::sync_wait(work1);
×
406
}
×
407

408
void test_transfer_just_one_arg()
×
409
{
410
    ex::run_loop loop;
×
411
    auto sched = loop.get_scheduler();
×
412

413
    hpx::thread::id parent_id = hpx::this_thread::get_id();
×
414

415
    auto begin = ex::transfer_just(sched, 3);
×
416
    auto work1 = ex::then(begin, [parent_id](int x) {
×
417
        HPX_TEST_EQ(parent_id, hpx::this_thread::get_id());
×
418
        HPX_TEST_EQ(x, 3);
×
419
    });
×
420

421
    tt::sync_wait(work1);
×
422
}
×
423

424
void test_transfer_just_two_args()
×
425
{
426
    ex::run_loop loop;
×
427
    auto sched = loop.get_scheduler();
×
428

429
    hpx::thread::id parent_id = hpx::this_thread::get_id();
×
430

431
    auto begin = ex::transfer_just(sched, 3, std::string("hello"));
×
432
    auto work1 = ex::then(begin, [parent_id](int x, std::string y) {
×
433
        HPX_TEST_EQ(parent_id, hpx::this_thread::get_id());
×
434
        HPX_TEST_EQ(x, 3);
×
435
        HPX_TEST_EQ(y, std::string("hello"));
×
436
    });
×
437

438
    tt::sync_wait(work1);
×
439
}
×
440

441
// Note: when_all does not propagate the completion scheduler, for this reason
442
// any senders coming after it need to be explicitly provided with the required
443
// scheduler again.
444
void test_when_all()
×
445
{
446
    ex::run_loop loop;
×
447
    auto sched = loop.get_scheduler();
×
448

449
    {
450
        hpx::thread::id parent_id = hpx::this_thread::get_id();
×
451

452
        auto work1 = ex::schedule(sched) | ex::then([parent_id]() {
×
453
            HPX_TEST_EQ(parent_id, hpx::this_thread::get_id());
×
454
            return 42;
×
455
        });
456

457
        auto work2 = ex::schedule(sched) | ex::then([parent_id]() {
×
458
            HPX_TEST_EQ(parent_id, hpx::this_thread::get_id());
×
459
            return std::string("hello");
×
460
        });
×
461

462
        auto work3 = ex::schedule(sched) | ex::then([parent_id]() {
×
463
            HPX_TEST_EQ(parent_id, hpx::this_thread::get_id());
×
464
            return 3.14;
×
465
        });
466

467
        auto when1 =
468
            ex::when_all(std::move(work1), std::move(work2), std::move(work3));
×
469

470
        bool executed{false};
×
471
        std::move(when1) |
×
472
            ex::then([parent_id, &executed](int x, std::string y, double z) {
×
473
                HPX_TEST_EQ(parent_id, hpx::this_thread::get_id());
×
474
                HPX_TEST_EQ(x, 42);
×
475
                HPX_TEST_EQ(y, std::string("hello"));
×
476
                HPX_TEST_EQ(z, 3.14);
×
477
                executed = true;
×
478
            }) |
×
479
            tt::sync_wait(sched);
×
480

481
        HPX_TEST(executed);
×
482
    }
483

484
    {
485
        hpx::thread::id parent_id = hpx::this_thread::get_id();
×
486

487
        // The exception is likely to be thrown before set_value from the second
488
        // sender is called because the second sender sleeps.
489
        auto work1 = ex::schedule(sched) | ex::then([parent_id]() -> int {
×
490
            HPX_TEST_EQ(parent_id, hpx::this_thread::get_id());
×
491
            throw std::runtime_error("error");
×
492
        });
×
493

494
        auto work2 = ex::schedule(sched) | ex::then([parent_id]() {
×
495
            HPX_TEST_EQ(parent_id, hpx::this_thread::get_id());
×
496
            hpx::this_thread::sleep_for(std::chrono::milliseconds(100));
×
497
            return std::string("hello");
×
498
        });
×
499

500
        bool exception_thrown = false;
×
501

502
        try
503
        {
504
            ex::when_all(std::move(work1), std::move(work2)) |
×
505
                ex::then([parent_id](int x, std::string y) {
×
506
                    HPX_TEST_EQ(parent_id, hpx::this_thread::get_id());
×
507
                    HPX_TEST_EQ(x, 42);
×
508
                    HPX_TEST_EQ(y, std::string("hello"));
×
509
                }) |
×
510
                tt::sync_wait(sched);
×
511

512
            HPX_TEST(false);
×
513
        }
×
514
        catch (std::runtime_error const& e)
515
        {
516
            HPX_TEST_EQ(std::string(e.what()), std::string("error"));
×
517
            exception_thrown = true;
×
518
        }
×
519

520
        HPX_TEST(exception_thrown);
×
521
    }
522

523
    {
524
        hpx::thread::id parent_id = hpx::this_thread::get_id();
×
525

526
        // The exception is likely to be thrown after set_value from the second
527
        // sender is called because the first sender sleeps before throwing.
528
        auto work1 = ex::schedule(sched) | ex::then([parent_id]() -> int {
×
529
            HPX_TEST_EQ(parent_id, hpx::this_thread::get_id());
×
530
            hpx::this_thread::sleep_for(std::chrono::milliseconds(100));
×
531
            throw std::runtime_error("error");
×
532
        });
×
533

534
        auto work2 = ex::schedule(sched) | ex::then([parent_id]() {
×
535
            HPX_TEST_EQ(parent_id, hpx::this_thread::get_id());
×
536
            return std::string("hello");
×
537
        });
×
538

539
        bool exception_thrown = false;
×
540

541
        try
542
        {
543
            ex::when_all(std::move(work1), std::move(work2)) |
×
544
                ex::then([parent_id](int x, std::string y) {
×
545
                    HPX_TEST_EQ(parent_id, hpx::this_thread::get_id());
×
546
                    HPX_TEST_EQ(x, 42);
×
547
                    HPX_TEST_EQ(y, std::string("hello"));
×
548
                }) |
×
549
                tt::sync_wait(sched);
×
550

551
            HPX_TEST(false);
×
552
        }
×
553
        catch (std::runtime_error const& e)
554
        {
555
            HPX_TEST_EQ(std::string(e.what()), std::string("error"));
×
556
            exception_thrown = true;
×
557
        }
×
558

559
        HPX_TEST(exception_thrown);
×
560
    }
561
}
×
562

563
// Note: make_future does not propagate the completion scheduler, for this
564
// reason any senders coming after it need to be explicitly provided with the
565
// required scheduler again.
566
void test_future_sender()
×
567
{
568
    std::cout << "1\n";
×
569
    // senders as futures
570
    {
571
        ex::run_loop loop;
×
572
        auto sched = loop.get_scheduler();
×
573

574
        auto s = ex::transfer_just(sched, 3);
×
575
        auto f = ex::make_future(std::move(s));
×
576
        HPX_TEST_EQ(f.get(), 3);
×
577
    }
×
578

579
    std::cout << "2\n";
×
580
    {
581
        ex::run_loop loop;
×
582
        auto sched = loop.get_scheduler();
×
583

584
        auto f = ex::transfer_just(sched, 3) | ex::make_future();
×
585
        HPX_TEST_EQ(f.get(), 3);
×
586
    }
×
587

588
    std::cout << "3\n";
×
589
    {
590
        ex::run_loop loop;
×
591
        auto sched = loop.get_scheduler();
×
592

593
        auto f = ex::just(3) | ex::make_future(sched);
×
594
        HPX_TEST_EQ(f.get(), 3);
×
595
    }
×
596

597
    std::cout << "4\n";
×
598
    {
599
        ex::run_loop loop;
×
600
        auto sched = loop.get_scheduler();
×
601

602
        std::atomic<bool> called{false};
×
603
        auto s = ex::schedule(sched) | ex::then([&] { called = true; });
×
604
        auto f = ex::make_future(std::move(s));
×
605
        f.get();
×
606
        HPX_TEST(called);
×
607
    }
×
608

609
    std::cout << "5\n";
×
610
    {
611
        ex::run_loop loop;
×
612
        auto sched = loop.get_scheduler();
×
613

614
        auto s1 = ex::transfer_just(sched, std::size_t(42));
×
615
        auto s2 = ex::transfer_just(sched, 3.14);
×
616
        auto s3 = ex::transfer_just(sched, std::string("hello"));
×
617
        auto f = ex::make_future(sched,
×
618
            ex::then(ex::when_all(std::move(s1), std::move(s2), std::move(s3)),
×
619
                [](std::size_t x, double, std::string z) {
×
620
                    return z.size() + x;
×
621
                }));
622
        HPX_TEST_EQ(f.get(), std::size_t(47));
×
623
    }
×
624

625
    // mixing senders and futures
626
    std::cout << "6\n";
×
627
    {
628
        ex::run_loop loop;
×
629
        auto sched = loop.get_scheduler();
×
630

631
        auto result = tt::sync_wait(sched,
×
632
            ex::as_sender(ex::make_future(ex::transfer_just(sched, 42))));
×
633
        HPX_TEST_EQ(hpx::get<0>(*result), 42);
×
634
    }
×
635

636
    std::cout << "7\n";
×
637
    {
638
        ex::run_loop loop;
×
639
        auto sched = loop.get_scheduler();
×
640

641
        auto f = hpx::async([]() {
×
642
            hpx::this_thread::sleep_for(std::chrono::seconds(1));
×
643
            return 42;
×
644
        });
645

646
        HPX_TEST_EQ(
×
647
            ex::make_future(ex::transfer(ex::as_sender(std::move(f)), sched))
648
                .get(),
649
            42);
650
    }
×
651

652
    std::cout << "8\n";
×
653
    {
654
        ex::run_loop loop;
×
655
        auto sched = loop.get_scheduler();
×
656

657
        auto s1 = ex::transfer_just(sched, std::size_t(42));
×
658
        auto s2 = ex::transfer_just(sched, 3.14);
×
659
        auto s3 = ex::transfer_just(sched, std::string("hello"));
×
660
        auto f = ex::make_future(sched,
×
661
            ex::then(ex::when_all(std::move(s1), std::move(s2), std::move(s3)),
×
662
                [](std::size_t x, double, std::string z) {
×
663
                    return z.size() + x;
×
664
                }));
665
        auto sf = f.then([](auto&& f) { return f.get() - 40; }).share();
×
666
        auto t1 = sf.then([](auto&& sf) { return sf.get() + 1; });
×
667
        auto t2 = sf.then([](auto&& sf) { return sf.get() + 2; });
×
668
        auto t1s = ex::then(
×
669
            ex::as_sender(std::move(t1)), [](std::size_t x) { return x + 1; });
×
670
        auto t1f = ex::make_future(sched, std::move(t1s));
×
671
        auto last = hpx::dataflow(
×
672
            hpx::unwrapping([](std::size_t x, std::size_t y) { return x + y; }),
×
673
            t1f, t2);
674

675
        HPX_TEST_EQ(last.get(), std::size_t(18));
×
676
    }
×
677
}
×
678

679
void test_ensure_started()
×
680
{
681
    {
682
        ex::run_loop loop;
×
683
        auto sched = loop.get_scheduler();
×
684

685
        ex::schedule(sched) | ex::ensure_started() | tt::sync_wait();
×
686
    }
×
687

688
    {
689
        ex::run_loop loop;
×
690
        auto sched = loop.get_scheduler();
×
691

692
        auto s = ex::transfer_just(sched, 42) | ex::ensure_started();
×
693
        auto result = tt::sync_wait(std::move(s));
×
694
        HPX_TEST_EQ(hpx::get<0>(*result), 42);
×
695
    }
×
696

697
    {
698
        ex::run_loop loop;
×
699
        auto sched = loop.get_scheduler();
×
700

701
        auto s = ex::transfer_just(sched, 42) | ex::ensure_started() |
×
702
            ex::transfer(sched);
×
703
        auto result = tt::sync_wait(std::move(s));
×
704
        HPX_TEST_EQ(hpx::get<0>(*result), 42);
×
705
    }
×
706

707
    {
708
        ex::run_loop loop;
×
709
        auto sched = loop.get_scheduler();
×
710

711
        auto s = ex::transfer_just(sched, 42) | ex::ensure_started();
×
712
        HPX_TEST_EQ(hpx::get<0>(*tt::sync_wait(s)), 42);
×
713
        HPX_TEST_EQ(hpx::get<0>(*tt::sync_wait(s)), 42);
×
714
        HPX_TEST_EQ(hpx::get<0>(*tt::sync_wait(s)), 42);
×
715
        HPX_TEST_EQ(hpx::get<0>(*tt::sync_wait(std::move(s))), 42);
×
716
    }
×
717
}
×
718

719
void test_ensure_started_when_all()
×
720
{
721
    {
722
        ex::run_loop loop;
×
723
        auto sched = loop.get_scheduler();
×
724

725
        std::atomic<std::size_t> first_task_calls{0};
×
726
        std::atomic<std::size_t> successor_task_calls{0};
×
727
        hpx::mutex mtx;
×
728
        hpx::condition_variable cond;
×
729
        bool started{false};
×
730
        auto s = ex::schedule(sched) | ex::then([&]() {
×
731
            ++first_task_calls;
×
732
            std::lock_guard l{mtx};
×
733
            started = true;
×
734
            cond.notify_one();
×
735
        }) | ex::ensure_started();
×
736
        {
737
            std::unique_lock l{mtx};
×
738
            cond.wait(l, [&]() { return started; });
×
739
        }
×
740
        auto succ1 = s | ex::then([&]() {
×
741
            ++successor_task_calls;
×
742
            return 1;
×
743
        });
744
        auto succ2 = s | ex::then([&]() {
×
745
            ++successor_task_calls;
×
746
            return 2;
×
747
        });
748
        HPX_TEST_EQ(
×
749
            hpx::get<0>(*(ex::when_all(succ1, succ2) |
750
                ex::then([](int const& x, int const& y) { return x + y; }) |
751
                tt::sync_wait(sched))),
752
            3);
753
        HPX_TEST_EQ(first_task_calls, std::size_t(1));
×
754
        HPX_TEST_EQ(successor_task_calls, std::size_t(2));
×
755
    }
×
756

757
    {
758
        ex::run_loop loop;
×
759
        auto sched = loop.get_scheduler();
×
760

761
        std::atomic<std::size_t> first_task_calls{0};
×
762
        std::atomic<std::size_t> successor_task_calls{0};
×
763
        hpx::mutex mtx;
×
764
        hpx::condition_variable cond;
×
765
        bool started{false};
×
766
        auto s = ex::schedule(sched) | ex::then([&]() {
×
767
            ++first_task_calls;
×
768
            std::lock_guard l{mtx};
×
769
            started = true;
×
770
            cond.notify_one();
×
771
            return 3;
772
        }) | ex::ensure_started();
×
773
        {
774
            std::unique_lock l{mtx};
×
775
            cond.wait(l, [&]() { return started; });
×
776
        }
×
777
        HPX_TEST_EQ(first_task_calls, std::size_t(1));
×
778
        auto succ1 = s | ex::then([&](int const& x) {
×
779
            ++successor_task_calls;
×
780
            return x + 1;
×
781
        });
782
        auto succ2 = s | ex::then([&](int const& x) {
×
783
            ++successor_task_calls;
×
784
            return x + 2;
×
785
        });
786
        HPX_TEST_EQ(
×
787
            hpx::get<0>(*(ex::when_all(succ1, succ2) |
788
                ex::then([](int const& x, int const& y) { return x + y; }) |
789
                tt::sync_wait(sched))),
790
            9);
791
        HPX_TEST_EQ(first_task_calls, std::size_t(1));
×
792
        HPX_TEST_EQ(successor_task_calls, std::size_t(2));
×
793
    }
×
794

795
    {
796
        ex::run_loop loop;
×
797
        auto sched = loop.get_scheduler();
×
798

799
        std::atomic<std::size_t> first_task_calls{0};
×
800
        std::atomic<std::size_t> successor_task_calls{0};
×
801
        hpx::mutex mtx;
×
802
        hpx::condition_variable cond;
×
803
        bool started{false};
×
804
        auto s = ex::schedule(sched) | ex::then([&]() {
×
805
            ++first_task_calls;
×
806
            std::lock_guard l{mtx};
×
807
            started = true;
×
808
            cond.notify_one();
×
809
            return 3;
810
        }) | ex::ensure_started();
×
811
        {
812
            std::unique_lock l{mtx};
×
813
            cond.wait(l, [&]() { return started; });
×
814
        }
×
815
        auto succ1 = s | ex::transfer(sched) | ex::then([&](int const& x) {
×
816
            ++successor_task_calls;
×
817
            return x + 1;
×
818
        });
819
        auto succ2 = s | ex::transfer(sched) | ex::then([&](int const& x) {
×
820
            ++successor_task_calls;
×
821
            return x + 2;
×
822
        });
823
        HPX_TEST_EQ(
×
824
            hpx::get<0>(*(ex::when_all(succ1, succ2) |
825
                ex::then([](int const& x, int const& y) { return x + y; }) |
826
                tt::sync_wait(sched))),
827
            9);
828
        HPX_TEST_EQ(first_task_calls, std::size_t(1));
×
829
        HPX_TEST_EQ(successor_task_calls, std::size_t(2));
×
830
    }
×
831
}
×
832

833
void test_split()
×
834
{
835
    {
836
        ex::run_loop loop;
×
837
        auto sched = loop.get_scheduler();
×
838

839
        ex::schedule(sched) | ex::split() | tt::sync_wait();
×
840
    }
×
841

842
    {
843
        ex::run_loop loop;
×
844
        auto sched = loop.get_scheduler();
×
845

846
        auto s = ex::transfer_just(sched, 42) | ex::split();
×
847
        HPX_TEST_EQ(hpx::get<0>(*tt::sync_wait(std::move(s))), 42);
×
848
    }
×
849

850
    {
851
        ex::run_loop loop;
×
852
        auto sched = loop.get_scheduler();
×
853

854
        auto s =
855
            ex::transfer_just(sched, 42) | ex::split() | ex::transfer(sched);
×
856
        HPX_TEST_EQ(hpx::get<0>(*tt::sync_wait(std::move(s))), 42);
×
857
    }
×
858

859
    {
860
        ex::run_loop loop;
×
861
        auto sched = loop.get_scheduler();
×
862

863
        auto s = ex::transfer_just(sched, 42) | ex::split();
×
864
        HPX_TEST_EQ(hpx::get<0>(*tt::sync_wait(s)), 42);
×
865
        HPX_TEST_EQ(hpx::get<0>(*tt::sync_wait(s)), 42);
×
866
        HPX_TEST_EQ(hpx::get<0>(*tt::sync_wait(s)), 42);
×
867
        HPX_TEST_EQ(hpx::get<0>(*tt::sync_wait(std::move(s))), 42);
×
868
    }
×
869
}
×
870

871
void test_split_when_all()
×
872
{
873
    {
874
        ex::run_loop loop;
×
875
        auto sched = loop.get_scheduler();
×
876

877
        std::atomic<std::size_t> first_task_calls{0};
×
878
        std::atomic<std::size_t> successor_task_calls{0};
×
879
        auto s = ex::schedule(sched) | ex::then([&]() {
×
880
            HPX_TEST_EQ(first_task_calls, std::size_t(0));
×
881
            HPX_TEST_EQ(successor_task_calls, std::size_t(0));
×
882
            ++first_task_calls;
×
883
        }) | ex::split();
×
884
        auto succ1 = s | ex::then([&]() {
×
885
            HPX_TEST_EQ(first_task_calls, std::size_t(1));
×
886
            HPX_TEST_EQ(successor_task_calls, std::size_t(0));
×
887
            ++successor_task_calls;
×
888
            return 1;
×
889
        });
890
        auto succ2 = s | ex::then([&]() {
×
891
            HPX_TEST_EQ(first_task_calls, std::size_t(1));
×
892
            HPX_TEST_EQ(successor_task_calls, std::size_t(1));
×
893
            ++successor_task_calls;
×
894
            return 2;
×
895
        });
896
        HPX_TEST_EQ(
×
897
            hpx::get<0>(*(ex::when_all(succ1, succ2) |
898
                ex::then([](int const& x, int const& y) { return x + y; }) |
899
                tt::sync_wait(sched))),
900
            3);
901
        HPX_TEST_EQ(first_task_calls, std::size_t(1));
×
902
        HPX_TEST_EQ(successor_task_calls, std::size_t(2));
×
903
    }
×
904

905
    {
906
        ex::run_loop loop;
×
907
        auto sched = loop.get_scheduler();
×
908

909
        std::atomic<std::size_t> first_task_calls{0};
×
910
        std::atomic<std::size_t> successor_task_calls{0};
×
911
        auto s = ex::schedule(sched) | ex::then([&]() {
×
912
            HPX_TEST_EQ(first_task_calls, std::size_t(0));
×
913
            HPX_TEST_EQ(successor_task_calls, std::size_t(0));
×
914
            ++first_task_calls;
×
915
            return 3;
×
916
        }) | ex::split();
×
917
        auto succ1 = s | ex::then([&](int const& x) {
×
918
            HPX_TEST_EQ(first_task_calls, std::size_t(1));
×
919
            HPX_TEST_EQ(successor_task_calls, std::size_t(0));
×
920
            ++successor_task_calls;
×
921
            return x + 1;
×
922
        });
923
        auto succ2 = s | ex::then([&](int const& x) {
×
924
            HPX_TEST_EQ(first_task_calls, std::size_t(1));
×
925
            HPX_TEST_EQ(successor_task_calls, std::size_t(1));
×
926
            ++successor_task_calls;
×
927
            return x + 2;
×
928
        });
929
        HPX_TEST_EQ(
×
930
            hpx::get<0>(*(ex::when_all(succ1, succ2) |
931
                ex::then([](int const& x, int const& y) { return x + y; }) |
932
                tt::sync_wait(sched))),
933
            9);
934
        HPX_TEST_EQ(first_task_calls, std::size_t(1));
×
935
        HPX_TEST_EQ(successor_task_calls, std::size_t(2));
×
936
    }
×
937

938
    {
939
        ex::run_loop loop;
×
940
        auto sched = loop.get_scheduler();
×
941

942
        std::atomic<std::size_t> first_task_calls{0};
×
943
        std::atomic<std::size_t> successor_task_calls{0};
×
944
        auto s = ex::schedule(sched) | ex::then([&]() {
×
945
            HPX_TEST_EQ(first_task_calls, std::size_t(0));
×
946
            HPX_TEST_EQ(successor_task_calls, std::size_t(0));
×
947
            ++first_task_calls;
×
948
            return 3;
×
949
        }) | ex::split();
×
950
        auto succ1 = s | ex::transfer(sched) | ex::then([&](int const& x) {
×
951
            HPX_TEST_EQ(first_task_calls, std::size_t(1));
×
952
            HPX_TEST_EQ(successor_task_calls, std::size_t(0));
×
953
            ++successor_task_calls;
×
954
            return x + 1;
×
955
        });
956
        auto succ2 = s | ex::transfer(sched) | ex::then([&](int const& x) {
×
957
            HPX_TEST_EQ(first_task_calls, std::size_t(1));
×
958
            HPX_TEST_EQ(successor_task_calls, std::size_t(1));
×
959
            ++successor_task_calls;
×
960
            return x + 2;
×
961
        });
962
        HPX_TEST_EQ(
×
963
            hpx::get<0>(*(ex::when_all(succ1, succ2) |
964
                ex::then([](int const& x, int const& y) { return x + y; }) |
965
                tt::sync_wait(sched))),
966
            9);
967
        HPX_TEST_EQ(first_task_calls, std::size_t(1));
×
968
        HPX_TEST_EQ(successor_task_calls, std::size_t(2));
×
969
    }
×
970
}
×
971

972
void test_let_value()
×
973
{
974
    // void predecessor
975
    {
976
        ex::run_loop loop;
×
977
        auto sched = loop.get_scheduler();
×
978

979
        auto result = hpx::get<0>(*(ex::schedule(sched) |
×
980
            ex::let_value([]() { return ex::just(42); }) | tt::sync_wait()));
×
981
        HPX_TEST_EQ(result, 42);
×
982
    }
×
983

984
    {
985
        ex::run_loop loop;
×
986
        auto sched = loop.get_scheduler();
×
987

988
        auto result = hpx::get<0>(*(ex::schedule(sched) | ex::let_value([=]() {
×
989
            return ex::transfer_just(sched, 42);
×
990
        }) | tt::sync_wait()));
×
991
        HPX_TEST_EQ(result, 42);
×
992
    }
×
993

994
    {
995
        ex::run_loop loop;
×
996
        auto sched = loop.get_scheduler();
×
997

998
        auto result = hpx::get<0>(*(ex::just() | ex::let_value([=]() {
×
999
            return ex::transfer_just(sched, 42);
×
1000
        }) | tt::sync_wait(sched)));
×
1001
        HPX_TEST_EQ(result, 42);
×
1002
    }
×
1003

1004
    // int predecessor, value ignored
1005
    {
1006
        ex::run_loop loop;
×
1007
        auto sched = loop.get_scheduler();
×
1008

1009
        auto result = hpx::get<0>(*(ex::transfer_just(sched, 43) |
×
1010
            ex::let_value([](int&) { return ex::just(42); }) |
×
1011
            tt::sync_wait()));
×
1012
        HPX_TEST_EQ(result, 42);
×
1013
    }
×
1014

1015
    {
1016
        ex::run_loop loop;
×
1017
        auto sched = loop.get_scheduler();
×
1018

1019
        auto result = hpx::get<0>(*(ex::transfer_just(sched, 43) |
×
1020
            ex::let_value([=](int&) { return ex::transfer_just(sched, 42); }) |
×
1021
            tt::sync_wait()));
×
1022
        HPX_TEST_EQ(result, 42);
×
1023
    }
×
1024

1025
    {
1026
        ex::run_loop loop;
×
1027
        auto sched = loop.get_scheduler();
×
1028

1029
        auto result = hpx::get<0>(*(ex::just(43) | ex::let_value([=](int&) {
×
1030
            return ex::transfer_just(sched, 42);
×
1031
        }) | tt::sync_wait(sched)));
×
1032
        HPX_TEST_EQ(result, 42);
×
1033
    }
×
1034

1035
    // int predecessor, value used
1036
    {
1037
        ex::run_loop loop;
×
1038
        auto sched = loop.get_scheduler();
×
1039

1040
        auto result = hpx::get<0>(
×
1041
            *(ex::transfer_just(sched, 43) | ex::let_value([](int& x) {
×
1042
                return ex::just(42) | ex::then([&](int y) { return x + y; });
×
1043
            }) | tt::sync_wait()));
×
1044
        HPX_TEST_EQ(result, 85);
×
1045
    }
×
1046

1047
    {
1048
        ex::run_loop loop;
×
1049
        auto sched = loop.get_scheduler();
×
1050

1051
        auto result = hpx::get<0>(
×
1052
            *(ex::transfer_just(sched, 43) | ex::let_value([=](int& x) {
×
1053
                return ex::transfer_just(sched, 42) |
×
1054
                    ex::then([&](int y) { return x + y; });
×
1055
            }) | tt::sync_wait()));
×
1056
        HPX_TEST_EQ(result, 85);
×
1057
    }
×
1058

1059
    {
1060
        ex::run_loop loop;
×
1061
        auto sched = loop.get_scheduler();
×
1062

1063
        auto result = hpx::get<0>(*(ex::just(43) | ex::let_value([=](int& x) {
×
1064
            return ex::transfer_just(sched, 42) |
×
1065
                ex::then([&](int y) { return x + y; });
×
1066
        }) | tt::sync_wait(sched)));
×
1067
        HPX_TEST_EQ(result, 85);
×
1068
    }
×
1069

1070
    // predecessor throws, let sender is ignored
1071
    {
1072
        ex::run_loop loop;
×
1073
        auto sched = loop.get_scheduler();
×
1074

1075
        bool exception_thrown = false;
×
1076

1077
        try
1078
        {
1079
            ex::transfer_just(sched, 43) | ex::then([](int) -> int {
×
1080
                throw std::runtime_error("error");
×
1081
            }) | ex::let_value([](int&) {
×
1082
                HPX_TEST(false);
×
1083
                return ex::just(0);
×
1084
            }) | tt::sync_wait();
×
1085

1086
            HPX_TEST(false);
×
1087
        }
×
1088
        catch (std::runtime_error const& e)
1089
        {
1090
            HPX_TEST_EQ(std::string(e.what()), std::string("error"));
×
1091
            exception_thrown = true;
×
1092
        }
×
1093

1094
        HPX_TEST(exception_thrown);
×
1095
    }
×
1096
}
×
1097

1098
void check_exception_ptr_message(
×
1099
    std::exception_ptr ep, std::string const& message)
1100
{
1101
    try
1102
    {
1103
        std::rethrow_exception(ep);
×
1104
    }
×
1105
    catch (std::runtime_error const& e)
1106
    {
1107
        HPX_TEST_EQ(std::string(e.what()), message);
×
1108
        return;
1109
    }
×
1110

1111
    HPX_TEST(false);
×
1112
}
×
1113

1114
void test_let_error()
×
1115
{
1116
    // void predecessor
1117
    {
1118
        ex::run_loop loop;
×
1119
        auto sched = loop.get_scheduler();
×
1120

1121
        std::atomic<bool> called{false};
×
1122
        ex::schedule(sched) | ex::then([]() {
×
1123
            throw std::runtime_error("error");
×
1124
        }) | ex::let_error([&called](std::exception_ptr& ep) {
×
1125
            called = true;
×
1126
            check_exception_ptr_message(ep, "error");
×
1127
            return ex::just();
×
1128
        }) | tt::sync_wait();
×
1129
        HPX_TEST(called);
×
1130
    }
×
1131

1132
    {
1133
        ex::run_loop loop;
×
1134
        auto sched = loop.get_scheduler();
×
1135

1136
        std::atomic<bool> called{false};
×
1137
        ex::schedule(sched) | ex::then([]() {
×
1138
            throw std::runtime_error("error");
×
1139
        }) | ex::let_error([=, &called](std::exception_ptr& ep) {
×
1140
            called = true;
×
1141
            check_exception_ptr_message(ep, "error");
×
1142
            return ex::transfer_just(sched);
×
1143
        }) | tt::sync_wait();
×
1144
        HPX_TEST(called);
×
1145
    }
×
1146

1147
    {
1148
        ex::run_loop loop;
×
1149
        auto sched = loop.get_scheduler();
×
1150

1151
        std::atomic<bool> called{false};
×
1152
        ex::just() | ex::then([]() { throw std::runtime_error("error"); }) |
×
1153
            ex::let_error([=, &called](std::exception_ptr& ep) {
×
1154
                called = true;
×
1155
                check_exception_ptr_message(ep, "error");
×
1156
                return ex::transfer_just(sched);
×
1157
            }) |
×
1158
            tt::sync_wait(sched);
×
1159
        HPX_TEST(called);
×
1160
    }
×
1161

1162
    // int predecessor
1163
    {
1164
        ex::run_loop loop;
×
1165
        auto sched = loop.get_scheduler();
×
1166

1167
        auto result = hpx::get<0>(*(ex::schedule(sched) | ex::then([]() {
×
1168
            throw std::runtime_error("error");
×
1169
            return 43;
1170
        }) | ex::let_error([](std::exception_ptr& ep) {
×
1171
            check_exception_ptr_message(ep, "error");
×
1172
            return ex::just(42);
×
1173
        }) | tt::sync_wait()));
×
1174
        HPX_TEST_EQ(result, 42);
×
1175
    }
×
1176

1177
    {
1178
        ex::run_loop loop;
×
1179
        auto sched = loop.get_scheduler();
×
1180

1181
        auto result = hpx::get<0>(*(ex::schedule(sched) | ex::then([]() {
×
1182
            throw std::runtime_error("error");
×
1183
            return 43;
1184
        }) | ex::let_error([=](std::exception_ptr& ep) {
×
1185
            check_exception_ptr_message(ep, "error");
×
1186
            return ex::transfer_just(sched, 42);
×
1187
        }) | tt::sync_wait()));
×
1188
        HPX_TEST_EQ(result, 42);
×
1189
    }
×
1190

1191
    {
1192
        ex::run_loop loop;
×
1193
        auto sched = loop.get_scheduler();
×
1194

1195
        auto result = hpx::get<0>(*(ex::just() | ex::then([]() {
×
1196
            throw std::runtime_error("error");
×
1197
            return 43;
1198
        }) | ex::let_error([=](std::exception_ptr& ep) {
×
1199
            check_exception_ptr_message(ep, "error");
×
1200
            return ex::transfer_just(sched, 42);
×
1201
        }) | tt::sync_wait(sched)));
×
1202
        HPX_TEST_EQ(result, 42);
×
1203
    }
×
1204

1205
    // predecessor doesn't throw, let sender is ignored
1206
    {
1207
        ex::run_loop loop;
×
1208
        auto sched = loop.get_scheduler();
×
1209

1210
        auto result = hpx::get<0>(*(ex::transfer_just(sched, 42) |
×
1211
            ex::let_error([](std::exception_ptr) {
×
1212
                HPX_TEST(false);
×
1213
                return ex::just(43);
×
1214
            }) |
×
1215
            tt::sync_wait()));
×
1216
        HPX_TEST_EQ(result, 42);
×
1217
    }
×
1218

1219
    {
1220
        ex::run_loop loop;
×
1221
        auto sched = loop.get_scheduler();
×
1222

1223
        auto result = hpx::get<0>(*(ex::transfer_just(sched, 42) |
×
1224
            ex::let_error([=](std::exception_ptr) {
×
1225
                HPX_TEST(false);
×
1226
                return ex::transfer_just(sched, 43);
×
1227
            }) |
×
1228
            tt::sync_wait()));
×
1229
        HPX_TEST_EQ(result, 42);
×
1230
    }
×
1231

1232
    {
1233
        ex::run_loop loop;
×
1234
        auto sched = loop.get_scheduler();
×
1235

1236
        auto result =
×
1237
            hpx::get<0>(*(ex::just(42) | ex::let_error([=](std::exception_ptr) {
×
1238
                HPX_TEST(false);
×
1239
                return ex::transfer_just(sched, 43);
×
1240
            }) | tt::sync_wait(sched)));
×
1241
        HPX_TEST_EQ(result, 42);
×
1242
    }
×
1243
}
×
1244

1245
void test_detach()
×
1246
{
1247
    {
1248
        ex::run_loop loop;
×
1249
        auto sched = loop.get_scheduler();
×
1250

1251
        bool called = false;
×
1252
        hpx::mutex mtx;
×
1253
        hpx::condition_variable cond;
×
1254
        ex::schedule(sched) | ex::then([&]() {
×
1255
            std::unique_lock l{mtx};
×
1256
            called = true;
×
1257
            cond.notify_one();
×
1258
        }) | ex::start_detached();
×
1259

1260
        {
1261
            std::unique_lock l{mtx};
×
1262
            HPX_TEST(cond.wait_for(
×
1263
                l, std::chrono::seconds(1), [&]() { return called; }));
1264
        }
×
1265
        HPX_TEST(called);
×
1266
    }
×
1267

1268
    // Values passed to set_value are ignored
1269
    {
1270
        ex::run_loop loop;
×
1271
        auto sched = loop.get_scheduler();
×
1272

1273
        bool called = false;
×
1274
        hpx::mutex mtx;
×
1275
        hpx::condition_variable cond;
×
1276
        ex::schedule(sched) | ex::then([&]() {
×
1277
            std::lock_guard l{mtx};
×
1278
            called = true;
×
1279
            cond.notify_one();
×
1280
            return 42;
1281
        }) | ex::start_detached();
×
1282

1283
        {
1284
            std::unique_lock l{mtx};
×
1285
            HPX_TEST(cond.wait_for(
×
1286
                l, std::chrono::seconds(1), [&]() { return called; }));
1287
        }
×
1288
        HPX_TEST(called);
×
1289
    }
×
1290
}
×
1291

1292
void test_keep_future_sender()
×
1293
{
1294
    // the future should be passed to then, not it's contained value
1295
    {
1296
        ex::run_loop loop;
×
1297
        auto sched = loop.get_scheduler();
×
1298

1299
        ex::keep_future(hpx::make_ready_future<void>()) |
×
1300
            ex::then([](hpx::future<void>&& f) { HPX_TEST(f.is_ready()); }) |
×
1301
            tt::sync_wait(sched);
×
1302
    }
×
1303

1304
    {
1305
        ex::run_loop loop;
×
1306
        auto sched = loop.get_scheduler();
×
1307

1308
        ex::keep_future(hpx::make_ready_future<void>().share()) |
×
1309
            ex::then(
×
1310
                [](hpx::shared_future<void>&& f) { HPX_TEST(f.is_ready()); }) |
×
1311
            tt::sync_wait(sched);
×
1312
    }
×
1313

1314
    {
1315
        ex::run_loop loop;
×
1316
        auto sched = loop.get_scheduler();
×
1317

1318
        ex::keep_future(hpx::make_ready_future<int>(42)) |
×
1319
            ex::then([](hpx::future<int>&& f) {
×
1320
                HPX_TEST(f.is_ready());
×
1321
                HPX_TEST_EQ(f.get(), 42);
×
1322
            }) |
×
1323
            tt::sync_wait(sched);
×
1324
    }
×
1325

1326
    {
1327
        ex::run_loop loop;
×
1328
        auto sched = loop.get_scheduler();
×
1329

1330
        ex::keep_future(hpx::make_ready_future<int>(42).share()) |
×
1331
            ex::then([](hpx::shared_future<int>&& f) {
×
1332
                HPX_TEST(f.is_ready());
×
1333
                HPX_TEST_EQ(f.get(), 42);
×
1334
            }) |
×
1335
            tt::sync_wait(sched);
×
1336
    }
×
1337

1338
    {
1339
        ex::run_loop loop;
×
1340
        auto sched = loop.get_scheduler();
×
1341

1342
        std::atomic<bool> called{false};
×
1343
        auto f = hpx::async([&]() { called = true; });
×
1344

1345
        auto r =
1346
            hpx::get<0>(*tt::sync_wait(sched, ex::keep_future(std::move(f))));
×
1347
        static_assert(
1348
            std::is_same<std::decay_t<decltype(r)>, hpx::future<void>>::value,
1349
            "sync_wait should return future<void>");
1350

1351
        HPX_TEST(called);
×
1352
        HPX_TEST(r.is_ready());
×
1353

1354
        bool exception_thrown = false;
×
1355
        try
1356
        {
1357
            // The move is intentional. sync_wait should throw.
1358
            // NOLINTNEXTLINE(bugprone-use-after-move)
1359
            tt::sync_wait(sched, ex::keep_future(std::move(f)));
×
1360
            HPX_TEST(false);
×
1361
        }
×
1362
        catch (...)
1363
        {
1364
            exception_thrown = true;
×
1365
        }
×
1366
        HPX_TEST(exception_thrown);
×
1367
    }
×
1368

1369
    {
1370
        ex::run_loop loop;
×
1371
        auto sched = loop.get_scheduler();
×
1372

1373
        std::atomic<bool> called{false};
×
1374
        auto f = hpx::async([&]() {
×
1375
            called = true;
×
1376
            return 42;
×
1377
        });
1378

1379
        auto r =
1380
            hpx::get<0>(*tt::sync_wait(sched, ex::keep_future(std::move(f))));
×
1381
        static_assert(
1382
            std::is_same<std::decay_t<decltype(r)>, hpx::future<int>>::value,
1383
            "sync_wait should return future<int>");
1384

1385
        HPX_TEST(called);
×
1386
        HPX_TEST(r.is_ready());
×
1387
        HPX_TEST_EQ(r.get(), 42);
×
1388

1389
        bool exception_thrown = false;
×
1390
        try
1391
        {
1392
            // The move is intentional. sync_wait should throw.
1393
            // NOLINTNEXTLINE(bugprone-use-after-move)
1394
            tt::sync_wait(sched, ex::keep_future(std::move(f)));
×
1395
            HPX_TEST(false);
×
1396
        }
×
1397
        catch (...)
1398
        {
1399
            exception_thrown = true;
×
1400
        }
×
1401
        HPX_TEST(exception_thrown);
×
1402
    }
×
1403

1404
    {
1405
        ex::run_loop loop;
×
1406
        auto sched = loop.get_scheduler();
×
1407

1408
        std::atomic<bool> called{false};
×
1409
        auto f = hpx::async([&]() {
×
1410
            called = true;
×
1411
            return 42;
×
1412
        });
1413

1414
        HPX_TEST_EQ(hpx::get<0>(*tt::sync_wait(sched,
×
1415
                        ex::then(ex::keep_future(std::move(f)),
1416
                            [](hpx::future<int>&& f) { return f.get() / 2; }))),
1417
            21);
1418
        HPX_TEST(called);
×
1419
    }
×
1420

1421
    {
1422
        ex::run_loop loop;
×
1423
        auto sched = loop.get_scheduler();
×
1424

1425
        std::atomic<std::size_t> calls{0};
×
1426
        auto sf = hpx::async([&]() { ++calls; }).share();
×
1427

1428
        tt::sync_wait(sched, ex::keep_future(sf));
×
1429
        tt::sync_wait(sched, ex::keep_future(sf));
×
1430
        tt::sync_wait(sched, ex::keep_future(std::move(sf)));
×
1431
        HPX_TEST_EQ(calls, std::size_t(1));
×
1432

1433
        bool exception_thrown = false;
×
1434
        try
1435
        {
1436
            tt::sync_wait(sched, ex::keep_future(sf));
×
1437
            HPX_TEST(false);
×
1438
        }
×
1439
        catch (...)
1440
        {
1441
            exception_thrown = true;
×
1442
        }
×
1443
        HPX_TEST(exception_thrown);
×
1444
    }
×
1445

1446
    {
1447
        ex::run_loop loop;
×
1448
        auto sched = loop.get_scheduler();
×
1449

1450
        std::atomic<std::size_t> calls{0};
×
1451
        auto sf = hpx::async([&]() {
×
1452
            ++calls;
×
1453
            return 42;
×
1454
        }).share();
×
1455

1456
        HPX_TEST_EQ(
×
1457
            hpx::get<0>(*tt::sync_wait(sched, ex::keep_future(sf))).get(), 42);
1458
        HPX_TEST_EQ(
×
1459
            hpx::get<0>(*tt::sync_wait(sched, ex::keep_future(sf))).get(), 42);
1460
        HPX_TEST_EQ(
×
1461
            hpx::get<0>(*tt::sync_wait(sched, ex::keep_future(std::move(sf))))
1462
                .get(),
1463
            42);
1464
        HPX_TEST_EQ(calls, std::size_t(1));
×
1465

1466
        bool exception_thrown = false;
×
1467
        try
1468
        {
1469
            tt::sync_wait(sched, ex::keep_future(sf));
×
1470
            HPX_TEST(false);
×
1471
        }
×
1472
        catch (...)
1473
        {
1474
            exception_thrown = true;
×
1475
        }
×
1476
        HPX_TEST(exception_thrown);
×
1477
    }
×
1478

1479
    // Keep future alive across on
1480
    {
1481
        ex::run_loop loop;
×
1482
        auto sched = loop.get_scheduler();
×
1483

1484
        auto f = hpx::async([&]() { return 42; });
×
1485

1486
        auto r = hpx::get<0>(*(ex::keep_future(std::move(f)) |
×
1487
            ex::transfer(sched) | tt::sync_wait()));
×
1488
        HPX_TEST(r.is_ready());
×
1489
        HPX_TEST_EQ(r.get(), 42);
×
1490
    }
×
1491

1492
    {
1493
        ex::run_loop loop;
×
1494
        auto sched = loop.get_scheduler();
×
1495

1496
        auto sf = hpx::async([&]() { return 42; }).share();
×
1497

1498
        auto r = hpx::get<0>(*(ex::keep_future(std::move(sf)) |
×
1499
            ex::transfer(sched) | tt::sync_wait()));
×
1500
        HPX_TEST(r.is_ready());
×
1501
        HPX_TEST_EQ(r.get(), 42);
×
1502
    }
×
1503

1504
    {
1505
        ex::run_loop loop;
×
1506
        auto sched = loop.get_scheduler();
×
1507

1508
        auto sf = hpx::async([&]() {
×
1509
            return custom_type_non_default_constructible_non_copyable{42};
×
1510
        }).share();
×
1511

1512
        // NOTE: Without keep_future this should fail to compile, since
1513
        // sync_wait would receive a const& to the value which requires a copy
1514
        // or storing a const&. The copy is not possible because the type is
1515
        // noncopyable, and storing a reference is not acceptable since the
1516
        // reference may outlive the value.
1517
        auto r = hpx::get<0>(*(ex::keep_future(std::move(sf)) |
×
1518
            ex::transfer(sched) | tt::sync_wait()));
×
1519
        HPX_TEST(r.is_ready());
×
1520
        HPX_TEST_EQ(r.get().x, 42);
×
1521
    }
×
1522

1523
    // Use unwrapping with keep_future
1524
    {
1525
        ex::run_loop loop;
×
1526
        auto sched = loop.get_scheduler();
×
1527

1528
        auto f = hpx::async([]() { return 42; });
×
1529
        auto sf = hpx::async([]() { return 3.14; }).share();
×
1530

1531
        auto fun = hpx::unwrapping(
×
1532
            [](int&& x, double const& y) { return x * 2 + (int(y) / 2); });
×
1533
        HPX_TEST_EQ(hpx::get<0>(*(ex::when_all(ex::keep_future(std::move(f)),
×
1534
                                      ex::keep_future(std::move(sf))) |
1535
                        ex::then(fun) | tt::sync_wait(sched))),
1536
            85);
1537
    }
×
1538

1539
    {
1540
        ex::run_loop loop;
×
1541
        auto sched = loop.get_scheduler();
×
1542

1543
        auto f = hpx::async([]() { return 42; });
×
1544
        auto sf = hpx::async([]() { return 3.14; }).share();
×
1545

1546
        auto fun = hpx::unwrapping(
×
1547
            [](int&& x, double const& y) { return x * 2 + (int(y) / 2); });
×
1548
        HPX_TEST_EQ(hpx::get<0>(*(ex::when_all(ex::keep_future(std::move(f)),
×
1549
                                      ex::keep_future(sf)) |
1550
                        ex::transfer(sched) | ex::then(fun) | tt::sync_wait())),
1551
            85);
1552
    }
×
1553
}
×
1554

1555
void test_bulk()
×
1556
{
1557
    std::vector<int> const ns = {0, 1, 10, 43};
×
1558

1559
    {
1560
        ex::run_loop loop;
×
1561
        auto sched = loop.get_scheduler();
×
1562

1563
        for (int n : ns)
×
1564
        {
1565
            std::vector<int> v(n, 0);
×
1566
            hpx::thread::id parent_id = hpx::this_thread::get_id();
×
1567

1568
            ex::schedule(sched) | ex::bulk(n, [&](int i) {
×
1569
                ++v[i];
×
1570
                HPX_TEST_EQ(parent_id, hpx::this_thread::get_id());
×
1571
            }) | tt::sync_wait();
×
1572

1573
            for (int i = 0; i < n; ++i)
×
1574
            {
1575
                HPX_TEST_EQ(v[i], 1);
×
1576
            }
×
1577
        }
×
1578
    }
×
1579

1580
    {
1581
        ex::run_loop loop;
×
1582
        auto sched = loop.get_scheduler();
×
1583

1584
        for (auto n : ns)
×
1585
        {
1586
            std::vector<int> v(n, -1);
×
1587
            hpx::thread::id parent_id = hpx::this_thread::get_id();
×
1588

1589
            auto v_out = hpx::get<0>(*(ex::transfer_just(sched, std::move(v)) |
×
1590
                ex::bulk(n,
×
1591
                    [&parent_id](int i, std::vector<int>& v) {
×
1592
                        v[i] = i;
×
1593
                        HPX_TEST_EQ(parent_id, hpx::this_thread::get_id());
×
1594
                    }) |
×
1595
                tt::sync_wait()));
×
1596

1597
            for (int i = 0; i < n; ++i)
×
1598
            {
1599
                HPX_TEST_EQ(v_out[i], i);
×
1600
            }
×
1601
        }
×
1602
    }
×
1603

1604
    {
1605
        ex::run_loop loop;
×
1606
        auto sched = loop.get_scheduler();
×
1607

1608
        std::unordered_set<std::string> string_map;
×
1609
        std::vector<std::string> v = {"hello", "brave", "new", "world"};
×
1610
        std::vector<std::string> v_ref = v;
×
1611

1612
        hpx::mutex mtx;
×
1613

1614
        ex::schedule(sched) | ex::bulk(std::move(v), [&](std::string const& s) {
×
1615
            std::lock_guard lk(mtx);
×
1616
            string_map.insert(s);
×
1617
        }) | tt::sync_wait();
×
1618

1619
        for (auto const& s : v_ref)
×
1620
        {
1621
            HPX_TEST(string_map.find(s) != string_map.end());
×
1622
        }
1623
    }
×
1624

1625
    {
1626
        ex::run_loop loop;
×
1627
        auto sched = loop.get_scheduler();
×
1628

1629
        for (auto n : ns)
×
1630
        {
1631
            int i_fail = 3;
×
1632

1633
            std::vector<int> v(n, -1);
×
1634
            bool const expect_exception = n > i_fail;
×
1635

1636
            try
1637
            {
1638
                ex::transfer_just(sched) | ex::bulk(n, [&v, i_fail](int i) {
×
1639
                    if (i == i_fail)
×
1640
                    {
1641
                        throw std::runtime_error("error");
×
1642
                    }
1643
                    v[i] = i;
×
1644
                }) | tt::sync_wait();
×
1645

1646
                if (expect_exception)
×
1647
                {
1648
                    HPX_TEST(false);
×
1649
                }
×
1650
            }
×
1651
            catch (std::runtime_error const& e)
1652
            {
1653
                if (!expect_exception)
×
1654
                {
1655
                    HPX_TEST(false);
×
1656
                }
×
1657

1658
                HPX_TEST_EQ(std::string(e.what()), std::string("error"));
×
1659
            }
×
1660

1661
            if (expect_exception)
×
1662
            {
1663
                HPX_TEST_EQ(v[i_fail], -1);
×
1664
            }
×
1665
            else
1666
            {
1667
                for (int i = 0; i < n; ++i)
×
1668
                {
1669
                    HPX_TEST_EQ(v[i], i);
×
1670
                }
×
1671
            }
1672
        }
×
1673
    }
×
1674
}
×
1675

1676
void test_completion_scheduler()
×
1677
{
1678
    ex::run_loop loop;
×
1679
    auto sched = loop.get_scheduler();
×
1680

1681
    {
1682
        auto sender = ex::schedule(sched);
×
1683
        auto completion_scheduler =
1684
            ex::get_completion_scheduler<ex::set_value_t>(sender);
×
1685
        static_assert(
1686
            std::is_same_v<std::decay_t<decltype(completion_scheduler)>,
1687
                decltype(sched)>,
1688
            "the completion scheduler should be a run_pool_scheduler");
1689
    }
1690

1691
    {
1692
        auto sender = ex::then(ex::schedule(sched), []() {});
×
1693
        using hpx::functional::tag_invoke;
1694
        auto completion_scheduler =
1695
            ex::get_completion_scheduler<ex::set_value_t>(sender);
×
1696
        static_assert(
1697
            std::is_same_v<std::decay_t<decltype(completion_scheduler)>,
1698
                decltype(sched)>,
1699
            "the completion scheduler should be a run_pool_scheduler");
1700
    }
1701

1702
    {
1703
        auto sender = ex::transfer_just(sched, 42);
×
1704
        auto completion_scheduler =
1705
            ex::get_completion_scheduler<ex::set_value_t>(sender);
×
1706
        static_assert(
1707
            std::is_same_v<std::decay_t<decltype(completion_scheduler)>,
1708
                decltype(sched)>,
1709
            "the completion scheduler should be a run_pool_scheduler");
1710
    }
1711

1712
    {
1713
        auto sender = ex::bulk(ex::schedule(sched), 10, [](int) {});
×
1714
        auto completion_scheduler =
1715
            ex::get_completion_scheduler<ex::set_value_t>(sender);
×
1716
        static_assert(
1717
            std::is_same_v<std::decay_t<decltype(completion_scheduler)>,
1718
                decltype(sched)>,
1719
            "the completion scheduler should be a run_pool_scheduler");
1720
    }
1721

1722
    {
1723
        auto sender = ex::then(
×
1724
            ex::bulk(ex::transfer_just(sched, 42), 10, [](int, int) {}),
×
1725
            [](int) {});
1726
        auto completion_scheduler =
1727
            ex::get_completion_scheduler<ex::set_value_t>(sender);
×
1728
        static_assert(
1729
            std::is_same_v<std::decay_t<decltype(completion_scheduler)>,
1730
                decltype(sched)>,
1731
            "the completion scheduler should be a run_pool_scheduler");
1732
    }
1733

1734
    {
1735
        auto sender =
1736
            ex::bulk(ex::then(ex::transfer_just(sched, 42), [](int) {}), 10,
×
1737
                [](int, int) {});
1738
        auto completion_scheduler =
1739
            ex::get_completion_scheduler<ex::set_value_t>(sender);
×
1740
        static_assert(
1741
            std::is_same_v<std::decay_t<decltype(completion_scheduler)>,
1742
                decltype(sched)>,
1743
            "the completion scheduler should be a run_pool_scheduler");
1744
    }
1745

1746
    loop.finish();
×
1747
    loop.run();
×
1748
}
×
1749

1750
void do_run_test(void (*func)(), char const* func_name)
×
1751
{
1752
    std::cout << func_name << "\n";
×
1753
    func();
×
1754
}
×
1755

1756
#define RUN_TEST(func) do_run_test(&func, HPX_PP_STRINGIZE(func))
1757

1758
int hpx_main()
×
1759
{
1760
    RUN_TEST(test_concepts);
×
1761
    RUN_TEST(test_execute);
×
1762
    RUN_TEST(test_sender_receiver_basic);
×
1763
    RUN_TEST(test_sender_receiver_then);
×
1764
    RUN_TEST(test_sender_receiver_then_wait);
×
1765
    RUN_TEST(test_sender_receiver_then_sync_wait);
×
1766
    RUN_TEST(test_sender_receiver_then_arguments);
×
1767
    RUN_TEST(test_transfer_basic);
×
1768
    RUN_TEST(test_transfer_arguments);
×
1769
    RUN_TEST(test_just_void);
×
1770
    RUN_TEST(test_just_one_arg);
×
1771
    RUN_TEST(test_just_two_args);
×
1772
    RUN_TEST(test_transfer_just_void);
×
1773
    RUN_TEST(test_transfer_just_one_arg);
×
1774
    RUN_TEST(test_transfer_just_two_args);
×
1775
    RUN_TEST(test_when_all);
×
1776
    RUN_TEST(test_future_sender);
×
1777
    RUN_TEST(test_keep_future_sender);
×
1778
    RUN_TEST(test_ensure_started);
×
1779
    RUN_TEST(test_ensure_started_when_all);
×
1780
    RUN_TEST(test_split);
×
1781
    RUN_TEST(test_split_when_all);
×
1782
    RUN_TEST(test_let_value);
×
1783
    RUN_TEST(test_let_error);
×
1784
    RUN_TEST(test_detach);
×
1785
    RUN_TEST(test_bulk);
×
1786

1787
    RUN_TEST(test_completion_scheduler);
×
1788

1789
    return hpx::local::finalize();
×
1790
}
1791

1792
int main(int argc, char* argv[])
×
1793
{
1794
    HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv), 0,
×
1795
        "HPX main exited with non-zero status");
1796

1797
    return hpx::util::report_errors();
×
1798
}
×
1799

1800
#else
1801

1802
int main(int, char*[])
1803
{
1804
    return 0;
1805
}
1806

1807
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc