• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #853

19 Dec 2022 01:01AM UTC coverage: 86.287% (+0.4%) from 85.912%
#853

push

StellarBot
Merge #6109

6109: Modernize serialization module r=hkaiser a=hkaiser

- flyby separate serialization of Boost types

working towards https://github.com/STEllAR-GROUP/hpx/issues/5497

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

53 of 53 new or added lines in 6 files covered. (100.0%)

173939 of 201582 relevant lines covered (86.29%)

1931657.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.76
/libs/full/runtime_distributed/src/big_boot_barrier.cpp
1
//  Copyright (c) 2011 Bryce Lelbach & Katelyn Kufahl
2
//  Copyright (c) 2007-2021 Hartmut Kaiser
3
//  Copyright (c) 2015 Anton Bikineev
4
//
5
//  SPDX-License-Identifier: BSL-1.0
6
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
7
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8

9
#include <hpx/config.hpp>
10

11
#if defined(HPX_HAVE_NETWORKING)
12
#include <hpx/actions_base/actions_base_support.hpp>
13
#include <hpx/actions_base/plain_action.hpp>
14
#include <hpx/agas/addressing_service.hpp>
15
#include <hpx/agas_base/detail/hosted_component_namespace.hpp>
16
#include <hpx/agas_base/detail/hosted_locality_namespace.hpp>
17
#include <hpx/assert.hpp>
18
#include <hpx/async_distributed/put_parcel.hpp>
19
#include <hpx/components_base/agas_interface.hpp>
20
#include <hpx/components_base/server/managed_component_base.hpp>
21
#include <hpx/execution_base/this_thread.hpp>
22
#include <hpx/functional/bind_front.hpp>
23
#include <hpx/modules/agas_base.hpp>
24
#include <hpx/modules/format.hpp>
25
#include <hpx/parcelset/detail/parcel_await.hpp>
26
#include <hpx/parcelset_base/parcel_interface.hpp>
27
#include <hpx/parcelset_base/parcelport.hpp>
28
#include <hpx/runtime_configuration/runtime_configuration.hpp>
29
#include <hpx/runtime_distributed.hpp>
30
#include <hpx/runtime_distributed/big_boot_barrier.hpp>
31
#include <hpx/runtime_distributed/runtime_fwd.hpp>
32
#include <hpx/serialization/detail/polymorphic_id_factory.hpp>
33
#include <hpx/serialization/vector.hpp>
34
#include <hpx/static_reinit/reinitializable_static.hpp>
35
#include <hpx/timing/high_resolution_clock.hpp>
36
#include <hpx/topology/topology.hpp>
37
#include <hpx/util/from_string.hpp>
38

39
#include <cstddef>
40
#include <cstdint>
41
#include <cstdlib>
42
#include <functional>
43
#include <memory>
44
#include <mutex>
45
#include <random>
46
#include <string>
47
#include <type_traits>
48
#include <utility>
49
#include <vector>
50

51
namespace hpx { namespace detail {
52

53
    std::string get_locality_base_name();
54
}}    // namespace hpx::detail
55

56
namespace hpx { namespace parcelset {
57

58
    // shortcut for get_runtime().get_parcel_handler()
59
    parcelhandler& get_parcel_handler();
60
}}    // namespace hpx::parcelset
61

62
namespace hpx { namespace agas { namespace detail {
63

64
    void register_unassigned_typenames()
451✔
65
    {
66
        // supposed to be run on locality 0 before
67
        // before locality communication
68
        hpx::serialization::detail::id_registry& serialization_registry =
451✔
69
            hpx::serialization::detail::id_registry::instance();
451✔
70

71
        serialization_registry.fill_missing_typenames();
451✔
72

73
        hpx::actions::detail::action_registry& action_registry =
451✔
74
            hpx::actions::detail::action_registry::instance();
451✔
75
        action_registry.fill_missing_typenames();
451✔
76
    }
451✔
77

78
    ///////////////////////////////////////////////////////////////////////////
79
    struct unassigned_typename_sequence
855✔
80
    {
81
        unassigned_typename_sequence() = default;
141✔
82

83
        explicit unassigned_typename_sequence(bool /*dummy*/)
142✔
84
          : serialization_typenames(
142✔
85
                hpx::serialization::detail::id_registry::instance()
142✔
86
                    .get_unassigned_typenames())
142✔
87
          , action_typenames(hpx::actions::detail::action_registry::instance()
142✔
88
                                 .get_unassigned_typenames())
142✔
89
        {
90
        }
142✔
91

92
        void save(hpx::serialization::output_archive& ar, unsigned) const
426✔
93
        {
94
            // part running on worker node
95
            HPX_ASSERT(!action_typenames.empty());
426✔
96
            ar << serialization_typenames;
426✔
97
            ar << action_typenames;
426✔
98
        }
426✔
99

100
        void load(hpx::serialization::input_archive& ar, unsigned)
141✔
101
        {
102
            // part running on locality 0
103
            ar >> serialization_typenames;
141✔
104
            ar >> action_typenames;
141✔
105
        }
141✔
106
        HPX_SERIALIZATION_SPLIT_MEMBER();
567✔
107

108
        std::vector<std::string> serialization_typenames;
109
        std::vector<std::string> action_typenames;
110
    };
111

112
    ///////////////////////////////////////////////////////////////////////////
113
    struct assigned_id_sequence
1,687✔
114
    {
115
        assigned_id_sequence() = default;
142✔
116

117
        explicit assigned_id_sequence(
141✔
118
            unassigned_typename_sequence const& typenames)
119
        {
120
            register_ids_on_main_loc(typenames);
141✔
121
        }
141✔
122

123
        void save(hpx::serialization::output_archive& ar, unsigned) const
422✔
124
        {
125
            HPX_ASSERT(!action_ids.empty());
422✔
126
            ar << serialization_ids;    // part running on locality 0
422✔
127
            ar << action_ids;
422✔
128
        }
422✔
129

130
        void load(hpx::serialization::input_archive& ar, unsigned)
142✔
131
        {
132
            ar >> serialization_ids;    // part running on worker node
142✔
133
            ar >> action_ids;
142✔
134
        }
142✔
135
        HPX_SERIALIZATION_SPLIT_MEMBER();
564✔
136

137
    private:
138
        void register_ids_on_main_loc(
141✔
139
            unassigned_typename_sequence const& unassigned_ids)
140
        {
141
            {
142
                hpx::serialization::detail::id_registry& registry =
141✔
143
                    hpx::serialization::detail::id_registry::instance();
141✔
144
                std::uint32_t max_id = registry.get_max_registered_id();
141✔
145

146
                for (const std::string& s :
141✔
147
                    unassigned_ids.serialization_typenames)
141✔
148
                {
149
                    std::uint32_t id = registry.try_get_id(s);
×
150
                    if (id ==
×
151
                        hpx::serialization::detail::id_registry::invalid_id)
152
                    {
153
                        // this id is not registered yet
154
                        id = ++max_id;
×
155
                        registry.register_typename(s, id);
×
156
                    }
×
157
                    serialization_ids.push_back(id);
×
158
                }
159
            }
160
            {
161
                hpx::actions::detail::action_registry& registry =
141✔
162
                    hpx::actions::detail::action_registry::instance();
141✔
163
                std::uint32_t max_id = registry.max_id_;
141✔
164

165
                for (const std::string& s : unassigned_ids.action_typenames)
8,979✔
166
                {
167
                    std::uint32_t id = registry.try_get_id(s);
8,838✔
168
                    if (id == hpx::actions::detail::action_registry::invalid_id)
8,838✔
169
                    {
170
                        // this id is not registered yet
171
                        id = ++max_id;
×
172
                        registry.register_typename(s, id);
×
173
                    }
×
174
                    action_ids.push_back(id);
8,838✔
175
                }
176
            }
177
        }
141✔
178

179
    public:
180
        void register_ids_on_worker_loc() const
142✔
181
        {
182
            {
183
                hpx::serialization::detail::id_registry& registry =
142✔
184
                    hpx::serialization::detail::id_registry::instance();
142✔
185

186
                // Yes, we look up the unassigned typenames twice, but this allows
187
                // to avoid using globals and protects from race conditions during
188
                // de-serialization.
189
                std::vector<std::string> typenames =
190
                    registry.get_unassigned_typenames();
142✔
191

192
                // we should have received as many ids as we have unassigned names
193
                HPX_ASSERT(typenames.size() == serialization_ids.size());
142✔
194

195
                for (std::size_t k = 0; k < serialization_ids.size(); ++k)
142✔
196
                {
197
                    registry.register_typename(
×
198
                        typenames[k], serialization_ids[k]);
×
199
                }
×
200

201
                // fill in holes which might have been caused by initialization
202
                // order problems
203
                registry.fill_missing_typenames();
142✔
204
            }
142✔
205
            {
206
                hpx::actions::detail::action_registry& registry =
142✔
207
                    hpx::actions::detail::action_registry::instance();
142✔
208

209
                // Yes, we look up the unassigned typenames twice, but this allows
210
                // to avoid using globals and protects from race conditions during
211
                // de-serialization.
212
                std::vector<std::string> typenames =
213
                    registry.get_unassigned_typenames();
142✔
214

215
                // we should have received as many ids as we have unassigned names
216
                HPX_ASSERT(typenames.size() == action_ids.size());
142✔
217

218
                for (std::size_t k = 0; k < action_ids.size(); ++k)
9,036✔
219
                {
220
                    registry.register_typename(typenames[k], action_ids[k]);
8,894✔
221
                }
8,894✔
222

223
                // fill in holes which might have been caused by initialization
224
                // order problems
225
                registry.fill_missing_typenames();
142✔
226
            }
142✔
227
        }
142✔
228

229
        std::vector<std::uint32_t> serialization_ids;
230
        std::vector<std::uint32_t> action_ids;
231
    };
232
}}}    // namespace hpx::agas::detail
233

234
namespace hpx { namespace agas {
235
    template <typename Action, typename... Args>
236
    void big_boot_barrier::apply(std::uint32_t source_locality_id,
282✔
237
        std::uint32_t target_locality_id, parcelset::locality dest, Action act,
238
        Args&&... args)
239
    {    // {{{
240
        HPX_ASSERT(pp);
282✔
241
        naming::address addr(
282✔
242
            naming::get_gid_from_locality_id(target_locality_id));
282✔
243
        parcelset::parcel p(parcelset::detail::create_parcel::call(
282✔
244
            naming::get_gid_from_locality_id(target_locality_id),
282✔
245
            HPX_MOVE(addr), act, HPX_FORWARD(Args, args)...));
282✔
246
#if defined(HPX_HAVE_PARCEL_PROFILING)
247
        if (!p.parcel_id())
248
        {
249
            p.parcel_id() =
250
                parcelset::parcel::generate_unique_id(source_locality_id);
251
        }
252
#else
253
        HPX_UNUSED(source_locality_id);
282✔
254
#endif
255

256
        parcelset::detail::parcel_await_apply(HPX_MOVE(p),
282✔
257
            parcelset::write_handler_type(), 0,
282✔
258
            [this, dest](
1,410✔
259
                parcelset::parcel&& p, parcelset::write_handler_type&&) {
260
                pp->send_early_parcel(dest, HPX_MOVE(p));
282✔
261
            });
282✔
262
    }    // }}}
282✔
263

264
    template <typename Action, typename... Args>
265
    void big_boot_barrier::apply_late(std::uint32_t /* source_locality_id */
1✔
266
        ,
267
        std::uint32_t target_locality_id, parcelset::locality const& /* dest */
268
        ,
269
        Action act, Args&&... args)
270
    {    // {{{
271
        naming::address addr(
1✔
272
            naming::get_gid_from_locality_id(target_locality_id));
1✔
273

274
        parcelset::put_parcel(
1✔
275
            hpx::id_type(naming::get_gid_from_locality_id(target_locality_id),
1✔
276
                hpx::id_type::management_type::unmanaged),
277
            HPX_MOVE(addr), act, HPX_FORWARD(Args, args)...);
1✔
278
    }    // }}}
1✔
279

280
    //typedef components::detail::heap_factory<
281
    //    lcos::detail::promise<
282
    //        response
283
    //      , response
284
    //    >
285
    //  , components::managed_component<
286
    //        lcos::detail::promise<
287
    //            response
288
    //          , response
289
    //        >
290
    //    >
291
    //> response_heap_type;
292

293
    // This structure is used when a locality registers with node zero
294
    // (first round trip)
295
    struct registration_header
571✔
296
    {
297
        registration_header()
141✔
298
          : primary_ns_ptr(nullptr)
141✔
299
          , symbol_ns_ptr(nullptr)
141✔
300
          , cores_needed(0)
141✔
301
          , num_threads(0)
141✔
302
        {
303
        }
141✔
304

305
        // TODO: pass head address as a GVA
306
        registration_header(parcelset::endpoints_type const& endpoints_,
142✔
307
            naming::address_type primary_ns_ptr_,
308
            naming::address_type symbol_ns_ptr_, std::uint32_t cores_needed_,
309
            std::uint32_t num_threads_, std::string const& hostname_,
310
            detail::unassigned_typename_sequence const& typenames_,
311
            naming::gid_type prefix_ = naming::gid_type())
312
          : endpoints(endpoints_)
142✔
313
          , primary_ns_ptr(primary_ns_ptr_)
142✔
314
          , symbol_ns_ptr(symbol_ns_ptr_)
142✔
315
          , cores_needed(cores_needed_)
142✔
316
          , num_threads(num_threads_)
142✔
317
          , hostname(hostname_)
142✔
318
          , typenames(typenames_)
142✔
319
          , prefix(prefix_)
142✔
320
        {
321
        }
142✔
322

323
        parcelset::endpoints_type endpoints;
324
        naming::address_type primary_ns_ptr;
325
        naming::address_type symbol_ns_ptr;
326
        std::uint32_t cores_needed;
327
        std::uint32_t num_threads;
328
        std::string hostname;    // hostname of locality
329
        detail::unassigned_typename_sequence typenames;
330
        naming::gid_type prefix;    // suggested prefix (optional)
331

332
        template <typename Archive>
333
        void serialize(Archive& ar, const unsigned int)
567✔
334
        {
335
            // clang-format off
336
            ar & endpoints;
567✔
337

338
            std::size_t address = reinterpret_cast<std::size_t>(primary_ns_ptr);
567✔
339
            ar & address;
567✔
340
            primary_ns_ptr = reinterpret_cast<naming::address_type>(address);
567✔
341

342
            address = reinterpret_cast<std::size_t>(symbol_ns_ptr);
567✔
343
            ar & address;
567✔
344
            symbol_ns_ptr = reinterpret_cast<naming::address_type>(address);
567✔
345

346
            ar & cores_needed;
567✔
347
            ar & num_threads;
567✔
348
            ar & hostname;
567✔
349
            ar & typenames;
567✔
350
            ar & prefix;
567✔
351
            // clang-format on
352
        }
567✔
353
    };
354

355
    // This structure is used in the response from node zero to the locality which
356
    // is trying to register (first roundtrip).
357
    struct notification_header
1,405✔
358
    {
359
        notification_header()
142✔
360
          : num_localities(0)
142✔
361
          , used_cores(0)
142✔
362
        {
363
        }
142✔
364

365
        notification_header(naming::gid_type const& prefix_,
282✔
366
            parcelset::locality const& agas_locality_,
367
            naming::address const& locality_ns_address_,
368
            naming::address const& primary_ns_address_,
369
            naming::address const& component_ns_address_,
370
            naming::address const& symbol_ns_address_,
371
            std::uint32_t num_localities_, std::uint32_t used_cores_,
372
            parcelset::endpoints_type const& agas_endpoints_,
373
            detail::assigned_id_sequence const& ids_)
374
          : prefix(prefix_)
141✔
375
          , agas_locality(agas_locality_)
141✔
376
          , locality_ns_address(locality_ns_address_)
141✔
377
          , primary_ns_address(primary_ns_address_)
141✔
378
          , component_ns_address(component_ns_address_)
141✔
379
          , symbol_ns_address(symbol_ns_address_)
141✔
380
          , num_localities(num_localities_)
141✔
381
          , used_cores(used_cores_)
141✔
382
          , agas_endpoints(agas_endpoints_)
141✔
383
          , ids(ids_)
141✔
384
        {
385
        }
141✔
386

387
        naming::gid_type prefix;
388
        parcelset::locality agas_locality;
389
        naming::address locality_ns_address;
390
        naming::address primary_ns_address;
391
        naming::address component_ns_address;
392
        naming::address symbol_ns_address;
393
        std::uint32_t num_localities;
394
        std::uint32_t used_cores;
395
        parcelset::endpoints_type agas_endpoints;
396
        detail::assigned_id_sequence ids;
397
        std::vector<parcelset::endpoints_type> endpoints;
398

399
        template <typename Archive>
400
        void serialize(Archive& ar, const unsigned int)
564✔
401
        {
402
            // clang-format off
403
            ar & prefix;
564✔
404
            ar & agas_locality;
564✔
405
            ar & locality_ns_address;
564✔
406
            ar & primary_ns_address;
564✔
407
            ar & component_ns_address;
564✔
408
            ar & symbol_ns_address;
564✔
409
            ar & num_localities;
564✔
410
            ar & used_cores;
564✔
411
            ar & agas_endpoints;
564✔
412
            ar & ids;
564✔
413
            ar & endpoints;
564✔
414
            // clang-format on
415
        }
564✔
416
    };
417

418
    // {{{ early action forwards
419
    void register_worker(registration_header const& header);
420
    void notify_worker(notification_header const& header);
421
    // }}}
422

423
    // {{{ early action types
424
    using register_worker_action =
425
        actions::direct_action<void (*)(registration_header const&),
426
            register_worker>;
427

428
    using notify_worker_action =
429
        actions::direct_action<void (*)(notification_header const&),
430
            notify_worker>;
431
    // }}}
432

433
}}    // namespace hpx::agas
434

435
using hpx::agas::notify_worker_action;
436
using hpx::agas::register_worker_action;
437

438
HPX_ACTION_HAS_CRITICAL_PRIORITY(register_worker_action)
439
HPX_ACTION_HAS_CRITICAL_PRIORITY(notify_worker_action)
440

441
HPX_REGISTER_ACTION_ID(register_worker_action, register_worker_action,
5,345✔
442
    hpx::actions::register_worker_action_id)
443
HPX_REGISTER_ACTION_ID(notify_worker_action, notify_worker_action,
5,339✔
444
    hpx::actions::notify_worker_action_id)
445

446
namespace hpx { namespace agas {
447

448
    // remote call to AGAS
449
    void register_worker(registration_header const& header)
141✔
450
    {
451
        // This lock acquires the bbb mutex on creation. When it goes out of scope,
452
        // its dtor calls big_boot_barrier::notify().
453
        big_boot_barrier::scoped_lock lock(get_big_boot_barrier());
141✔
454

455
        naming::resolver_client& agas_client = naming::get_agas_client();
141✔
456

457
        if (HPX_UNLIKELY(agas_client.is_connecting()))
141✔
458
        {
459
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
×
460
                "agas::register_worker",
461
                "a locality in connect mode cannot be an AGAS server.");
462
        }
463

464
        if (HPX_UNLIKELY(!agas_client.is_bootstrap()))
141✔
465
        {
466
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
×
467
                "agas::register_worker",
468
                "registration parcel received by non-bootstrap locality.");
469
        }
470

471
        naming::gid_type prefix = header.prefix;
141✔
472
        if (prefix != naming::invalid_gid &&
141✔
473
            naming::get_locality_id_from_gid(prefix) == 0)
140✔
474
        {
475
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
×
476
                "agas::register_worker",
477
                "worker node ({}) can't suggest locality_id zero, "
478
                "this is reserved for the console",
479
                header.endpoints);
480
            return;
481
        }
482

483
        if (!agas_client.register_locality(
282✔
484
                header.endpoints, prefix, header.num_threads))
141✔
485
        {
486
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
×
487
                "agas::register_worker",
488
                "attempt to register locality {} more than once",
489
                header.endpoints);
490
            return;
491
        }
492

493
        naming::address locality_addr(agas::get_locality(),
282✔
494
            hpx::components::component_agas_locality_namespace,
495
            agas_client.locality_ns_->ptr());
141✔
496
        naming::address primary_addr(agas::get_locality(),
282✔
497
            hpx::components::component_agas_primary_namespace,
498
            agas_client.primary_ns_.ptr());
141✔
499
        naming::address component_addr(agas::get_locality(),
282✔
500
            hpx::components::component_agas_component_namespace,
501
            agas_client.component_ns_->ptr());
141✔
502
        naming::address symbol_addr(agas::get_locality(),
282✔
503
            hpx::components::component_agas_symbol_namespace,
504
            agas_client.symbol_ns_.ptr());
141✔
505

506
        // assign cores to the new locality
507
        runtime& rt = get_runtime_distributed();
141✔
508
        std::uint32_t first_core =
141✔
509
            rt.assign_cores(header.hostname, header.cores_needed);
141✔
510

511
        big_boot_barrier& bbb = get_big_boot_barrier();
141✔
512

513
        // register all ids
514
        detail::assigned_id_sequence assigned_ids(header.typenames);
141✔
515

516
        notification_header hdr(prefix, bbb.here(), locality_addr, primary_addr,
282✔
517
            component_addr, symbol_addr, rt.get_config().get_num_localities(),
141✔
518
            first_core, bbb.get_endpoints(), assigned_ids);
141✔
519

520
        parcelset::locality dest;
141✔
521
        parcelset::locality here = bbb.here();
141✔
522
        for (parcelset::endpoints_type::value_type const& loc :
141✔
523
            header.endpoints)
141✔
524
        {
525
            if (loc.second.type() == here.type())
141✔
526
            {
527
                dest = loc.second;
141✔
528
                break;
141✔
529
            }
530
        }
531

532
        // collect endpoints from all registering localities
533
        bbb.add_locality_endpoints(
282✔
534
            naming::get_locality_id_from_gid(prefix), header.endpoints);
141✔
535

536
        // TODO: Handle cases where localities try to connect to AGAS while it's
537
        // shutting down.
538
        if (agas_client.get_status() != hpx::state::starting)
141✔
539
        {
540
            // We can just send the parcel now, the connecting locality isn't a part
541
            // of startup synchronization.
542
            get_big_boot_barrier().apply_late(0,
1✔
543
                naming::get_locality_id_from_gid(prefix), dest,
1✔
544
                notify_worker_action(), HPX_MOVE(hdr));
545
        }
1✔
546

547
        else
548
        {
549
            // AGAS is starting up; this locality is participating in startup
550
            // synchronization.
551

552
            // delay the final response until the runtime system is up and running
553
            hpx::move_only_function<void()>* thunk =
140✔
554
                new hpx::move_only_function<void()>(util::one_shot(
280✔
555
                    hpx::bind_front(&big_boot_barrier::apply_notification,
140✔
556
                        &get_big_boot_barrier(), 0,
140✔
557
                        naming::get_locality_id_from_gid(prefix), dest,
140✔
558
                        HPX_MOVE(hdr))));
559
            get_big_boot_barrier().add_thunk(thunk);
140✔
560
        }
561
    }
141✔
562

563
    // AGAS callback to client (first round trip response)
564
    void notify_worker(notification_header const& header)
142✔
565
    {
566
        // This lock acquires the bbb mutex on creation. When it goes out of scope,
567
        // it's dtor calls big_boot_barrier::notify().
568
        big_boot_barrier::scoped_lock lock(get_big_boot_barrier());
142✔
569

570
        // register all ids with this locality
571
        header.ids.register_ids_on_worker_loc();
142✔
572

573
        runtime_distributed& rt = get_runtime_distributed();
142✔
574
        naming::resolver_client& agas_client = naming::get_agas_client();
142✔
575

576
        if (HPX_UNLIKELY(agas_client.get_status() != hpx::state::starting))
142✔
577
        {
578
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
×
579
                "agas::notify_worker", "locality {} has launched early",
580
                rt.here());
581
        }
582

583
        util::runtime_configuration& cfg = rt.get_config();
142✔
584

585
        // set our prefix
586
        agas_client.set_local_locality(header.prefix);
142✔
587
        agas_client.register_console(header.agas_endpoints);
142✔
588
        cfg.parse("assigned locality",
284✔
589
            hpx::util::format("hpx.locality!={1}",
284✔
590
                naming::get_locality_id_from_gid(header.prefix)));
142✔
591

592
        // store the full addresses of the agas servers in our local service
593
        agas_client.component_ns_.reset(new detail::hosted_component_namespace(
142✔
594
            header.component_ns_address));
142✔
595
        agas_client.locality_ns_.reset(
284✔
596
            new detail::hosted_locality_namespace(header.locality_ns_address));
142✔
597
        naming::gid_type const& here = agas::get_locality();
142✔
598

599
        // register runtime support component
600
        naming::gid_type runtime_support_gid(
142✔
601
            header.prefix.get_msb(), rt.get_runtime_support_lva());
142✔
602
        naming::address const runtime_support_address(here,
142✔
603
            components::get_component_type<
142✔
604
                components::server::runtime_support>(),
605
            rt.get_runtime_support_lva());
142✔
606
        agas_client.bind_local(runtime_support_gid, runtime_support_address);
142✔
607

608
        runtime_support_gid.set_lsb(std::uint64_t(0));
142✔
609
        agas_client.bind_local(runtime_support_gid, runtime_support_address);
142✔
610

611
        // Assign the initial parcel gid range to the parcelport.
612
        rt.init_id_pool_range();
142✔
613

614
        // store number of initial localities
615
        cfg.set_num_localities(header.num_localities);
142✔
616

617
        // store number of used cores by other localities
618
        cfg.set_first_used_core(header.used_cores);
142✔
619
        rt.assign_cores();
142✔
620

621
        // pre-cache all known locality endpoints in local AGAS
622
        agas_client.pre_cache_endpoints(header.endpoints);
142✔
623
    }
142✔
624
    // }}}
625

626
    void big_boot_barrier::apply_notification(std::uint32_t source_locality_id,
140✔
627
        std::uint32_t target_locality_id, parcelset::locality const& dest,
628
        notification_header&& hdr)
629
    {
630
        hdr.endpoints = localities;
140✔
631
        apply(source_locality_id, target_locality_id, dest,
280✔
632
            notify_worker_action(), HPX_MOVE(hdr));
140✔
633
    }
140✔
634

635
    void big_boot_barrier::add_locality_endpoints(std::uint32_t locality_id,
592✔
636
        parcelset::endpoints_type const& endpoints_data)
637
    {
638
        if (localities.size() < static_cast<std::size_t>(locality_id) + 1)
592✔
639
            localities.resize(static_cast<std::size_t>(locality_id) + 1);
591✔
640

641
        localities[static_cast<std::size_t>(locality_id)] = endpoints_data;
592✔
642
    }
592✔
643

644
    ///////////////////////////////////////////////////////////////////////////////
645
    void big_boot_barrier::spin()
593✔
646
    {
647
        std::unique_lock<std::mutex> lock(mtx);
593✔
648
        while (connected)
873✔
649
        {
650
            cond.wait(lock);
280✔
651
        }
652
        // pre-cache all known locality endpoints in local AGAS on locality 0 as well
653
        if (service_mode_bootstrap == service_type)
593✔
654
        {
655
            naming::resolver_client& agas_client = naming::get_agas_client();
451✔
656
            agas_client.pre_cache_endpoints(localities);
451✔
657
        }
451✔
658
    }
593✔
659

660
    inline std::size_t get_number_of_bootstrap_connections(
593✔
661
        util::runtime_configuration const& ini)
662
    {
663
        service_mode service_type = ini.get_agas_service_mode();
593✔
664
        std::size_t result = 1;
593✔
665

666
        if (service_mode_bootstrap == service_type)
593✔
667
        {
668
            std::size_t num_localities =
451✔
669
                static_cast<std::size_t>(ini.get_num_localities());
451✔
670
            result = num_localities ? num_localities - 1 : 0;
451✔
671
        }
451✔
672

673
        return result;
593✔
674
    }
675

676
    big_boot_barrier::big_boot_barrier(parcelset::parcelport* pp_,
1,186✔
677
        parcelset::endpoints_type const& endpoints_,
678
        util::runtime_configuration const& ini_)
679
      : pp(pp_)
593✔
680
      , endpoints(endpoints_)
593✔
681
      , service_type(ini_.get_agas_service_mode())
593✔
682
      , bootstrap_agas(pp_ ? pp_->agas_locality(ini_) : parcelset::locality())
593✔
683
      , cond()
593✔
684
      , mtx()
593✔
685
      , connected(get_number_of_bootstrap_connections(ini_))
593✔
686
      , thunks(32)
593✔
687
    {
688
        // register all not registered typenames
689
        if (service_type == service_mode_bootstrap)
593✔
690
        {
691
            detail::register_unassigned_typenames();
451✔
692
            // store endpoints of root locality for later
693
            add_locality_endpoints(0, get_endpoints());
451✔
694
        }
451✔
695
    }
593✔
696

697
    void big_boot_barrier::wait_bootstrap()
451✔
698
    {    // {{{
699
        HPX_ASSERT(service_mode_bootstrap == service_type);
451✔
700

701
        // the root just waits until all localities have connected
702
        spin();
451✔
703
    }    // }}}
451✔
704

705
    namespace detail {
706

707
        std::uint32_t get_number_of_pus_in_cores(std::uint32_t num_cores)
×
708
        {
709
            threads::topology& top = threads::create_topology();
×
710

711
            std::uint32_t num_pus = 0;
×
712
            for (std::uint32_t i = 0; i != num_cores; ++i)
×
713
            {
714
                std::uint32_t num_pus_core = static_cast<std::uint32_t>(
×
715
                    top.get_number_of_core_pus(std::size_t(i)));
×
716
                if (num_pus_core == ~std::uint32_t(0))
×
717
                    return num_cores;    // assume one pu per core
×
718

719
                num_pus += num_pus_core;
×
720
            }
×
721

722
            return num_pus;
×
723
        }
×
724
    }    // namespace detail
725

726
    void big_boot_barrier::wait_hosted(std::string const& locality_name,
142✔
727
        naming::address::address_type primary_ns_server,
728
        naming::address::address_type symbol_ns_server)
729
    {    // {{{
730
        HPX_ASSERT(service_mode_bootstrap != service_type);
142✔
731

732
        // any worker sends a request for registration and waits
733
        HPX_ASSERT(nullptr != primary_ns_server);
142✔
734
        HPX_ASSERT(nullptr != symbol_ns_server);
142✔
735

736
        runtime& rt = get_runtime_distributed();
142✔
737

738
        // get the number of cores we need for our locality. This respects the
739
        // affinity description. Cores that are partially used are counted as well
740
        std::uint32_t cores_needed = rt.assign_cores();
142✔
741
        std::uint32_t num_threads =
142✔
742
            std::uint32_t(rt.get_config().get_os_thread_count());
142✔
743

744
        naming::gid_type suggested_prefix;
142✔
745

746
        std::string locality_str =
747
            rt.get_config().get_entry("hpx.locality", "-1");
142✔
748
        if (locality_str != "-1")
142✔
749
        {
750
            suggested_prefix = naming::get_gid_from_locality_id(
141✔
751
                util::from_string<std::uint32_t>(locality_str, -1));
141✔
752
        }
141✔
753

754
        // pre-load all unassigned ids
755
        detail::unassigned_typename_sequence unassigned(true);
142✔
756

757
        // contact the bootstrap AGAS node
758
        registration_header hdr(parcelset::get_parcel_handler().endpoints(),
142✔
759
            primary_ns_server, symbol_ns_server, cores_needed, num_threads,
142✔
760
            locality_name, unassigned, suggested_prefix);
142✔
761

762
        // random first parcel id
763
        apply(static_cast<std::uint32_t>(std::random_device{}()), 0,
284✔
764
            bootstrap_agas, register_worker_action(), HPX_MOVE(hdr));
142✔
765

766
        // wait for registration to be complete
767
        spin();
142✔
768
    }    // }}}
142✔
769

770
    void big_boot_barrier::notify()
283✔
771
    {
772
        naming::resolver_client& agas_client = naming::get_agas_client();
283✔
773

774
        bool notify = false;
283✔
775
        {
776
            std::lock_guard<std::mutex> lk(mtx, std::adopt_lock);
283✔
777
            if (agas_client.get_status() == hpx::state::starting)
283✔
778
            {
779
                --connected;
282✔
780
                if (connected == 0)
282✔
781
                    notify = true;
281✔
782
            }
282✔
783
        }
283✔
784
        if (notify)
283✔
785
            cond.notify_all();
281✔
786
    }
283✔
787

788
    // This is triggered in runtime_impl::start, after the early action handler
789
    // has been replaced by the parcelhandler. We have to delay the notifications
790
    // until this point so that the AGAS locality can come up.
791
    void big_boot_barrier::trigger()
593✔
792
    {
793
        if (service_mode_bootstrap == service_type)
593✔
794
        {
795
            hpx::move_only_function<void()>* p;
796

797
            while (thunks.pop(p))
591✔
798
            {
799
                try
800
                {
801
                    (*p)();
140✔
802
                }
140✔
803
                catch (...)
804
                {
805
                    delete p;
×
806
                    throw;
×
807
                }
×
808
                delete p;
140✔
809
            }
810
        }
451✔
811
    }
593✔
812

813
    void big_boot_barrier::add_thunk(hpx::move_only_function<void()>* f)
140✔
814
    {
815
        std::size_t k = 0;
140✔
816
        while (!thunks.push(f))
140✔
817
        {
818
            // Wait until successfully pushed ...
819
            hpx::util::detail::yield_k(
×
820
                k, "hpx::agas::big_boot_barrier::add_thunk");
×
821
            ++k;
×
822
        }
823
    }
140✔
824

825
    ///////////////////////////////////////////////////////////////////////////////
826
    struct bbb_tag;
827

828
    void create_big_boot_barrier(parcelset::parcelport* pp_,
593✔
829
        parcelset::endpoints_type const& endpoints_,
830
        util::runtime_configuration const& ini_)
831
    {
832
        util::reinitializable_static<std::shared_ptr<big_boot_barrier>, bbb_tag>
833
            bbb;
593✔
834
        if (bbb.get())
593✔
835
        {
836
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
×
837
                "create_big_boot_barrier",
838
                "create_big_boot_barrier was called more than once");
839
        }
840
        bbb.get().reset(new big_boot_barrier(pp_, endpoints_, ini_));
593✔
841
    }
593✔
842

843
    void destroy_big_boot_barrier()
591✔
844
    {
845
        util::reinitializable_static<std::shared_ptr<big_boot_barrier>, bbb_tag>
846
            bbb;
591✔
847
        if (!bbb.get())
591✔
848
        {
849
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
×
850
                "destroy_big_boot_barrier",
851
                "big_boot_barrier has not been created yet");
852
        }
853
        bbb.get().reset();
591✔
854
    }
591✔
855

856
    big_boot_barrier& get_big_boot_barrier()
1,891✔
857
    {
858
        util::reinitializable_static<std::shared_ptr<big_boot_barrier>, bbb_tag>
859
            bbb;
1,891✔
860
        if (!bbb.get())
1,891✔
861
        {
862
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
×
863
                "get_big_boot_barrier",
864
                "big_boot_barrier has not been created yet");
865
        }
866
        return *(bbb.get());
1,891✔
867
    }
×
868

869
}}    // namespace hpx::agas
870

871
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc