• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #856

28 Dec 2022 02:00AM UTC coverage: 86.602% (+0.05%) from 86.55%
#856

push

StellarBot
Merge #6119

6119: Update CMakeLists.txt r=hkaiser a=khuck

updating the default APEX version


Co-authored-by: Kevin Huck <khuck@cs.uoregon.edu>

174566 of 201573 relevant lines covered (86.6%)

1876093.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.76
/libs/full/runtime_distributed/src/big_boot_barrier.cpp
1
//  Copyright (c) 2011 Bryce Lelbach & Katelyn Kufahl
2
//  Copyright (c) 2007-2021 Hartmut Kaiser
3
//  Copyright (c) 2015 Anton Bikineev
4
//
5
//  SPDX-License-Identifier: BSL-1.0
6
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
7
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8

9
#include <hpx/config.hpp>
10

11
#if defined(HPX_HAVE_NETWORKING)
12
#include <hpx/actions_base/actions_base_support.hpp>
13
#include <hpx/actions_base/plain_action.hpp>
14
#include <hpx/agas/addressing_service.hpp>
15
#include <hpx/agas_base/detail/hosted_component_namespace.hpp>
16
#include <hpx/agas_base/detail/hosted_locality_namespace.hpp>
17
#include <hpx/assert.hpp>
18
#include <hpx/async_distributed/put_parcel.hpp>
19
#include <hpx/components_base/agas_interface.hpp>
20
#include <hpx/components_base/server/managed_component_base.hpp>
21
#include <hpx/execution_base/this_thread.hpp>
22
#include <hpx/functional/bind_front.hpp>
23
#include <hpx/modules/agas_base.hpp>
24
#include <hpx/modules/format.hpp>
25
#include <hpx/parcelset/detail/parcel_await.hpp>
26
#include <hpx/parcelset_base/parcel_interface.hpp>
27
#include <hpx/parcelset_base/parcelport.hpp>
28
#include <hpx/runtime_configuration/runtime_configuration.hpp>
29
#include <hpx/runtime_distributed.hpp>
30
#include <hpx/runtime_distributed/big_boot_barrier.hpp>
31
#include <hpx/runtime_distributed/runtime_fwd.hpp>
32
#include <hpx/serialization/detail/polymorphic_id_factory.hpp>
33
#include <hpx/serialization/vector.hpp>
34
#include <hpx/static_reinit/reinitializable_static.hpp>
35
#include <hpx/timing/high_resolution_clock.hpp>
36
#include <hpx/topology/topology.hpp>
37
#include <hpx/util/from_string.hpp>
38

39
#include <cstddef>
40
#include <cstdint>
41
#include <cstdlib>
42
#include <functional>
43
#include <memory>
44
#include <mutex>
45
#include <random>
46
#include <string>
47
#include <type_traits>
48
#include <utility>
49
#include <vector>
50

51
namespace hpx { namespace detail {
52

53
    std::string get_locality_base_name();
54
}}    // namespace hpx::detail
55

56
namespace hpx { namespace parcelset {
57

58
    // shortcut for get_runtime().get_parcel_handler()
59
    parcelhandler& get_parcel_handler();
60
}}    // namespace hpx::parcelset
61

62
namespace hpx { namespace agas { namespace detail {
63

64
    void register_unassigned_typenames()
452✔
65
    {
66
        // supposed to be run on locality 0 before
67
        // before locality communication
68
        hpx::serialization::detail::id_registry& serialization_registry =
452✔
69
            hpx::serialization::detail::id_registry::instance();
452✔
70

71
        serialization_registry.fill_missing_typenames();
452✔
72

73
        hpx::actions::detail::action_registry& action_registry =
452✔
74
            hpx::actions::detail::action_registry::instance();
452✔
75
        action_registry.fill_missing_typenames();
452✔
76
    }
452✔
77

78
    ///////////////////////////////////////////////////////////////////////////
79
    struct unassigned_typename_sequence
856✔
80
    {
81
        unassigned_typename_sequence() = default;
142✔
82

83
        explicit unassigned_typename_sequence(bool /*dummy*/)
142✔
84
          : serialization_typenames(
142✔
85
                hpx::serialization::detail::id_registry::instance()
142✔
86
                    .get_unassigned_typenames())
142✔
87
          , action_typenames(hpx::actions::detail::action_registry::instance()
142✔
88
                                 .get_unassigned_typenames())
142✔
89
        {
90
        }
142✔
91

92
        void save(hpx::serialization::output_archive& ar, unsigned) const
426✔
93
        {
94
            // part running on worker node
95
            HPX_ASSERT(!action_typenames.empty());
426✔
96
            ar << serialization_typenames;
426✔
97
            ar << action_typenames;
426✔
98
        }
426✔
99

100
        void load(hpx::serialization::input_archive& ar, unsigned)
142✔
101
        {
102
            // part running on locality 0
103
            ar >> serialization_typenames;
142✔
104
            ar >> action_typenames;
142✔
105
        }
142✔
106
        HPX_SERIALIZATION_SPLIT_MEMBER();
568✔
107

108
        std::vector<std::string> serialization_typenames;
109
        std::vector<std::string> action_typenames;
110
    };
111

112
    ///////////////////////////////////////////////////////////////////////////
113
    struct assigned_id_sequence
1,698✔
114
    {
115
        assigned_id_sequence() = default;
142✔
116

117
        explicit assigned_id_sequence(
142✔
118
            unassigned_typename_sequence const& typenames)
119
        {
120
            register_ids_on_main_loc(typenames);
142✔
121
        }
142✔
122

123
        void save(hpx::serialization::output_archive& ar, unsigned) const
425✔
124
        {
125
            HPX_ASSERT(!action_ids.empty());
425✔
126
            ar << serialization_ids;    // part running on locality 0
425✔
127
            ar << action_ids;
425✔
128
        }
425✔
129

130
        void load(hpx::serialization::input_archive& ar, unsigned)
142✔
131
        {
132
            ar >> serialization_ids;    // part running on worker node
142✔
133
            ar >> action_ids;
142✔
134
        }
142✔
135
        HPX_SERIALIZATION_SPLIT_MEMBER();
567✔
136

137
    private:
138
        void register_ids_on_main_loc(
142✔
139
            unassigned_typename_sequence const& unassigned_ids)
140
        {
141
            {
142
                hpx::serialization::detail::id_registry& registry =
142✔
143
                    hpx::serialization::detail::id_registry::instance();
142✔
144
                std::uint32_t max_id = registry.get_max_registered_id();
142✔
145

146
                for (const std::string& s :
142✔
147
                    unassigned_ids.serialization_typenames)
142✔
148
                {
149
                    std::uint32_t id = registry.try_get_id(s);
×
150
                    if (id ==
×
151
                        hpx::serialization::detail::id_registry::invalid_id)
152
                    {
153
                        // this id is not registered yet
154
                        id = ++max_id;
×
155
                        registry.register_typename(s, id);
×
156
                    }
×
157
                    serialization_ids.push_back(id);
×
158
                }
159
            }
160
            {
161
                hpx::actions::detail::action_registry& registry =
142✔
162
                    hpx::actions::detail::action_registry::instance();
142✔
163
                std::uint32_t max_id = registry.max_id_;
142✔
164

165
                for (const std::string& s : unassigned_ids.action_typenames)
9,036✔
166
                {
167
                    std::uint32_t id = registry.try_get_id(s);
8,894✔
168
                    if (id == hpx::actions::detail::action_registry::invalid_id)
8,894✔
169
                    {
170
                        // this id is not registered yet
171
                        id = ++max_id;
×
172
                        registry.register_typename(s, id);
×
173
                    }
×
174
                    action_ids.push_back(id);
8,894✔
175
                }
176
            }
177
        }
142✔
178

179
    public:
180
        void register_ids_on_worker_loc() const
142✔
181
        {
182
            {
183
                hpx::serialization::detail::id_registry& registry =
142✔
184
                    hpx::serialization::detail::id_registry::instance();
142✔
185

186
                // Yes, we look up the unassigned typenames twice, but this allows
187
                // to avoid using globals and protects from race conditions during
188
                // de-serialization.
189
                std::vector<std::string> typenames =
190
                    registry.get_unassigned_typenames();
142✔
191

192
                // we should have received as many ids as we have unassigned names
193
                HPX_ASSERT(typenames.size() == serialization_ids.size());
142✔
194

195
                for (std::size_t k = 0; k < serialization_ids.size(); ++k)
142✔
196
                {
197
                    registry.register_typename(
×
198
                        typenames[k], serialization_ids[k]);
×
199
                }
×
200

201
                // fill in holes which might have been caused by initialization
202
                // order problems
203
                registry.fill_missing_typenames();
142✔
204
            }
142✔
205
            {
206
                hpx::actions::detail::action_registry& registry =
142✔
207
                    hpx::actions::detail::action_registry::instance();
142✔
208

209
                // Yes, we look up the unassigned typenames twice, but this allows
210
                // to avoid using globals and protects from race conditions during
211
                // de-serialization.
212
                std::vector<std::string> typenames =
213
                    registry.get_unassigned_typenames();
142✔
214

215
                // we should have received as many ids as we have unassigned names
216
                HPX_ASSERT(typenames.size() == action_ids.size());
142✔
217

218
                for (std::size_t k = 0; k < action_ids.size(); ++k)
9,036✔
219
                {
220
                    registry.register_typename(typenames[k], action_ids[k]);
8,894✔
221
                }
8,894✔
222

223
                // fill in holes which might have been caused by initialization
224
                // order problems
225
                registry.fill_missing_typenames();
142✔
226
            }
142✔
227
        }
142✔
228

229
        std::vector<std::uint32_t> serialization_ids;
230
        std::vector<std::uint32_t> action_ids;
231
    };
232
}}}    // namespace hpx::agas::detail
233

234
namespace hpx { namespace agas {
235
    template <typename Action, typename... Args>
236
    void big_boot_barrier::apply(std::uint32_t source_locality_id,
283✔
237
        std::uint32_t target_locality_id, parcelset::locality dest, Action act,
238
        Args&&... args)
239
    {    // {{{
240
        HPX_ASSERT(pp);
283✔
241
        naming::address addr(
283✔
242
            naming::get_gid_from_locality_id(target_locality_id));
283✔
243
        parcelset::parcel p(parcelset::detail::create_parcel::call(
283✔
244
            naming::get_gid_from_locality_id(target_locality_id),
283✔
245
            HPX_MOVE(addr), act, HPX_FORWARD(Args, args)...));
283✔
246
#if defined(HPX_HAVE_PARCEL_PROFILING)
247
        if (!p.parcel_id())
248
        {
249
            p.parcel_id() =
250
                parcelset::parcel::generate_unique_id(source_locality_id);
251
        }
252
#else
253
        HPX_UNUSED(source_locality_id);
283✔
254
#endif
255

256
        parcelset::detail::parcel_await_apply(HPX_MOVE(p),
283✔
257
            parcelset::write_handler_type(), 0,
283✔
258
            [this, dest](
1,415✔
259
                parcelset::parcel&& p, parcelset::write_handler_type&&) {
260
                pp->send_early_parcel(dest, HPX_MOVE(p));
283✔
261
            });
283✔
262
    }    // }}}
283✔
263

264
    template <typename Action, typename... Args>
265
    void big_boot_barrier::apply_late(std::uint32_t /* source_locality_id */
1✔
266
        ,
267
        std::uint32_t target_locality_id, parcelset::locality const& /* dest */
268
        ,
269
        Action act, Args&&... args)
270
    {    // {{{
271
        naming::address addr(
1✔
272
            naming::get_gid_from_locality_id(target_locality_id));
1✔
273

274
        parcelset::put_parcel(
1✔
275
            hpx::id_type(naming::get_gid_from_locality_id(target_locality_id),
1✔
276
                hpx::id_type::management_type::unmanaged),
277
            HPX_MOVE(addr), act, HPX_FORWARD(Args, args)...);
1✔
278
    }    // }}}
1✔
279

280
    //typedef components::detail::heap_factory<
281
    //    lcos::detail::promise<
282
    //        response
283
    //      , response
284
    //    >
285
    //  , components::managed_component<
286
    //        lcos::detail::promise<
287
    //            response
288
    //          , response
289
    //        >
290
    //    >
291
    //> response_heap_type;
292

293
    // This structure is used when a locality registers with node zero
294
    // (first round trip)
295
    struct registration_header
572✔
296
    {
297
        registration_header()
142✔
298
          : primary_ns_ptr(nullptr)
142✔
299
          , symbol_ns_ptr(nullptr)
142✔
300
          , cores_needed(0)
142✔
301
          , num_threads(0)
142✔
302
        {
303
        }
142✔
304

305
        // TODO: pass head address as a GVA
306
        registration_header(parcelset::endpoints_type const& endpoints_,
142✔
307
            naming::address_type primary_ns_ptr_,
308
            naming::address_type symbol_ns_ptr_, std::uint32_t cores_needed_,
309
            std::uint32_t num_threads_, std::string const& hostname_,
310
            detail::unassigned_typename_sequence const& typenames_,
311
            naming::gid_type prefix_ = naming::gid_type())
312
          : endpoints(endpoints_)
142✔
313
          , primary_ns_ptr(primary_ns_ptr_)
142✔
314
          , symbol_ns_ptr(symbol_ns_ptr_)
142✔
315
          , cores_needed(cores_needed_)
142✔
316
          , num_threads(num_threads_)
142✔
317
          , hostname(hostname_)
142✔
318
          , typenames(typenames_)
142✔
319
          , prefix(prefix_)
142✔
320
        {
321
        }
142✔
322

323
        parcelset::endpoints_type endpoints;
324
        naming::address_type primary_ns_ptr;
325
        naming::address_type symbol_ns_ptr;
326
        std::uint32_t cores_needed;
327
        std::uint32_t num_threads;
328
        std::string hostname;    // hostname of locality
329
        detail::unassigned_typename_sequence typenames;
330
        naming::gid_type prefix;    // suggested prefix (optional)
331

332
        template <typename Archive>
333
        void serialize(Archive& ar, const unsigned int)
568✔
334
        {
335
            // clang-format off
336
            ar & endpoints;
568✔
337

338
            std::size_t address = reinterpret_cast<std::size_t>(primary_ns_ptr);
568✔
339
            ar & address;
568✔
340
            primary_ns_ptr = reinterpret_cast<naming::address_type>(address);
568✔
341

342
            address = reinterpret_cast<std::size_t>(symbol_ns_ptr);
568✔
343
            ar & address;
568✔
344
            symbol_ns_ptr = reinterpret_cast<naming::address_type>(address);
568✔
345

346
            ar & cores_needed;
568✔
347
            ar & num_threads;
568✔
348
            ar & hostname;
568✔
349
            ar & typenames;
568✔
350
            ar & prefix;
568✔
351
            // clang-format on
352
        }
568✔
353
    };
354

355
    // This structure is used in the response from node zero to the locality which
356
    // is trying to register (first roundtrip).
357
    struct notification_header
1,414✔
358
    {
359
        notification_header()
142✔
360
          : num_localities(0)
142✔
361
          , used_cores(0)
142✔
362
        {
363
        }
142✔
364

365
        notification_header(naming::gid_type const& prefix_,
284✔
366
            parcelset::locality const& agas_locality_,
367
            naming::address const& locality_ns_address_,
368
            naming::address const& primary_ns_address_,
369
            naming::address const& component_ns_address_,
370
            naming::address const& symbol_ns_address_,
371
            std::uint32_t num_localities_, std::uint32_t used_cores_,
372
            parcelset::endpoints_type const& agas_endpoints_,
373
            detail::assigned_id_sequence const& ids_)
374
          : prefix(prefix_)
142✔
375
          , agas_locality(agas_locality_)
142✔
376
          , locality_ns_address(locality_ns_address_)
142✔
377
          , primary_ns_address(primary_ns_address_)
142✔
378
          , component_ns_address(component_ns_address_)
142✔
379
          , symbol_ns_address(symbol_ns_address_)
142✔
380
          , num_localities(num_localities_)
142✔
381
          , used_cores(used_cores_)
142✔
382
          , agas_endpoints(agas_endpoints_)
142✔
383
          , ids(ids_)
142✔
384
        {
385
        }
142✔
386

387
        naming::gid_type prefix;
388
        parcelset::locality agas_locality;
389
        naming::address locality_ns_address;
390
        naming::address primary_ns_address;
391
        naming::address component_ns_address;
392
        naming::address symbol_ns_address;
393
        std::uint32_t num_localities;
394
        std::uint32_t used_cores;
395
        parcelset::endpoints_type agas_endpoints;
396
        detail::assigned_id_sequence ids;
397
        std::vector<parcelset::endpoints_type> endpoints;
398

399
        template <typename Archive>
400
        void serialize(Archive& ar, const unsigned int)
567✔
401
        {
402
            // clang-format off
403
            ar & prefix;
567✔
404
            ar & agas_locality;
567✔
405
            ar & locality_ns_address;
567✔
406
            ar & primary_ns_address;
567✔
407
            ar & component_ns_address;
567✔
408
            ar & symbol_ns_address;
567✔
409
            ar & num_localities;
567✔
410
            ar & used_cores;
567✔
411
            ar & agas_endpoints;
567✔
412
            ar & ids;
567✔
413
            ar & endpoints;
567✔
414
            // clang-format on
415
        }
567✔
416
    };
417

418
    // {{{ early action forwards
419
    void register_worker(registration_header const& header);
420
    void notify_worker(notification_header const& header);
421
    // }}}
422

423
    // {{{ early action types
424
    using register_worker_action =
425
        actions::direct_action<void (*)(registration_header const&),
426
            register_worker>;
427

428
    using notify_worker_action =
429
        actions::direct_action<void (*)(notification_header const&),
430
            notify_worker>;
431
    // }}}
432

433
}}    // namespace hpx::agas
434

435
using hpx::agas::notify_worker_action;
436
using hpx::agas::register_worker_action;
437

438
HPX_ACTION_HAS_CRITICAL_PRIORITY(register_worker_action)
439
HPX_ACTION_HAS_CRITICAL_PRIORITY(notify_worker_action)
440

441
HPX_REGISTER_ACTION_ID(register_worker_action, register_worker_action,
5,350✔
442
    hpx::actions::register_worker_action_id)
443
HPX_REGISTER_ACTION_ID(notify_worker_action, notify_worker_action,
5,347✔
444
    hpx::actions::notify_worker_action_id)
445

446
namespace hpx { namespace agas {
447

448
    // remote call to AGAS
449
    void register_worker(registration_header const& header)
142✔
450
    {
451
        // This lock acquires the bbb mutex on creation. When it goes out of scope,
452
        // its dtor calls big_boot_barrier::notify().
453
        big_boot_barrier::scoped_lock lock(get_big_boot_barrier());
142✔
454

455
        naming::resolver_client& agas_client = naming::get_agas_client();
142✔
456

457
        if (HPX_UNLIKELY(agas_client.is_connecting()))
142✔
458
        {
459
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
×
460
                "agas::register_worker",
461
                "a locality in connect mode cannot be an AGAS server.");
462
        }
463

464
        if (HPX_UNLIKELY(!agas_client.is_bootstrap()))
142✔
465
        {
466
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
×
467
                "agas::register_worker",
468
                "registration parcel received by non-bootstrap locality.");
469
        }
470

471
        naming::gid_type prefix = header.prefix;
142✔
472
        if (prefix != naming::invalid_gid &&
142✔
473
            naming::get_locality_id_from_gid(prefix) == 0)
141✔
474
        {
475
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
×
476
                "agas::register_worker",
477
                "worker node ({}) can't suggest locality_id zero, "
478
                "this is reserved for the console",
479
                header.endpoints);
480
            return;
481
        }
482

483
        if (!agas_client.register_locality(
284✔
484
                header.endpoints, prefix, header.num_threads))
142✔
485
        {
486
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
×
487
                "agas::register_worker",
488
                "attempt to register locality {} more than once",
489
                header.endpoints);
490
            return;
491
        }
492

493
        naming::address locality_addr(agas::get_locality(),
284✔
494
            hpx::components::component_agas_locality_namespace,
495
            agas_client.locality_ns_->ptr());
142✔
496
        naming::address primary_addr(agas::get_locality(),
284✔
497
            hpx::components::component_agas_primary_namespace,
498
            agas_client.primary_ns_.ptr());
142✔
499
        naming::address component_addr(agas::get_locality(),
284✔
500
            hpx::components::component_agas_component_namespace,
501
            agas_client.component_ns_->ptr());
142✔
502
        naming::address symbol_addr(agas::get_locality(),
284✔
503
            hpx::components::component_agas_symbol_namespace,
504
            agas_client.symbol_ns_.ptr());
142✔
505

506
        // assign cores to the new locality
507
        runtime& rt = get_runtime_distributed();
142✔
508
        std::uint32_t first_core =
142✔
509
            rt.assign_cores(header.hostname, header.cores_needed);
142✔
510

511
        big_boot_barrier& bbb = get_big_boot_barrier();
142✔
512

513
        // register all ids
514
        detail::assigned_id_sequence assigned_ids(header.typenames);
142✔
515

516
        notification_header hdr(prefix, bbb.here(), locality_addr, primary_addr,
284✔
517
            component_addr, symbol_addr, rt.get_config().get_num_localities(),
142✔
518
            first_core, bbb.get_endpoints(), assigned_ids);
142✔
519

520
        parcelset::locality dest;
142✔
521
        parcelset::locality here = bbb.here();
142✔
522
        for (parcelset::endpoints_type::value_type const& loc :
142✔
523
            header.endpoints)
142✔
524
        {
525
            if (loc.second.type() == here.type())
142✔
526
            {
527
                dest = loc.second;
142✔
528
                break;
142✔
529
            }
530
        }
531

532
        // collect endpoints from all registering localities
533
        bbb.add_locality_endpoints(
284✔
534
            naming::get_locality_id_from_gid(prefix), header.endpoints);
142✔
535

536
        // TODO: Handle cases where localities try to connect to AGAS while it's
537
        // shutting down.
538
        if (agas_client.get_status() != hpx::state::starting)
142✔
539
        {
540
            // We can just send the parcel now, the connecting locality isn't a part
541
            // of startup synchronization.
542
            get_big_boot_barrier().apply_late(0,
1✔
543
                naming::get_locality_id_from_gid(prefix), dest,
1✔
544
                notify_worker_action(), HPX_MOVE(hdr));
545
        }
1✔
546

547
        else
548
        {
549
            // AGAS is starting up; this locality is participating in startup
550
            // synchronization.
551

552
            // delay the final response until the runtime system is up and running
553
            hpx::move_only_function<void()>* thunk =
141✔
554
                new hpx::move_only_function<void()>(util::one_shot(
282✔
555
                    hpx::bind_front(&big_boot_barrier::apply_notification,
141✔
556
                        &get_big_boot_barrier(), 0,
141✔
557
                        naming::get_locality_id_from_gid(prefix), dest,
141✔
558
                        HPX_MOVE(hdr))));
559
            get_big_boot_barrier().add_thunk(thunk);
141✔
560
        }
561
    }
142✔
562

563
    // AGAS callback to client (first round trip response)
564
    void notify_worker(notification_header const& header)
142✔
565
    {
566
        // This lock acquires the bbb mutex on creation. When it goes out of scope,
567
        // it's dtor calls big_boot_barrier::notify().
568
        big_boot_barrier::scoped_lock lock(get_big_boot_barrier());
142✔
569

570
        // register all ids with this locality
571
        header.ids.register_ids_on_worker_loc();
142✔
572

573
        runtime_distributed& rt = get_runtime_distributed();
142✔
574
        naming::resolver_client& agas_client = naming::get_agas_client();
142✔
575

576
        if (HPX_UNLIKELY(agas_client.get_status() != hpx::state::starting))
142✔
577
        {
578
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
×
579
                "agas::notify_worker", "locality {} has launched early",
580
                rt.here());
581
        }
582

583
        util::runtime_configuration& cfg = rt.get_config();
142✔
584

585
        // set our prefix
586
        agas_client.set_local_locality(header.prefix);
142✔
587
        agas_client.register_console(header.agas_endpoints);
142✔
588
        cfg.parse("assigned locality",
284✔
589
            hpx::util::format("hpx.locality!={1}",
284✔
590
                naming::get_locality_id_from_gid(header.prefix)));
142✔
591

592
        // store the full addresses of the agas servers in our local service
593
        agas_client.component_ns_.reset(new detail::hosted_component_namespace(
142✔
594
            header.component_ns_address));
142✔
595
        agas_client.locality_ns_.reset(
284✔
596
            new detail::hosted_locality_namespace(header.locality_ns_address));
142✔
597
        naming::gid_type const& here = agas::get_locality();
142✔
598

599
        // register runtime support component
600
        naming::gid_type runtime_support_gid(
142✔
601
            header.prefix.get_msb(), rt.get_runtime_support_lva());
142✔
602
        naming::address const runtime_support_address(here,
142✔
603
            components::get_component_type<
142✔
604
                components::server::runtime_support>(),
605
            rt.get_runtime_support_lva());
142✔
606
        agas_client.bind_local(runtime_support_gid, runtime_support_address);
142✔
607

608
        runtime_support_gid.set_lsb(std::uint64_t(0));
142✔
609
        agas_client.bind_local(runtime_support_gid, runtime_support_address);
142✔
610

611
        // Assign the initial parcel gid range to the parcelport.
612
        rt.init_id_pool_range();
142✔
613

614
        // store number of initial localities
615
        cfg.set_num_localities(header.num_localities);
142✔
616

617
        // store number of used cores by other localities
618
        cfg.set_first_used_core(header.used_cores);
142✔
619
        rt.assign_cores();
142✔
620

621
        // pre-cache all known locality endpoints in local AGAS
622
        agas_client.pre_cache_endpoints(header.endpoints);
142✔
623
    }
142✔
624
    // }}}
625

626
    void big_boot_barrier::apply_notification(std::uint32_t source_locality_id,
141✔
627
        std::uint32_t target_locality_id, parcelset::locality const& dest,
628
        notification_header&& hdr)
629
    {
630
        hdr.endpoints = localities;
141✔
631
        apply(source_locality_id, target_locality_id, dest,
282✔
632
            notify_worker_action(), HPX_MOVE(hdr));
141✔
633
    }
141✔
634

635
    void big_boot_barrier::add_locality_endpoints(std::uint32_t locality_id,
594✔
636
        parcelset::endpoints_type const& endpoints_data)
637
    {
638
        if (localities.size() < static_cast<std::size_t>(locality_id) + 1)
594✔
639
            localities.resize(static_cast<std::size_t>(locality_id) + 1);
594✔
640

641
        localities[static_cast<std::size_t>(locality_id)] = endpoints_data;
594✔
642
    }
594✔
643

644
    ///////////////////////////////////////////////////////////////////////////////
645
    void big_boot_barrier::spin()
594✔
646
    {
647
        std::unique_lock<std::mutex> lock(mtx);
594✔
648
        while (connected)
876✔
649
        {
650
            cond.wait(lock);
282✔
651
        }
652

653
        // pre-cache all known locality endpoints in local AGAS on locality 0 as well
654
        if (service_mode::bootstrap == service_type)
594✔
655
        {
656
            naming::resolver_client& agas_client = naming::get_agas_client();
452✔
657
            agas_client.pre_cache_endpoints(localities);
452✔
658
        }
452✔
659
    }
594✔
660

661
    inline std::size_t get_number_of_bootstrap_connections(
594✔
662
        util::runtime_configuration const& ini)
663
    {
664
        service_mode service_type = ini.get_agas_service_mode();
594✔
665
        std::size_t result = 1;
594✔
666

667
        if (service_mode::bootstrap == service_type)
594✔
668
        {
669
            std::size_t num_localities =
452✔
670
                static_cast<std::size_t>(ini.get_num_localities());
452✔
671
            result = num_localities ? num_localities - 1 : 0;
452✔
672
        }
452✔
673

674
        return result;
594✔
675
    }
676

677
    big_boot_barrier::big_boot_barrier(parcelset::parcelport* pp_,
1,188✔
678
        parcelset::endpoints_type const& endpoints_,
679
        util::runtime_configuration const& ini_)
680
      : pp(pp_)
594✔
681
      , endpoints(endpoints_)
594✔
682
      , service_type(ini_.get_agas_service_mode())
594✔
683
      , bootstrap_agas(pp_ ? pp_->agas_locality(ini_) : parcelset::locality())
594✔
684
      , cond()
594✔
685
      , mtx()
594✔
686
      , connected(get_number_of_bootstrap_connections(ini_))
594✔
687
      , thunks(32)
594✔
688
    {
689
        // register all not registered typenames
690
        if (service_type == service_mode::bootstrap)
594✔
691
        {
692
            detail::register_unassigned_typenames();
452✔
693
            // store endpoints of root locality for later
694
            add_locality_endpoints(0, get_endpoints());
452✔
695
        }
452✔
696
    }
594✔
697

698
    void big_boot_barrier::wait_bootstrap()
452✔
699
    {    // {{{
700
        HPX_ASSERT(service_mode::bootstrap == service_type);
452✔
701

702
        // the root just waits until all localities have connected
703
        spin();
452✔
704
    }    // }}}
452✔
705

706
    namespace detail {
707

708
        std::uint32_t get_number_of_pus_in_cores(std::uint32_t num_cores)
×
709
        {
710
            threads::topology& top = threads::create_topology();
×
711

712
            std::uint32_t num_pus = 0;
×
713
            for (std::uint32_t i = 0; i != num_cores; ++i)
×
714
            {
715
                std::uint32_t num_pus_core = static_cast<std::uint32_t>(
×
716
                    top.get_number_of_core_pus(std::size_t(i)));
×
717
                if (num_pus_core == ~std::uint32_t(0))
×
718
                    return num_cores;    // assume one pu per core
×
719

720
                num_pus += num_pus_core;
×
721
            }
×
722

723
            return num_pus;
×
724
        }
×
725
    }    // namespace detail
726

727
    void big_boot_barrier::wait_hosted(std::string const& locality_name,
142✔
728
        naming::address::address_type primary_ns_server,
729
        naming::address::address_type symbol_ns_server)
730
    {    // {{{
731
        HPX_ASSERT(service_mode::bootstrap != service_type);
142✔
732

733
        // any worker sends a request for registration and waits
734
        HPX_ASSERT(nullptr != primary_ns_server);
142✔
735
        HPX_ASSERT(nullptr != symbol_ns_server);
142✔
736

737
        runtime& rt = get_runtime_distributed();
142✔
738

739
        // get the number of cores we need for our locality. This respects the
740
        // affinity description. Cores that are partially used are counted as well
741
        std::uint32_t cores_needed = rt.assign_cores();
142✔
742
        std::uint32_t num_threads =
142✔
743
            std::uint32_t(rt.get_config().get_os_thread_count());
142✔
744

745
        naming::gid_type suggested_prefix;
142✔
746

747
        std::string locality_str =
748
            rt.get_config().get_entry("hpx.locality", "-1");
142✔
749
        if (locality_str != "-1")
142✔
750
        {
751
            suggested_prefix = naming::get_gid_from_locality_id(
141✔
752
                util::from_string<std::uint32_t>(locality_str, -1));
141✔
753
        }
141✔
754

755
        // pre-load all unassigned ids
756
        detail::unassigned_typename_sequence unassigned(true);
142✔
757

758
        // contact the bootstrap AGAS node
759
        registration_header hdr(parcelset::get_parcel_handler().endpoints(),
142✔
760
            primary_ns_server, symbol_ns_server, cores_needed, num_threads,
142✔
761
            locality_name, unassigned, suggested_prefix);
142✔
762

763
        // random first parcel id
764
        apply(static_cast<std::uint32_t>(std::random_device{}()), 0,
284✔
765
            bootstrap_agas, register_worker_action(), HPX_MOVE(hdr));
142✔
766

767
        // wait for registration to be complete
768
        spin();
142✔
769
    }    // }}}
142✔
770

771
    void big_boot_barrier::notify()
284✔
772
    {
773
        naming::resolver_client& agas_client = naming::get_agas_client();
284✔
774

775
        bool notify = false;
284✔
776
        {
777
            std::lock_guard<std::mutex> lk(mtx, std::adopt_lock);
284✔
778
            if (agas_client.get_status() == hpx::state::starting)
284✔
779
            {
780
                --connected;
283✔
781
                if (connected == 0)
283✔
782
                    notify = true;
282✔
783
            }
283✔
784
        }
284✔
785
        if (notify)
284✔
786
            cond.notify_all();
282✔
787
    }
284✔
788

789
    // This is triggered in runtime_impl::start, after the early action handler
790
    // has been replaced by the parcelhandler. We have to delay the notifications
791
    // until this point so that the AGAS locality can come up.
792
    void big_boot_barrier::trigger()
594✔
793
    {
794
        if (service_mode::bootstrap == service_type)
594✔
795
        {
796
            hpx::move_only_function<void()>* p;
797

798
            while (thunks.pop(p))
593✔
799
            {
800
                try
801
                {
802
                    (*p)();
141✔
803
                }
141✔
804
                catch (...)
805
                {
806
                    delete p;
×
807
                    throw;
×
808
                }
×
809
                delete p;
141✔
810
            }
811
        }
452✔
812
    }
594✔
813

814
    void big_boot_barrier::add_thunk(hpx::move_only_function<void()>* f)
141✔
815
    {
816
        std::size_t k = 0;
141✔
817
        while (!thunks.push(f))
141✔
818
        {
819
            // Wait until successfully pushed ...
820
            hpx::util::detail::yield_k(
×
821
                k, "hpx::agas::big_boot_barrier::add_thunk");
×
822
            ++k;
×
823
        }
824
    }
141✔
825

826
    ///////////////////////////////////////////////////////////////////////////////
827
    struct bbb_tag;
828

829
    void create_big_boot_barrier(parcelset::parcelport* pp_,
594✔
830
        parcelset::endpoints_type const& endpoints_,
831
        util::runtime_configuration const& ini_)
832
    {
833
        util::reinitializable_static<std::shared_ptr<big_boot_barrier>, bbb_tag>
834
            bbb;
594✔
835
        if (bbb.get())
594✔
836
        {
837
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
×
838
                "create_big_boot_barrier",
839
                "create_big_boot_barrier was called more than once");
840
        }
841
        bbb.get().reset(new big_boot_barrier(pp_, endpoints_, ini_));
594✔
842
    }
594✔
843

844
    void destroy_big_boot_barrier()
593✔
845
    {
846
        util::reinitializable_static<std::shared_ptr<big_boot_barrier>, bbb_tag>
847
            bbb;
593✔
848
        if (!bbb.get())
593✔
849
        {
850
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
×
851
                "destroy_big_boot_barrier",
852
                "big_boot_barrier has not been created yet");
853
        }
854
        bbb.get().reset();
593✔
855
    }
593✔
856

857
    big_boot_barrier& get_big_boot_barrier()
1,897✔
858
    {
859
        util::reinitializable_static<std::shared_ptr<big_boot_barrier>, bbb_tag>
860
            bbb;
1,897✔
861
        if (!bbb.get())
1,897✔
862
        {
863
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
×
864
                "get_big_boot_barrier",
865
                "big_boot_barrier has not been created yet");
866
        }
867
        return *(bbb.get());
1,897✔
868
    }
×
869

870
}}    // namespace hpx::agas
871

872
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc