• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #844

02 Dec 2022 12:37AM UTC coverage: 85.8% (+0.2%) from 85.634%
#844

push

StellarBot
Merge #6084

6084: Renaming hpx::apply and friends to hpx::post r=hkaiser a=hkaiser

- this is needed to be able to rename invoke_fused to apply later

working towards https://github.com/STEllAR-GROUP/hpx/issues/5497



Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

468 of 468 new or added lines in 96 files covered. (100.0%)

171389 of 199753 relevant lines covered (85.8%)

1914550.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.46
/tests/performance/local/future_overhead.cpp
1
//  Copyright (c) 2018-2020 Mikael Simberg
2
//  Copyright (c) 2018-2019 John Biddiscombe
3
//  Copyright (c) 2011 Bryce Adelstein-Lelbach
4
//
5
//  SPDX-License-Identifier: BSL-1.0
6
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
7
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8

9
#include <hpx/config.hpp>
10
#if defined(HPX_HAVE_DISTRIBUTED_RUNTIME) && !defined(HPX_COMPUTE_DEVICE_CODE)
11
#include <hpx/actions_base/plain_action.hpp>
12
#include <hpx/async_distributed/continuation.hpp>
13
#include <hpx/future.hpp>
14
#include <hpx/runtime.hpp>
15
#endif
16
#include <hpx/init.hpp>
17
#include <hpx/local/algorithm.hpp>
18
#include <hpx/local/execution.hpp>
19
#include <hpx/local/future.hpp>
20
#include <hpx/local/runtime.hpp>
21
#include <hpx/local/thread.hpp>
22
#include <hpx/modules/format.hpp>
23
#include <hpx/modules/synchronization.hpp>
24
#include <hpx/modules/testing.hpp>
25
#include <hpx/modules/timing.hpp>
26
#include <hpx/threading_base/annotated_function.hpp>
27

28
#include <array>
29
#include <atomic>
30
#include <cstddef>
31
#include <cstdint>
32
#include <iostream>
33
#include <sstream>
34
#include <stdexcept>
35
#include <string>
36
#include <type_traits>
37
#include <vector>
38

39
using hpx::program_options::options_description;
40
using hpx::program_options::value;
41
using hpx::program_options::variables_map;
42

43
using hpx::async;
44
using hpx::future;
45
using hpx::post;
46

47
using hpx::chrono::high_resolution_timer;
48

49
// global vars we stick here to make printouts easy for plotting
50
static std::string queuing = "default";
1✔
51
static std::size_t numa_sensitive = 0;
52
static std::uint64_t num_threads = 1;
53
static std::string info_string = "";
1✔
54

55
///////////////////////////////////////////////////////////////////////////////
56
void print_stats(const char* title, const char* wait, const char* exec,
1✔
57
    std::int64_t count, double duration, bool csv)
58
{
59
    std::ostringstream temp;
1✔
60
    double us = 1e6 * duration / count;
1✔
61
    if (csv)
1✔
62
    {
63
        hpx::util::format_to(temp,
×
64
            "{1}, {:27}, {:15}, {:18}, {:8}, {:8}, {:20}, {:4}, {:4}, "
×
65
            "{:20}",
66
            count, title, wait, exec, duration, us, queuing, numa_sensitive,
67
            num_threads, info_string);
68
    }
×
69
    else
70
    {
71
        hpx::util::format_to(temp,
2✔
72
            "invoked {:1}, futures {:27} {:15} {:18} in {:8} seconds : {:8} "
1✔
73
            "us/future, queue {:20}, numa {:4}, threads {:4}, info {:20}",
74
            count, title, wait, exec, duration, us, queuing, numa_sensitive,
75
            num_threads, info_string);
76
    }
77
    std::cout << temp.str() << std::endl;
1✔
78
    // CDash graph plotting
79
    //hpx::util::print_cdash_timing(title, duration);
80
}
1✔
81

82
const char* exec_name(hpx::execution::parallel_executor const&)
×
83
{
84
    return "parallel_executor";
×
85
}
86

87
const char* exec_name(hpx::execution::experimental::scheduler_executor<
×
88
    hpx::execution::experimental::thread_pool_scheduler> const&)
89
{
90
    return "scheduler_executor<thread_pool_scheduler>";
×
91
}
92

93
///////////////////////////////////////////////////////////////////////////////
94
// we use globals here to prevent the delay from being optimized away
95
double global_scratch = 0;
96
std::uint64_t num_iterations = 0;
97

98
///////////////////////////////////////////////////////////////////////////////
99
double null_function() noexcept
499,997✔
100
{
101
    if (num_iterations > 0)
499,997✔
102
    {
103
        const int array_size = 4096;
×
104
        std::array<double, array_size> dummy;
105
        for (std::uint64_t i = 0; i < num_iterations; ++i)
×
106
        {
107
            for (std::uint64_t j = 0; j < array_size; ++j)
×
108
            {
109
                dummy[j] = 1.0 / (2.0 * i * j + 1.0);
×
110
            }
×
111
        }
×
112
        return dummy[0];
×
113
    }
114
    return 0.0;
499,997✔
115
}
499,997✔
116

117
struct scratcher
118
{
119
    void operator()(future<double> r) const
×
120
    {
121
        global_scratch += r.get();
×
122
    }
×
123
};
124

125
#if defined(HPX_HAVE_DISTRIBUTED_RUNTIME) && !defined(HPX_COMPUTE_DEVICE_CODE)
126
HPX_PLAIN_ACTION(null_function, null_action)
3✔
127

128
// Time async action execution using wait each on futures vector
129
void measure_action_futures_wait_each(std::uint64_t count, bool csv)
×
130
{
131
    const hpx::id_type here = hpx::find_here();
×
132
    std::vector<future<double>> futures;
×
133
    futures.reserve(count);
×
134

135
    // start the clock
136
    high_resolution_timer walltime;
×
137
    for (std::uint64_t i = 0; i < count; ++i)
×
138
        futures.push_back(async<null_action>(here));
×
139
    hpx::wait_each(scratcher(), futures);
×
140

141
    // stop the clock
142
    const double duration = walltime.elapsed();
×
143
    print_stats("action", "WaitEach", "no-executor", count, duration, csv);
×
144
}
×
145

146
// Time async action execution using wait each on futures vector
147
void measure_action_futures_wait_all(std::uint64_t count, bool csv)
×
148
{
149
    const hpx::id_type here = hpx::find_here();
×
150
    std::vector<future<double>> futures;
×
151
    futures.reserve(count);
×
152

153
    // start the clock
154
    high_resolution_timer walltime;
×
155
    for (std::uint64_t i = 0; i < count; ++i)
×
156
        futures.push_back(async<null_action>(here));
×
157
    hpx::wait_all(futures);
×
158

159
    // stop the clock
160
    const double duration = walltime.elapsed();
×
161
    print_stats("action", "WaitAll", "no-executor", count, duration, csv);
×
162
}
×
163
#endif
164

165
// Time async execution using wait each on futures vector
166
template <typename Executor>
167
void measure_function_futures_wait_each(
×
168
    std::uint64_t count, bool csv, Executor& exec)
169
{
170
    std::vector<future<double>> futures;
×
171
    futures.reserve(count);
×
172

173
    // start the clock
174
    high_resolution_timer walltime;
×
175
    for (std::uint64_t i = 0; i < count; ++i)
×
176
        futures.push_back(async(exec, &null_function));
×
177
    hpx::wait_each(scratcher(), futures);
×
178

179
    // stop the clock
180
    const double duration = walltime.elapsed();
×
181
    print_stats("async", "WaitEach", exec_name(exec), count, duration, csv);
×
182
}
×
183

184
template <typename Executor>
185
void measure_function_futures_wait_all(
×
186
    std::uint64_t count, bool csv, Executor& exec)
187
{
188
    std::vector<future<double>> futures;
×
189
    futures.reserve(count);
×
190

191
    // start the clock
192
    high_resolution_timer walltime;
×
193
    for (std::uint64_t i = 0; i < count; ++i)
×
194
        futures.push_back(async(exec, &null_function));
×
195
    hpx::wait_all(futures);
×
196

197
    const double duration = walltime.elapsed();
×
198
    print_stats("async", "WaitAll", exec_name(exec), count, duration, csv);
×
199
}
×
200

201
template <typename Executor>
202
void measure_function_futures_limiting_executor(
×
203
    std::uint64_t count, bool csv, Executor exec)
204
{
205
    std::uint64_t const num_threads = hpx::get_num_worker_threads();
×
206
    std::uint64_t const tasks = num_threads * 2000;
×
207
    std::atomic<std::uint64_t> sanity_check(count);
×
208

209
    auto const sched = hpx::threads::get_self_id_data()->get_scheduler_base();
×
210
    if (std::string("core-shared_priority_queue_scheduler") ==
×
211
        sched->get_description())
×
212
    {
213
        sched->add_remove_scheduler_mode(
×
214
            // add these flags
215
            hpx::threads::policies::scheduler_mode::enable_stealing |
×
216
                hpx::threads::policies::scheduler_mode::
217
                    assign_work_round_robin |
×
218
                hpx::threads::policies::scheduler_mode::steal_after_local,
219
            // remove these flags
220
            hpx::threads::policies::scheduler_mode::enable_stealing_numa |
×
221
                hpx::threads::policies::scheduler_mode::
222
                    assign_work_thread_parent |
×
223
                hpx::threads::policies::scheduler_mode::
224
                    steal_high_priority_first);
225
    }
×
226

227
    // test a parallel algorithm on custom pool with high priority
228
    auto const chunk_size = count / (num_threads * 2);
×
229
    hpx::execution::static_chunk_size fixed(chunk_size);
×
230

231
    // start the clock
232
    high_resolution_timer walltime;
×
233
    {
234
        hpx::execution::experimental::limiting_executor<Executor> signal_exec(
×
235
            exec, tasks, tasks + 1000);
×
236
        hpx::experimental::for_loop(
×
237
            hpx::execution::par.with(fixed), 0, count, [&](std::uint64_t) {
×
238
                hpx::post(signal_exec, [&]() {
×
239
                    null_function();
×
240
                    sanity_check--;
×
241
                });
×
242
            });
×
243
    }
×
244

245
    if (sanity_check != 0)
×
246
    {
247
        throw std::runtime_error(
×
248
            "This test is faulty " + std::to_string(sanity_check));
×
249
    }
250

251
    // stop the clock
252
    const double duration = walltime.elapsed();
×
253
    print_stats(
×
254
        "apply", "limiting-Exec", exec_name(exec), count, duration, csv);
×
255
}
×
256

257
template <typename Executor>
258
void measure_function_futures_sliding_semaphore(
×
259
    std::uint64_t count, bool csv, Executor& exec)
260
{
261
    // start the clock
262
    high_resolution_timer walltime;
×
263
    const int sem_count = 5000;
×
264
    hpx::sliding_semaphore sem(sem_count);
×
265
    for (std::uint64_t i = 0; i < count; ++i)
×
266
    {
267
        hpx::async(exec, [i, &sem]() {
×
268
            null_function();
×
269
            sem.signal(i);
×
270
        });
×
271
        sem.wait(i);
×
272
    }
×
273
    sem.wait(count + sem_count - 1);
×
274

275
    // stop the clock
276
    const double duration = walltime.elapsed();
×
277
    print_stats("apply", "Sliding-Sem", exec_name(exec), count, duration, csv);
×
278
}
×
279

280
struct unlimited_number_of_chunks
281
{
282
    template <typename Executor>
283
    std::size_t maximal_number_of_chunks(
×
284
        Executor&& /*executor*/, std::size_t /*cores*/, std::size_t num_tasks)
285
    {
286
        return num_tasks;
×
287
    }
288
};
289

290
namespace hpx { namespace parallel { namespace execution {
291
    template <>
292
    struct is_executor_parameters<unlimited_number_of_chunks> : std::true_type
293
    {
294
    };
295
}}}    // namespace hpx::parallel::execution
296

297
template <typename Executor>
298
void measure_function_futures_for_loop(std::uint64_t count, bool csv,
×
299
    Executor& exec, char const* executor_name = nullptr)
300
{
301
    // start the clock
302
    high_resolution_timer walltime;
×
303
    hpx::experimental::for_loop(
×
304
        hpx::execution::par.on(exec).with(
×
305
            hpx::execution::static_chunk_size(1), unlimited_number_of_chunks()),
×
306
        0, count, [](std::uint64_t) { null_function(); });
×
307

308
    // stop the clock
309
    const double duration = walltime.elapsed();
×
310
    print_stats("for_loop", "par",
×
311
        executor_name ? executor_name : exec_name(exec), count, duration, csv);
×
312
}
×
313

314
void measure_function_futures_register_work(std::uint64_t count, bool csv)
×
315
{
316
    hpx::latch l(count);
×
317

318
    // start the clock
319
    high_resolution_timer walltime;
×
320
    for (std::uint64_t i = 0; i < count; ++i)
×
321
    {
322
        hpx::threads::thread_init_data data(
×
323
            hpx::threads::make_thread_function_nullary([&l]() {
×
324
                null_function();
×
325
                l.count_down(1);
×
326
            }),
×
327
            "null_function");
×
328
        hpx::threads::register_work(data);
×
329
    }
×
330
    l.wait();
×
331

332
    // stop the clock
333
    const double duration = walltime.elapsed();
×
334
    print_stats("register_work", "latch", "none", count, duration, csv);
×
335
}
×
336

337
void measure_function_futures_create_thread(std::uint64_t count, bool csv)
×
338
{
339
    hpx::latch l(count);
×
340

341
    auto const sched = hpx::threads::get_self_id_data()->get_scheduler_base();
×
342
    auto func = [&l]() {
×
343
        null_function();
×
344
        l.count_down(1);
×
345
    };
×
346
    auto const thread_func =
347
        hpx::threads::detail::thread_function_nullary<decltype(func)>{func};
×
348
    auto const desc = hpx::util::thread_description();
×
349
    auto const prio = hpx::threads::thread_priority::normal;
×
350
    auto const hint = hpx::threads::thread_schedule_hint();
×
351
    auto const stack_size = hpx::threads::thread_stacksize::small_;
×
352
    hpx::error_code ec;
×
353

354
    // start the clock
355
    high_resolution_timer walltime;
×
356
    for (std::uint64_t i = 0; i < count; ++i)
×
357
    {
358
        auto init = hpx::threads::thread_init_data(
×
359
            hpx::threads::thread_function_type(thread_func), desc, prio, hint,
×
360
            stack_size, hpx::threads::thread_schedule_state::pending, false,
361
            sched);
×
362
        sched->create_thread(init, nullptr, ec);
×
363
    }
×
364
    l.wait();
×
365

366
    // stop the clock
367
    const double duration = walltime.elapsed();
×
368
    print_stats("create_thread", "latch", "none", count, duration, csv);
×
369
}
×
370

371
void measure_function_futures_create_thread_hierarchical_placement(
1✔
372
    std::uint64_t count, bool csv)
373
{
374
    hpx::latch l(count);
1✔
375

376
    auto sched = hpx::threads::get_self_id_data()->get_scheduler_base();
1✔
377

378
    if (std::string("core-shared_priority_queue_scheduler") ==
2✔
379
        sched->get_description())
1✔
380
    {
381
        sched->add_remove_scheduler_mode(
×
382
            hpx::threads::policies::scheduler_mode::assign_work_thread_parent,
383
            hpx::threads::policies::scheduler_mode::enable_stealing |
×
384
                hpx::threads::policies::scheduler_mode::enable_stealing_numa |
×
385
                hpx::threads::policies::scheduler_mode::
386
                    assign_work_round_robin |
×
387
                hpx::threads::policies::scheduler_mode::steal_after_local |
×
388
                hpx::threads::policies::scheduler_mode::
389
                    steal_high_priority_first);
390
    }
×
391
    auto const func = [&l]() {
499,998✔
392
        null_function();
499,996✔
393
        l.count_down(1);
499,996✔
394
    };
499,996✔
395
    auto const thread_func =
396
        hpx::threads::detail::thread_function_nullary<decltype(func)>{func};
1✔
397
    auto const desc = hpx::util::thread_description();
1✔
398
    auto prio = hpx::threads::thread_priority::normal;
1✔
399
    auto const stack_size = hpx::threads::thread_stacksize::small_;
1✔
400
    auto const num_threads = hpx::get_num_worker_threads();
1✔
401
    hpx::error_code ec;
1✔
402

403
    // start the clock
404
    high_resolution_timer walltime;
1✔
405
    for (std::size_t t = 0; t < num_threads; ++t)
5✔
406
    {
407
        auto const hint =
408
            hpx::threads::thread_schedule_hint(static_cast<std::int16_t>(t));
4✔
409
        auto spawn_func = [&thread_func, sched, hint, t, count, num_threads,
12✔
410
                              desc, prio]() {
4✔
411
            std::uint64_t const count_start = t * count / num_threads;
4✔
412
            std::uint64_t const count_end = (t + 1) * count / num_threads;
4✔
413
            hpx::error_code ec;
4✔
414
            for (std::uint64_t i = count_start; i < count_end; ++i)
500,004✔
415
            {
416
                hpx::threads::thread_init_data init(
500,000✔
417
                    hpx::threads::thread_function_type(thread_func), desc, prio,
500,000✔
418
                    hint, stack_size,
500,000✔
419
                    hpx::threads::thread_schedule_state::pending, false, sched);
500,000✔
420
                sched->create_thread(init, nullptr, ec);
500,000✔
421
            }
500,000✔
422
        };
4✔
423
        auto const thread_spawn_func =
424
            hpx::threads::detail::thread_function_nullary<decltype(spawn_func)>{
4✔
425
                spawn_func};
4✔
426

427
        hpx::threads::thread_init_data init(
4✔
428
            hpx::threads::thread_function_type(thread_spawn_func), desc, prio,
4✔
429
            hint, stack_size, hpx::threads::thread_schedule_state::pending,
4✔
430
            false, sched);
4✔
431
        sched->create_thread(init, nullptr, ec);
4✔
432
    }
4✔
433
    l.wait();
1✔
434

435
    // stop the clock
436
    const double duration = walltime.elapsed();
1✔
437
    print_stats(
1✔
438
        "create_thread_hierarchical", "latch", "none", count, duration, csv);
1✔
439
}
1✔
440

441
void measure_function_futures_apply_hierarchical_placement(
×
442
    std::uint64_t count, bool csv)
443
{
444
    hpx::latch l(count);
×
445

446
    auto const func = [&l]() {
×
447
        null_function();
×
448
        l.count_down(1);
×
449
    };
×
450
    auto const num_threads = hpx::get_num_worker_threads();
×
451

452
    // start the clock
453
    high_resolution_timer walltime;
×
454
    for (std::size_t t = 0; t < num_threads; ++t)
×
455
    {
456
        auto const hint =
457
            hpx::threads::thread_schedule_hint(static_cast<std::int16_t>(t));
×
458
        auto spawn_func = [&func, hint, t, count, num_threads]() {
×
459
            auto exec = hpx::execution::parallel_executor(hint);
×
460
            std::uint64_t const count_start = t * count / num_threads;
×
461
            std::uint64_t const count_end = (t + 1) * count / num_threads;
×
462

463
            for (std::uint64_t i = count_start; i < count_end; ++i)
×
464
            {
465
                hpx::post(exec, func);
×
466
            }
×
467
        };
×
468

469
        auto exec = hpx::execution::parallel_executor(hint);
×
470
        hpx::post(exec, spawn_func);
×
471
    }
×
472
    l.wait();
×
473

474
    // stop the clock
475
    const double duration = walltime.elapsed();
×
476
    print_stats("apply_hierarchical", "latch", "parallel_executor", count,
×
477
        duration, csv);
×
478
}
×
479

480
///////////////////////////////////////////////////////////////////////////////
481
int hpx_main(variables_map& vm)
1✔
482
{
483
    {
484
        if (vm.count("hpx:queuing"))
1✔
485
            queuing = vm["hpx:queuing"].as<std::string>();
×
486

487
        if (vm.count("hpx:numa-sensitive"))
1✔
488
            numa_sensitive = 1;
×
489
        else
490
            numa_sensitive = 0;
1✔
491

492
        bool test_all = (vm.count("test-all") > 0);
1✔
493
        const int repetitions = vm["repetitions"].as<int>();
1✔
494

495
        if (vm.count("info"))
1✔
496
            info_string = vm["info"].as<std::string>();
1✔
497

498
        num_threads = hpx::get_num_worker_threads();
1✔
499

500
        num_iterations = vm["delay-iterations"].as<std::uint64_t>();
1✔
501

502
        const std::uint64_t count = vm["futures"].as<std::uint64_t>();
1✔
503
        bool csv = vm.count("csv") != 0;
1✔
504
        if (HPX_UNLIKELY(0 == count))
1✔
505
            throw std::logic_error("error: count of 0 futures specified\n");
×
506

507
        hpx::execution::parallel_executor par;
1✔
508
        hpx::execution::parallel_executor par_nostack(
1✔
509
            hpx::threads::thread_priority::default_,
510
            hpx::threads::thread_stacksize::nostack);
511
        hpx::execution::experimental::scheduler_executor<
512
            hpx::execution::experimental::thread_pool_scheduler>
513
            sched_exec_tps;
1✔
514

515
        for (int i = 0; i < repetitions; i++)
2✔
516
        {
517
            measure_function_futures_create_thread_hierarchical_placement(
1✔
518
                count, csv);
1✔
519
            if (test_all)
1✔
520
            {
521
                measure_function_futures_limiting_executor(count, csv, par);
×
522
#if defined(HPX_HAVE_DISTRIBUTED_RUNTIME) && !defined(HPX_COMPUTE_DEVICE_CODE)
523
                measure_action_futures_wait_each(count, csv);
×
524
                measure_action_futures_wait_all(count, csv);
×
525
#endif
526
                measure_function_futures_wait_each(count, csv, par);
×
527
                measure_function_futures_wait_all(count, csv, par);
×
528
                measure_function_futures_sliding_semaphore(count, csv, par);
×
529
                measure_function_futures_for_loop(count, csv, par);
×
530
                measure_function_futures_for_loop(count, csv, sched_exec_tps);
×
531
                measure_function_futures_for_loop(
×
532
                    count, csv, par_nostack, "parallel_executor_nostack");
×
533
                measure_function_futures_register_work(count, csv);
×
534
                measure_function_futures_create_thread(count, csv);
×
535
                measure_function_futures_apply_hierarchical_placement(
×
536
                    count, csv);
×
537
            }
×
538
        }
1✔
539
    }
540

541
    return hpx::finalize();
1✔
542
}
×
543

544
///////////////////////////////////////////////////////////////////////////////
545
int main(int argc, char* argv[])
1✔
546
{
547
    // Configure application-specific options.
548
    options_description cmdline("usage: " HPX_APPLICATION_STRING " [options]");
1✔
549

550
    // clang-format off
551
    cmdline.add_options()("futures",
5✔
552
        value<std::uint64_t>()->default_value(500000),
1✔
553
        "number of futures to invoke")
554

555
        ("delay-iterations", value<std::uint64_t>()->default_value(0),
1✔
556
         "number of iterations in the delay loop")
557

558
        ("csv", "output results as csv (format: count,duration)")
559
        ("test-all", "run all benchmarks")
560
        ("repetitions", value<int>()->default_value(1),
1✔
561
         "number of repetitions of the full benchmark")
562

563
        ("info", value<std::string>()->default_value("no-info"),
1✔
564
         "extra info for plot output (e.g. branch name)");
565
    // clang-format on
566

567
    // Initialize and run HPX.
568
    hpx::init_params init_args;
1✔
569
    init_args.desc_cmdline = cmdline;
1✔
570

571
    return hpx::init(argc, argv, init_args);
1✔
572
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc