• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.41
/libs/full/runtime_distributed/src/big_boot_barrier.cpp
1
//  Copyright (c) 2011 Bryce Lelbach & Katelyn Kufahl
2
//  Copyright (c) 2007-2025 Hartmut Kaiser
3
//  Copyright (c) 2015 Anton Bikineev
4
//
5
//  SPDX-License-Identifier: BSL-1.0
6
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
7
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8

9
#include <hpx/config.hpp>
10

11
#if defined(HPX_HAVE_NETWORKING)
12
#include <hpx/actions_base/actions_base_support.hpp>
13
#include <hpx/actions_base/plain_action.hpp>
14
#include <hpx/agas/addressing_service.hpp>
15
#include <hpx/agas_base/detail/hosted_component_namespace.hpp>
16
#include <hpx/agas_base/detail/hosted_locality_namespace.hpp>
17
#include <hpx/assert.hpp>
18
#include <hpx/async_distributed/put_parcel.hpp>
19
#include <hpx/components_base/agas_interface.hpp>
20
#include <hpx/components_base/server/managed_component_base.hpp>
21
#include <hpx/modules/agas_base.hpp>
22
#include <hpx/modules/errors.hpp>
23
#include <hpx/modules/execution_base.hpp>
24
#include <hpx/modules/format.hpp>
25
#include <hpx/modules/functional.hpp>
26
#include <hpx/modules/runtime_configuration.hpp>
27
#include <hpx/modules/serialization.hpp>
28
#include <hpx/modules/static_reinit.hpp>
29
#include <hpx/modules/timing.hpp>
30
#include <hpx/modules/topology.hpp>
31
#include <hpx/parcelset/detail/parcel_await.hpp>
32
#include <hpx/parcelset_base/parcel_interface.hpp>
33
#include <hpx/parcelset_base/parcelport.hpp>
34
#include <hpx/runtime_distributed.hpp>
35
#include <hpx/runtime_distributed/big_boot_barrier.hpp>
36
#include <hpx/runtime_distributed/runtime_fwd.hpp>
37

38
#include <cstddef>
39
#include <cstdint>
40
#include <cstdlib>
41
#include <functional>
42
#include <memory>
43
#include <mutex>
44
#include <random>
45
#include <string>
46
#include <type_traits>
47
#include <utility>
48
#include <vector>
49

50
#include <hpx/config/warnings_prefix.hpp>
51

52
namespace hpx::parcelset {
53

54
    // shortcut for get_runtime().get_parcel_handler()
55
    parcelhandler& get_parcel_handler();
56
}    // namespace hpx::parcelset
57

58
namespace hpx::agas::detail {
59

60
    void register_unassigned_typenames()
61
    {
62
        // supposed to be run on locality 0 locality communication
63
        hpx::serialization::detail::id_registry& serialization_registry =
64
            hpx::serialization::detail::id_registry::instance();
29✔
65

66
        serialization_registry.fill_missing_typenames();
67

68
        hpx::actions::detail::action_registry& action_registry =
69
            hpx::actions::detail::action_registry::instance();
29✔
70
        action_registry.fill_missing_typenames();
71
    }
29✔
72

73
    ///////////////////////////////////////////////////////////////////////////
74
    struct unassigned_typename_sequence
29✔
75
    {
29✔
76
        unassigned_typename_sequence() = default;
29✔
77

78
        explicit unassigned_typename_sequence(bool /*dummy*/)
79
          : serialization_typenames(
3✔
80
                hpx::serialization::detail::id_registry::instance()
81
                    .get_unassigned_typenames())
82
          , action_typenames(hpx::actions::detail::action_registry::instance()
83
                    .get_unassigned_typenames())
3✔
84
        {
3✔
85
        }
3✔
86

87
        void save(hpx::serialization::output_archive& ar, unsigned) const
3✔
88
        {
89
            // part running on worker node
90
            HPX_ASSERT(!action_typenames.empty());
3✔
91
            ar << serialization_typenames;
92
            ar << action_typenames;
93
        }
94

95
        void load(hpx::serialization::input_archive& ar, unsigned)
96
        {
9✔
97
            // part running on locality 0
9✔
98
            ar >> serialization_typenames;
99
            ar >> action_typenames;
100
        }
101
        HPX_SERIALIZATION_SPLIT_MEMBER();
102

103
        std::vector<std::string> serialization_typenames;
3✔
104
        std::vector<std::string> action_typenames;
3✔
105
    };
106

107
    ///////////////////////////////////////////////////////////////////////////
108
    struct assigned_id_sequence
109
    {
110
        assigned_id_sequence() = default;
111

112
        explicit assigned_id_sequence(
113
            unassigned_typename_sequence const& typenames)
114
        {
115
            register_ids_on_main_loc(typenames);
116
        }
117

3✔
118
        void save(hpx::serialization::output_archive& ar, unsigned) const
119
        {
3✔
120
            HPX_ASSERT(!action_ids.empty());
3✔
121
            ar << serialization_ids;    // part running on locality 0
3✔
122
            ar << action_ids;
123
        }
124

125
        void load(hpx::serialization::input_archive& ar, unsigned)
126
        {
9✔
127
            ar >> serialization_ids;    // part running on worker node
9✔
128
            ar >> action_ids;
129
        }
130
        HPX_SERIALIZATION_SPLIT_MEMBER();
131

132
    private:
3✔
133
        void register_ids_on_main_loc(
3✔
134
            unassigned_typename_sequence const& unassigned_ids)
135
        {
136
            {
137
                hpx::serialization::detail::id_registry& registry =
138
                    hpx::serialization::detail::id_registry::instance();
3✔
139
                std::uint32_t max_id = registry.get_max_registered_id();
140

141
                for (std::string const& s :
142
                    unassigned_ids.serialization_typenames)
143
                {
3✔
144
                    std::uint32_t id = registry.try_get_id(s);
145
                    if (id ==
146
                        hpx::serialization::detail::id_registry::invalid_id)
3✔
147
                    {
3✔
148
                        // this id is not registered yet
149
                        id = ++max_id;
×
150
                        registry.register_typename(s, id);
×
151
                    }
152
                    serialization_ids.push_back(id);
153
                }
154
            }
×
155
            {
×
156
                hpx::actions::detail::action_registry& registry =
157
                    hpx::actions::detail::action_registry::instance();
×
158
                std::uint32_t max_id = registry.max_id_;
159

160
                for (std::string const& s : unassigned_ids.action_typenames)
161
                {
162
                    std::uint32_t id = registry.try_get_id(s);
3✔
163
                    if (id == hpx::actions::detail::action_registry::invalid_id)
3✔
164
                    {
165
                        // this id is not registered yet
120✔
166
                        id = ++max_id;
167
                        registry.register_typename(s, id);
117✔
168
                    }
117✔
169
                    action_ids.push_back(id);
170
                }
171
            }
×
172
        }
×
173

174
    public:
117✔
175
        void register_ids_on_worker_loc() const
176
        {
177
            {
3✔
178
                hpx::serialization::detail::id_registry& registry =
179
                    hpx::serialization::detail::id_registry::instance();
180

3✔
181
                // Yes, we look up the unassigned typenames twice, but this allows
182
                // to avoid using globals and protects from race conditions during
183
                // de-serialization.
184
                std::vector<std::string> typenames =
3✔
185
                    registry.get_unassigned_typenames();
186

187
                // we should have received as many ids as we have unassigned names
188
                HPX_ASSERT(typenames.size() == serialization_ids.size());
189

190
                for (std::size_t k = 0; k < serialization_ids.size(); ++k)
3✔
191
                {
192
                    registry.register_typename(
193
                        typenames[k], serialization_ids[k]);
194
                }
195

3✔
196
                // fill in holes which might have been caused by initialization
197
                // order problems
×
198
                registry.fill_missing_typenames();
199
            }
200
            {
201
                hpx::actions::detail::action_registry& registry =
202
                    hpx::actions::detail::action_registry::instance();
203

3✔
204
                // Yes, we look up the unassigned typenames twice, but this allows
3✔
205
                // to avoid using globals and protects from race conditions during
206
                // de-serialization.
207
                std::vector<std::string> typenames =
3✔
208
                    registry.get_unassigned_typenames();
209

210
                // we should have received as many ids as we have unassigned names
211
                HPX_ASSERT(typenames.size() == action_ids.size());
212

213
                for (std::size_t k = 0; k < action_ids.size(); ++k)
3✔
214
                {
215
                    registry.register_typename(typenames[k], action_ids[k]);
216
                }
217

218
                // fill in holes which might have been caused by initialization
120✔
219
                // order problems
220
                registry.fill_missing_typenames();
117✔
221
            }
222
        }
223

224
        std::vector<std::uint32_t> serialization_ids;
225
        std::vector<std::uint32_t> action_ids;
3✔
226
    };
3✔
227
}    // namespace hpx::agas::detail
3✔
228

229
namespace hpx::agas {
230

231
    template <typename Action, typename... Args>
232
    void big_boot_barrier::apply(std::uint32_t source_locality_id,
233
        std::uint32_t target_locality_id, parcelset::locality dest, Action act,
234
        Args&&... args)
235
    {    // {{{
236
        HPX_ASSERT(pp);
237
        naming::address addr(
3✔
238
            naming::get_gid_from_locality_id(target_locality_id));
239
        parcelset::parcel p(parcelset::detail::create_parcel::call(
240
            naming::get_gid_from_locality_id(target_locality_id),
241
            HPX_MOVE(addr), act, HPX_FORWARD(Args, args)...));
242
#if defined(HPX_HAVE_PARCEL_PROFILING)
243
        if (!p.parcel_id())
244
        {
3✔
245
            p.parcel_id() =
246
                parcelset::parcel::generate_unique_id(source_locality_id);
247
        }
248
#else
249
        HPX_UNUSED(source_locality_id);
250
#endif
251

252
        parcelset::detail::parcel_await_apply(HPX_MOVE(p),
253
            parcelset::write_handler_type(), 0,
254
            [this, dest](
255
                parcelset::parcel&& p, parcelset::write_handler_type&&) {
256
                pp->send_early_parcel(dest, HPX_MOVE(p));
257
            });
3✔
258
    }    // }}}
3✔
259

12✔
260
    template <typename Action, typename... Args>
261
    void big_boot_barrier::apply_late(std::uint32_t /* source_locality_id */
6✔
262
        ,
263
        std::uint32_t target_locality_id, parcelset::locality const& /* dest */
3✔
264
        ,
265
        Action act, Args&&... args)
266
    {    // {{{
×
267
        naming::address addr(
268
            naming::get_gid_from_locality_id(target_locality_id));
269

270
        parcelset::put_parcel(
271
            hpx::id_type(naming::get_gid_from_locality_id(target_locality_id),
272
                hpx::id_type::management_type::unmanaged),
273
            HPX_MOVE(addr), act, HPX_FORWARD(Args, args)...);
274
    }    // }}}
275

×
276
    //typedef components::detail::heap_factory<
×
277
    //    lcos::detail::promise<
278
    //        response
279
    //      , response
×
280
    //    >
281
    //  , components::managed_component<
282
    //        lcos::detail::promise<
283
    //            response
284
    //          , response
285
    //        >
286
    //    >
287
    //> response_heap_type;
288

289
    // This structure is used when a locality registers with node zero
290
    // (first round trip)
291
    struct registration_header
292
    {
293
        registration_header()
294
          : primary_ns_ptr(nullptr)
295
          , symbol_ns_ptr(nullptr)
296
          , cores_needed(0)
297
          , num_threads(0)
298
        {
299
        }
×
300

3✔
301
        // TODO: pass head address as a GVA
3✔
302
        registration_header(parcelset::endpoints_type const& endpoints_,
×
303
            naming::address_type primary_ns_ptr_,
304
            naming::address_type symbol_ns_ptr_, std::uint32_t cores_needed_,
305
            std::uint32_t num_threads_, std::string const& hostname_,
306
            detail::unassigned_typename_sequence const& typenames_,
307
            naming::gid_type prefix_ = naming::gid_type())
3✔
308
          : endpoints(endpoints_)
309
          , primary_ns_ptr(primary_ns_ptr_)
310
          , symbol_ns_ptr(symbol_ns_ptr_)
311
          , cores_needed(cores_needed_)
312
          , num_threads(num_threads_)
313
          , hostname(hostname_)
3✔
314
          , typenames(typenames_)
3✔
315
          , prefix(prefix_)
3✔
316
        {
3✔
317
        }
3✔
318

3✔
319
        parcelset::endpoints_type endpoints;
3✔
320
        naming::address_type primary_ns_ptr;
321
        naming::address_type symbol_ns_ptr;
322
        std::uint32_t cores_needed;
3✔
323
        std::uint32_t num_threads;
324
        std::string hostname;    // hostname of locality
325
        detail::unassigned_typename_sequence typenames;
326
        naming::gid_type prefix;    // suggested prefix (optional)
327

328
        template <typename Archive>
329
        void serialize(Archive& ar, unsigned int const)
330
        {
331
            // clang-format off
332
            ar & endpoints;
333

334
            std::size_t address = reinterpret_cast<std::size_t>(primary_ns_ptr);
3✔
335
            ar & address;
336
            primary_ns_ptr = reinterpret_cast<naming::address_type>(address);
337

3✔
338
            address = reinterpret_cast<std::size_t>(symbol_ns_ptr);
339
            ar & address;
9✔
340
            symbol_ns_ptr = reinterpret_cast<naming::address_type>(address);
341

3✔
342
            ar & cores_needed;
343
            ar & num_threads;
9✔
344
            ar & hostname;
345
            ar & typenames;
3✔
346
            ar & prefix;
347
            // clang-format on
348
        }
349
    };
3✔
350

3✔
351
    // This structure is used in the response from node zero to the locality which
3✔
352
    // is trying to register (first roundtrip).
353
    struct notification_header
3✔
354
    {
355
        notification_header()
356
          : num_localities(0)
357
          , used_cores(0)
358
        {
359
        }
360

3✔
361
        notification_header(naming::gid_type const& prefix_,
3✔
362
            parcelset::locality const& agas_locality_,
3✔
363
            naming::address const& locality_ns_address_,
364
            naming::address const& primary_ns_address_,
3✔
365
            naming::address const& component_ns_address_,
366
            naming::address const& symbol_ns_address_,
3✔
367
            std::uint32_t num_localities_, std::uint32_t used_cores_,
368
            parcelset::endpoints_type const& agas_endpoints_,
369
            detail::assigned_id_sequence const& ids_)
370
          : prefix(prefix_)
371
          , agas_locality(agas_locality_)
372
          , locality_ns_address(locality_ns_address_)
373
          , primary_ns_address(primary_ns_address_)
374
          , component_ns_address(component_ns_address_)
375
          , symbol_ns_address(symbol_ns_address_)
3✔
376
          , num_localities(num_localities_)
3✔
377
          , used_cores(used_cores_)
378
          , agas_endpoints(agas_endpoints_)
379
          , ids(ids_)
380
        {
381
        }
3✔
382

3✔
383
        naming::gid_type prefix;
384
        parcelset::locality agas_locality;
3✔
385
        naming::address locality_ns_address;
386
        naming::address primary_ns_address;
3✔
387
        naming::address component_ns_address;
388
        naming::address symbol_ns_address;
389
        std::uint32_t num_localities;
390
        std::uint32_t used_cores;
391
        parcelset::endpoints_type agas_endpoints;
392
        detail::assigned_id_sequence ids;
393
        std::vector<parcelset::endpoints_type> endpoints;
394

395
        template <typename Archive>
396
        void serialize(Archive& ar, unsigned int const)
397
        {
398
            // clang-format off
399
            ar & prefix;
400
            ar & agas_locality;
401
            ar & locality_ns_address;
3✔
402
            ar & primary_ns_address;
403
            ar & component_ns_address;
404
            ar & symbol_ns_address;
3✔
405
            ar & num_localities;
3✔
406
            ar & used_cores;
3✔
407
            ar & agas_endpoints;
3✔
408
            ar & ids;
3✔
409
            ar & endpoints;
3✔
410
            // clang-format on
411
        }
412
    };
3✔
413

3✔
414
    // {{{ early action forwards
3✔
415
    void register_worker(registration_header const& header);
416
    void notify_worker(notification_header const& header);
3✔
417
    // }}}
418

419
    // {{{ early action types
420
    using register_worker_action =
421
        actions::direct_action<void (*)(registration_header const&),
422
            register_worker>;
423

424
    using notify_worker_action =
425
        actions::direct_action<void (*)(notification_header const&),
426
            notify_worker>;
427
    // }}}
428
}    // namespace hpx::agas
429

430
using hpx::agas::notify_worker_action;
431
using hpx::agas::register_worker_action;
432

433
HPX_ACTION_HAS_CRITICAL_PRIORITY(register_worker_action)
434
HPX_ACTION_HAS_CRITICAL_PRIORITY(notify_worker_action)
435

436
HPX_REGISTER_ACTION_ID(register_worker_action, register_worker_action,
437
    hpx::actions::register_worker_action_id)
438
HPX_REGISTER_ACTION_ID(notify_worker_action, notify_worker_action,
439
    hpx::actions::notify_worker_action_id)
440

441
namespace hpx::agas {
195✔
442

443
    // remote call to AGAS
195✔
444
    void register_worker(registration_header const& header)
445
    {
446
        // This lock acquires the bbb mutex on creation. When it goes out of scope,
447
        // its dtor calls big_boot_barrier::notify().
448
        big_boot_barrier::scoped_lock lock(get_big_boot_barrier());
449

3✔
450
        agas::addressing_service& agas_client = naming::get_agas_client();
451

452
        if (HPX_UNLIKELY(agas_client.is_connecting()))
453
        {
3✔
454
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
455
                "agas::register_worker",
3✔
456
                "a locality in connect mode cannot be an AGAS server.");
457
        }
3✔
458

459
        if (HPX_UNLIKELY(!agas_client.is_bootstrap()))
×
460
        {
461
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
462
                "agas::register_worker",
463
                "registration parcel received by non-bootstrap locality.");
464
        }
3✔
465

466
        naming::gid_type prefix = header.prefix;
×
467
        if (prefix != naming::invalid_gid &&
468
            naming::get_locality_id_from_gid(prefix) == 0)
469
        {
470
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
471
                "agas::register_worker",
472
                "worker node ({}) can't suggest locality_id zero, "
3✔
473
                "this is reserved for the console",
474
                header.endpoints);
475
            return;
×
476
        }
477

478
        if (!agas_client.register_locality(
479
                header.endpoints, prefix, header.num_threads))
480
        {
481
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
482
                "agas::register_worker",
483
                "attempt to register locality {} more than once",
3✔
484
                header.endpoints);
3✔
485
        }
486

×
487
        naming::address locality_addr(agas::get_locality(),
488
            to_int(components::component_enum_type::agas_locality_namespace),
489
            agas_client.locality_ns_->ptr());
490
        naming::address primary_addr(agas::get_locality(),
491
            to_int(components::component_enum_type::agas_primary_namespace),
492
            agas_client.primary_ns_.ptr());
493
        naming::address component_addr(agas::get_locality(),
494
            to_int(components::component_enum_type::agas_component_namespace),
495
            agas_client.component_ns_->ptr());
3✔
496
        naming::address symbol_addr(agas::get_locality(),
497
            to_int(components::component_enum_type::agas_symbol_namespace),
498
            agas_client.symbol_ns_.ptr());
3✔
499

500
        // assign cores to the new locality
501
        runtime& rt = get_runtime_distributed();
3✔
502
        std::uint32_t first_core =
503
            rt.assign_cores(header.hostname, header.cores_needed);
504

3✔
505
        big_boot_barrier& bbb = get_big_boot_barrier();
506

507
        // register all ids
3✔
508
        detail::assigned_id_sequence assigned_ids(header.typenames);
509

3✔
510
        notification_header hdr(prefix, bbb.here(), locality_addr, primary_addr,
511
            component_addr, symbol_addr, rt.get_config().get_num_localities(),
3✔
512
            first_core, bbb.get_endpoints(), assigned_ids);
513

514
        parcelset::locality dest;
3✔
515
        parcelset::locality here = bbb.here();
516
        for (parcelset::endpoints_type::value_type const& loc :
3✔
517
            header.endpoints)
3✔
518
        {
6✔
519
            if (loc.second.type() == here.type())
520
            {
3✔
521
                dest = loc.second;
522
                break;
3✔
523
            }
3✔
524
        }
525

3✔
526
        // collect endpoints from all registering localities
527
        bbb.add_locality_endpoints(
3✔
528
            naming::get_locality_id_from_gid(prefix), header.endpoints);
529

530
        // TODO: Handle cases where localities try to connect to AGAS while it's
531
        // shutting down.
532
        if (agas_client.get_status() != hpx::state::starting)
533
        {
3✔
534
            // We can just send the parcel now, the connecting locality isn't a part
535
            // of startup synchronization.
536
            get_big_boot_barrier().apply_late(0,
537
                naming::get_locality_id_from_gid(prefix), dest,
538
                notify_worker_action(), HPX_MOVE(hdr));
3✔
539
        }
540

541
        else
542
        {
×
543
            // AGAS is starting up; this locality is participating in startup
544
            // synchronization.
545

546
            // delay the final response until the runtime system is up and running
547
            hpx::move_only_function<void()>* thunk =
548
                new hpx::move_only_function<void()>(util::one_shot(
549
                    hpx::bind_front(&big_boot_barrier::apply_notification,
550
                        &get_big_boot_barrier(), 0,
551
                        naming::get_locality_id_from_gid(prefix), dest,
552
                        HPX_MOVE(hdr))));
553
            get_big_boot_barrier().add_thunk(thunk);
554
        }
3✔
555
    }
3✔
556

3✔
557
    // AGAS callback to client (first round trip response)
558
    void notify_worker(notification_header const& header)
6✔
559
    {
3✔
560
        // This lock acquires the bbb mutex on creation. When it goes out of scope,
561
        // it's dtor calls big_boot_barrier::notify().
3✔
562
        big_boot_barrier::scoped_lock lock(get_big_boot_barrier());
563

564
        // register all ids with this locality
3✔
565
        header.ids.register_ids_on_worker_loc();
566

567
        runtime_distributed& rt = get_runtime_distributed();
568
        agas::addressing_service& agas_client = naming::get_agas_client();
3✔
569

570
        if (HPX_UNLIKELY(agas_client.get_status() != hpx::state::starting))
571
        {
3✔
572
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
573
                "agas::notify_worker", "locality {} has launched early",
3✔
574
                rt.here());
3✔
575
        }
576

3✔
577
        util::runtime_configuration& cfg = rt.get_config();
578

×
579
        // set our prefix
580
        agas_client.set_local_locality(header.prefix);
581
        agas_client.register_console(header.agas_endpoints);
582
        cfg.parse("assigned locality",
583
            hpx::util::format("hpx.locality!={1}",
3✔
584
                naming::get_locality_id_from_gid(header.prefix)));
585

586
        // store the full addresses of the agas servers in our local service
3✔
587
        agas_client.component_ns_.reset(new detail::hosted_component_namespace(
3✔
588
            header.component_ns_address));
9✔
589
        agas_client.locality_ns_.reset(
3✔
590
            new detail::hosted_locality_namespace(header.locality_ns_address));
3✔
591
        naming::gid_type const& here = agas::get_locality();
592

593
        // register runtime support component
594
        naming::gid_type runtime_support_gid(
3✔
595
            header.prefix.get_msb(), rt.get_runtime_support_lva());
596
        naming::address const runtime_support_address(here,
6✔
597
            components::get_component_type<
598
                components::server::runtime_support>(),
599
            rt.get_runtime_support_lva());
600
        agas_client.bind_local(runtime_support_gid, runtime_support_address);
601

3✔
602
        runtime_support_gid.set_lsb(std::uint64_t(0));
603
        agas_client.bind_local(runtime_support_gid, runtime_support_address);
604

605
        // Assign the initial parcel gid range to the parcelport.
3✔
606
        rt.init_id_pool_range();
607

608
        // store number of initial localities
609
        cfg.set_num_localities(header.num_localities);
610

611
        // store number of used cores by other localities
612
        cfg.set_first_used_core(header.used_cores);
3✔
613
        rt.assign_cores();
614

615
        // pre-cache all known locality endpoints in local AGAS
3✔
616
        agas_client.pre_cache_endpoints(header.endpoints);
617
    }
618
    // }}}
3✔
619

3✔
620
    void big_boot_barrier::apply_notification(std::uint32_t source_locality_id,
621
        std::uint32_t target_locality_id, parcelset::locality const& dest,
622
        notification_header&& hdr)
3✔
623
    {
3✔
624
        hdr.endpoints = localities;
625
        apply(source_locality_id, target_locality_id, dest,
626
            notify_worker_action(), HPX_MOVE(hdr));
3✔
627
    }
628

629
    void big_boot_barrier::add_locality_endpoints(std::uint32_t locality_id,
630
        parcelset::endpoints_type const& endpoints_data)
3✔
631
    {
3✔
632
        if (localities.size() < static_cast<std::size_t>(locality_id) + 1)
633
            localities.resize(static_cast<std::size_t>(locality_id) + 1);
3✔
634

635
        localities[static_cast<std::size_t>(locality_id)] = endpoints_data;
32✔
636
    }
637

638
    ///////////////////////////////////////////////////////////////////////////////
32✔
639
    void big_boot_barrier::spin()
32✔
640
    {
641
        std::unique_lock<std::mutex> lock(mtx);
642
        while (connected)
32✔
643
        {
644
            cond.wait(lock);    //-V1089
645
        }
32✔
646

647
        // pre-cache all known locality endpoints in local AGAS on locality 0 as well
32✔
648
        if (service_mode::bootstrap == service_type)
38✔
649
        {
650
            agas::addressing_service& agas_client = naming::get_agas_client();
6✔
651
            agas_client.pre_cache_endpoints(localities);
652
        }
653
    }
654

32✔
655
    inline std::size_t get_number_of_bootstrap_connections(
656
        util::runtime_configuration const& ini)
29✔
657
    {
29✔
658
        service_mode service_type = ini.get_agas_service_mode();
659
        std::size_t result = 1;
32✔
660

661
        if (service_mode::bootstrap == service_type)
32✔
662
        {
663
            std::size_t num_localities =
664
                static_cast<std::size_t>(ini.get_num_localities());
32✔
665
            result = num_localities ? num_localities - 1 : 0;
666
        }
667

32✔
668
        return result;
669
    }
670

29✔
671
    big_boot_barrier::big_boot_barrier(parcelset::parcelport* pp_,
29✔
672
        parcelset::endpoints_type const& endpoints_,
673
        util::runtime_configuration const& ini_)
674
      : pp(pp_)
32✔
675
      , endpoints(endpoints_)
676
      , service_type(ini_.get_agas_service_mode())
677
      , bootstrap_agas(pp_ ? pp_->agas_locality(ini_) : parcelset::locality())
32✔
678
      , cond()
679
      , mtx()
32✔
680
      , connected(get_number_of_bootstrap_connections(ini_))
32✔
681
      , thunks(32)    //-V112
32✔
682
    {
32✔
683
        // register all not registered typenames
32✔
684
        if (service_type == service_mode::bootstrap)
32✔
685
        {
32✔
686
            detail::register_unassigned_typenames();
32✔
687
            // store endpoints of root locality for later
32✔
688
            add_locality_endpoints(0, get_endpoints());
689
        }
690
    }
32✔
691

692
    void big_boot_barrier::wait_bootstrap()
29✔
693
    {    // {{{
694
        HPX_ASSERT(service_mode::bootstrap == service_type);
29✔
695

696
        // the root just waits until all localities have connected
32✔
697
        spin();
698
    }    // }}}
29✔
699

700
    namespace detail {
701

702
        std::uint32_t get_number_of_pus_in_cores(std::uint32_t num_cores)
703
        {
29✔
704
            threads::topology& top = threads::create_topology();
29✔
705

706
            std::uint32_t num_pus = 0;
707
            for (std::uint32_t i = 0; i != num_cores; ++i)
708
            {
×
709
                std::uint32_t num_pus_core = static_cast<std::uint32_t>(
710
                    top.get_number_of_core_pus(std::size_t(i)));
×
711
                if (num_pus_core == ~std::uint32_t(0))
712
                    return num_cores;    // assume one pu per core
713

×
714
                num_pus += num_pus_core;
715
            }
716

×
717
            return num_pus;
×
718
        }
719
    }    // namespace detail
720

×
721
    void big_boot_barrier::wait_hosted(std::string const& locality_name,
722
        naming::address::address_type primary_ns_server,
723
        naming::address::address_type symbol_ns_server)
724
    {    // {{{
725
        HPX_ASSERT(service_mode::bootstrap != service_type);
726

727
        // any worker sends a request for registration and waits
3✔
728
        HPX_ASSERT(nullptr != primary_ns_server);
729
        HPX_ASSERT(nullptr != symbol_ns_server);
730

731
        runtime& rt = get_runtime_distributed();
732

733
        // get the number of cores we need for our locality. This respects the
734
        // affinity description. Cores that are partially used are counted as well
735
        std::uint32_t cores_needed = rt.assign_cores();
736
        std::uint32_t num_threads =
737
            std::uint32_t(rt.get_config().get_os_thread_count());
3✔
738

739
        naming::gid_type suggested_prefix;
740

741
        std::string locality_str =
3✔
742
            rt.get_config().get_entry("hpx.locality", "-1");
743
        if (locality_str != "-1")
3✔
744
        {
745
            suggested_prefix = naming::get_gid_from_locality_id(
746
                util::from_string<std::uint32_t>(locality_str, -1));
747
        }
748

6✔
749
        // pre-load all unassigned ids
3✔
750
        detail::unassigned_typename_sequence unassigned(true);
751

3✔
752
        // contact the bootstrap AGAS node
3✔
753
        registration_header hdr(parcelset::get_parcel_handler().endpoints(),
754
            primary_ns_server, symbol_ns_server, cores_needed, num_threads,
755
            locality_name, unassigned, suggested_prefix);
756

3✔
757
        // random first parcel id
758
        apply(static_cast<std::uint32_t>(std::random_device{}()), 0,
759
            bootstrap_agas, register_worker_action(), HPX_MOVE(hdr));
3✔
760

761
        // wait for registration to be complete
3✔
762
        spin();
763
    }    // }}}
764

9✔
765
    // 26110: Caller failing to hold lock 'this->mtx' before calling function
3✔
766
#if defined(HPX_MSVC)
767
#pragma warning(push)
768
#pragma warning(disable : 26110)
3✔
769
#endif
6✔
770
    void big_boot_barrier::notify()
771
    {
6✔
772
        agas::addressing_service& agas_client = naming::get_agas_client();
773

6✔
774
        bool notify = false;
775
        {
776
            std::lock_guard<std::mutex> lk(mtx, std::adopt_lock);
777
            if (agas_client.get_status() == hpx::state::starting)
6✔
778
            {
6✔
779
                --connected;
780
                if (connected == 0)
6✔
781
                    notify = true;
6✔
782
            }
783
        }
784
        if (notify)
785
            cond.notify_all();
6✔
786
    }
6✔
787
#if defined(HPX_MSVC)
6✔
788
#pragma warning(pop)
789
#endif
790

791
    // This is triggered in runtime_impl::start, after the early action handler
792
    // has been replaced by the parcelhandler. We have to delay the notifications
32✔
793
    // until this point so that the AGAS locality can come up.
794
    void big_boot_barrier::trigger()
32✔
795
    {
796
        if (service_mode::bootstrap == service_type)
797
        {
798
            hpx::move_only_function<void()>* p;
32✔
799

800
            while (thunks.pop(p))
801
            {
802
                try
3✔
803
                {
804
                    (*p)();
×
805
                }
806
                catch (...)
×
807
                {
×
808
                    delete p;
×
809
                    throw;
3✔
810
                }
811
                delete p;
812
            }
32✔
813
        }
814
    }
3✔
815

816
    void big_boot_barrier::add_thunk(hpx::move_only_function<void()>* f)
817
    {
3✔
818
        std::size_t k = 0;
819
        while (!thunks.push(f))
820
        {
821
            // Wait until successfully pushed ...
822
            hpx::util::detail::yield_k(
×
823
                k, "hpx::agas::big_boot_barrier::add_thunk");
824
            ++k;
3✔
825
        }
826
    }
827

828
    ///////////////////////////////////////////////////////////////////////////////
829
    struct bbb_tag;
32✔
830

831
    void create_big_boot_barrier(parcelset::parcelport* pp_,
832
        parcelset::endpoints_type const& endpoints_,
833
        util::runtime_configuration const& ini_)
834
    {
835
        util::reinitializable_static<std::shared_ptr<big_boot_barrier>, bbb_tag>
32✔
836
            bbb;
837
        if (bbb.get())
×
838
        {
839
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
840
                "create_big_boot_barrier",
841
                "create_big_boot_barrier was called more than once");
32✔
842
        }
32✔
843
        bbb.get().reset(new big_boot_barrier(pp_, endpoints_, ini_));
844
    }
32✔
845

846
    void destroy_big_boot_barrier()
847
    {
848
        util::reinitializable_static<std::shared_ptr<big_boot_barrier>, bbb_tag>
32✔
849
            bbb;
850
        if (!bbb.get())
×
851
        {
852
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
853
                "destroy_big_boot_barrier",
854
                "big_boot_barrier has not been created yet");
855
        }
32✔
856
        bbb.get().reset();
857
    }
79✔
858

859
    big_boot_barrier& get_big_boot_barrier()
860
    {
861
        util::reinitializable_static<std::shared_ptr<big_boot_barrier>, bbb_tag>
79✔
862
            bbb;
863
        if (!bbb.get())
×
864
        {
865
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
866
                "get_big_boot_barrier",
867
                "big_boot_barrier has not been created yet");
79✔
868
        }
869
        return *(bbb.get());
870
    }
871
}    // namespace hpx::agas
872

873
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc