• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.42
/libs/full/runtime_distributed/src/runtime_distributed.cpp
1
//  Copyright (c) 2007-2025 Hartmut Kaiser
2
//  Copyright (c)      2011 Bryce Lelbach
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
#include <hpx/config.hpp>
9

10
#include <hpx/agas/addressing_service.hpp>
11
#include <hpx/assert.hpp>
12
#include <hpx/async_distributed/post.hpp>
13
#include <hpx/components_base/agas_interface.hpp>
14
#include <hpx/components_base/server/component.hpp>
15
#include <hpx/components_base/server/component_base.hpp>
16
#include <hpx/modules/async_base.hpp>
17
#include <hpx/modules/datastructures.hpp>
18
#include <hpx/modules/errors.hpp>
19
#include <hpx/modules/execution_base.hpp>
20
#include <hpx/modules/format.hpp>
21
#include <hpx/modules/functional.hpp>
22
#include <hpx/modules/io_service.hpp>
23
#include <hpx/modules/itt_notify.hpp>
24
#include <hpx/modules/logging.hpp>
25
#include <hpx/modules/runtime_configuration.hpp>
26
#include <hpx/modules/runtime_local.hpp>
27
#include <hpx/modules/static_reinit.hpp>
28
#include <hpx/modules/thread_pools.hpp>
29
#include <hpx/modules/thread_support.hpp>
30
#include <hpx/modules/threading_base.hpp>
31
#include <hpx/modules/threadmanager.hpp>
32
#include <hpx/modules/timing.hpp>
33
#include <hpx/modules/topology.hpp>
34
#include <hpx/modules/type_support.hpp>
35
#include <hpx/naming_base/id_type.hpp>
36
#include <hpx/parcelset/parcelhandler.hpp>
37
#include <hpx/parcelset/parcelset_fwd.hpp>
38
#include <hpx/performance_counters/counter_creators.hpp>
39
#include <hpx/performance_counters/counters.hpp>
40
#include <hpx/performance_counters/manage_counter_type.hpp>
41
#include <hpx/performance_counters/query_counters.hpp>
42
#include <hpx/performance_counters/registry.hpp>
43
#include <hpx/runtime_components/components_fwd.hpp>
44
#include <hpx/runtime_components/console_error_sink.hpp>
45
#include <hpx/runtime_components/console_logging.hpp>
46
#include <hpx/runtime_components/server/console_error_sink.hpp>
47
#include <hpx/runtime_distributed.hpp>
48
#include <hpx/runtime_distributed/applier.hpp>
49
#include <hpx/runtime_distributed/big_boot_barrier.hpp>
50
#include <hpx/runtime_distributed/find_localities.hpp>
51
#include <hpx/runtime_distributed/get_num_localities.hpp>
52
#include <hpx/runtime_distributed/runtime_fwd.hpp>
53
#include <hpx/runtime_distributed/runtime_support.hpp>
54
#include <hpx/runtime_distributed/server/runtime_support.hpp>
55
#include <hpx/version.hpp>
56

57
#include <atomic>
58
#include <condition_variable>
59
#include <cstddef>
60
#include <cstdint>
61
#include <cstring>
62
#include <exception>
63
#include <functional>
64
#include <iostream>
65
#include <list>
66
#include <memory>
67
#include <mutex>
68
#include <sstream>
69
#include <string>
70
#include <thread>
71
#include <utility>
72
#include <vector>
73

74
#if defined(_WIN64) && defined(HPX_DEBUG) &&                                   \
75
    !defined(HPX_HAVE_FIBER_BASED_COROUTINES)
76
#include <io.h>
77
#endif
78

79
#include <hpx/config/warnings_prefix.hpp>
80

81
///////////////////////////////////////////////////////////////////////////////
82
namespace hpx {
83

84
    namespace detail {
85
        naming::gid_type get_next_id(std::size_t count)
86
        {
87
            if (nullptr == get_runtime_ptr())
88
                return naming::invalid_gid;
89

90
            return get_runtime_distributed().get_next_id(count);
91
        }
92

93
        ///////////////////////////////////////////////////////////////////////
94
#if defined(HPX_HAVE_NETWORKING)
95
        void dijkstra_make_black()
96
        {
×
97
            if (auto* rtp = get_runtime_support_ptr(); rtp != nullptr)
98
            {
×
99
                rtp->dijkstra_make_black();
100
            }
101
        }
×
102
#endif
103

104
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
105
    defined(HPX_HAVE_THREAD_IDLE_RATES)
106
        bool network_background_callback(
768✔
107
            [[maybe_unused]] runtime_distributed* rt, std::size_t num_thread,
108
            std::int64_t& background_work_exec_time_send,
768✔
109
            std::int64_t& background_work_exec_time_receive)
110
        {
768✔
111
            bool result = false;
112

768✔
113
#if defined(HPX_HAVE_NETWORKING)
114
            // count background work duration
115
            {
116
                threads::detail::background_work_duration_counter
117
                    bg_send_duration(background_work_exec_time_send);
118
                threads::detail::background_exec_time_wrapper bg_exec_time(
119
                    bg_send_duration);
120

121
                if (rt->get_parcel_handler().do_background_work(num_thread,
122
                        false, parcelset::parcelport_background_mode::send))
123
                {
124
                    result = true;
125
                }
126
            }
127

128
            {
129
                threads::detail::background_work_duration_counter
130
                    bg_receive_duration(background_work_exec_time_receive);
131
                threads::detail::background_exec_time_wrapper bg_exec_time(
132
                    bg_receive_duration);
133

134
                if (rt->get_parcel_handler().do_background_work(num_thread,
135
                        false, parcelset::parcelport_background_mode::receive))
136
                {
137
                    result = true;
138
                }
139
            }
140
#endif
141
            if (0 == num_thread)
142
            {
143
                rt->get_agas_client().garbage_collect_non_blocking();
144
            }
145

146
            return result;
147
        }
148
#else
149
        bool network_background_callback(
150
            [[maybe_unused]] runtime_distributed* rt, std::size_t num_thread)
151
        {
152
            bool result = false;
153

154
#if defined(HPX_HAVE_NETWORKING)
155
            if (rt->get_parcel_handler().do_background_work(num_thread, false,
156
                    parcelset::parcelport_background_mode::all))
157
            {
686,019✔
158
                result = true;
159
            }
160
#endif
161
            if (0 == num_thread)
162
            {
686,019✔
163
                rt->get_agas_client().garbage_collect_non_blocking();
164
            }
165

166
            return result;
167
        }
168
#endif
169
    }    // namespace detail
686,019✔
170

686,019✔
171
    ///////////////////////////////////////////////////////////////////////////
686,019✔
172
    components::server::runtime_support* get_runtime_support_ptr()
173
    {
174
        if (auto const* rt = get_runtime_distributed_ptr(); rt != nullptr)
175
        {
176
            return static_cast<components::server::runtime_support*>(
177
                rt->get_runtime_support_lva());
768✔
178
        }
179
        return nullptr;
768✔
180
    }
181

182
    ///////////////////////////////////////////////////////////////////////////
768✔
183
    runtime_distributed::runtime_distributed(util::runtime_configuration& rtcfg,
184
        int (*pre_main)(runtime_mode), void (*post_main)())
185
      : runtime(rtcfg)
186
      , mode_(rtcfg_.mode_)
187
#if defined(HPX_HAVE_NETWORKING)
188
      , parcel_handler_(rtcfg_)
32✔
189
#endif
32✔
190
      , agas_client_(rtcfg_)
191
      , pre_main_(pre_main)
32✔
192
      , post_main_(post_main)
193
    {
32✔
194
        // set notification policies only after the object was completely
32✔
195
        // initialized
196
        runtime::set_notification_policies(
32✔
197
            runtime_distributed::get_notification_policy(
32✔
198
                "worker-thread", runtime_local::os_thread_type::worker_thread),
32✔
199
#ifdef HPX_HAVE_IO_POOL
32✔
200
            runtime_distributed::get_notification_policy(
32✔
201
                "io-thread", runtime_local::os_thread_type::io_thread),
202
#endif
203
#ifdef HPX_HAVE_TIMER_POOL
204
            runtime_distributed::get_notification_policy(
32✔
205
                "timer-thread", runtime_local::os_thread_type::timer_thread),
64✔
206
#endif
207
            threads::detail::network_background_callback_type(
208
                hpx::bind_front(&detail::network_background_callback, this)));
64✔
209

210
#if defined(HPX_HAVE_NETWORKING)
211
        parcel_handler_notifier_ = runtime_distributed::get_notification_policy(
212
            "parcel-thread", runtime_local::os_thread_type::parcel_thread);
64✔
213
        parcel_handler_.set_notification_policies(
214
            rtcfg_, thread_manager_.get(), parcel_handler_notifier_);
215

32✔
216
        applier_.init(parcel_handler_, *thread_manager_);
32✔
217
#else
218
        applier_.init(*thread_manager_);
219
#endif
64✔
220
        runtime_support_.reset(new components::server::runtime_support(rtcfg_));
32✔
221

32✔
222
        // This needs to happen first
223
        runtime::init();
224

32✔
225
        init_global_data();
226
        util::reinit_construct();
227

228
        LPROGRESS_;
32✔
229

230
        components::server::get_error_dispatcher().set_error_sink(
231
            &runtime_distributed::default_errorsink);
32✔
232

233
        // now, launch AGAS and register all nodes, launch all other components
32✔
234
        initialize_agas();
32✔
235

236
        applier_.initialize(
32✔
237
            reinterpret_cast<std::uint64_t>(runtime_support_.get()));
238
    }
32✔
239

32✔
240
    void runtime_distributed::initialize_agas()
241
    {
242
#if defined(HPX_HAVE_NETWORKING)
32✔
243
        std::shared_ptr<parcelset::parcelport> const pp =
244
            parcel_handler_.get_bootstrap_parcelport();
32✔
245

32✔
246
        agas::create_big_boot_barrier(
247
            pp ? pp.get() : nullptr, parcel_handler_.endpoints(), rtcfg_);
32✔
248

249
        if (agas_client_.is_bootstrap())
250
        {
251
            // store number of cores used by other processes
32✔
252
            std::uint32_t const cores_needed =
253
                runtime_distributed::assign_cores();
32✔
254
            std::uint32_t const first_used_core =
32✔
255
                runtime_distributed::assign_cores(
256
                    pp ? pp->get_locality_name() : "<console>", cores_needed);
32✔
257

258
            rtcfg_.set_first_used_core(first_used_core);
259
            HPX_ASSERT(pp ? pp->here() == pp->agas_locality(rtcfg_) : true);
29✔
260

29✔
261
            agas_client_.bootstrap(parcel_handler_.endpoints(), rtcfg_);
29✔
262

263
            init_id_pool_range();
29✔
264

265
            hpx::detail::try_catch_exception_ptr(
266
                [&]() {
29✔
267
                    if (pp)
268
                        pp->run(false);
29✔
269
                },
270
                [&](std::exception_ptr const& e) {
271
                    std::cerr << hpx::util::format(
272
                        "the bootstrap parcelport ({}) has failed to "
29✔
273
                        "initialize on locality {}:\n{},\n"
3✔
274
                        "bailing out\n",
275
                        pp->type(), hpx::get_locality_id(),
29✔
276
                        hpx::get_error_what(e));
×
277
                    std::terminate();
278
                });
279

×
280
            agas::get_big_boot_barrier().wait_bootstrap();
×
281
        }
×
282
        else
283
        {
284
            hpx::detail::try_catch_exception_ptr(
29✔
285
                [&]() {
286
                    if (pp)
287
                        pp->run(false);
288
                },
289
                [&](std::exception_ptr const& e) {
290
                    std::cerr << hpx::util::format(
3✔
291
                        "the bootstrap parcelport ({}) has failed to "
3✔
292
                        "initialize on locality {}:\n{},\n"
293
                        "bailing out\n",
3✔
294
                        pp->type(), hpx::get_locality_id(),
×
295
                        hpx::get_error_what(e));
296
                    std::terminate();
297
                });
×
298

×
299
            agas::get_big_boot_barrier().wait_hosted(
×
300
                pp ? pp->get_locality_name() : "<console>",
301
                agas_client_.get_primary_ns_lva(),
302
                agas_client_.get_symbol_ns_lva());
6✔
303
        }
6✔
304

305
        agas_client_.initialize(
306
            reinterpret_cast<std::uint64_t>(runtime_support_.get()));
307
        parcel_handler_.initialize();
308
#else
32✔
309
        if (agas_client_.is_bootstrap())
32✔
310
        {
311
            parcelset::endpoints_type endpoints;
312
            endpoints.insert(parcelset::endpoints_type::value_type(
313
                "local-loopback", parcelset::locality{}));
314
            agas_client_.bootstrap(endpoints, rtcfg_);
315
        }
316
        agas_client_.initialize(std::uint64_t(runtime_support_.get()));
317
#endif
318
    }
319

320
    ///////////////////////////////////////////////////////////////////////////
32✔
321
    runtime_distributed::~runtime_distributed()
322
    {
323
        LRT_(debug).format("~runtime_distributed(entering)");
32✔
324

325
        // reset counter registry
32✔
326
        get_counter_registry().clear();
327

328
        runtime_support_->delete_function_lists();
32✔
329

330
        // stop all services
32✔
331
#if defined(HPX_HAVE_NETWORKING)
332
        parcel_handler_.stop();    // stops parcel pools as well
333
#endif
334
        // unload libraries
32✔
335
        runtime_support_->tidy();
336

337
        LRT_(debug).format("~runtime_distributed(finished)");
32✔
338

339
        LPROGRESS_;
32✔
340
    }
341

32✔
342
    threads::thread_result_type runtime_distributed::run_helper(
64✔
343
        hpx::function<runtime::hpx_main_function_type> const& func, int& result)
344
    {
32✔
345
        bool caught_exception = false;
346
        try
347
        {
348
            lbt_ << "(2nd stage) runtime_distributed::run_helper: launching "
349
                    "pre_main";
350

351
            // Change our thread description, as we're about to call pre_main
32✔
352
            threads::set_thread_description(threads::get_self_id(), "pre_main");
353

354
            // Finish the bootstrap
32✔
355
            result = 0;
356
            if (pre_main_ != nullptr)
357
            {
32✔
358
                result = pre_main_(mode_);
32✔
359
            }
360

32✔
361
            if (result)
362
            {
363
                lbt_ << "runtime_distributed::run_helper: bootstrap "
32✔
364
                        "aborted, bailing out";
365
                return threads::thread_result_type(
366
                    threads::thread_schedule_state::terminated,
×
367
                    threads::invalid_thread_id);
368
            }
369

370
            lbt_ << "(4th stage) runtime_distributed::run_helper: bootstrap "
371
                    "complete";
372
            set_state(hpx::state::running);
373

32✔
374
#if defined(HPX_HAVE_NETWORKING)
32✔
375
            parcel_handler_.enable_alternative_parcelports();
376
#endif
377
            // reset all counters right before running main, if requested
378
            if (get_config_entry("hpx.print_counter.startup", "0") == "1")
379
            {
380
                bool reset = false;
64✔
381
                if (get_config_entry("hpx.print_counter.reset", "0") == "1")
382
                    reset = true;
383

×
384
                error_code ec(throwmode::lightweight);    // ignore errors
385
                evaluate_active_counters(reset, "startup", ec);
386
            }
387
        }
×
388
        catch (...)
389
        {
390
            // make sure exceptions thrown in hpx_main don't escape
×
391
            // unnoticed
392
            {
393
                std::lock_guard<std::mutex> l(mtx_);
394
                exception_ = std::current_exception();
395
            }
×
396
            result = -1;
×
397
            caught_exception = true;
398
        }
×
399

400
        if (caught_exception)
×
401
        {
402
            HPX_ASSERT(exception_);
403
            report_error(exception_, false);
404
            finalize(-1.0);    // make sure the application exits
405
        }
×
406

×
407
        auto result_value = runtime::run_helper(func, result, false);
408

409
        if (post_main_ != nullptr)
32✔
410
        {
411
            post_main_();
32✔
412
        }
413

32✔
414
        return result_value;
415
    }
416

417
    int runtime_distributed::start(
418
        hpx::function<hpx_main_function_type> const& func, bool blocking)
419
    {
32✔
420
#if defined(_WIN64) && defined(HPX_DEBUG) &&                                   \
421
    !defined(HPX_HAVE_FIBER_BASED_COROUTINES)
422
        // needs to be called to avoid problems at system startup
423
        // see: http://connect.microsoft.com/VisualStudio/feedback/ViewFeedback.aspx?FeedbackID=100319
424
        _isatty(0);
425
#endif
426
        // {{{ early startup code - local
427

428
        // initialize instrumentation system
429
#ifdef HPX_HAVE_APEX
430
        util::external_timer::init(
431
            nullptr, hpx::get_locality_id(), hpx::get_initial_num_localities());
432
#endif
433

434
        LRT_(info).format("cmd_line: {}", get_config().get_cmd_line());
435

436
        lbt_ << "(1st stage) runtime_distributed::start: booting locality "
32✔
437
             << here();
438

439
        // Register this thread with the runtime system to allow calling
32✔
440
        // certain HPX functionality from the main thread. Also calls
441
        // registered startup callbacks.
442
        init_tss_helper("main-thread",
443
            runtime_local::os_thread_type::main_thread, 0, 0, "", "", false);
444

32✔
445
        // start runtime_support services
446
        runtime_support_->run();
447
        lbt_ << "(1st stage) runtime_distributed::start: started "
448
                "runtime_support component";
32✔
449

450
#ifdef HPX_HAVE_IO_POOL
32✔
451
        // start the io pool
452
        io_pool_->run(false);
453
        lbt_ << "(1st stage) runtime_distributed::start: started the "
454
                "application I/O service pool";
32✔
455
#endif
456
        // start the thread manager
457
        thread_manager_->run();
32✔
458
        lbt_ << "(1st stage) runtime_distributed::start: started "
459
                "threadmanager";
460
        // }}}
32✔
461

462
        // invoke the AGAS v2 notifications
32✔
463
#if defined(HPX_HAVE_NETWORKING)
464
        agas::get_big_boot_barrier().trigger();
465
#endif
466

467
        // {{{ launch main
32✔
468
        // register the given main function with the thread manager
469
        lbt_ << "(1st stage) runtime_distributed::start: launching "
470
                "run_helper HPX thread";
471

472
        threads::thread_function_type thread_func =
473
            threads::make_thread_function(
32✔
474
                hpx::bind(&runtime_distributed::run_helper, this, func,
475
                    std::ref(result_)));
476

477
        threads::thread_init_data data(HPX_MOVE(thread_func), "run_helper",
32✔
478
            threads::thread_priority::normal, threads::thread_schedule_hint(0),
32✔
479
            threads::thread_stacksize::large);
480

481
        this->runtime::starting();
482
        threads::thread_id_ref_type id = threads::invalid_thread_id;
483
        thread_manager_->register_thread(data, id);
484
        // }}}
32✔
485

486
        // block if required
32✔
487
        if (blocking)
488
        {
489
            return wait();    // wait for the shutdown_action to be executed
490
        }
32✔
491
        else
492
        {
×
493
            // wait for at least hpx::state::running
494
            util::yield_while(
495
                [this]() {
496
                    return !exception_ && get_state() < hpx::state::running;
497
                },
498
                "runtime_impl::start");
27,214✔
499
        }
500

501
        return 0;    // return zero as we don't know the outcome of hpx_main yet
502
    }
32✔
503

504
    int runtime_distributed::start(bool blocking)
505
    {
3✔
506
        hpx::function<hpx_main_function_type> const empty_main;
507
        return start(empty_main, blocking);
3✔
508
    }
6✔
509

510
    ///////////////////////////////////////////////////////////////////////////
511
    std::string locality_prefix(util::runtime_configuration const& cfg)
512
    {
284✔
513
        std::string const localities = cfg.get_entry("hpx.localities", "1");
514
        std::size_t const num_localities =
284✔
515
            util::from_string<std::size_t>(localities, 1);
516
        if (num_localities > 1)
284✔
517
        {
284✔
518
            std::string locality = cfg.get_entry("hpx.locality", "");
519
            if (!locality.empty())
124✔
520
            {
62✔
521
                locality = "locality#" + locality;
522
            }
124✔
523
            return locality;
524
        }
62✔
525
        return "";
526
    }
222✔
527

62✔
528
    ///////////////////////////////////////////////////////////////////////////
529
    void runtime_distributed::wait_helper(
530
        std::mutex& mtx, std::condition_variable& cond, bool& running)
32✔
531
    {
532
        // signal successful initialization
533
        {
534
            std::lock_guard<std::mutex> lk(mtx);
535
            running = true;
536
            cond.notify_all();
32✔
537
        }
32✔
538

539
        // prefix thread name with locality number, if needed
540
        std::string const locality = locality_prefix(get_config());
541

32✔
542
        // register this thread with any possibly active Intel tool
543
        std::string const thread_name(locality + "main-thread#wait_helper");
544
        HPX_ITT_THREAD_SET_NAME(thread_name.c_str());
32✔
545

546
        // set thread name as shown in Visual Studio
547
        util::set_thread_name(thread_name.c_str());
548

32✔
549
#if defined(HPX_HAVE_APEX)
550
        // not registering helper threads - for now
551
        //util::external_timer::register_thread(thread_name.c_str());
552
#endif
553

554
        // wait for termination
555
        runtime_support_->wait();
556

32✔
557
        // stop main thread pool
558
        main_pool_->stop();
559
    }
32✔
560

32✔
561
    int runtime_distributed::wait()
562
    {
32✔
563
        LRT_(info).format("runtime_distributed: about to enter wait state");
564

32✔
565
        // start the wait_helper in a separate thread
566
        std::mutex mtx;
567
        std::condition_variable cond;
32✔
568
        bool running = false;
32✔
569

32✔
570
        std::thread t(hpx::bind(&runtime_distributed::wait_helper, this,
571
            std::ref(mtx), std::ref(cond), std::ref(running)));
32✔
572

32✔
573
        // wait for the thread to run
574
        {
575
            std::unique_lock<std::mutex> lk(mtx);
576
            // NOLINTNEXTLINE(bugprone-infinite-loop)
577
            while (!running)      // -V776 // -V1044
578
                cond.wait(lk);    //-V1089
64✔
579
        }
32✔
580

581
        // use main thread to drive main thread pool
582
        main_pool_->thread_run(0);
583

32✔
584
        // block main thread
585
        t.join();
586

32✔
587
        LRT_(info).format("runtime_distributed: exiting wait state");
588
        return result_;
32✔
589
    }
64✔
590

32✔
591
    ///////////////////////////////////////////////////////////////////////////
592
    // First half of termination process: stop thread manager,
593
    // schedule a task managed by timer_pool to initiate second part
594
    void runtime_distributed::stop(bool blocking)
595
    {
32✔
596
        LRT_(warning).format("runtime_distributed: about to stop services");
597

32✔
598
        // flush all parcel buffers, stop buffering parcels at this point
599
        //parcel_handler_.do_background_work(true, parcelport_background_mode::all);
600

601
        // execute all on_exit functions whenever the first thread calls this
602
        this->runtime::stopping();
603

32✔
604
        // stop runtime_distributed services (threads)
605
        thread_manager_->stop(false);    // just initiate shutdown
606

32✔
607
#ifdef HPX_HAVE_APEX
608
        util::external_timer::finalize();
609
#endif
610

611
        if (threads::get_self_ptr())
612
        {
32✔
613
            // schedule task on separate thread to execute stop_helper() below
614
            // this is necessary as this function (stop()) might have been called
615
            // from a HPX thread, so it would deadlock by waiting for the thread
616
            // manager
617
            std::mutex mtx;
618
            std::condition_variable cond;
×
619
            std::unique_lock<std::mutex> l(mtx);
×
620

621
            std::thread t(hpx::bind(&runtime_distributed::stop_helper, this,
622
                blocking, std::ref(cond), std::ref(mtx)));
×
623
            cond.wait(l);    //-V1089
×
624

×
625
            t.join();
626
        }
×
627
        else
×
628
        {
629
            runtime_support_->stopped();    // re-activate shutdown HPX-thread
630
            thread_manager_->stop(blocking);    // wait for thread manager
32✔
631

32✔
632
            deinit_global_data();
633

32✔
634
            // this disables all logging from the main thread
635
            deinit_tss_helper("main-thread", 0);
636

32✔
637
            LRT_(info).format("runtime_distributed: stopped all services");
638
        }
32✔
639

640
        // stop the rest of the system
641
#if defined(HPX_HAVE_NETWORKING)
642
        parcel_handler_.stop(blocking);
643
#endif
32✔
644
#ifdef HPX_HAVE_TIMER_POOL
645
        LTM_(info).format("stop: stopping timer pool");
646
        timer_pool_->stop();
32✔
647
        if (blocking)
32✔
648
        {
32✔
649
            timer_pool_->join();
650
            timer_pool_->clear();
32✔
651
        }
32✔
652
#endif
653
#ifdef HPX_HAVE_IO_POOL
654
        LTM_(info).format("stop: stopping io pool");
655
        io_pool_->stop();
32✔
656
        if (blocking)
32✔
657
        {
32✔
658
            io_pool_->join();
659
            io_pool_->clear();
32✔
660
        }
32✔
661
#endif
662
    }
663

32✔
664
    int runtime_distributed::finalize(double shutdown_timeout)
665
    {
29✔
666
#if !defined(HPX_COMPUTE_DEVICE_CODE)
667
        //   tell main locality to start application exit, duplicated requests
668
        // will be ignored
669
        hpx::post<components::server::runtime_support::shutdown_all_action>(
670
            hpx::find_root_locality(), shutdown_timeout);
671
#else
29✔
672
        HPX_ASSERT(false);
673
        HPX_UNUSED(shutdown_timeout);
674
#endif
675
        return 0;
676
    }
29✔
677

678
    // Second step in termination: shut down all services.
679
    // This gets executed as a task in the timer_pool io_service and not as
680
    // a HPX thread!
681
    void runtime_distributed::stop_helper(
682
        bool blocking, std::condition_variable& cond, std::mutex& mtx)
×
683
    {
684
        // wait for thread manager to exit
685
        runtime_support_->stopped();        // re-activate shutdown HPX-thread
686
        thread_manager_->stop(blocking);    // wait for thread manager
×
687

×
688
        deinit_global_data();
689

×
690
        // this disables all logging from the main thread
691
        deinit_tss_helper("main-thread", 0);
692

×
693
        LRT_(info).format("runtime_distributed: stopped all services");
694

×
695
        std::lock_guard<std::mutex> l(mtx);
696
        cond.notify_all();    // we're done now
697
    }
×
698

×
699
    int runtime_distributed::suspend()
700
    {
×
701
        return runtime::suspend();
702
    }
×
703

704
    int runtime_distributed::resume()
705
    {
×
706
        return runtime::resume();
707
    }
×
708

709
    ///////////////////////////////////////////////////////////////////////////
710
    bool runtime_distributed::report_error(
711
        std::size_t num_thread, std::exception_ptr const& e, bool terminate_all)
×
712
    {
713
        // call thread-specific user-supplied on_error handler
714
        bool report_exception = true;
715
        if (on_error_func_)
716
        {
×
717
            report_exception = on_error_func_(num_thread, e);
718
        }
×
719

720
        // Early and late exceptions, errors outside HPX-threads
721
        if (!threads::get_self_ptr() ||
722
            !threads::threadmanager_is(hpx::state::running))
×
723
        {
×
724
            // report the error to the local console
725
            if (report_exception)
726
            {
×
727
                detail::report_exception_and_continue(e);
728
            }
×
729

730
            // store the exception to be able to rethrow it later
731
            {
732
                std::lock_guard<std::mutex> l(mtx_);
733
                exception_ = e;
×
734
            }
×
735

736
            // initiate stopping the runtime system
737
            runtime_support_->notify_waiting_main();
738
            stop(false);
×
739

×
740
            return report_exception;
741
        }
×
742

743
        // The components::console_error_sink is only applied at the console,
744
        // so the default error sink never gets called on the locality, meaning
745
        // that the user never sees errors that kill the system before the
746
        // error parcel gets sent out. So, before we try to send the error
747
        // parcel (which might cause a double fault), print local diagnostics.
748
        components::server::console_error_sink(e);
749

×
750
        // Report this error to the console.
751
        naming::gid_type console_id;
752
        if (agas_client_.get_console_locality(console_id))
×
753
        {
×
754
            if (agas_client_.get_local_locality() != console_id)
755
            {
756
                components::console_error_sink(
757
                    hpx::id_type(
×
758
                        console_id, hpx::id_type::management_type::unmanaged),
×
759
                    e);
760
            }
761
        }
762

763
        if (terminate_all)
764
        {
×
765
            components::stubs::runtime_support::terminate_all(
766
                naming::get_id_from_locality_id(agas::booststrap_prefix));
×
767
        }
×
768

769
        return report_exception;
770
    }
771

772
    bool runtime_distributed::report_error(
773
        std::exception_ptr const& e, bool terminate_all)
×
774
    {
775
        return report_error(hpx::get_worker_thread_num(), e, terminate_all);
776
    }
×
777

778
    ///////////////////////////////////////////////////////////////////////////
779
    int runtime_distributed::run(
780
        hpx::function<hpx_main_function_type> const& func)
28✔
781
    {
782
        // start the main thread function
783
        start(func);
784

28✔
785
        // now wait for everything to finish
786
        wait();
787
        stop();
28✔
788

28✔
789
#if defined(HPX_HAVE_NETWORKING)
790
        parcel_handler_.stop();    // stops parcelport for sure
791
#endif
28✔
792

793
        rethrow_exception();
794
        return result_;
28✔
795
    }
28✔
796

797
    ///////////////////////////////////////////////////////////////////////////
798
    int runtime_distributed::run()
799
    {
3✔
800
        // start the main thread function
801
        start();
802

3✔
803
        // now wait for everything to finish
804
        int const result = wait();
805
        stop();
3✔
806

3✔
807
#if defined(HPX_HAVE_NETWORKING)
808
        parcel_handler_.stop();    // stops parcelport for sure
809
#endif
3✔
810

811
        rethrow_exception();
812
        return result;
3✔
813
    }
3✔
814

815
    bool runtime_distributed::is_networking_enabled()
816
    {
76✔
817
#if defined(HPX_HAVE_NETWORKING)
818
        return get_config().enable_networking();
819
#else
76✔
820
        return false;
821
#endif
822
    }
823

824
    performance_counters::registry& runtime_distributed::get_counter_registry()
825
    {
32✔
826
        return performance_counters::registry::instance();
827
    }
32✔
828

829
    performance_counters::registry const&
830
    runtime_distributed::get_counter_registry() const
831
    {
×
832
        return performance_counters::registry::instance();
833
    }
×
834

835
    ///////////////////////////////////////////////////////////////////////////
836
    void runtime_distributed::register_query_counters(
837
        std::shared_ptr<util::query_counters> const& active_counters)
×
838
    {
839
        active_counters_ = active_counters;
840
    }
841

×
842
    void runtime_distributed::start_active_counters(error_code& ec) const
843
    {
×
844
        if (active_counters_)
845
            active_counters_->start_counters(ec);
×
846
    }
×
847

×
848
    void runtime_distributed::stop_active_counters(error_code& ec) const
849
    {
×
850
        if (active_counters_)
851
            active_counters_->stop_counters(ec);
×
852
    }
×
853

×
854
    void runtime_distributed::reset_active_counters(error_code& ec) const
855
    {
×
856
        if (active_counters_)
857
            active_counters_->reset_counters(ec);
×
858
    }
×
859

×
860
    void runtime_distributed::reinit_active_counters(
861
        bool reset, error_code& ec) const
×
862
    {
863
        if (active_counters_)
×
864
            active_counters_->reinit_counters(reset, ec);
×
865
    }
×
866

867
    void runtime_distributed::evaluate_active_counters(
×
868
        bool reset, char const* description, error_code& ec) const
869
    {
870
        if (active_counters_)
×
871
            active_counters_->evaluate_counters(reset, description, true, ec);
×
872
    }
×
873

874
    void runtime_distributed::stop_evaluating_counters(bool terminate) const
29✔
875
    {
876
        if (active_counters_)
29✔
877
            active_counters_->stop_evaluating_counters(terminate);
×
878
    }
29✔
879

880
    agas::addressing_service& runtime_distributed::get_agas_client()
862,976✔
881
    {
882
        return agas_client_;
862,976✔
883
    }
884

885
#if defined(HPX_HAVE_NETWORKING)
886
    parcelset::parcelhandler const& runtime_distributed::get_parcel_handler()
×
887
        const
888
    {
889
        return parcel_handler_;
×
890
    }
891

892
    parcelset::parcelhandler& runtime_distributed::get_parcel_handler()
686,648✔
893
    {
894
        return parcel_handler_;
686,648✔
895
    }
896
#endif
897

898
    hpx::threads::threadmanager& runtime_distributed::get_thread_manager()
83,501✔
899
    {
900
        return *thread_manager_;
83,501✔
901
    }
902

903
    applier::applier& runtime_distributed::get_applier()
271✔
904
    {
905
        return applier_;
271✔
906
    }
907

908
#if defined(HPX_HAVE_NETWORKING)
909
    parcelset::endpoints_type const& runtime_distributed::endpoints() const
34✔
910
    {
911
        return parcel_handler_.endpoints();
34✔
912
    }
913
#endif
914

915
    std::string runtime_distributed::here() const
34✔
916
    {
917
#if defined(HPX_HAVE_NETWORKING)
918
        std::ostringstream strm;
34✔
919
        strm << endpoints();
34✔
920
        return strm.str();
34✔
921
#else
922
        return "console";
923
#endif
924
    }
34✔
925

926
    naming::address_type runtime_distributed::get_runtime_support_lva() const
774✔
927
    {
928
        return runtime_support_.get();
774✔
929
    }
930

931
    naming::gid_type get_next_id(std::size_t count = 1);
932

933
    void runtime_distributed::init_id_pool_range()
32✔
934
    {
935
        naming::gid_type lower, upper;
32✔
936
        naming::get_agas_client().get_id_range(
32✔
937
            HPX_INITIAL_GID_RANGE, lower, upper);
938
        return id_pool_.set_range(lower, upper);
32✔
939
    }
940

941
    util::unique_id_ranges& runtime_distributed::get_id_pool()
×
942
    {
943
        return id_pool_;
×
944
    }
945

946
    /// \brief Register all performance counter types related to this runtime
947
    ///        instance
948
    void runtime_distributed::register_counter_types()
32✔
949
    {
950
        // clang-format off
951
        performance_counters::generic_counter_type_data const
952
            statistic_counter_types[] =
953
        {    // averaging counter
954
            {"/statistics/average",
955
                performance_counters::counter_type::aggregating,
956
                "returns the averaged value of its base counter over "
957
                "an arbitrary time line; pass required base counter as the "
958
                "instance "
959
                "name: /statistics{<base_counter_name>}/average",
960
                HPX_PERFORMANCE_COUNTER_V1,
32✔
961
                &performance_counters::detail::statistics_counter_creator,
32✔
962
                &performance_counters::default_counter_discoverer, ""},
963

964
            // stddev counter
965
            {"/statistics/stddev",
966
                performance_counters::counter_type::aggregating,
967
                "returns the standard deviation value of its base counter "
968
                "over "
969
                "an arbitrary time line; pass required base counter as the "
970
                "instance "
971
                "name: /statistics{<base_counter_name>}/stddev",
972
                HPX_PERFORMANCE_COUNTER_V1,
32✔
973
                &performance_counters::detail::statistics_counter_creator,
32✔
974
                &performance_counters::default_counter_discoverer, ""},
975

976
            // rolling_averaging counter
977
            {"/statistics/rolling_average",
978
                performance_counters::counter_type::aggregating,
979
                "returns the rolling average value of its base counter "
980
                "over "
981
                "an arbitrary time line; pass required base counter as the "
982
                "instance "
983
                "name: /statistics{<base_counter_name>}/rolling_averaging",
984
                HPX_PERFORMANCE_COUNTER_V1,
32✔
985
                &performance_counters::detail::statistics_counter_creator,
32✔
986
                &performance_counters::default_counter_discoverer, ""},
987

988
            // rolling stddev counter
989
            {"/statistics/rolling_stddev",
990
                performance_counters::counter_type::aggregating,
991
                "returns the rolling standard deviation value of its base "
992
                "counter over "
993
                "an arbitrary time line; pass required base counter as the "
994
                "instance "
995
                "name: /statistics{<base_counter_name>}/rolling_stddev",
996
                HPX_PERFORMANCE_COUNTER_V1,
32✔
997
                &performance_counters::detail::statistics_counter_creator,
32✔
998
                &performance_counters::default_counter_discoverer, ""},
999

1000
            // median counter
1001
            {"/statistics/median",
1002
                performance_counters::counter_type::aggregating,
1003
                "returns the median value of its base counter over "
1004
                "an arbitrary time line; pass required base counter as the "
1005
                "instance "
1006
                "name: /statistics{<base_counter_name>}/median",
1007
                HPX_PERFORMANCE_COUNTER_V1,
32✔
1008
                &performance_counters::detail::statistics_counter_creator,
32✔
1009
                &performance_counters::default_counter_discoverer, ""},
1010

1011
            // max counter
1012
            {"/statistics/max", performance_counters::counter_type::aggregating,
1013
                "returns the maximum value of its base counter over "
1014
                "an arbitrary time line; pass required base counter as the "
1015
                "instance "
1016
                "name: /statistics{<base_counter_name>}/max",
1017
                HPX_PERFORMANCE_COUNTER_V1,
32✔
1018
                &performance_counters::detail::statistics_counter_creator,
32✔
1019
                &performance_counters::default_counter_discoverer, ""},
1020

1021
            // min counter
1022
            {"/statistics/min", performance_counters::counter_type::aggregating,
1023
                "returns the minimum value of its base counter over "
1024
                "an arbitrary time line; pass required base counter as the "
1025
                "instance "
1026
                "name: /statistics{<base_counter_name>}/min",
1027
                HPX_PERFORMANCE_COUNTER_V1,
32✔
1028
                &performance_counters::detail::statistics_counter_creator,
32✔
1029
                &performance_counters::default_counter_discoverer, ""},
1030

1031
            // rolling max counter
1032
            {"/statistics/rolling_max",
1033
                performance_counters::counter_type::aggregating,
1034
                "returns the rolling maximum value of its base counter "
1035
                "over "
1036
                "an arbitrary time line; pass required base counter as the "
1037
                "instance "
1038
                "name: /statistics{<base_counter_name>}/rolling_max",
1039
                HPX_PERFORMANCE_COUNTER_V1,
32✔
1040
                &performance_counters::detail::statistics_counter_creator,
32✔
1041
                &performance_counters::default_counter_discoverer, ""},
1042

1043
            // rolling min counter
1044
            {"/statistics/rolling_min",
1045
                performance_counters::counter_type::aggregating,
1046
                "returns the rolling minimum value of its base counter "
1047
                "over "
1048
                "an arbitrary time line; pass required base counter as the "
1049
                "instance "
1050
                "name: /statistics{<base_counter_name>}/rolling_min",
1051
                HPX_PERFORMANCE_COUNTER_V1,
32✔
1052
                &performance_counters::detail::statistics_counter_creator,
32✔
1053
                &performance_counters::default_counter_discoverer, ""},
1054

1055
            // uptime counters
1056
            {
1057
                "/runtime/uptime",
1058
                performance_counters::counter_type::elapsed_time,
1059
                "returns the up time of the runtime instance for the "
1060
                "referenced "
1061
                "locality",
1062
                HPX_PERFORMANCE_COUNTER_V1,
32✔
1063
                &performance_counters::detail::uptime_counter_creator,
32✔
1064
                &performance_counters::locality_counter_discoverer,
1065
                "s"    // unit of measure is seconds
1066
            },
1067

1068
            // component instance counters
1069
            {"/runtime/count/component",
1070
                performance_counters::counter_type::raw,
1071
                "returns the number of component instances currently alive "
1072
                "on "
1073
                "this locality (the component type has to be specified as "
1074
                "the "
1075
                "counter parameter)",
1076
                HPX_PERFORMANCE_COUNTER_V1,
32✔
1077
                &performance_counters::detail::
1078
                    component_instance_counter_creator,
32✔
1079
                &performance_counters::locality_counter_discoverer, ""},
1080

1081
            // action invocation counters
1082
            {"/runtime/count/action-invocation",
1083
                performance_counters::counter_type::raw,
1084
                "returns the number of (local) invocations of a specific "
1085
                "action "
1086
                "on this locality (the action type has to be specified as "
1087
                "the "
1088
                "counter parameter)",
1089
                HPX_PERFORMANCE_COUNTER_V1,
32✔
1090
                &performance_counters::local_action_invocation_counter_creator,
32✔
1091
                &performance_counters::
1092
                    local_action_invocation_counter_discoverer,
1093
                ""},
1094

1095
#if defined(HPX_HAVE_NETWORKING)
1096
            {"/runtime/count/remote-action-invocation",
1097
                performance_counters::counter_type::raw,
1098
                "returns the number of (remote) invocations of a specific "
1099
                "action "
1100
                "on this locality (the action type has to be specified as "
1101
                "the "
1102
                "counter parameter)",
1103
                HPX_PERFORMANCE_COUNTER_V1,
32✔
1104
                &performance_counters::remote_action_invocation_counter_creator,
32✔
1105
                &performance_counters::
1106
                    remote_action_invocation_counter_discoverer,
1107
                ""}
1108
#endif
416✔
1109
        };
32✔
1110
        // clang-format on
1111

1112
        performance_counters::install_counter_types(
1113
            statistic_counter_types, std::size(statistic_counter_types));
1114

1115
        performance_counters::generic_counter_type_data const
1116
            arithmetic_counter_types[] = {
1117
                // adding counter
1118
                {"/arithmetics/add",
1119
                    performance_counters::counter_type::aggregating,
1120
                    "returns the sum of the values of the specified base "
1121
                    "counters; "
1122
                    "pass required base counters as the parameters: "
1123
                    "/arithmetics/"
1124
                    "add@<base_counter_name1>,<base_counter_name2>",
32✔
1125
                    HPX_PERFORMANCE_COUNTER_V1,
32✔
1126
                    &performance_counters::detail::arithmetics_counter_creator,
1127
                    &performance_counters::default_counter_discoverer, ""},
1128
                // minus counter
1129
                {"/arithmetics/subtract",
1130
                    performance_counters::counter_type::aggregating,
1131
                    "returns the difference of the values of the specified "
1132
                    "base counters; "
1133
                    "pass the required base counters as the parameters: "
1134
                    "/arithmetics/"
1135
                    "subtract@<base_counter_name1>,<base_counter_name2>",
32✔
1136
                    HPX_PERFORMANCE_COUNTER_V1,
32✔
1137
                    &performance_counters::detail::arithmetics_counter_creator,
1138
                    &performance_counters::default_counter_discoverer, ""},
1139
                // multiply counter
1140
                {"/arithmetics/multiply",
1141
                    performance_counters::counter_type::aggregating,
1142
                    "returns the product of the values of the specified "
1143
                    "base "
1144
                    "counters; "
1145
                    "pass the required base counters as the parameters: "
1146
                    "/arithmetics/"
1147
                    "multiply@<base_counter_name1>,<base_counter_name2>",
32✔
1148
                    HPX_PERFORMANCE_COUNTER_V1,
32✔
1149
                    &performance_counters::detail::arithmetics_counter_creator,
1150
                    &performance_counters::default_counter_discoverer, ""},
1151
                // divide counter
1152
                {"/arithmetics/divide",
1153
                    performance_counters::counter_type::aggregating,
1154
                    "returns the result of division of the values of the "
1155
                    "specified "
1156
                    "base counters; pass the required base counters as the "
1157
                    "parameters: "
1158
                    "/arithmetics/"
1159
                    "divide@<base_counter_name1>,<base_counter_name2>",
32✔
1160
                    HPX_PERFORMANCE_COUNTER_V1,
32✔
1161
                    &performance_counters::detail::arithmetics_counter_creator,
1162
                    &performance_counters::default_counter_discoverer, ""},
1163

1164
                // arithmetics mean counter
1165
                {"/arithmetics/mean",
1166
                    performance_counters::counter_type::aggregating,
1167
                    "returns the average value of all values of the "
1168
                    "specified "
1169
                    "base counters; pass the required base counters as the "
1170
                    "parameters: "
1171
                    "/arithmetics/"
1172
                    "mean@<base_counter_name1>,<base_counter_name2>",
32✔
1173
                    HPX_PERFORMANCE_COUNTER_V1,
1174
                    &performance_counters::detail::
32✔
1175
                        arithmetics_counter_extended_creator,
1176
                    &performance_counters::default_counter_discoverer, ""},
1177
                // arithmetics variance counter
1178
                {"/arithmetics/variance",
1179
                    performance_counters::counter_type::aggregating,
1180
                    "returns the standard deviation of all values of the "
1181
                    "specified "
1182
                    "base counters; pass the required base counters as the "
1183
                    "parameters: "
1184
                    "/arithmetics/"
1185
                    "variance@<base_counter_name1>,<base_counter_name2>",
32✔
1186
                    HPX_PERFORMANCE_COUNTER_V1,
1187
                    &performance_counters::detail::
32✔
1188
                        arithmetics_counter_extended_creator,
1189
                    &performance_counters::default_counter_discoverer, ""},
1190
                // arithmetics median counter
1191
                {"/arithmetics/median",
1192
                    performance_counters::counter_type::aggregating,
1193
                    "returns the median of all values of the specified "
1194
                    "base counters; pass the required base counters as the "
1195
                    "parameters: "
1196
                    "/arithmetics/"
1197
                    "median@<base_counter_name1>,<base_counter_name2>",
32✔
1198
                    HPX_PERFORMANCE_COUNTER_V1,
1199
                    &performance_counters::detail::
32✔
1200
                        arithmetics_counter_extended_creator,
1201
                    &performance_counters::default_counter_discoverer, ""},
1202
                // arithmetics min counter
1203
                {"/arithmetics/min",
1204
                    performance_counters::counter_type::aggregating,
1205
                    "returns the minimum value of all values of the "
1206
                    "specified "
1207
                    "base counters; pass the required base counters as the "
1208
                    "parameters: "
1209
                    "/arithmetics/"
1210
                    "min@<base_counter_name1>,<base_counter_name2>",
32✔
1211
                    HPX_PERFORMANCE_COUNTER_V1,
1212
                    &performance_counters::detail::
32✔
1213
                        arithmetics_counter_extended_creator,
1214
                    &performance_counters::default_counter_discoverer, ""},
1215
                // arithmetics max counter
1216
                {"/arithmetics/max",
1217
                    performance_counters::counter_type::aggregating,
1218
                    "returns the maximum value of all values of the "
1219
                    "specified "
1220
                    "base counters; pass the required base counters as the "
1221
                    "parameters: "
1222
                    "/arithmetics/"
1223
                    "max@<base_counter_name1>,<base_counter_name2>",
32✔
1224
                    HPX_PERFORMANCE_COUNTER_V1,
1225
                    &performance_counters::detail::
32✔
1226
                        arithmetics_counter_extended_creator,
1227
                    &performance_counters::default_counter_discoverer, ""},
1228
                // arithmetics count counter
1229
                {"/arithmetics/count",
1230
                    performance_counters::counter_type::aggregating,
1231
                    "returns the count value of all values of the "
1232
                    "specified "
1233
                    "base counters; pass the required base counters as the "
1234
                    "parameters: "
1235
                    "/arithmetics/"
1236
                    "count@<base_counter_name1>,<base_counter_name2>",
32✔
1237
                    HPX_PERFORMANCE_COUNTER_V1,
1238
                    &performance_counters::detail::
32✔
1239
                        arithmetics_counter_extended_creator,
320✔
1240
                    &performance_counters::default_counter_discoverer, ""},
32✔
1241
            };
1242
        performance_counters::install_counter_types(
1243
            arithmetic_counter_types, std::size(arithmetic_counter_types));
1,600✔
1244
    }
1245

1246
    ///////////////////////////////////////////////////////////////////////////
×
1247
    void start_active_counters(error_code& ec)
1248
    {
×
1249
        if (runtime_distributed const* rtd = get_runtime_distributed_ptr();
×
1250
            nullptr != rtd)
1251
        {
×
1252
            rtd->start_active_counters(ec);
1253
        }
1254
        else
1255
        {
×
1256
            HPX_THROWS_IF(ec, hpx::error::invalid_status,
1257
                "start_active_counters",
1258
                "the runtime system is not available at this time");
1259
        }
×
1260
    }
1261

×
1262
    void stop_active_counters(error_code& ec)
1263
    {
×
1264
        if (runtime_distributed const* rtd = get_runtime_distributed_ptr();
×
1265
            nullptr != rtd)
1266
        {
×
1267
            rtd->stop_active_counters(ec);
1268
        }
1269
        else
1270
        {
×
1271
            HPX_THROWS_IF(ec, hpx::error::invalid_status,
1272
                "stop_active_counters",
1273
                "the runtime system is not available at this time");
1274
        }
×
1275
    }
1276

×
1277
    void reset_active_counters(error_code& ec)
1278
    {
×
1279
        if (runtime_distributed const* rtd = get_runtime_distributed_ptr();
×
1280
            nullptr != rtd)
1281
        {
×
1282
            rtd->reset_active_counters(ec);
1283
        }
1284
        else
1285
        {
×
1286
            HPX_THROWS_IF(ec, hpx::error::invalid_status,
1287
                "reset_active_counters",
1288
                "the runtime system is not available at this time");
1289
        }
×
1290
    }
1291

×
1292
    void reinit_active_counters(bool reset, error_code& ec)
1293
    {
×
1294
        if (runtime_distributed const* rtd = get_runtime_distributed_ptr();
×
1295
            nullptr != rtd)
1296
        {
×
1297
            rtd->reinit_active_counters(reset, ec);
1298
        }
1299
        else
1300
        {
×
1301
            HPX_THROWS_IF(ec, hpx::error::invalid_status,
1302
                "reinit_active_counters",
1303
                "the runtime system is not available at this time");
1304
        }
×
1305
    }
1306

×
1307
    void evaluate_active_counters(
1308
        bool reset, char const* description, error_code& ec)
1309
    {
×
1310
        if (runtime_distributed const* rtd = get_runtime_distributed_ptr();
×
1311
            nullptr != rtd)
1312
        {
×
1313
            rtd->evaluate_active_counters(reset, description, ec);
1314
        }
1315
        else
1316
        {
×
1317
            HPX_THROWS_IF(ec, hpx::error::invalid_status,
1318
                "evaluate_active_counters",
1319
                "the runtime system is not available at this time");
1320
        }
×
1321
    }
1322

1323
    // helper function to stop evaluating counters during shutdown
29✔
1324
    void stop_evaluating_counters(bool terminate)
1325
    {
29✔
1326
        if (runtime_distributed const* rtd = get_runtime_distributed_ptr();
29✔
1327
            nullptr != rtd)
29✔
1328
        {
29✔
1329
            rtd->stop_evaluating_counters(terminate);
1330
        }
1331
    }
1332

128✔
1333
    ///////////////////////////////////////////////////////////////////////////
1334
    threads::policies::callback_notifier
1335
    runtime_distributed::get_notification_policy(
1336
        char const* prefix, runtime_local::os_thread_type type)
1337
    {
1338
        typedef bool (runtime_distributed::*report_error_t)(
1339
            std::size_t, std::exception_ptr const&, bool);
1340

1341
        using placeholders::_1;
1342
        using placeholders::_2;
1343
        using placeholders::_3;
128✔
1344
        using placeholders::_4;
1345

128✔
1346
        notification_policy_type notifier;
128✔
1347

1348
        notifier.add_on_start_thread_callback(
256✔
1349
            hpx::bind(&runtime_distributed::init_tss_helper, this, prefix, type,
1350
                _1, _2, _3, _4, false));
128✔
1351
        notifier.add_on_stop_thread_callback(hpx::bind(
1352
            &runtime_distributed::deinit_tss_helper, this, prefix, _1));
1353
        notifier.set_on_error_callback(hpx::bind(
1354
            static_cast<report_error_t>(&runtime_distributed::report_error),
128✔
1355
            this, _1, _2, true));
×
1356

1357
        return notifier;
251✔
1358
    }
1359

1360
    void runtime_distributed::init_tss_helper(char const* context,
1361
        runtime_local::os_thread_type type, std::size_t local_thread_num,
1362
        std::size_t global_thread_num, char const* pool_name,
1363
        char const* postfix, bool service_thread)
251✔
1364
    {
1365
        // prefix thread name with locality number, if needed
1366
        std::string const locality = locality_prefix(get_config());
251✔
1367

251✔
1368
        error_code ec(throwmode::lightweight);
1369
        return init_tss_ex(locality, context, type, local_thread_num,
1370
            global_thread_num, pool_name, postfix, service_thread, ec);
252✔
1371
    }
1372

1373
    void runtime_distributed::init_tss_ex(std::string const& locality,
1374
        char const* context, runtime_local::os_thread_type type,
1375
        std::size_t local_thread_num, std::size_t global_thread_num,
1376
        char const* pool_name, char const* postfix, bool service_thread,
1377
        error_code& ec) const
1378
    {
1379
        // set the thread's name, if it's not already set
1380
        HPX_ASSERT(detail::thread_name().empty());
252✔
1381

1382
        std::string fullname = std::string(locality);
1383
        if (!locality.empty())
252✔
1384
            fullname += "/";
1385
        fullname += context;
1386
        if (postfix && *postfix)
1387
            fullname += postfix;
1388

1389
#if defined(HPX_GCC_VERSION) && HPX_GCC_VERSION >= 110000
1390
#pragma GCC diagnostic push
252✔
1391
#pragma GCC diagnostic ignored "-Wrestrict"
1392
#endif
1393
        fullname += "#" + std::to_string(global_thread_num);
1394
#if defined(HPX_GCC_VERSION) && HPX_GCC_VERSION >= 110000
1395
#pragma GCC diagnostic pop
252✔
1396
#endif
1397

252✔
1398
        detail::thread_name() = HPX_MOVE(fullname);
1399

1400
        char const* name = detail::thread_name().c_str();
252✔
1401

1402
        // initialize thread mapping for external libraries (i.e. PAPI)
1403
        thread_support_->register_thread(name, type);
1404

1405
        // register this thread with any possibly active Intel tool
1406
        HPX_ITT_THREAD_SET_NAME(name);
252✔
1407

1408
        // set thread name as shown in Visual Studio
1409
        util::set_thread_name(name);
1410

1411
#if defined(HPX_HAVE_APEX)
1412
        if (std::strstr(name, "worker") != nullptr)
1413
            util::external_timer::register_thread(name);
1414
#endif
252✔
1415

1416
        // call thread-specific user-supplied on_start handler
×
1417
        if (on_start_func_)
1418
        {
1419
            on_start_func_(
1420
                local_thread_num, global_thread_num, pool_name, context);
1421
        }
252✔
1422

1423
        // if this is a service thread, set its service affinity
1424
        if (service_thread)
1425
        {
1426
            // FIXME: We don't set the affinity of the service threads on BG/Q,
1427
            // as this is causing a hang (needs to be investigated)
1✔
1428
#if !defined(__bgq__)
1429
            threads::mask_cref_type used_processing_units =
1430
                thread_manager_->get_used_processing_units();
1✔
1431

1432
            // --hpx:bind=none  should disable all affinity definitions
1✔
1433
            if (threads::any(used_processing_units))
2✔
1434
            {
1435
                this->topology_.set_thread_affinity_mask(
1436
                    this->topology_.get_service_affinity_mask(
1437
                        used_processing_units),
1438
                    ec);
1439

1440
                // comment this out for now as on CircleCI this is causing unending grief
1441
                //if (ec)
1442
                //{
1443
                //    HPX_THROW_EXCEPTION(hpx::error::kernel_error,
1444
                //        "runtime_distributed::init_tss_ex",
1445
                //        "failed to set thread affinity mask ({}) for service "
1446
                //        "thread: {}",
1447
                //        hpx::threads::to_string(used_processing_units),
1448
                //        detail::thread_name());
1449
                //}
1450
            }
252✔
1451
#endif
1452
        }
251✔
1453
    }
1454

1455
    void runtime_distributed::deinit_tss_helper(
251✔
1456
        char const* context, std::size_t global_thread_num) const
1457
    {
1458
        threads::reset_continuation_recursion_count();
251✔
1459

1460
        // call thread-specific user-supplied on_stop handler
×
1461
        if (on_stop_func_)
1462
        {
1463
            on_stop_func_(global_thread_num, global_thread_num, "", context);
1464
        }
251✔
1465

1466
        // reset PAPI support
1467
        thread_support_->unregister_thread();
251✔
1468

251✔
1469
        // reset thread local storage
1470
        detail::thread_name().clear();
×
1471
    }
1472

×
1473
    naming::gid_type runtime_distributed::get_next_id(std::size_t count)
1474
    {
1475
        return id_pool_.get_id(count);
×
1476
    }
1477

×
1478
    void runtime_distributed::add_pre_startup_function(startup_function_type f)
×
1479
    {
1480
        runtime_support_->add_pre_startup_function(HPX_MOVE(f));
33✔
1481
    }
1482

33✔
1483
    void runtime_distributed::add_startup_function(startup_function_type f)
33✔
1484
    {
1485
        runtime_support_->add_startup_function(HPX_MOVE(f));
38✔
1486
    }
1487

1488
    void runtime_distributed::add_pre_shutdown_function(
38✔
1489
        shutdown_function_type f)
38✔
1490
    {
1491
        runtime_support_->add_pre_shutdown_function(HPX_MOVE(f));
4,062✔
1492
    }
1493

4,062✔
1494
    void runtime_distributed::add_shutdown_function(shutdown_function_type f)
4,062✔
1495
    {
1496
        runtime_support_->add_shutdown_function(HPX_MOVE(f));
86✔
1497
    }
1498

1499
    hpx::util::io_service_pool* runtime_distributed::get_thread_pool(
1500
        char const* name)
1501
    {
86✔
1502
        HPX_ASSERT(name != nullptr);
77✔
1503
#ifdef HPX_HAVE_IO_POOL
1504
        if (name && 0 == std::strncmp(name, "io", 2))
1505
            return io_pool_.get();
9✔
1506
#endif
×
1507
#if defined(HPX_HAVE_NETWORKING)
1508
        if (name && 0 == std::strncmp(name, "parcel", 6))
1509
            return parcel_handler_.get_thread_pool(name);
9✔
1510
#endif
9✔
1511
#ifdef HPX_HAVE_TIMER_POOL
1512
        if (name && 0 == std::strncmp(name, "timer", 5))
×
1513
            return timer_pool_.get();
×
1514
#endif
1515
        if (name && 0 == std::strncmp(name, "main", 4))    //-V112
×
1516
            return main_pool_.get();
1517

1518
        HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
1519
            "runtime_distributed::get_thread_pool",
1520
            "unknown thread pool requested: {}", name ? name : "<unknown>");
1521
    }
1✔
1522

1523
    /// Register an external OS-thread with HPX
1524
    bool runtime_distributed::register_thread(char const* name,
1525
        std::size_t global_thread_num, bool service_thread, error_code& ec)
1✔
1526
    {
1527
        // prefix thread name with locality number, if needed
1✔
1528
        std::string const locality = locality_prefix(get_config());
1529

1530
        std::string thread_name(name);
1✔
1531
        thread_name += "-thread";
1532

1533
        init_tss_ex(locality, thread_name.c_str(),
1534
            runtime_local::os_thread_type::custom_thread, global_thread_num,
2✔
1535
            global_thread_num, "", nullptr, service_thread, ec);
1536

1537
        return !ec ? true : false;
1538
    }
1,088✔
1539

1540
#if defined(HPX_HAVE_NETWORKING)
1541
    void runtime_distributed::register_message_handler(
1,088✔
1542
        char const* message_handler_type, char const* action,
1,088✔
1543
        error_code& ec) const
1544
    {
1545
        return runtime_support_->register_message_handler(
1546
            message_handler_type, action, ec);
6✔
1547
    }
1548

1549
    parcelset::policies::message_handler*
1550
    runtime_distributed::create_message_handler(
1551
        char const* message_handler_type, char const* action,
6✔
1552
        parcelset::parcelport* pp, std::size_t num_messages,
6✔
1553
        std::size_t interval, error_code& ec) const
1554
    {
1555
        return runtime_support_->create_message_handler(
×
1556
            message_handler_type, action, pp, num_messages, interval, ec);
1557
    }
1558

1559
    serialization::binary_filter* runtime_distributed::create_binary_filter(
×
1560
        char const* binary_filter_type, bool compress,
×
1561
        serialization::binary_filter* next_filter, error_code& ec) const
1562
    {
1563
        return runtime_support_->create_binary_filter(
1564
            binary_filter_type, compress, next_filter, ec);
250✔
1565
    }
1566
#endif
250✔
1567

1568
    std::uint32_t runtime_distributed::get_locality_id(error_code& ec) const
1569
    {
×
1570
        return agas::get_locality_id(ec);
1571
    }
1572

1573
    std::size_t runtime_distributed::get_num_worker_threads() const
×
1574
    {
1575
        error_code ec(throwmode::lightweight);
1576
        return static_cast<std::size_t>(
1✔
1577
            agas_client_.get_num_overall_threads(ec));
1578
    }
1579

1✔
1580
    std::uint32_t runtime_distributed::get_num_localities(
1581
        hpx::launch::sync_policy, error_code& ec) const
1582
    {
465✔
1583
        return agas_client_.get_num_localities(ec);
1584
    }
465✔
1585

1586
    std::uint32_t runtime_distributed::get_initial_num_localities() const
1587
    {
×
1588
        return get_config().get_num_localities();
1589
    }
×
1590

1591
    hpx::future<std::uint32_t> runtime_distributed::get_num_localities() const
1592
    {
×
1593
        return agas_client_.get_num_localities_async();
1594
    }
1595

×
1596
    std::string runtime_distributed::get_locality_name() const
1597
    {
1598
#if defined(HPX_HAVE_NETWORKING)
1599
        return get_parcel_handler().get_locality_name();
1600
#else
1601
        return "<unknown>";
×
1602
#endif
1603
    }
1604

1605
    std::uint32_t runtime_distributed::get_num_localities(
×
1606
        hpx::launch::sync_policy, components::component_type type,
1607
        error_code& ec) const
1608
    {
×
1609
        return agas_client_.get_num_localities(type, ec);
1610
    }
1611

×
1612
    hpx::future<std::uint32_t> runtime_distributed::get_num_localities(
1613
        components::component_type type) const
1614
    {
32✔
1615
        return agas_client_.get_num_localities_async(type);
1616
    }
1617

32✔
1618
    std::uint32_t runtime_distributed::assign_cores(
1619
        std::string const& locality_basename, std::uint32_t cores_needed)
1620
    {
1621
        std::lock_guard<std::mutex> l(mtx_);
32✔
1622

1623
        used_cores_map_type::iterator const it =
30✔
1624
            used_cores_map_.find(locality_basename);
1625
        if (it == used_cores_map_.end())
1626
        {
1627
            used_cores_map_.emplace(locality_basename, cores_needed);
2✔
1628
            return 0;
2✔
1629
        }
1630

2✔
1631
        std::uint32_t const current = it->second;
1632
        it->second += cores_needed;
1633

35✔
1634
        return current;
1635
    }
1636

1637
    std::uint32_t runtime_distributed::assign_cores()
1638
    {
35✔
1639
        // adjust thread assignments to allow for more than one locality per
1640
        // node
35✔
1641
        std::size_t const first_core =
1642
            static_cast<std::size_t>(this->get_config().get_first_used_core());
35✔
1643
        std::size_t const cores_needed =
1644
            hpx::resource::get_partitioner().assign_cores(first_core);
1645

1646
        return static_cast<std::uint32_t>(cores_needed);
×
1647
    }
1648

1649
    ///////////////////////////////////////////////////////////////////////////
×
1650
    void runtime_distributed::default_errorsink(std::string const& msg)
1651
    {
1652
        // log the exception information in any case
×
1653
        LERR_(always).format("default_errorsink: unhandled exception: {}", msg);
1654

1655
        std::cerr << msg << std::endl;
1656
    }
1657

×
1658
#if defined(HPX_HAVE_NETWORKING)
1659
    ///////////////////////////////////////////////////////////////////////////
1660
    // Create an instance of a binary filter plugin
1661
    serialization::binary_filter* create_binary_filter(
×
1662
        char const* binary_filter_type, bool compress,
×
1663
        serialization::binary_filter* next_filter, error_code& ec)
×
1664
    {
×
1665
        runtime_distributed* rtd = get_runtime_distributed_ptr();
1666
        if (nullptr != rtd)
×
1667
            return rtd->create_binary_filter(
1668
                binary_filter_type, compress, next_filter, ec);
×
1669

1670
        HPX_THROWS_IF(ec, hpx::error::invalid_status, "create_binary_filter",
1671
            "the runtime system is not available at this time");
1672
        return nullptr;
1,549,717✔
1673
    }
1674
#endif
1675

1,549,717✔
1676
    runtime_distributed& get_runtime_distributed()
1677
    {
1678
        HPX_ASSERT(get_runtime_distributed_ptr() != nullptr);
1,550,835✔
1679
        return *get_runtime_distributed_ptr();
1680
    }
1681

1,550,835✔
1682
    runtime_distributed*& get_runtime_distributed_ptr()
1683
    {
1684
        static runtime_distributed* runtime_distributed_ = nullptr;
32✔
1685
        return runtime_distributed_;
1686
    }
1687

32✔
1688
    void runtime_distributed::init_global_data()
1689
    {
1690
        runtime_distributed*& runtime_distributed_ =
32✔
1691
            get_runtime_distributed_ptr();
32✔
1692
        HPX_ASSERT(nullptr == threads::thread_self::get_self());
1693
        runtime_distributed_ = this;
32✔
1694
    }
1695

1696
    void runtime_distributed::deinit_global_data()
32✔
1697
    {
1698
        runtime_distributed*& runtime_distributed_ =
32✔
1699
            get_runtime_distributed_ptr();
1700
        HPX_ASSERT(runtime_distributed_);
32✔
1701
        runtime_distributed_ = nullptr;
32✔
1702

1703
        runtime::deinit_global_data();
×
1704
    }
1705

×
1706
    naming::gid_type const& get_locality()
1707
    {
1708
        return get_runtime_distributed().get_agas_client().get_local_locality();
1709
    }
1710

545✔
1711
    ///////////////////////////////////////////////////////////////////////////
1712
    // Helpers
545✔
1713
    hpx::id_type find_here(error_code& ec)
545✔
1714
    {
1715
        runtime const* rt = get_runtime_ptr();
×
1716
        if (nullptr == rt)
1717
        {
1718
            HPX_THROWS_IF(ec, hpx::error::invalid_status, "hpx::find_here",
1719
                "the runtime system is not available at this time");
1720
            return hpx::invalid_id;
1721
        }
577✔
1722

1723
        static hpx::id_type here =
1724
            naming::get_id_from_locality_id(rt->get_locality_id(ec));
1725
        return here;
59✔
1726
    }
1727

59✔
1728
    hpx::id_type find_root_locality(error_code& ec)
59✔
1729
    {
1730
        runtime_distributed* rt = hpx::get_runtime_distributed_ptr();
×
1731
        if (nullptr == rt)
1732
        {
1733
            HPX_THROWS_IF(ec, hpx::error::invalid_status,
1734
                "hpx::find_root_locality",
1735
                "the runtime system is not available at this time");
1736
            return hpx::invalid_id;
59✔
1737
        }
59✔
1738

1739
        naming::gid_type console_locality;
×
1740
        if (!rt->get_agas_client().get_console_locality(console_locality))
1741
        {
1742
            HPX_THROWS_IF(ec, hpx::error::invalid_status,
1743
                "hpx::find_root_locality",
1744
                "the root locality is not available at this time");
1745
            return hpx::invalid_id;
59✔
1746
        }
×
1747

1748
        if (&ec != &throws)
1749
            ec = make_success_code();
59✔
1750

1751
        return {console_locality, hpx::id_type::management_type::unmanaged};
1752
    }
×
1753

1754
    std::vector<hpx::id_type> find_all_localities(
1755
        components::component_type type, error_code& ec)
×
1756
    {
×
1757
        std::vector<hpx::id_type> locality_ids;
1758
        if (nullptr == hpx::applier::get_applier_ptr())
×
1759
        {
1760
            HPX_THROWS_IF(ec, hpx::error::invalid_status,
1761
                "hpx::find_all_localities",
×
1762
                "the runtime system is not available at this time");
1763
            return locality_ids;
1764
        }
×
1765

1766
        hpx::applier::get_applier().get_localities(locality_ids, type, ec);
×
1767
        return locality_ids;
1768
    }
35✔
1769

1770
    std::vector<hpx::id_type> find_all_localities(error_code& ec)
35✔
1771
    {
35✔
1772
        std::vector<hpx::id_type> locality_ids;
1773
        if (nullptr == hpx::applier::get_applier_ptr())
×
1774
        {
1775
            HPX_THROWS_IF(ec, hpx::error::invalid_status,
1776
                "hpx::find_all_localities",
×
1777
                "the runtime system is not available at this time");
1778
            return locality_ids;
1779
        }
35✔
1780

1781
        hpx::applier::get_applier().get_localities(locality_ids, ec);
×
1782
        return locality_ids;
1783
    }
×
1784

1785
    std::vector<hpx::id_type> find_remote_localities(
1786
        components::component_type type, error_code& ec)
×
1787
    {
×
1788
        std::vector<hpx::id_type> locality_ids;
1789
        if (nullptr == hpx::applier::get_applier_ptr())
×
1790
        {
1791
            HPX_THROWS_IF(ec, hpx::error::invalid_status,
1792
                "hpx::find_remote_localities",
×
1793
                "the runtime system is not available at this time");
1794
            return locality_ids;
1795
        }
×
1796

1797
        hpx::applier::get_applier().get_remote_localities(
1798
            locality_ids, type, ec);
×
1799
        return locality_ids;
1800
    }
4✔
1801

1802
    std::vector<hpx::id_type> find_remote_localities(error_code& ec)
4✔
1803
    {
4✔
1804
        std::vector<hpx::id_type> locality_ids;
1805
        if (nullptr == hpx::applier::get_applier_ptr())
×
1806
        {
1807
            HPX_THROWS_IF(ec, hpx::error::invalid_status,
1808
                "hpx::find_remote_localities",
×
1809
                "the runtime system is not available at this time");
1810
            return locality_ids;
1811
        }
4✔
1812

1813
        hpx::applier::get_applier().get_remote_localities(locality_ids,
1814
            to_int(hpx::components::component_enum_type::invalid), ec);
1815

×
1816
        return locality_ids;
1817
    }
1818

×
1819
    // find a locality supporting the given component
1820
    hpx::id_type find_locality(components::component_type type, error_code& ec)
×
1821
    {
1822
        if (nullptr == hpx::applier::get_applier_ptr())
×
1823
        {
1824
            HPX_THROWS_IF(ec, hpx::error::invalid_status, "hpx::find_locality",
1825
                "the runtime system is not available at this time");
1826
            return hpx::invalid_id;
1827
        }
×
1828

×
1829
        std::vector<hpx::id_type> locality_ids;
1830
        hpx::applier::get_applier().get_localities(locality_ids, type, ec);
×
1831

1832
        if (ec || locality_ids.empty())
1833
            return hpx::invalid_id;
1834

1835
        // chose first locality to host the object
×
1836
        return locality_ids.front();
1837
    }
×
1838

1839
    std::uint32_t get_num_localities(hpx::launch::sync_policy,
1840
        components::component_type type, error_code& ec)
×
1841
    {
×
1842
        runtime_distributed const* rt = get_runtime_distributed_ptr();
1843
        if (nullptr == rt)
×
1844
        {
1845
            HPX_THROW_EXCEPTION(hpx::error::invalid_status,
1846
                "hpx::get_num_localities",
1847
                "the runtime system has not been initialized yet");
1848
        }
×
1849

1850
        return rt->get_num_localities(hpx::launch::sync, type, ec);
1851
    }
×
1852

1853
    hpx::future<std::uint32_t> get_num_localities(
1854
        components::component_type type)
×
1855
    {
×
1856
        runtime_distributed const* rt = get_runtime_distributed_ptr();
1857
        if (nullptr == rt)
×
1858
        {
1859
            HPX_THROW_EXCEPTION(hpx::error::invalid_status,
1860
                "hpx::get_num_localities",
1861
                "the runtime system has not been initialized yet");
1862
        }
×
1863

1864
        return rt->get_num_localities(type);
1865
    }
1866
}    // namespace hpx
1867

1868
///////////////////////////////////////////////////////////////////////////////
1869
namespace hpx::naming {
1870

862,725✔
1871
    // shortcut for get_runtime().get_agas_client()
1872
    agas::addressing_service& get_agas_client()
862,725✔
1873
    {
1874
        return get_runtime_distributed().get_agas_client();
1875
    }
1876

192✔
1877
    // shortcut for get_runtime_ptr()->get_agas_client()
1878
    agas::addressing_service* get_agas_client_ptr()
192✔
1879
    {
192✔
1880
        auto* rtd = get_runtime_distributed_ptr();
1881
        return rtd ? &rtd->get_agas_client() : nullptr;
1882
    }
1883
}    // namespace hpx::naming
1884

1885
///////////////////////////////////////////////////////////////////////////////
1886
#if defined(HPX_HAVE_NETWORKING)
1887
namespace hpx::parcelset {
686,019✔
1888

1889
    bool do_background_work(
1890
        std::size_t num_thread, parcelport_background_mode mode)
686,019✔
1891
    {
686,019✔
1892
        return get_runtime_distributed()
686,019✔
1893
            .get_parcel_handler()
1894
            .do_background_work(num_thread, false, mode);
1895
    }
1896

3✔
1897
    // shortcut for get_runtime().get_parcel_handler()
1898
    parcelhandler& get_parcel_handler()
3✔
1899
    {
1900
        return get_runtime_distributed().get_parcel_handler();
1901
    }
1902

×
1903
    // shortcut for get_runtime_ptr()->get_parcel_handler()
1904
    parcelhandler* get_parcel_handler_ptr()
×
1905
    {
×
1906
        auto* rtd = get_runtime_distributed_ptr();
1907
        return rtd ? &rtd->get_parcel_handler() : nullptr;
1908
    }
1909
}    // namespace hpx::parcelset
1910
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc