• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/libs/core/resource_partitioner/examples/async_customization.cpp
1
//  Copyright (c) 2017-2018 John Biddiscombe
2
//  Copyright (c) 2024 Hartmut Kaiser
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
#define GUIDED_EXECUTOR_DEBUG 1
9

10
#include <hpx/execution.hpp>
11
#include <hpx/functional.hpp>
12
#include <hpx/future.hpp>
13
#include <hpx/init.hpp>
14
#include <hpx/modules/debugging.hpp>
15
#include <hpx/modules/pack_traversal.hpp>
16
#include <hpx/modules/resource_partitioner.hpp>
17
#include <hpx/modules/testing.hpp>
18
#include <hpx/tuple.hpp>
19

20
#include <chrono>
21
#include <complex>
22
#include <cstddef>
23
#include <cstdint>
24
#include <iostream>
25
#include <memory>
26
#include <string>
27
#include <type_traits>
28
#include <utility>
29

30
// --------------------------------------------------------------------
31
// custom executor async/then/when/dataflow specialization example
32
// --------------------------------------------------------------------
33
using namespace hpx;
34

35
struct test_async_executor
36
{
37
    // --------------------------------------------------------------------
38
    // helper structs to make future<tuple<f1, f2, f3, ...>>>
39
    // detection of futures simpler
40
    // --------------------------------------------------------------------
41
    template <typename TupleOfFutures>
42
    struct is_tuple_of_futures;
43

44
    template <typename... Futures>
45
    struct is_tuple_of_futures<hpx::tuple<Futures...>>
46
      : util::all_of<traits::is_future<std::remove_reference_t<Futures>>...>
47
    {
48
    };
49

50
    template <typename Future>
51
    struct is_future_of_tuple_of_futures
52
      : std::integral_constant<bool,
53
            traits::is_future_v<Future> &&
54
                is_tuple_of_futures<typename traits::future_traits<
55
                    std::remove_reference_t<Future>>::result_type>::value>
56
    {
57
    };
58

59
    // --------------------------------------------------------------------
60
    // function that returns a const ref to the contents of a future
61
    // without calling .get() on the future so that we can use the value
62
    // and then pass the original future on to the intended destination.
63
    // --------------------------------------------------------------------
64
    struct future_extract_value
65
    {
66
        template <typename T, template <typename> class Future>
67
        T const& operator()(Future<T> const& el) const
68
        {
69
            using shared_state_ptr =
70
                traits::detail::shared_state_ptr_for_t<Future<T>>;
71
            shared_state_ptr const& state =
72
                traits::detail::get_shared_state(el);
×
73
            return *state->get_result();
74
        }
75
    };
76

77
private:
78
    // --------------------------------------------------------------------
79
    // async execute specialized for simple arguments typical
80
    // of a normal async call with arbitrary arguments
81
    // --------------------------------------------------------------------
82
    template <typename F, typename... Ts>
×
83
    friend future<util::invoke_result_t<F, Ts...>> tag_invoke(
84
        hpx::parallel::execution::async_execute_t,
85
        test_async_executor const& exec, F&& f, Ts&&... ts)
86
    {
87
        using result_type = util::detail::invoke_deferred_result_t<F, Ts...>;
88

89
        using namespace hpx::util::debug;
90
        std::cout << "async_execute : Function    : " << print_type<F>()
×
91
                  << "\n";
92
        std::cout << "async_execute : Arguments   : "
×
93
                  << print_type<Ts...>(" | ") << "\n";
94
        std::cout << "async_execute : Result      : "
×
95
                  << print_type<result_type>() << "\n";
96

97
        // forward the task execution on to the real internal executor
×
98
        return hpx::parallel::execution::async_execute(exec.executor_,
99
            hpx::annotated_function(std::forward<F>(f), "custom"),
×
100
            std::forward<Ts>(ts)...);
101
    }
102

103
    // --------------------------------------------------------------------
104
    // .then() execute specialized for a future<P> predecessor argument
105
    // note that future<> and shared_future<> are both supported
106
    // --------------------------------------------------------------------
107
    template <typename F, typename Future, typename... Ts,
108
        typename = std::enable_if_t<
109
            traits::is_future_v<std::remove_reference_t<Future>>>>
×
110
    friend auto tag_invoke(hpx::parallel::execution::then_execute_t,
111
        test_async_executor const& exec, F&& f, Future&& predecessor,
112
        Ts&&... ts)
113
        -> future<util::detail::invoke_deferred_result_t<F, Future, Ts...>>
114
    {
115
        using result_type =
116
            util::detail::invoke_deferred_result_t<F, Future, Ts...>;
117

118
        using namespace hpx::util::debug;
119
        std::cout << "then_execute : Function     : " << print_type<F>()
×
120
                  << "\n";
121
        std::cout << "then_execute : Predecessor  : " << print_type<Future>()
×
122
                  << "\n";
123
        std::cout
124
            << "then_execute : Future       : "
125
            << print_type<typename traits::future_traits<Future>::result_type>()
×
126
            << "\n";
127
        std::cout << "then_execute : Arguments    : "
×
128
                  << print_type<Ts...>(" | ") << "\n";
129
        std::cout << "then_execute : Result       : "
×
130
                  << print_type<result_type>() << "\n";
131

×
132
        return hpx::parallel::execution::then_execute(exec.executor_,
133
            std::forward<F>(f), std::forward<Future>(predecessor),
×
134
            std::forward<Ts>(ts)...);
135
    }
136

137
    // --------------------------------------------------------------------
138
    // .then() execute specialized for a when_all dispatch for any future types
139
    // future< tuple< is_future<a>::type, is_future<b>::type, ...> >
140
    // --------------------------------------------------------------------
141
    // clang-format off
142
    template <typename F, template <typename> class OuterFuture,
143
        typename... InnerFutures, typename... Ts,
144
        typename = std::enable_if_t<is_future_of_tuple_of_futures<
145
            OuterFuture<hpx::tuple<InnerFutures...>>>::value>,
146
        typename = std::enable_if_t<
×
147
            is_tuple_of_futures<hpx::tuple<InnerFutures...>>::value>>
148
    friend auto tag_invoke(hpx::parallel::execution::then_execute_t,
149
        test_async_executor const& exec, F&& f,
150
        OuterFuture<hpx::tuple<InnerFutures...>>&& predecessor, Ts&&... ts)
151
            -> future<util::detail::invoke_deferred_result_t<F,
152
                        OuterFuture<hpx::tuple<InnerFutures...>>, Ts...>>
153
    // clang-format on
154
    {
155
        using result_type = util::detail::invoke_deferred_result_t<F,
156
            OuterFuture<hpx::tuple<InnerFutures...>>, Ts...>;
157

158
        // create a tuple of the unwrapped future values
159
        auto unwrapped_futures_tuple = util::map_pack(future_extract_value{},
160
            future_extract_value().operator()(predecessor));
161

162
        using namespace hpx::util::debug;
×
163
        std::cout << "when_all(fut) : Predecessor : "
164
                  << print_type<OuterFuture<hpx::tuple<InnerFutures...>>>()
165
                  << "\n";
×
166
        std::cout << "when_all(fut) : unwrapped   : "
167
                  << print_type<decltype(unwrapped_futures_tuple)>(" | ")
×
168
                  << "\n";
169
        std::cout << "when_all(fut) : Arguments   : "
×
170
                  << print_type<Ts...>(" | ") << "\n";
171
        std::cout << "when_all(fut) : Result      : "
172
                  << print_type<result_type>() << "\n";
173

×
174
        // invoke a function with the unwrapped tuple future types to demonstrate
175
        // that we can access them
×
176
        std::cout << "when_all(fut) : tuple       : ";
×
177
        hpx::invoke_fused(
178
            [](auto const&... ts) {
179
                std::cout << print_type<decltype(ts)...>(" | ") << "\n";
180
            },
181
            unwrapped_futures_tuple);
182

183
        // forward the task execution on to the real internal executor
184
        return hpx::parallel::execution::then_execute(exec.executor_,
×
185
            hpx::annotated_function(std::forward<F>(f), "custom then"),
186
            std::forward<OuterFuture<hpx::tuple<InnerFutures...>>>(predecessor),
187
            std::forward<Ts>(ts)...);
188
    }
189

190
    // --------------------------------------------------------------------
191
    // execute specialized for a dataflow dispatch
192
    // dataflow unwraps the outer future for us but passes a dataflowframe
193
    // function type, result type and tuple of futures as arguments
194
    // --------------------------------------------------------------------
195
    template <typename F, typename... InnerFutures,
×
196
        typename = std::enable_if_t<
197
            traits::is_future_tuple_v<hpx::tuple<InnerFutures...>>>>
198
    friend auto tag_invoke(hpx::parallel::execution::async_execute_t,
199
        test_async_executor const& exec, F&& f,
200
        hpx::tuple<InnerFutures...>&& predecessor)
201
        -> future<util::detail::invoke_deferred_result_t<F,
202
            hpx::tuple<InnerFutures...>>>
203
    {
204
        using result_type = util::detail::invoke_deferred_result_t<F,
205
            hpx::tuple<InnerFutures...>>;
206

207
        auto unwrapped_futures_tuple =
208
            util::map_pack(future_extract_value{}, predecessor);
209

×
210
        using namespace hpx::util::debug;
211
        std::cout << "dataflow      : Predecessor : "
212
                  << print_type<hpx::tuple<InnerFutures...>>() << "\n";
×
213
        std::cout << "dataflow      : unwrapped   : "
214
                  << print_type<decltype(unwrapped_futures_tuple)>(" | ")
×
215
                  << "\n";
216
        std::cout << "dataflow-frame: Result      : "
217
                  << print_type<result_type>() << "\n";
218

×
219
        // invoke a function with the unwrapped tuple future types to demonstrate
220
        // that we can access them
×
221
        std::cout << "dataflow      : tuple       : ";
×
222
        hpx::invoke_fused(
223
            [](auto const&... ts) {
224
                std::cout << print_type<decltype(ts)...>(" | ") << "\n";
225
            },
226
            unwrapped_futures_tuple);
×
227

228
        // forward the task execution on to the real internal executor
×
229
        return hpx::parallel::execution::async_execute(exec.executor_,
230
            hpx::annotated_function(std::forward<F>(f), "custom async"),
231
            std::forward<hpx::tuple<InnerFutures...>>(predecessor));
232
    }
233

234
private:
235
    hpx::execution::parallel_executor executor_;
236
};
237

238
// --------------------------------------------------------------------
239
// set traits for executor to say it is an async executor
240
// --------------------------------------------------------------------
241
namespace hpx::execution::experimental {
242

243
    template <>
244
    struct is_two_way_executor<test_async_executor> : std::true_type
245
    {
246
    };
×
247
}    // namespace hpx::execution::experimental
248

249
template <typename T>
×
250
T dummy_task(T val)
×
251
{
252
    // using std::thread here is intentional
253
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
254
    return val;
255
}
256
// --------------------------------------------------------------------
×
257
// test various execution modes
258
// --------------------------------------------------------------------
259
template <typename Executor>
×
260
int test(std::string const& message, Executor& exec)
×
261
{
×
262
    // test 1
×
263
    std::cout << "============================" << std::endl;
264
    std::cout << message << std::endl;
265
    std::cout << "============================" << std::endl;
×
266
    std::cout << "Test 1 : async()" << std::endl;
×
267
    hpx::future<char const*> fa = async(
×
268
        exec,
×
269
        [](int a, double b, char const* c) {
270
            std::cout << "Inside async " << c << std::endl;
×
271
            HPX_TEST_EQ(a == 1 && b == 2.2 && std::string(c) == "Hello", true);
×
272
            return "async";
×
273
        },
274
        1, 2.2, "Hello");
275
    HPX_TEST_EQ(std::string(fa.get()), std::string("async"));
×
276
    std::cout << std::endl;
×
277

×
278
    // test 2a
×
279
    std::cout << "============================" << std::endl;
280
    std::cout << "Test 2a : .then()" << std::endl;
281
    int testval = 5;
×
282
    future<decltype(testval)> f =
283
        hpx::async(&dummy_task<decltype(testval)>, testval);
×
284
    //
285
    future<std::string> ft = f.then(exec, [testval](auto&& f) {
×
286
        std::cout << "Inside .then() " << std::endl;
×
287
        HPX_TEST_EQ_MSG(
×
288
            f.is_ready(), true, "Continuation run before future ready");
×
289
        decltype(testval) r = f.get();
290
        std::cout << "expected " << testval << " got " << r << std::endl;
×
291
        HPX_TEST_EQ(r, testval);
×
292
        return std::string("then");
293
    });
294
    HPX_TEST_EQ(ft.get(), std::string("then"));
×
295
    std::cout << std::endl;
×
296

×
297
    // test 2b
298
    std::cout << "============================" << std::endl;
×
299
    std::cout << "Test 2b : .then(shared)" << std::endl;
300
    auto fs = hpx::async(&dummy_task<decltype(testval)>, testval).share();
×
301
    //
302
    future<std::string> fts = fs.then(exec, [testval](auto&& f) {
×
303
        std::cout << "Inside .then(shared)" << std::endl;
×
304
        HPX_TEST_EQ_MSG(
×
305
            f.is_ready(), true, "Continuation run before future ready");
×
306
        decltype(testval) r = f.get();
307
        std::cout << "expected " << testval << " got " << r << std::endl;
×
308
        HPX_TEST_EQ(r, testval);
×
309
        return std::string("then(shared)");
310
    });
311
    HPX_TEST_EQ(fts.get(), std::string("then(shared)"));
312
    std::cout << std::endl;
×
313

×
314
#if !defined(HPX_MSVC)
×
315
    // test 3a
×
316
    std::cout << "============================" << std::endl;
×
317
    std::cout << "Test 3a : when_all()" << std::endl;
×
318
    int testval2 = 123;
319
    double testval3 = 4.567;
×
320
    auto fw1 = hpx::async(&dummy_task<decltype(testval2)>, testval2);
×
321
    auto fw2 = hpx::async(&dummy_task<decltype(testval3)>, testval3);
322
    //
323
    auto fw = hpx::when_all(fw1, fw2).then(exec,
×
324
        [testval2, testval3](
325
            future<hpx::tuple<future<int>, future<double>>>&& f) {
×
326
            std::cout << "Inside when_all : " << std::endl;
×
327
            HPX_TEST_EQ_MSG(
×
328
                f.is_ready(), true, "Continuation run before future ready");
×
329
            auto tup = f.get();
×
330
            auto cmplx = std::complex<double>(
×
331
                double(hpx::get<0>(tup).get()), hpx::get<1>(tup).get());
×
332
            auto cmplxe = std::complex<double>(double(testval2), testval3);
333
            std::cout << "expected " << cmplxe << " got " << cmplx << std::endl;
×
334
            HPX_TEST_EQ(cmplx, cmplxe);
×
335
            return std::string("when_all");
336
        });
337
    HPX_TEST_EQ(fw.get(), "when_all");
×
338
    std::cout << std::endl;
×
339

×
340
    // test 3b
×
341
    std::cout << "============================" << std::endl;
×
342
    std::cout << "Test 3b : when_all(shared)" << std::endl;
×
343
    std::uint64_t testval4 = 666;
344
    float testval5 = 876.5;
×
345
    auto fws1 = hpx::async(&dummy_task<decltype(testval4)>, testval4);
346
    auto fws2 = hpx::async(&dummy_task<decltype(testval5)>, testval5).share();
347
    //
×
348
    auto fws =
349
        hpx::when_all(fws1, fws2)
350
            .then(exec,
×
351
                [testval4, testval5](future<hpx::tuple<future<std::uint64_t>,
352
                        shared_future<float>>>&& f) {
×
353
                    std::cout << "Inside when_all(shared) : " << std::endl;
×
354
                    HPX_TEST_EQ_MSG(f.is_ready(), true,
×
355
                        "Continuation run before future ready");
×
356
                    auto tup = f.get();
×
357
                    auto cmplx =
358
                        std::complex<double>(double(hpx::get<0>(tup).get()),
×
359
                            double(hpx::get<1>(tup).get()));
360
                    auto cmplxe = std::complex<double>(
×
361
                        double(testval4), double(testval5));
×
362
                    std::cout << "expected " << cmplxe << " got " << cmplx
363
                              << std::endl;
×
364
                    HPX_TEST_EQ(cmplx, cmplxe);
×
365
                    return std::string("when_all(shared)");
366
                });
367
    HPX_TEST_EQ(fws.get(), "when_all(shared)");
368
    std::cout << std::endl;
×
369
#endif
×
370

×
371
    // test 4a
×
372
    std::cout << "============================" << std::endl;
×
373
    std::cout << "Test 4a : dataflow()" << std::endl;
×
374
    std::uint16_t testval6 = 333;
375
    double testval7 = 777.777;
376
    auto f1 = hpx::async(&dummy_task<decltype(testval6)>, testval6);
377
    auto f2 = hpx::async(&dummy_task<decltype(testval7)>, testval7);
×
378
    //
379
    hpx::future<std::string> fd = dataflow(
×
380
        exec,
381
        [testval6, testval7](future<std::uint16_t>&& f1, future<double>&& f2) {
×
382
            std::cout << "Inside dataflow : " << std::endl;
×
383
            HPX_TEST_EQ_MSG(f1.is_ready() && f2.is_ready(), true,
384
                "Continuation run before future ready");
×
385
            double r1 = f1.get();
×
386
            double r2 = f2.get();
×
387
            auto cmplx = std::complex<double>(r1, r2);
×
388
            auto cmplxe = std::complex<double>(double(testval6), testval7);
389
            std::cout << "expected " << cmplxe << " got " << cmplx << std::endl;
390
            HPX_TEST_EQ(cmplx, cmplxe);
×
391
            return std::string("dataflow");
×
392
        },
393
        f1, f2);
394
    HPX_TEST_EQ(fd.get(), std::string("dataflow"));
×
395
    std::cout << std::endl;
×
396

×
397
    // test 4b
×
398
    std::cout << "============================" << std::endl;
×
399
    std::cout << "Test 4b : dataflow(shared)" << std::endl;
×
400
    std::uint32_t testval8 = 987;
401
    double testval9 = 654.321;
402
    auto fs1 = hpx::async(&dummy_task<decltype(testval8)>, testval8);
403
    auto fs2 = hpx::async(&dummy_task<decltype(testval9)>, testval9);
×
404
    //
405
    hpx::future<std::string> fds = dataflow(
406
        exec,
×
407
        [testval8, testval9](
408
            future<std::uint32_t>&& f1, shared_future<double>&& f2) {
×
409
            std::cout << "Inside dataflow(shared) : " << std::endl;
×
410
            HPX_TEST_EQ_MSG(f1.is_ready() && f2.is_ready(), true,
411
                "Continuation run before future ready");
×
412
            double r1 = f1.get();
×
413
            double r2 = f2.get();
×
414
            auto cmplx = std::complex<double>(r1, r2);
×
415
            auto cmplxe = std::complex<double>(double(testval8), testval9);
416
            std::cout << "expected " << cmplxe << " got " << cmplx << std::endl;
417
            HPX_TEST_EQ(cmplx, cmplxe);
×
418
            return std::string("dataflow(shared)");
419
        },
×
420
        fs1, fs2);
×
421
    HPX_TEST_EQ(fds.get(), std::string("dataflow(shared)"));
×
422

×
423
    std::cout << "============================" << std::endl;
×
424
    std::cout << "Complete" << std::endl;
425
    std::cout << "============================" << std::endl << std::endl;
426
    return 0;
427
}
428

429
struct dummy_tag
430
{
431
};
432

433
namespace hpx::execution::experimental {
434

435
    template <>
×
436
    struct pool_numa_hint<dummy_tag>
437
    {
438
        int operator()() const
439
        {
440
            std::cout << "Hint 0 \n";
×
441
            return 0;
442
        }
443
        int operator()(int const, double const, char const*) const
444
        {
445
            std::cout << "Hint 1 \n";
×
446
            return 1;
447
        }
448
        int operator()(int const) const
449
        {
450
            std::cout << "Hint 2 \n";
×
451
            return 2;
452
        }
453
        int operator()(hpx::tuple<future<int>, future<double>> const&) const
454
        {
455
            std::cout << "Hint 3(a) \n";
456
            return 3;
457
        }
×
458
        int operator()(
459
            hpx::tuple<future<std::uint64_t>, shared_future<float>> const&)
460
            const
461
        {
462
            std::cout << "Hint 3(b) \n";
×
463
            return 3;
464
        }
465
        int operator()(std::uint16_t const, double const) const
466
        {
467
            std::cout << "Hint 4(a) \n";
×
468
            return 4;
469
        }
470
        int operator()(std::uint32_t const, double const&) const
471
        {
472
            std::cout << "Hint 4(b) \n";
473
            return 4;
×
474
        }
475
    };
476
}    // namespace hpx::execution::experimental
477

×
478
int hpx_main()
×
479
{
480
    try
×
481
    {
482
        test_async_executor exec;
×
483
        test("Testing async custom executor", exec);
×
484
    }
485
    catch (std::exception& e)
486
    {
487
        std::cout << "Exception " << e.what() << std::endl;
488
    }
489

×
490
    typedef hpx::execution::experimental::pool_numa_hint<dummy_tag> dummy_hint;
×
491
    try
492
    {
×
493
        hpx::execution::experimental::guided_pool_executor<dummy_hint> exec2(
494
            &hpx::resource::get_thread_pool("default"));
×
495
        test("Testing guided_pool_executor<dummy_hint>", exec2);
×
496
    }
497
    catch (std::exception& e)
498
    {
499
        std::cout << "Exception " << e.what() << std::endl;
500
    }
×
501

×
502
    try
503
    {
×
504
        hpx::execution::experimental::guided_pool_executor_shim<dummy_hint>
505
            exec3(true, &hpx::resource::get_thread_pool("default"));
×
506
        test("Testing guided_pool_executor_shim<dummy_hint>", exec3);
×
507
    }
508
    catch (std::exception& e)
×
509
    {
×
510
        std::cout << "Exception " << e.what() << std::endl;
511
    }
512

×
513
    std::cout << "Tests done \n";
514
    return hpx::local::finalize();
515
}
×
516

517
int main(int argc, char* argv[])
518
{
×
519
    // Initialize and run HPX
520
    HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv), 0,
521
        "HPX main exited with non-zero status");
522

523
    return hpx::util::report_errors();
524
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc