• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #846

05 Dec 2022 11:16PM UTC coverage: 86.482% (+0.8%) from 85.664%
#846

push

StellarBot
Merge #6093

6093: Replace boost::string_ref with std::string_view r=hkaiser a=hkaiser

Working towards #5497 and #3440

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

14 of 14 new or added lines in 6 files covered. (100.0%)

172759 of 199764 relevant lines covered (86.48%)

1842530.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.39
/libs/full/agas/src/addressing_service.cpp
1
//  Copyright (c) 2011 Bryce Adelstein-Lelbach
2
//  Copyright (c) 2011-2021 Hartmut Kaiser
3
//  Copyright (c) 2016 Parsa Amini
4
//  Copyright (c) 2016 Thomas Heller
5
//
6
//  SPDX-License-Identifier: BSL-1.0
7
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
8
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9

10
#include <hpx/config.hpp>
11
#include <hpx/actions_base/traits/action_priority.hpp>
12
#include <hpx/actions_base/traits/action_was_object_migrated.hpp>
13
#include <hpx/agas/addressing_service.hpp>
14
#include <hpx/agas_base/detail/bootstrap_component_namespace.hpp>
15
#include <hpx/agas_base/detail/bootstrap_locality_namespace.hpp>
16
#include <hpx/assert.hpp>
17
#include <hpx/async_base/launch_policy.hpp>
18
#include <hpx/async_combinators/when_all.hpp>
19
#include <hpx/components_base/traits/component_supports_migration.hpp>
20
#include <hpx/datastructures/detail/dynamic_bitset.hpp>
21
#include <hpx/functional/bind.hpp>
22
#include <hpx/functional/bind_back.hpp>
23
#include <hpx/functional/bind_front.hpp>
24
#include <hpx/lock_registration/detail/register_locks.hpp>
25
#include <hpx/modules/async_distributed.hpp>
26
#include <hpx/modules/errors.hpp>
27
#include <hpx/modules/execution.hpp>
28
#include <hpx/modules/format.hpp>
29
#include <hpx/modules/logging.hpp>
30
#include <hpx/naming/split_gid.hpp>
31
#include <hpx/runtime_configuration/runtime_configuration.hpp>
32
#include <hpx/runtime_local/runtime_local_fwd.hpp>
33
#include <hpx/serialization/serialize.hpp>
34
#include <hpx/serialization/vector.hpp>
35
#include <hpx/thread_support/unlock_guard.hpp>
36
#include <hpx/type_support/unused.hpp>
37
#include <hpx/util/get_entry_as.hpp>
38
#include <hpx/util/insert_checked.hpp>
39

40
#include <cstddef>
41
#include <cstdint>
42
#include <functional>
43
#include <map>
44
#include <memory>
45
#include <mutex>
46
#include <sstream>
47
#include <string>
48
#include <system_error>
49
#include <utility>
50
#include <vector>
51

52
namespace hpx { namespace agas {
53

54
    struct addressing_service::gva_cache_key
185,318✔
55
    {    // {{{ gva_cache_key implementation
56
    private:
57
        using key_type = std::pair<naming::gid_type, naming::gid_type>;
58

59
        key_type key_;
60

61
    public:
62
        gva_cache_key()
184,295✔
63
          : key_()
184,295✔
64
        {
65
        }
184,295✔
66

67
        explicit gva_cache_key(
186,060✔
68
            naming::gid_type const& id, std::uint64_t count = 1)
69
          : key_(naming::detail::get_stripped_gid(id),
372,120✔
70
                naming::detail::get_stripped_gid(id) + (count - 1))
186,060✔
71
        {
72
            HPX_ASSERT(count);
186,060✔
73
        }
186,060✔
74

75
        naming::gid_type get_gid() const
364,150✔
76
        {
77
            return key_.first;
364,150✔
78
        }
79

80
        std::uint64_t get_count() const
1,026✔
81
        {
82
            naming::gid_type const size = key_.second - key_.first;
1,026✔
83
            HPX_ASSERT(size.get_msb() == 0);
1,026✔
84
            return size.get_lsb();
1,026✔
85
        }
86

87
        friend bool operator<(
606,422✔
88
            gva_cache_key const& lhs, gva_cache_key const& rhs)
89
        {
90
            return lhs.key_.second < rhs.key_.first;
606,422✔
91
        }
92

93
        friend bool operator==(
94
            gva_cache_key const& lhs, gva_cache_key const& rhs)
95
        {
96
            // Direct hit
97
            if (lhs.key_ == rhs.key_)
98
            {
99
                return true;
100
            }
101

102
            // Is lhs in rhs?
103
            if (1 == lhs.get_count() && 1 != rhs.get_count())
104
            {
105
                return rhs.key_.first <= lhs.key_.first &&
106
                    lhs.key_.second <= rhs.key_.second;
107
            }
108

109
            // Is rhs in lhs?
110
            else if (1 != lhs.get_count() && 1 == rhs.get_count())
111
            {
112
                return lhs.key_.first <= rhs.key_.first &&
113
                    rhs.key_.second <= lhs.key_.second;
114
            }
115

116
            return false;
117
        }
118
    };    // }}}
119

120
    addressing_service::addressing_service(
1,188✔
121
        util::runtime_configuration const& ini_)
122
      : gva_cache_(new gva_cache_type)
594✔
123
      , console_cache_(naming::invalid_locality_id)
594✔
124
      , max_refcnt_requests_(ini_.get_agas_max_pending_refcnt_requests())
594✔
125
      , refcnt_requests_count_(0)
594✔
126
      , enable_refcnt_caching_(true)
594✔
127
      , refcnt_requests_(new refcnt_requests_type)
594✔
128
      , service_type(ini_.get_agas_service_mode())
594✔
129
      , runtime_type(ini_.mode_)
594✔
130
      , caching_(ini_.get_agas_caching_mode())
594✔
131
      , range_caching_(caching_ ? ini_.get_agas_range_caching_mode() : false)
594✔
132
      , action_priority_(threads::thread_priority::boost)
594✔
133
      , rts_lva_(0)
594✔
134
      , state_(hpx::state::starting)
594✔
135
      , locality_()
594✔
136
    {
137
        if (caching_)
594✔
138
            gva_cache_->reserve(ini_.get_agas_local_cache_size());
594✔
139
    }
594✔
140

141
    void addressing_service::bootstrap(
452✔
142
        parcelset::endpoints_type const& endpoints,
143
        util::runtime_configuration& rtcfg)
144
    {    // {{{
145
        LPROGRESS_;
452✔
146

147
        HPX_ASSERT(is_bootstrap());
452✔
148
        launch_bootstrap(endpoints, rtcfg);
452✔
149
    }    // }}}
452✔
150

151
    void addressing_service::initialize(std::uint64_t rts_lva)
594✔
152
    {    // {{{
153
        rts_lva_ = rts_lva;
594✔
154
        set_status(hpx::state::running);
594✔
155
    }    // }}}
594✔
156

157
    namespace detail {
158

159
        std::uint32_t get_number_of_pus_in_cores(std::uint32_t num_cores);
160
    }
161

162
    void addressing_service::launch_bootstrap(
452✔
163
        parcelset::endpoints_type const& endpoints,
164
        util::runtime_configuration& rtcfg)
165
    {    // {{{
166
        component_ns_.reset(new detail::bootstrap_component_namespace);
452✔
167
        locality_ns_.reset(new detail::bootstrap_locality_namespace(
452✔
168
            reinterpret_cast<server::primary_namespace*>(primary_ns_.ptr())));
452✔
169

170
        naming::gid_type const here =
171
            naming::get_gid_from_locality_id(agas::booststrap_prefix);
452✔
172
        set_local_locality(here);
452✔
173

174
        rtcfg.parse("assigned locality",
904✔
175
            hpx::util::format(
452✔
176
                "hpx.locality!={1}", naming::get_locality_id_from_gid(here)));
452✔
177

178
        std::uint32_t num_threads =
452✔
179
            hpx::util::get_entry_as<std::uint32_t>(rtcfg, "hpx.os_threads", 1u);
452✔
180
        locality_ns_->allocate(endpoints, 0, num_threads, naming::invalid_gid);
452✔
181

182
        register_name("/0/agas/locality#0", here);
452✔
183
        if (is_console())
452✔
184
        {
185
            register_name("/0/locality#console", here);
452✔
186
        }
452✔
187
    }    // }}}
452✔
188

189
    void addressing_service::adjust_local_cache_size(std::size_t cache_size)
×
190
    {    // {{{
191
        // adjust the local AGAS cache size for the number of worker threads and
192
        // create the hierarchy based on the topology
193
        if (caching_)
×
194
        {
195
            std::size_t previous = gva_cache_->size();
×
196
            gva_cache_->reserve(cache_size);
×
197

198
            LAGAS_(info).format(
×
199
                "addressing_service::adjust_local_cache_size, previous size: "
×
200
                "{1}, new size: {2}",
201
                previous, cache_size);
202
        }
×
203
    }    // }}}
×
204

205
    void addressing_service::set_local_locality(naming::gid_type const& g)
594✔
206
    {
207
        locality_ = g;
594✔
208
        primary_ns_.set_local_locality(g);
594✔
209
    }
594✔
210

211
    bool addressing_service::register_locality(
142✔
212
        parcelset::endpoints_type const& endpoints, naming::gid_type& prefix,
213
        std::uint32_t num_threads, error_code& ec)
214
    {    // {{{
215
        try
216
        {
217
            prefix = naming::get_gid_from_locality_id(
142✔
218
                locality_ns_->allocate(endpoints, 0, num_threads, prefix));
142✔
219

220
            {
221
                std::unique_lock<mutex_type> l(resolved_localities_mtx_);
142✔
222
                std::pair<resolved_localities_type::iterator, bool> res =
223
                    resolved_localities_.insert(
142✔
224
                        std::make_pair(prefix, endpoints));
142✔
225

226
                if (!res.second)
142✔
227
                {
228
                    l.unlock();
×
229
                    HPX_THROWS_IF(ec, bad_parameter,
×
230
                        "addressing_service::register_locality",
231
                        "locality insertion failed because of a duplicate");
232
                    return false;
×
233
                }
234
            }
142✔
235

236
            return true;
142✔
237
        }
×
238
        catch (hpx::exception const& e)
239
        {
240
            HPX_RETHROWS_IF(ec, e, "addressing_service::register_locality");
×
241
            return false;
×
242
        }
×
243
    }    // }}}
142✔
244

245
    void addressing_service::register_console(
142✔
246
        parcelset::endpoints_type const& eps)
247
    {
248
        std::lock_guard<mutex_type> l(resolved_localities_mtx_);
142✔
249
        std::pair<resolved_localities_type::iterator, bool> res =
250
            resolved_localities_.insert(
142✔
251
                std::make_pair(naming::get_gid_from_locality_id(0), eps));
142✔
252
        HPX_ASSERT(res.second);
142✔
253
        HPX_UNUSED(res);
142✔
254
    }
142✔
255

256
    bool addressing_service::has_resolved_locality(naming::gid_type const& gid)
×
257
    {    // {{{
258
        std::unique_lock<mutex_type> l(resolved_localities_mtx_);
×
259
        return resolved_localities_.find(gid) != resolved_localities_.end();
×
260
    }    // }}}
×
261

262
    void addressing_service::pre_cache_endpoints(
594✔
263
        std::vector<parcelset::endpoints_type> const& endpoints)
264
    {    // {{{
265
        std::unique_lock<mutex_type> l(resolved_localities_mtx_);
594✔
266
        std::uint32_t locality_id = 0;
594✔
267
        for (parcelset::endpoints_type const& endpoint : endpoints)
1,471✔
268
        {
269
            resolved_localities_.insert(resolved_localities_type::value_type(
1,754✔
270
                naming::get_gid_from_locality_id(locality_id), endpoint));
877✔
271
            ++locality_id;
877✔
272
        }
273
    }    // }}}
594✔
274

275
    parcelset::endpoints_type const& addressing_service::resolve_locality(
466,926✔
276
        naming::gid_type const& gid, error_code& ec)
277
    {    // {{{
278
        std::unique_lock<mutex_type> l(resolved_localities_mtx_);
466,929✔
279
        resolved_localities_type::iterator it = resolved_localities_.find(gid);
466,926✔
280
        if (it == resolved_localities_.end() || it->second.empty())
466,929✔
281
        {
282
            // The locality hasn't been requested to be resolved yet. Do it now.
283
            parcelset::endpoints_type endpoints;
×
284
            {
285
                hpx::unlock_guard<std::unique_lock<mutex_type>> ul(l);
×
286
                endpoints = locality_ns_->resolve_locality(gid);
×
287
                if (endpoints.empty())
×
288
                {
289
                    std::string str = hpx::util::format(
×
290
                        "couldn't resolve the given target locality ({})", gid);
×
291

292
                    l.unlock();
×
293

294
                    HPX_THROWS_IF(ec, bad_parameter,
×
295
                        "addressing_service::resolve_locality", str);
296
                    return resolved_localities_[naming::invalid_gid];
×
297
                }
×
298
            }
×
299

300
            // Search again ... might have been added by a different thread already
301
            it = resolved_localities_.find(gid);
×
302
            if (it == resolved_localities_.end())
×
303
            {
304
                if (HPX_UNLIKELY(!util::insert_checked(
×
305
                        resolved_localities_.insert(
306
                            std::make_pair(gid, endpoints)),
307
                        it)))
308
                {
309
                    l.unlock();
×
310

311
                    HPX_THROWS_IF(ec, internal_server_error,
×
312
                        "addressing_service::resolve_locality",
313
                        "resolved locality insertion failed "
314
                        "due to a locking error or memory corruption");
315
                    return resolved_localities_[naming::invalid_gid];
×
316
                }
317
            }
×
318
            else if (it->second.empty() && !endpoints.empty())
×
319
            {
320
                resolved_localities_[gid] = endpoints;
×
321
            }
×
322
        }
×
323
        return it->second;
466,929✔
324
    }    // }}}
466,929✔
325

326
    // TODO: We need to ensure that the locality isn't unbound while it still holds
327
    // referenced objects.
328
    bool addressing_service::unregister_locality(
594✔
329
        naming::gid_type const& gid, error_code& ec)
330
    {    // {{{
331
        try
332
        {
333
            locality_ns_->free(gid);
594✔
334
            component_ns_->unregister_server_instance(ec);
594✔
335
            symbol_ns_.unregister_server_instance(ec);
594✔
336

337
            remove_resolved_locality(gid);
594✔
338
            return true;
594✔
339
        }
×
340
        catch (hpx::exception const& e)
341
        {
342
            HPX_RETHROWS_IF(ec, e, "addressing_service::unregister_locality");
×
343
            return false;
×
344
        }
×
345
    }    // }}}
594✔
346

347
    void addressing_service::remove_resolved_locality(
595✔
348
        naming::gid_type const& gid)
349
    {
350
        std::lock_guard<mutex_type> l(resolved_localities_mtx_);
595✔
351
        resolved_localities_type::iterator it = resolved_localities_.find(gid);
595✔
352
        if (it != resolved_localities_.end())
595✔
353
            resolved_localities_.erase(it);
594✔
354
    }
595✔
355

356
    bool addressing_service::get_console_locality(
1,415✔
357
        naming::gid_type& prefix, error_code& ec)
358
    {    // {{{
359
        try
360
        {
361
            if (get_status() != hpx::state::running)
1,415✔
362
            {
363
                if (&ec != &throws)
×
364
                    ec = make_success_code();
×
365
                return false;
×
366
            }
367

368
            if (is_console())
1,415✔
369
            {
370
                prefix = get_local_locality();
1,391✔
371
                if (&ec != &throws)
1,391✔
372
                    ec = make_success_code();
×
373
                return true;
1,391✔
374
            }
375

376
            {
377
                std::lock_guard<mutex_type> lock(console_cache_mtx_);
24✔
378

379
                if (console_cache_ != naming::invalid_locality_id)
24✔
380
                {
381
                    prefix = naming::get_gid_from_locality_id(console_cache_);
×
382
                    if (&ec != &throws)
×
383
                        ec = make_success_code();
×
384
                    return true;
×
385
                }
386
            }
24✔
387

388
            std::string key("/0/locality#console");
24✔
389

390
            hpx::id_type resolved_prefix = resolve_name(key);
24✔
391
            if (resolved_prefix != hpx::invalid_id)
24✔
392
            {
393
                std::uint32_t console =
24✔
394
                    naming::get_locality_id_from_id(resolved_prefix);
24✔
395
                prefix = resolved_prefix.get_gid();
24✔
396

397
                {
398
                    std::lock_guard<mutex_type> lock(console_cache_mtx_);
24✔
399
                    if (console_cache_ == naming::invalid_locality_id)
24✔
400
                    {
401
                        console_cache_ = console;
24✔
402
                    }
24✔
403
                    else
404
                    {
405
                        HPX_ASSERT(console_cache_ == console);
×
406
                    }
407
                }
24✔
408

409
                LAGAS_(debug).format(
24✔
410
                    "addressing_server::get_console_locality, caching console "
×
411
                    "locality, prefix({1})",
412
                    console);
413

414
                return true;
24✔
415
            }
416

417
            return false;
×
418
        }
24✔
419
        catch (hpx::exception const& e)
420
        {
421
            HPX_RETHROWS_IF(ec, e, "addressing_service::get_console_locality");
×
422
            return false;
×
423
        }
×
424
    }    // }}}
1,415✔
425

426
    bool addressing_service::get_localities(
1,456✔
427
        std::vector<naming::gid_type>& locality_ids,
428
        components::component_type type, error_code& ec)
429
    {    // {{{ get_locality_ids implementation
430
        try
431
        {
432
            if (type != components::component_invalid)
1,456✔
433
            {
434
                std::vector<std::uint32_t> const p =
435
                    component_ns_->resolve_id(type);
15✔
436

437
                if (p.empty())
15✔
438
                    return false;
×
439

440
                locality_ids.clear();
15✔
441
                for (unsigned int i : p)
45✔
442
                {
443
                    locality_ids.push_back(naming::get_gid_from_locality_id(i));
30✔
444
                }
445

446
                return true;
15✔
447
            }
15✔
448
            else
449
            {
450
                const std::vector<std::uint32_t> p = locality_ns_->localities();
1,441✔
451

452
                if (!p.size())
1,441✔
453
                    return false;
×
454

455
                locality_ids.clear();
1,441✔
456
                for (unsigned int i : p)
3,352✔
457
                {
458
                    locality_ids.push_back(naming::get_gid_from_locality_id(i));
1,911✔
459
                }
460
                return true;
1,441✔
461
            }
1,441✔
462
        }
×
463
        catch (hpx::exception const& e)
464
        {
465
            HPX_RETHROWS_IF(ec, e, "addressing_service::get_locality_ids");
×
466
            return false;
×
467
        }
×
468
    }    // }}}
1,456✔
469

470
    ///////////////////////////////////////////////////////////////////////////
471
    std::uint32_t addressing_service::get_num_localities(
367✔
472
        components::component_type type, error_code& ec) const
473
    {    // {{{ get_num_localities implementation
474
        try
475
        {
476
            if (type == components::component_invalid)
367✔
477
            {
478
                return locality_ns_->get_num_localities();
367✔
479
            }
480

481
            return component_ns_->get_num_localities(type).get();
×
482
        }
×
483
        catch (hpx::exception const& e)
484
        {
485
            HPX_RETHROWS_IF(ec, e, "addressing_service::get_num_localities");
×
486
        }
×
487
        return std::uint32_t(-1);
×
488
    }    // }}}
367✔
489

490
    hpx::future<std::uint32_t> addressing_service::get_num_localities_async(
9✔
491
        components::component_type type) const
492
    {    // {{{ get_num_localities implementation
493
        if (type == components::component_invalid)
9✔
494
        {
495
            return locality_ns_->get_num_localities_async();
9✔
496
        }
497

498
        return component_ns_->get_num_localities(type);
×
499
    }    // }}}
9✔
500

501
    ///////////////////////////////////////////////////////////////////////////
502
    std::uint32_t addressing_service::get_num_overall_threads(
11✔
503
        error_code& ec) const
504
    {    // {{{ get_num_overall_threads implementation
505
        try
506
        {
507
            return locality_ns_->get_num_overall_threads();
11✔
508
        }
×
509
        catch (hpx::exception const& e)
510
        {
511
            HPX_RETHROWS_IF(
×
512
                ec, e, "addressing_service::get_num_overall_threads");
513
        }
×
514
        return std::uint32_t(0);
×
515
    }    // }}}
11✔
516

517
    hpx::future<std::uint32_t>
518
    addressing_service::get_num_overall_threads_async() const
×
519
    {    // {{{
520
        return locality_ns_->get_num_overall_threads_async();
×
521
    }    // }}}
522

523
    std::vector<std::uint32_t> addressing_service::get_num_threads(
×
524
        error_code& ec) const
525
    {    // {{{ get_num_threads implementation
526
        try
527
        {
528
            return locality_ns_->get_num_threads();
×
529
        }
×
530
        catch (hpx::exception const& e)
531
        {
532
            HPX_RETHROWS_IF(ec, e, "addressing_service::get_num_threads");
×
533
        }
×
534
        return std::vector<std::uint32_t>();
×
535
    }    // }}}
×
536

537
    hpx::future<std::vector<std::uint32_t>>
538
    addressing_service::get_num_threads_async() const
×
539
    {    // {{{
540
        return locality_ns_->get_num_threads_async();
×
541
    }    // }}}
542

543
    ///////////////////////////////////////////////////////////////////////////
544
    components::component_type addressing_service::get_component_id(
11,246✔
545
        std::string const& name, error_code& ec)
546
    {    /// {{{
547
        try
548
        {
549
            return component_ns_->bind_name(name);
11,246✔
550
        }
×
551
        catch (hpx::exception const& e)
552
        {
553
            HPX_RETHROWS_IF(ec, e, "addressing_service::get_component_id");
×
554
            return components::component_invalid;
×
555
        }
×
556
    }    // }}}
11,246✔
557

558
    void addressing_service::iterate_types(
×
559
        iterate_types_function_type const& f, error_code& ec)
560
    {    // {{{
561
        try
562
        {
563
            return component_ns_->iterate_types(f);
×
564
        }
×
565
        catch (hpx::exception const& e)
566
        {
567
            HPX_RETHROWS_IF(ec, e, "addressing_service::iterate_types");
×
568
        }
×
569
    }    // }}}
×
570

571
    std::string addressing_service::get_component_type_name(
×
572
        components::component_type id, error_code& ec)
573
    {    // {{{
574
        try
575
        {
576
            return component_ns_->get_component_type_name(id);
×
577
        }
×
578
        catch (hpx::exception const& e)
579
        {
580
            HPX_RETHROWS_IF(ec, e, "addressing_service::iterate_types");
×
581
        }
×
582
        return "<unknown>";
×
583
    }    // }}}
×
584

585
    components::component_type addressing_service::register_factory(
13,493✔
586
        std::uint32_t prefix, std::string const& name, error_code& ec)
587
    {    // {{{
588
        try
589
        {
590
            return component_ns_->bind_prefix(name, prefix);
13,493✔
591
        }
×
592
        catch (hpx::exception const& e)
593
        {
594
            HPX_RETHROWS_IF(ec, e, "addressing_service::register_factory");
×
595
            return components::component_invalid;
×
596
        }
×
597
    }    // }}}
13,493✔
598

599
    ///////////////////////////////////////////////////////////////////////////
600
    bool addressing_service::get_id_range(std::uint64_t count,
3,176✔
601
        naming::gid_type& lower_bound, naming::gid_type& upper_bound,
602
        error_code& ec)
603
    {    // {{{ get_id_range implementation
604
        try
605
        {
606
            // parcelset::endpoints_type() is an obsolete, dummy argument
607

608
            std::pair<naming::gid_type, naming::gid_type> rep(
609
                primary_ns_.allocate(count));
3,176✔
610

611
            if (rep.first == naming::invalid_gid ||
3,176✔
612
                rep.second == naming::invalid_gid)
3,176✔
613
                return false;
×
614

615
            lower_bound = rep.first;
3,175✔
616
            upper_bound = rep.second;
3,175✔
617

618
            return true;
3,175✔
619
        }
×
620
        catch (hpx::exception const& e)
621
        {
622
            HPX_RETHROWS_IF(ec, e, "addressing_service::get_id_range");
×
623
            return false;
×
624
        }
×
625
    }    // }}}
3,175✔
626

627
    bool addressing_service::bind_range_local(naming::gid_type const& lower_id,
5,202✔
628
        std::uint64_t count, naming::address const& baseaddr,
629
        std::uint64_t offset, error_code& ec)
630
    {    // {{{ bind_range implementation
631
        try
632
        {
633
            naming::gid_type const& prefix = baseaddr.locality_;
5,202✔
634

635
            // Create a global virtual address from the legacy calling convention
636
            // parameters
637
            gva const g(
5,202✔
638
                prefix, baseaddr.type_, count, baseaddr.address_, offset);
5,202✔
639

640
            primary_ns_.bind_gid(
10,404✔
641
                g, lower_id, naming::get_locality_from_gid(lower_id));
5,202✔
642

643
            if (range_caching_)
5,202✔
644
            {
645
                // Put the range into the cache.
646
                update_cache_entry(lower_id, g, ec);
5,202✔
647
            }
5,202✔
648
            else
649
            {
650
                // Only put the first GID in the range into the cache
651
                gva const first_g = g.resolve(lower_id, lower_id);
×
652
                update_cache_entry(lower_id, first_g, ec);
×
653
            }
654

655
            if (ec)
5,202✔
656
                return false;
×
657

658
            return true;
5,202✔
659
        }
×
660
        catch (hpx::exception const& e)
661
        {
662
            HPX_RETHROWS_IF(ec, e, "addressing_service::bind_range_local");
×
663
            return false;
×
664
        }
×
665
    }    // }}}
5,202✔
666

667
    bool addressing_service::bind_postproc(
2,028✔
668
        naming::gid_type const& lower_id, gva const& g, future<bool> f)
669
    {
670
        f.get();
2,028✔
671

672
        if (range_caching_)
2,028✔
673
        {
674
            // Put the range into the cache.
675
            update_cache_entry(lower_id, g);
2,028✔
676
        }
2,028✔
677
        else
678
        {
679
            // Only put the first GID in the range into the cache
680
            gva const first_g = g.resolve(lower_id, lower_id);
×
681
            update_cache_entry(lower_id, first_g);
×
682
        }
683

684
        return true;
2,028✔
685
    }
686

687
    hpx::future<bool> addressing_service::bind_range_async(
2,028✔
688
        naming::gid_type const& lower_id, std::uint64_t count,
689
        naming::address const& baseaddr, std::uint64_t offset,
690
        naming::gid_type const& locality)
691
    {
692
        // ask server
693
        naming::gid_type const& prefix = baseaddr.locality_;
2,028✔
694

695
        // Create a global virtual address from the legacy calling convention
696
        // parameters.
697
        gva const g(prefix, baseaddr.type_, count, baseaddr.address_, offset);
2,028✔
698

699
        naming::gid_type id(
700
            naming::detail::get_stripped_gid_except_dont_cache(lower_id));
2,028✔
701

702
        future<bool> f = primary_ns_.bind_gid_async(g, id, locality);
2,028✔
703

704
        return f.then(hpx::launch::sync,
2,028✔
705
            util::one_shot(hpx::bind_front(
2,028✔
706
                &addressing_service::bind_postproc, this, id, g)));
2,028✔
707
    }
2,028✔
708

709
    hpx::future<naming::address> addressing_service::unbind_range_async(
17,251✔
710
        naming::gid_type const& lower_id, std::uint64_t count)
711
    {
712
        return primary_ns_.unbind_gid_async(count, lower_id);
17,251✔
713
    }
714

715
    bool addressing_service::unbind_range_local(
1,812✔
716
        naming::gid_type const& lower_id, std::uint64_t count,
717
        naming::address& addr, error_code& ec)
718
    {    // {{{ unbind_range implementation
719
        try
720
        {
721
            addr = primary_ns_.unbind_gid(count, lower_id);
1,812✔
722

723
            return true;
1,812✔
724
        }
×
725
        catch (hpx::exception const& e)
726
        {
727
            HPX_RETHROWS_IF(ec, e, "addressing_service::unbind_range_local");
×
728
            return false;
×
729
        }
×
730
    }    // }}}
1,812✔
731

732
    /// This function will test whether the given address refers to an object
733
    /// living on the locality of the caller. We rely completely on the local AGAS
734
    /// cache and local AGAS instance, assuming that everything which is not in
735
    /// the cache is not local.
736

737
    bool addressing_service::is_local_address_cached(
1,981,697✔
738
        naming::gid_type const& gid, naming::address& addr, error_code& ec)
739
    {
740
        // Assume non-local operation if the gid is known to have been migrated
741
        naming::gid_type id(
742
            naming::detail::get_stripped_gid_except_dont_cache(gid));
1,981,695✔
743

744
#if defined(HPX_HAVE_NETWORKING)
745
        if (naming::detail::is_migratable(gid))
1,981,695✔
746
        {
747
            std::lock_guard<mutex_type> lock(migrated_objects_mtx_);
29,551✔
748
            if (was_object_migrated_locked(id))
29,550✔
749
            {
750
                if (&ec != &throws)
13,812✔
751
                    ec = make_success_code();
×
752
                return false;
13,812✔
753
            }
754
        }
29,551✔
755
#endif
756

757
        // Try to resolve the address of the GID from the locally available
758
        // information.
759

760
        // NOTE: We do not throw here for a reason; it is perfectly valid for the
761
        // GID to not be found in the cache.
762
        if (!resolve_cached(id, addr, ec) || ec)
1,967,884✔
763
        {
764
            if (ec)
104,132✔
765
                return false;
×
766

767
            // try also the local part of AGAS before giving up
768
            if (!resolve_full_local(id, addr, ec) || ec)
104,132✔
769
                return false;
8,908✔
770
        }
95,224✔
771

772
        return addr.locality_ == get_local_locality();
1,958,968✔
773
    }
1,981,688✔
774

775
    // Return true if at least one address is local.
776
    bool addressing_service::is_local_lva_encoded_address(std::uint64_t msb)
2,018,664✔
777
    {
778
        // NOTE: This should still be migration safe.
779
        return naming::detail::strip_internal_bits_and_component_type_from_gid(
4,037,328✔
780
                   msb) == get_local_locality().get_msb();
4,037,328✔
781
    }
782

783
    bool addressing_service::resolve_locally_known_addresses(
2,018,664✔
784
        naming::gid_type const& id, naming::address& addr)
785
    {
786
        // LVA-encoded GIDs (located on this machine)
787
        std::uint64_t lsb = id.get_lsb();
2,018,660✔
788
        std::uint64_t msb = id.get_msb();
2,018,660✔
789

790
        if (is_local_lva_encoded_address(msb))
2,018,660✔
791
        {
792
            addr.locality_ = get_local_locality();
1,625,596✔
793

794
            // An LSB of 0 references the runtime support component
795
            if (0 == lsb || lsb == rts_lva_)
1,625,590✔
796
            {
797
                HPX_ASSERT(rts_lva_);
1,449,744✔
798

799
                addr.type_ = components::component_runtime_support;
1,449,744✔
800
                addr.address_ =
1,449,744✔
801
                    reinterpret_cast<naming::address::address_type>(rts_lva_);
1,449,744✔
802
                return true;
1,449,744✔
803
            }
804

805
            if (naming::refers_to_local_lva(id))
175,852✔
806
            {
807
                // handle (non-migratable) components located on this locality first
808
                addr.type_ = naming::detail::get_component_type_from_gid(msb);
175,852✔
809
                addr.address_ =
175,852✔
810
                    reinterpret_cast<naming::address::address_type>(lsb);
175,852✔
811
                return true;
175,852✔
812
            }
813
        }
×
814

815
        msb = naming::detail::strip_internal_bits_from_gid(msb);
393,078✔
816

817
        // explicitly resolve localities
818
        if (naming::is_locality(id))
393,078✔
819
        {
820
            addr.locality_ = id;
20,784✔
821
            addr.type_ = components::component_runtime_support;
20,784✔
822
            // addr.address_ will be supplied on the target locality
823
            return true;
20,784✔
824
        }
825

826
        // authoritative AGAS component address resolution
827
        if (agas::locality_ns_msb == msb && agas::locality_ns_lsb == lsb)
372,294✔
828
        {
829
            addr = locality_ns_->addr();
316✔
830
            return true;
316✔
831
        }
832
        if (agas::component_ns_msb == msb && agas::component_ns_lsb == lsb)
371,981✔
833
        {
834
            addr = component_ns_->addr();
7,246✔
835
            return true;
7,246✔
836
        }
837

838
        naming::gid_type dest = naming::get_locality_from_gid(id);
364,739✔
839
        if (agas::primary_ns_lsb == lsb)
364,739✔
840
        {
841
            // primary AGAS service on locality 0?
842
            if (dest == get_local_locality())
46,813✔
843
            {
844
                addr = primary_ns_.addr();
28,039✔
845
            }
28,039✔
846
            // primary AGAS service on any locality
847
            else
848
            {
849
                addr.locality_ = dest;
18,774✔
850
                addr.type_ = hpx::components::component_agas_primary_namespace;
18,774✔
851
                // addr.address_ will be supplied on the target locality
852
            }
853
            return true;
46,813✔
854
        }
855

856
        if (agas::symbol_ns_lsb == lsb)
317,926✔
857
        {
858
            // symbol AGAS service on this locality?
859
            if (dest == get_local_locality())
17,339✔
860
            {
861
                addr = symbol_ns_.addr();
×
862
            }
×
863
            // symbol AGAS service on any locality
864
            else
865
            {
866
                addr.locality_ = dest;
17,339✔
867
                addr.type_ = hpx::components::component_agas_symbol_namespace;
17,339✔
868
                // addr.address_ will be supplied on the target locality
869
            }
870
            return true;
17,339✔
871
        }
872

873
        return false;
300,587✔
874
    }    // }}}
2,018,668✔
875

876
    bool addressing_service::resolve_full_local(
115,732✔
877
        naming::gid_type const& id, naming::address& addr, error_code& ec)
878
    {    // {{{ resolve implementation
879
        try
880
        {
881
            auto rep = primary_ns_.resolve_gid(id);
115,733✔
882

883
            using hpx::get;
884

885
            if (get<0>(rep) == naming::invalid_gid ||
115,733✔
886
                get<2>(rep) == naming::invalid_gid)
99,329✔
887
                return false;
16,404✔
888

889
            // Resolve the gva to the real resolved address (which is just a gva
890
            // with as fully resolved LVA and an offset of zero).
891
            naming::gid_type base_gid = get<0>(rep);
99,329✔
892
            gva const base_gva = get<1>(rep);
99,329✔
893

894
            gva const g = base_gva.resolve(id, base_gid);
99,329✔
895

896
            addr.locality_ = g.prefix;
99,329✔
897
            addr.type_ = g.type;
99,329✔
898
            addr.address_ = g.lva();
99,329✔
899

900
            if (naming::detail::store_in_cache(id))
99,329✔
901
            {
902
                HPX_ASSERT(addr.address_);
86,453✔
903
                if (range_caching_)
86,453✔
904
                {
905
                    // Put the range into the cache.
906
                    update_cache_entry(base_gid, base_gva, ec);
86,453✔
907
                }
86,453✔
908
                else
909
                {
910
                    // Put the fully resolved gva into the cache.
911
                    update_cache_entry(id, g, ec);
×
912
                }
913
            }
86,453✔
914

915
            if (ec)
99,329✔
916
                return false;
×
917

918
            if (&ec != &throws)
99,329✔
919
                ec = make_success_code();
×
920

921
            return true;
99,329✔
922
        }
×
923
        catch (hpx::exception const& e)
924
        {
925
            HPX_RETHROWS_IF(ec, e, "addressing_service::resolve_full_local");
×
926
            return false;
×
927
        }
×
928
    }    // }}}
115,733✔
929

930
    bool addressing_service::resolve_cached(
2,018,659✔
931
        naming::gid_type const& gid, naming::address& addr, error_code& ec)
932
    {    // {{{ resolve_cached implementation
933

934
        naming::gid_type id =
935
            naming::detail::get_stripped_gid_except_dont_cache(gid);
2,018,674✔
936

937
        // special cases
938
        if (resolve_locally_known_addresses(id, addr))
2,018,674✔
939
        {
940
            if (&ec != &throws)
1,718,088✔
941
                ec = make_success_code();
34,330✔
942
            return true;
1,718,088✔
943
        }
944

945
        // If caching is disabled, bail
946
        if (!caching_)
300,587✔
947
        {
948
            if (&ec != &throws)
×
949
                ec = make_success_code();
×
950
            return false;
×
951
        }
952

953
        // don't look at cache if id is marked as non-cache-able
954
        if (!naming::detail::store_in_cache(id))
300,587✔
955
        {
956
            if (&ec != &throws)
29,658✔
957
                ec = make_success_code();
3,232✔
958
            return false;
29,658✔
959
        }
960

961
        // don't look at the cache if the id is locally managed
962
        if (naming::get_locality_id_from_gid(id) ==
541,858✔
963
            naming::get_locality_id_from_gid(locality_))
270,929✔
964
        {
965
            if (&ec != &throws)
85,733✔
966
                ec = make_success_code();
5✔
967
            return false;
85,733✔
968
        }
969

970
        // force routing if target object was migrated
971
        if (naming::detail::is_migratable(id))
185,196✔
972
        {
973
            std::lock_guard<mutex_type> lock(migrated_objects_mtx_);
×
974
            if (was_object_migrated_locked(id))
×
975
            {
976
                if (&ec != &throws)
×
977
                    ec = make_success_code();
×
978
                return false;
×
979
            }
980
        }
×
981

982
        // first look up the requested item in the cache
983
        gva g;
185,196✔
984
        naming::gid_type idbase;
185,196✔
985
        if (get_cache_entry(id, g, idbase, ec))
185,196✔
986
        {
987
            addr.locality_ = g.prefix;
181,562✔
988
            addr.type_ = g.type;
181,562✔
989
            addr.address_ = g.lva(id, idbase);
181,562✔
990

991
            if (&ec != &throws)
181,562✔
992
                ec = make_success_code();
1,476✔
993

994
            return true;
181,562✔
995
        }
996

997
        if (&ec != &throws)
3,634✔
998
            ec = make_success_code();
4✔
999

1000
        LAGAS_(debug).format(
3,634✔
1001
            "addressing_service::resolve_cached, cache miss for address {1}",
×
1002
            id);
1003

1004
        return false;
3,634✔
1005
    }    // }}}
2,018,675✔
1006

1007
    hpx::future<naming::address> addressing_service::resolve_async(
39,050✔
1008
        naming::gid_type const& gid)
1009
    {
1010
        if (!gid)
39,049✔
1011
        {
1012
            HPX_THROW_EXCEPTION(bad_parameter,
×
1013
                "addressing_service::resolve_async", "invalid reference id");
1014
            return make_ready_future(naming::address());
1015
        }
1016

1017
        // Try the cache.
1018
        if (caching_)
39,049✔
1019
        {
1020
            naming::address addr;
39,048✔
1021
            error_code ec;
39,048✔
1022
            if (resolve_cached(gid, addr, ec))
39,049✔
1023
                return make_ready_future(addr);
35,807✔
1024

1025
            if (ec)
3,241✔
1026
            {
1027
                return hpx::make_exceptional_future<naming::address>(
×
1028
                    hpx::detail::access_exception(ec));
×
1029
            }
1030
        }
39,044✔
1031

1032
        // now try the AGAS service
1033
        return resolve_full_async(gid);
3,241✔
1034
    }
39,044✔
1035

1036
    hpx::future<hpx::id_type> addressing_service::get_colocation_id_async(
41✔
1037
        hpx::id_type const& id)
1038
    {
1039
        if (!id)
41✔
1040
        {
1041
            HPX_THROW_EXCEPTION(bad_parameter,
1✔
1042
                "addressing_service::get_colocation_id_async",
1043
                "invalid reference id");
1044
            return make_ready_future(hpx::invalid_id);
1045
        }
1046

1047
        return primary_ns_.colocate(id.get_gid());
40✔
1048
    }
1✔
1049

1050
    ///////////////////////////////////////////////////////////////////////////
1051
    naming::address addressing_service::resolve_full_postproc(
3,241✔
1052
        naming::gid_type const& id, future<primary_namespace::resolved_type> f)
1053
    {
1054
        using hpx::get;
1055

1056
        naming::address addr;
3,241✔
1057

1058
        auto rep = f.get();
3,241✔
1059
        if (get<0>(rep) == naming::invalid_gid ||
3,241✔
1060
            get<2>(rep) == naming::invalid_gid)
3,241✔
1061
        {
1062
            HPX_THROW_EXCEPTION(bad_parameter,
×
1063
                "addressing_service::resolve_full_postproc",
1064
                "could no resolve global id");
1065
            return addr;
1066
        }
1067

1068
        // Resolve the gva to the real resolved address (which is just a gva
1069
        // with as fully resolved LVA and and offset of zero).
1070
        naming::gid_type base_gid = get<0>(rep);
3,241✔
1071
        gva const base_gva = get<1>(rep);
3,241✔
1072

1073
        gva const g = base_gva.resolve(id, base_gid);
3,241✔
1074

1075
        addr.locality_ = g.prefix;
3,241✔
1076
        addr.type_ = g.type;
3,241✔
1077
        addr.address_ = g.lva();
3,241✔
1078

1079
        if (naming::detail::store_in_cache(id))
3,241✔
1080
        {
1081
            if (range_caching_)
9✔
1082
            {
1083
                // Put the range into the cache.
1084
                update_cache_entry(base_gid, base_gva);
9✔
1085
            }
9✔
1086
            else
1087
            {
1088
                // Put the fully resolved gva into the cache.
1089
                update_cache_entry(id, g);
×
1090
            }
1091
        }
9✔
1092

1093
        return addr;
3,241✔
1094
    }
×
1095

1096
    hpx::future<naming::address> addressing_service::resolve_full_async(
3,241✔
1097
        naming::gid_type const& gid)
1098
    {
1099
        if (!gid)
3,241✔
1100
        {
1101
            HPX_THROW_EXCEPTION(bad_parameter,
×
1102
                "addressing_service::resolve_full_async",
1103
                "invalid reference id");
1104
            return make_ready_future(naming::address());
1105
        }
1106

1107
        // ask server
1108
        future<primary_namespace::resolved_type> f =
1109
            primary_ns_.resolve_full(gid);
3,241✔
1110

1111
        return f.then(hpx::launch::sync,
3,241✔
1112
            util::one_shot(hpx::bind_front(
3,241✔
1113
                &addressing_service::resolve_full_postproc, this, gid)));
3,241✔
1114
    }
3,241✔
1115

1116
    ///////////////////////////////////////////////////////////////////////////
1117
    bool addressing_service::resolve_full_local(naming::gid_type const* gids,
×
1118
        naming::address* addrs, std::size_t count,
1119
        hpx::detail::dynamic_bitset<>& locals, error_code& ec)
1120
    {
1121
        locals.resize(count);
×
1122

1123
        try
1124
        {
1125
            using hpx::get;
1126

1127
            // special cases
1128
            for (std::size_t i = 0; i != count; ++i)
×
1129
            {
1130
                if (addrs[i])
×
1131
                {
1132
                    locals.set(i, true);
×
1133
                    continue;
×
1134
                }
1135

1136
                HPX_ASSERT(!locals.test(i));
×
1137

1138
                if (!addrs[i] && !locals.test(i))
×
1139
                {
1140
                    auto rep = primary_ns_.resolve_gid(gids[i]);
×
1141

1142
                    if (get<0>(rep) == naming::invalid_gid ||
×
1143
                        get<2>(rep) == naming::invalid_gid)
×
1144
                        return false;
×
1145
                    // Resolve the gva to the real resolved address (which is
1146
                    // just a gva with as fully resolved LVA and and offset of
1147
                    // zero).
1148
                    naming::gid_type base_gid = get<0>(rep);
×
1149
                    gva const base_gva = get<1>(rep);
×
1150

1151
                    gva const g = base_gva.resolve(gids[i], base_gid);
×
1152

1153
                    naming::address& addr = addrs[i];
×
1154
                    addr.locality_ = g.prefix;
×
1155
                    addr.type_ = g.type;
×
1156
                    addr.address_ = g.lva();
×
1157

1158
                    hpx::error_code ec;
×
1159
                    if (naming::detail::store_in_cache(gids[i]))
×
1160
                    {
1161
                        if (range_caching_)
×
1162
                        {
1163
                            // Put the range into the cache.
1164
                            update_cache_entry(base_gid, base_gva, ec);
×
1165
                        }
×
1166
                        else
1167
                        {
1168
                            // Put the fully resolved gva into the cache.
1169
                            update_cache_entry(gids[i], g, ec);
×
1170
                        }
1171
                    }
×
1172

1173
                    if (ec)
×
1174
                        return false;
×
1175
                }
×
1176
            }
×
1177

1178
            return true;
×
1179
        }
×
1180
        catch (hpx::exception const& e)
1181
        {
1182
            HPX_RETHROWS_IF(ec, e, "addressing_service::resolve_full");
×
1183
            return false;
×
1184
        }
×
1185
    }
×
1186

1187
    bool addressing_service::resolve_cached(naming::gid_type const* gids,
×
1188
        naming::address* addrs, std::size_t count,
1189
        hpx::detail::dynamic_bitset<>& locals, error_code& ec)
1190
    {
1191
        locals.resize(count);
×
1192

1193
        std::size_t resolved = 0;
×
1194
        for (std::size_t i = 0; i != count; ++i)
×
1195
        {
1196
            if (!addrs[i] && !locals.test(i))
×
1197
            {
1198
                bool was_resolved = resolve_cached(gids[i], addrs[i], ec);
×
1199
                if (ec)
×
1200
                    return false;
×
1201
                if (was_resolved)
×
1202
                    ++resolved;
×
1203

1204
                if (addrs[i].locality_ == get_local_locality())
×
1205
                    locals.set(i, true);
×
1206
            }
×
1207

1208
            else if (addrs[i].locality_ == get_local_locality())
×
1209
            {
1210
                ++resolved;
×
1211
                locals.set(i, true);
×
1212
            }
×
1213
        }
×
1214

1215
        return resolved == count;    // returns whether all have been resolved
×
1216
    }
×
1217

1218
#if defined(HPX_HAVE_NETWORKING)
1219
    ///////////////////////////////////////////////////////////////////////////
1220
    void addressing_service::route(parcelset::parcel p,
7,496✔
1221
        hpx::function<void(std::error_code const&, parcelset::parcel const&)>&&
1222
            f,
1223
        threads::thread_priority local_priority)
1224
    {
1225
        if (HPX_UNLIKELY(nullptr == threads::get_self_ptr()))
7,496✔
1226
        {
1227
            // reschedule this call as an HPX thread
1228
            void (addressing_service::*route_ptr)(parcelset::parcel,
×
1229
                hpx::function<void(
1230
                    std::error_code const&, parcelset::parcel const&)>&&,
1231
                threads::thread_priority) = &addressing_service::route;
1232

1233
            threads::thread_init_data data(
×
1234
                threads::make_thread_function_nullary(util::deferred_call(
×
1235
                    route_ptr, this, HPX_MOVE(p), HPX_MOVE(f), local_priority)),
×
1236
                "addressing_service::route", threads::thread_priority::normal,
×
1237
                threads::thread_schedule_hint(),
×
1238
                threads::thread_stacksize::default_,
1239
                threads::thread_schedule_state::pending, true);
1240
            threads::register_thread(data);
×
1241
            return;
1242
        }
×
1243

1244
        primary_ns_.route(HPX_MOVE(p), HPX_MOVE(f));
7,496✔
1245
    }
7,496✔
1246
#endif
1247

1248
    ///////////////////////////////////////////////////////////////////////////
1249
    // The parameter 'compensated_credit' holds the amount of credits to be added
1250
    // to the acknowledged number of credits. The compensated credits are non-zero
1251
    // if there was a pending decref request at the point when the incref was sent.
1252
    // The pending decref was subtracted from the amount of credits to incref.
1253
    std::int64_t addressing_service::synchronize_with_async_incref(
9,567✔
1254
        hpx::future<std::int64_t> fut, hpx::id_type const&,
1255
        std::int64_t compensated_credit)
1256
    {
1257
        return fut.get() + compensated_credit;
9,567✔
1258
    }
1259

1260
    hpx::future<std::int64_t> addressing_service::incref_async(
9,977✔
1261
        naming::gid_type const& id, std::int64_t credit,
1262
        hpx::id_type const& keep_alive)
1263
    {    // {{{ incref implementation
1264
        naming::gid_type raw(naming::detail::get_stripped_gid(id));
9,977✔
1265

1266
        if (HPX_UNLIKELY(nullptr == threads::get_self_ptr()))
9,977✔
1267
        {
1268
            // reschedule this call as an HPX thread
1269
            hpx::future<std::int64_t> (addressing_service::*incref_async_ptr)(
×
1270
                naming::gid_type const&, std::int64_t, hpx::id_type const&) =
1271
                &addressing_service::incref_async;
1272

1273
            return async(incref_async_ptr, this, raw, credit, keep_alive);
×
1274
        }
1275

1276
        if (HPX_UNLIKELY(0 >= credit))
9,977✔
1277
        {
1278
            HPX_THROW_EXCEPTION(bad_parameter,
×
1279
                "addressing_service::incref_async",
1280
                "invalid credit count of {1}", credit);
1281
            return hpx::future<std::int64_t>();
1282
        }
1283

1284
        HPX_ASSERT(keep_alive != hpx::invalid_id);
9,977✔
1285

1286
        using mapping = refcnt_requests_type::value_type;
1287

1288
        // Some examples of calculating the compensated credits below
1289
        //
1290
        //  case   pending   credits   remaining   sent to   compensated
1291
        //  no     decref              decrefs     AGAS      credits
1292
        // ------+---------+---------+------------+--------+-------------
1293
        //   1         0        10        0           0        10
1294
        //   2        10         9        1           0        10
1295
        //   3        10        10        0           0        10
1296
        //   4        10        11        0           1        10
1297

1298
        std::pair<naming::gid_type, std::int64_t> pending_incref;
9,977✔
1299
        bool has_pending_incref = false;
9,977✔
1300
        std::int64_t pending_decrefs = 0;
9,977✔
1301

1302
        {
1303
            std::lock_guard<mutex_type> l(refcnt_requests_mtx_);
9,977✔
1304

1305
            using iterator = refcnt_requests_type::iterator;
1306

1307
            iterator matches = refcnt_requests_->find(raw);
9,977✔
1308
            if (matches != refcnt_requests_->end())
9,977✔
1309
            {
1310
                pending_decrefs = matches->second;
543✔
1311
                matches->second += credit;
543✔
1312

1313
                // Increment requests need to be handled immediately.
1314

1315
                // If the given incref was fully compensated by a pending decref
1316
                // (i.e. match_data is less than 0) then there is no need
1317
                // to do anything more.
1318
                if (matches->second > 0)
543✔
1319
                {
1320
                    // credit > decrefs (case no 4): store the remaining incref to
1321
                    // be handled below.
1322
                    pending_incref = mapping(matches->first, matches->second);
133✔
1323
                    has_pending_incref = true;
133✔
1324

1325
                    refcnt_requests_->erase(matches);
133✔
1326
                }
133✔
1327
                else if (matches->second == 0)
410✔
1328
                {
1329
                    // credit == decref (case no. 3): if the incref offsets any
1330
                    // pending decref, just remove the pending decref request.
1331
                    refcnt_requests_->erase(matches);
337✔
1332
                }
337✔
1333
                else
1334
                {
1335
                    // credit < decref (case no. 2): do nothing
1336
                }
1337
            }
543✔
1338
            else
1339
            {
1340
                // case no. 1
1341
                pending_incref = mapping(raw, credit);
9,434✔
1342
                has_pending_incref = true;
9,434✔
1343
            }
1344
        }
9,977✔
1345

1346
        if (!has_pending_incref)
9,977✔
1347
        {
1348
            // no need to talk to AGAS, acknowledge the incref immediately
1349
            return hpx::make_ready_future(pending_decrefs);
410✔
1350
        }
1351

1352
        naming::gid_type const e_lower = pending_incref.first;
9,567✔
1353

1354
        hpx::future<std::int64_t> f = primary_ns_.increment_credit(
19,134✔
1355
            pending_incref.second, e_lower, e_lower);
9,567✔
1356

1357
        // pass the amount of compensated decrefs to the callback
1358
        using placeholders::_1;
1359
        return f.then(hpx::launch::sync,
9,567✔
1360
            util::one_shot(
9,567✔
1361
                hpx::bind(&addressing_service::synchronize_with_async_incref,
19,134✔
1362
                    this, _1, keep_alive, pending_decrefs)));
9,567✔
1363
    }    // }}}
9,976✔
1364

1365
    ///////////////////////////////////////////////////////////////////////////
1366
    void addressing_service::decref(
99,261✔
1367
        naming::gid_type const& gid, std::int64_t credit, error_code& ec)
1368
    {    // {{{ decref implementation
1369
        naming::gid_type raw(naming::detail::get_stripped_gid(gid));
99,261✔
1370

1371
        if (HPX_UNLIKELY(nullptr == threads::get_self_ptr()))
99,261✔
1372
        {
1373
            // reschedule this call as an HPX thread
1374
            threads::thread_init_data data(
6,866✔
1375
                threads::make_thread_function_nullary(
6,866✔
1376
                    [HPX_CXX20_CAPTURE_THIS(=)]() -> void {
27,464✔
1377
                        return decref(raw, credit, throws);
6,866✔
1378
                    }),
1379
                "addressing_service::decref", threads::thread_priority::normal,
6,866✔
1380
                threads::thread_schedule_hint(),
6,866✔
1381
                threads::thread_stacksize::default_,
1382
                threads::thread_schedule_state::pending, true);
1383
            threads::register_thread(data, ec);
6,866✔
1384
            return;
1385
        }
6,866✔
1386

1387
        if (HPX_UNLIKELY(credit <= 0))
92,395✔
1388
        {
1389
            HPX_THROWS_IF(ec, bad_parameter, "addressing_service::decref",
×
1390
                "invalid credit count of {1}", credit);
1391
            return;
×
1392
        }
1393

1394
        try
1395
        {
1396
            std::unique_lock<mutex_type> l(refcnt_requests_mtx_);
92,396✔
1397

1398
            // Match the decref request with entries in the incref table
1399
            using iterator = refcnt_requests_type::iterator;
1400
            using mapping = refcnt_requests_type::value_type;
1401

1402
            iterator matches = refcnt_requests_->find(raw);
92,396✔
1403
            if (matches != refcnt_requests_->end())
92,396✔
1404
            {
1405
                matches->second -= credit;
46,823✔
1406
            }
46,823✔
1407
            else
1408
            {
1409
                std::pair<iterator, bool> p =
1410
                    refcnt_requests_->insert(mapping(raw, -credit));
45,573✔
1411

1412
                if (HPX_UNLIKELY(!p.second))
45,573✔
1413
                {
1414
                    l.unlock();
×
1415

1416
                    HPX_THROWS_IF(ec, bad_parameter,
×
1417
                        "addressing_service::decref",
1418
                        "couldn't insert decref request for {1} ({2})", raw,
1419
                        credit);
1420
                    return;
×
1421
                }
1422
            }
1423

1424
            send_refcnt_requests(l, ec);
92,396✔
1425
        }
92,396✔
1426
        catch (hpx::exception const& e)
1427
        {
1428
            HPX_RETHROWS_IF(ec, e, "addressing_service::decref");
×
1429
        }
×
1430
    }    // }}}
99,262✔
1431

1432
    ///////////////////////////////////////////////////////////////////////////
1433
    static bool correct_credit_on_failure(future<bool> f, hpx::id_type id,
2,476✔
1434
        std::int64_t mutable_gid_credit, std::int64_t new_gid_credit)
1435
    {
1436
        // Return the credit to the GID if the operation failed
1437
        if ((f.has_exception() && mutable_gid_credit != 0) || !f.get())
2,476✔
1438
        {
1439
            naming::detail::add_credit_to_gid(id.get_gid(), new_gid_credit);
2✔
1440
            return false;
2✔
1441
        }
1442
        return true;
2,474✔
1443
    }
2,476✔
1444

1445
    bool addressing_service::register_name(
3,000✔
1446
        std::string const& name, naming::gid_type const& id, error_code& ec)
1447
    {    // {{{
1448
        try
1449
        {
1450
            return symbol_ns_.bind(name, naming::detail::get_stripped_gid(id));
3,000✔
1451
        }
×
1452
        catch (hpx::exception const& e)
1453
        {
1454
            HPX_RETHROWS_IF(ec, e, "addressing_service::register_name");
×
1455
        }
×
1456
        return false;
×
1457
    }    // }}}
3,000✔
1458

1459
    bool addressing_service::register_name(
1,527✔
1460
        std::string const& name, hpx::id_type const& id, error_code& ec)
1461
    {
1462
        // We need to modify the reference count.
1463
        naming::gid_type& mutable_gid = const_cast<hpx::id_type&>(id).get_gid();
1,527✔
1464
        naming::gid_type new_gid =
1465
            naming::detail::split_gid_if_needed(mutable_gid).get();
1,527✔
1466
        std::int64_t new_credit = naming::detail::get_credit_from_gid(new_gid);
1,527✔
1467

1468
        try
1469
        {
1470
            return symbol_ns_.bind(name, new_gid);
1,527✔
1471
        }
×
1472
        catch (hpx::exception const& e)
1473
        {
1474
            if (new_credit != 0)
×
1475
            {
1476
                naming::detail::add_credit_to_gid(mutable_gid, new_credit);
×
1477
            }
×
1478
            HPX_RETHROWS_IF(ec, e, "addressing_service::register_name");
×
1479
        }
×
1480
        return false;
×
1481
    }
1,527✔
1482

1483
    hpx::future<bool> addressing_service::register_name_async(
3,143✔
1484
        std::string const& name, hpx::id_type const& id)
1485
    {    // {{{
1486
        // We need to modify the reference count.
1487
        naming::gid_type& mutable_gid = const_cast<hpx::id_type&>(id).get_gid();
3,143✔
1488
        naming::gid_type new_gid =
1489
            naming::detail::split_gid_if_needed(mutable_gid).get();
3,143✔
1490

1491
        future<bool> f = symbol_ns_.bind_async(name, new_gid);
3,143✔
1492

1493
        std::int64_t new_credit = naming::detail::get_credit_from_gid(new_gid);
3,143✔
1494
        if (new_credit != 0)
3,143✔
1495
        {
1496
            return f.then(hpx::launch::sync,
2,476✔
1497
                util::one_shot(hpx::bind_back(&correct_credit_on_failure, id,
4,952✔
1498
                    std::int64_t(HPX_GLOBALCREDIT_INITIAL), new_credit)));
2,476✔
1499
        }
1500

1501
        return f;
667✔
1502
    }    // }}}
3,143✔
1503

1504
    ///////////////////////////////////////////////////////////////////////////
1505
    hpx::id_type addressing_service::unregister_name(
2,747✔
1506
        std::string const& name, error_code& ec)
1507
    {    // {{{
1508
        try
1509
        {
1510
            return symbol_ns_.unbind(name);
2,747✔
1511
        }
×
1512
        catch (hpx::exception const& e)
1513
        {
1514
            HPX_RETHROWS_IF(ec, e, "addressing_service::unregister_name");
×
1515
            return hpx::invalid_id;
×
1516
        }
×
1517
    }    // }}}
2,747✔
1518

1519
    hpx::future<hpx::id_type> addressing_service::unregister_name_async(
680✔
1520
        std::string const& name)
1521
    {    // {{{
1522
        return symbol_ns_.unbind_async(name);
680✔
1523
    }    // }}}
×
1524

1525
    ///////////////////////////////////////////////////////////////////////////
1526
    hpx::id_type addressing_service::resolve_name(
114✔
1527
        std::string const& name, error_code& ec)
1528
    {    // {{{
1529
        try
1530
        {
1531
            return symbol_ns_.resolve(name);
114✔
1532
        }
×
1533
        catch (hpx::exception const& e)
1534
        {
1535
            HPX_RETHROWS_IF(ec, e, "addressing_service::resolve_name");
×
1536
            return hpx::invalid_id;
×
1537
        }
×
1538
    }    // }}}
114✔
1539

1540
    hpx::future<hpx::id_type> addressing_service::resolve_name_async(
×
1541
        std::string const& name)
1542
    {    // {{{
1543
        return symbol_ns_.resolve_async(name);
×
1544
    }    // }}}
×
1545

1546
    namespace detail {
1547
        hpx::future<hpx::id_type> on_register_event(
61,784✔
1548
            hpx::future<bool> f, hpx::future<hpx::id_type> result_f)
1549
        {
1550
            if (!f.get())
61,785✔
1551
            {
1552
                HPX_THROW_EXCEPTION(bad_request,
×
1553
                    "hpx::agas::detail::on_register_event",
1554
                    "request 'symbol_ns_on_event' failed");
1555
                return hpx::future<hpx::id_type>();
1556
            }
1557

1558
            return result_f;
61,785✔
1559
        }
×
1560
    }    // namespace detail
1561

1562
    future<hpx::id_type> addressing_service::on_symbol_namespace_event(
61,785✔
1563
        std::string const& name, bool call_for_past_events)
1564
    {
1565
        hpx::distributed::promise<hpx::id_type, naming::gid_type> p;
61,785✔
1566
        auto result_f = p.get_future();
61,785✔
1567

1568
        hpx::future<bool> f =
1569
            symbol_ns_.on_event(name, call_for_past_events, p.get_id());
61,785✔
1570

1571
        return f.then(hpx::launch::sync,
61,785✔
1572
            util::one_shot(hpx::bind_back(
61,785✔
1573
                &detail::on_register_event, HPX_MOVE(result_f))));
61,785✔
1574
    }
61,785✔
1575

1576
    // Return all matching entries in the symbol namespace
1577
    hpx::future<addressing_service::iterate_names_return_type>
1578
    addressing_service::iterate_ids(std::string const& pattern)
×
1579
    {    // {{{
1580
        return symbol_ns_.iterate_async(pattern);
×
1581
    }    // }}}
1582

1583
    // This function has to return false if the key is already in the cache (true
1584
    // means go ahead with the cache update).
1585
    bool check_for_collisions(addressing_service::gva_cache_key const& new_key,
513✔
1586
        addressing_service::gva_cache_key const& old_key)
1587
    {
1588
        return (new_key.get_gid() != old_key.get_gid()) ||
513✔
1589
            (new_key.get_count() != old_key.get_count());
513✔
1590
    }
1591

1592
    void addressing_service::update_cache_entry(
94,682✔
1593
        naming::gid_type const& id, gva const& g, error_code& ec)
1594
    {    // {{{
1595
        if (!caching_)
94,682✔
1596
        {
1597
            // If caching is disabled, we silently pretend success.
1598
            if (&ec != &throws)
×
1599
                ec = make_success_code();
×
1600
            return;
×
1601
        }
1602

1603
        // don't look at cache if id is marked as non-cache-able
1604
        if (!naming::detail::store_in_cache(id))
94,682✔
1605
        {
1606
            if (&ec != &throws)
2,092✔
1607
                ec = make_success_code();
×
1608
            return;
2,092✔
1609
        }
1610

1611
        naming::gid_type gid = naming::detail::get_stripped_gid(id);
92,590✔
1612

1613
        // don't look at the cache if the id is locally managed
1614
        if (naming::get_locality_id_from_gid(gid) ==
185,180✔
1615
            naming::get_locality_id_from_gid(locality_))
92,590✔
1616
        {
1617
            if (&ec != &throws)
90,825✔
1618
                ec = make_success_code();
×
1619
            return;
90,825✔
1620
        }
1621

1622
        if (hpx::threads::get_self_ptr() == nullptr)
1,765✔
1623
        {
1624
            // Don't update the cache while HPX is starting up ...
1625
            if (hpx::is_starting())
×
1626
            {
1627
                return;
×
1628
            }
1629
            threads::thread_init_data data(
×
1630
                threads::make_thread_function_nullary(
×
1631
                    [HPX_CXX20_CAPTURE_THIS(=)]() -> void {
×
1632
                        return update_cache_entry(id, g, throws);
×
1633
                    }),
1634
                "addressing_service::update_cache_entry",
×
1635
                threads::thread_priority::normal,
1636
                threads::thread_schedule_hint(),
×
1637
                threads::thread_stacksize::default_,
1638
                threads::thread_schedule_state::pending, true);
1639
            threads::register_thread(data, ec);
×
1640
        }
×
1641

1642
        try
1643
        {
1644
            // The entry in AGAS for a locality's RTS component has a count of 0,
1645
            // so we convert it to 1 here so that the cache doesn't break.
1646
            const std::uint64_t count = (g.count ? g.count : 1);
1,765✔
1647

1648
            LAGAS_(debug).format(
1,765✔
1649
                "addressing_service::update_cache_entry, gid({1}), count({2})",
×
1650
                gid, count);
1651

1652
            const gva_cache_key key(gid, count);
1,765✔
1653

1654
            {
1655
                std::unique_lock<mutex_type> lock(gva_cache_mtx_);
1,765✔
1656
                if (!gva_cache_->update_if(key, g, check_for_collisions))
1,765✔
1657
                {
1658
                    if (LAGAS_ENABLED(warning))
×
1659
                    {
1660
                        // Figure out who we collided with.
1661
                        addressing_service::gva_cache_key idbase;
×
1662
                        addressing_service::gva_cache_type::entry_type e;
×
1663

1664
                        if (!gva_cache_->get_entry(key, idbase, e))
×
1665
                        {
1666
                            // This is impossible under sane conditions.
1667
                            lock.unlock();
×
1668
                            HPX_THROWS_IF(ec, invalid_data,
×
1669
                                "addressing_service::update_cache_entry",
1670
                                "data corruption or lock error occurred in "
1671
                                "cache");
1672
                            return;
×
1673
                        }
1674

1675
                        LAGAS_(warning).format(
×
1676
                            "addressing_service::update_cache_entry, aborting "
×
1677
                            "update due to key collision in cache, "
1678
                            "new_gid({1}), new_count({2}), old_gid({3}), "
1679
                            "old_count({4})",
1680
                            gid, count, idbase.get_gid(), idbase.get_count());
×
1681
                    }
×
1682
                }
×
1683
            }
1,765✔
1684

1685
            if (&ec != &throws)
1,765✔
1686
                ec = make_success_code();
×
1687
        }
1,765✔
1688
        catch (hpx::exception const& e)
1689
        {
1690
            HPX_RETHROWS_IF(ec, e, "addressing_service::update_cache_entry");
×
1691
        }
×
1692
    }    // }}}
94,682✔
1693

1694
    bool addressing_service::get_cache_entry(naming::gid_type const& gid,
185,196✔
1695
        gva& gva, naming::gid_type& idbase, error_code& ec)
1696
    {
1697
        // Don't use the cache while HPX is starting up
1698
        if (hpx::is_starting())
185,196✔
1699
        {
1700
            return false;
901✔
1701
        }
1702
        gva_cache_key k(gid);
184,295✔
1703
        gva_cache_key idbase_key;
184,295✔
1704

1705
        std::unique_lock<mutex_type> lock(gva_cache_mtx_);
184,295✔
1706
        if (gva_cache_->get_entry(k, idbase_key, gva))
184,295✔
1707
        {
1708
            const std::uint64_t id_msb =
181,562✔
1709
                naming::detail::strip_internal_bits_from_gid(gid.get_msb());
181,562✔
1710

1711
            if (HPX_UNLIKELY(id_msb != idbase_key.get_gid().get_msb()))
181,562✔
1712
            {
1713
                lock.unlock();
×
1714
                HPX_THROWS_IF(ec, internal_server_error,
×
1715
                    "addressing_service::get_cache_entry",
1716
                    "bad entry in cache, MSBs of GID base and GID do not "
1717
                    "match");
1718
                return false;
×
1719
            }
1720
            idbase = idbase_key.get_gid();
181,562✔
1721
            return true;
181,562✔
1722
        }
1723

1724
        return false;
2,733✔
1725
    }
185,196✔
1726

1727
    void addressing_service::clear_cache(error_code& ec)
×
1728
    {    // {{{
1729
        if (!caching_)
×
1730
        {
1731
            // If caching is disabled, we silently pretend success.
1732
            if (&ec != &throws)
×
1733
                ec = make_success_code();
×
1734
            return;
×
1735
        }
1736

1737
        try
1738
        {
1739
            LAGAS_(warning).format(
×
1740
                "addressing_service::clear_cache, clearing cache");
×
1741

1742
            std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1743

1744
            gva_cache_->clear();
×
1745

1746
            if (&ec != &throws)
×
1747
                ec = make_success_code();
×
1748
        }
×
1749
        catch (hpx::exception const& e)
1750
        {
1751
            HPX_RETHROWS_IF(ec, e, "addressing_service::clear_cache");
×
1752
        }
×
1753
    }    // }}}
×
1754

1755
    void addressing_service::remove_cache_entry(
4,776✔
1756
        naming::gid_type const& id, error_code& ec)
1757
    {
1758
        // If caching is disabled, we silently pretend success.
1759
        if (!caching_)
4,776✔
1760
        {
1761
            if (&ec != &throws)
×
1762
                ec = make_success_code();
×
1763
            return;
×
1764
        }
1765

1766
        // don't look at cache if id is marked as non-cache-able
1767
        if (!naming::detail::store_in_cache(id))
4,776✔
1768
        {
1769
            if (&ec != &throws)
4,776✔
1770
                ec = make_success_code();
×
1771
            return;
4,776✔
1772
        }
1773

1774
        naming::gid_type gid = naming::detail::get_stripped_gid(id);
×
1775

1776
        // don't look at the cache if the id is locally managed
1777
        if (naming::get_locality_id_from_gid(gid) ==
×
1778
            naming::get_locality_id_from_gid(locality_))
×
1779
        {
1780
            if (&ec != &throws)
×
1781
                ec = make_success_code();
×
1782
            return;
×
1783
        }
1784

1785
        try
1786
        {
1787
            LAGAS_(warning).format("addressing_service::remove_cache_entry");
×
1788

1789
            std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1790

1791
            gva_cache_->erase([&gid](std::pair<gva_cache_key, gva> const& p) {
×
1792
                return gid == p.first.get_gid();
×
1793
            });
1794

1795
            if (&ec != &throws)
×
1796
                ec = make_success_code();
×
1797
        }
×
1798
        catch (hpx::exception const& e)
1799
        {
1800
            HPX_RETHROWS_IF(ec, e, "addressing_service::clear_cache");
×
1801
        }
×
1802
    }
4,776✔
1803

1804
    // Disable refcnt caching during shutdown
1805
    void addressing_service::start_shutdown(error_code& ec)
1,576✔
1806
    {
1807
        // If caching is disabled, we silently pretend success.
1808
        if (!caching_)
1,576✔
1809
            return;
×
1810

1811
        std::unique_lock<mutex_type> l(refcnt_requests_mtx_);
1,576✔
1812
        enable_refcnt_caching_ = false;
1,576✔
1813
        send_refcnt_requests_sync(l, ec);
1,576✔
1814
    }
1,576✔
1815

1816
    ///////////////////////////////////////////////////////////////////////////
1817
    // Helper functions to access the current cache statistics
1818
    std::uint64_t addressing_service::get_cache_entries(bool /* reset */)
×
1819
    {
1820
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1821
        return gva_cache_->size();
×
1822
    }
×
1823

1824
    std::uint64_t addressing_service::get_cache_hits(bool reset)
×
1825
    {
1826
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1827
        return gva_cache_->get_statistics().hits(reset);
×
1828
    }
×
1829

1830
    std::uint64_t addressing_service::get_cache_misses(bool reset)
×
1831
    {
1832
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1833
        return gva_cache_->get_statistics().misses(reset);
×
1834
    }
×
1835

1836
    std::uint64_t addressing_service::get_cache_evictions(bool reset)
×
1837
    {
1838
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1839
        return gva_cache_->get_statistics().evictions(reset);
×
1840
    }
×
1841

1842
    std::uint64_t addressing_service::get_cache_insertions(bool reset)
×
1843
    {
1844
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1845
        return gva_cache_->get_statistics().insertions(reset);
×
1846
    }
×
1847

1848
    ///////////////////////////////////////////////////////////////////////////
1849
    std::uint64_t addressing_service::get_cache_get_entry_count(bool reset)
×
1850
    {
1851
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1852
        return gva_cache_->get_statistics().get_get_entry_count(reset);
×
1853
    }
×
1854

1855
    std::uint64_t addressing_service::get_cache_insertion_entry_count(
×
1856
        bool reset)
1857
    {
1858
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1859
        return gva_cache_->get_statistics().get_insert_entry_count(reset);
×
1860
    }
×
1861

1862
    std::uint64_t addressing_service::get_cache_update_entry_count(bool reset)
×
1863
    {
1864
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1865
        return gva_cache_->get_statistics().get_update_entry_count(reset);
×
1866
    }
×
1867

1868
    std::uint64_t addressing_service::get_cache_erase_entry_count(bool reset)
×
1869
    {
1870
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1871
        return gva_cache_->get_statistics().get_erase_entry_count(reset);
×
1872
    }
×
1873

1874
    std::uint64_t addressing_service::get_cache_get_entry_time(bool reset)
×
1875
    {
1876
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1877
        return gva_cache_->get_statistics().get_get_entry_time(reset);
×
1878
    }
×
1879

1880
    std::uint64_t addressing_service::get_cache_insertion_entry_time(bool reset)
×
1881
    {
1882
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1883
        return gva_cache_->get_statistics().get_insert_entry_time(reset);
×
1884
    }
×
1885

1886
    std::uint64_t addressing_service::get_cache_update_entry_time(bool reset)
×
1887
    {
1888
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1889
        return gva_cache_->get_statistics().get_update_entry_time(reset);
×
1890
    }
×
1891

1892
    std::uint64_t addressing_service::get_cache_erase_entry_time(bool reset)
×
1893
    {
1894
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1895
        return gva_cache_->get_statistics().get_erase_entry_time(reset);
×
1896
    }
×
1897

1898
    void addressing_service::register_server_instances()
594✔
1899
    {
1900
        // register root server
1901
        std::uint32_t locality_id =
594✔
1902
            naming::get_locality_id_from_gid(get_local_locality());
594✔
1903
        locality_ns_->register_server_instance(locality_id);
594✔
1904
        primary_ns_.register_server_instance(locality_id);
594✔
1905
        component_ns_->register_server_instance(locality_id);
594✔
1906
        symbol_ns_.register_server_instance(locality_id);
594✔
1907
    }
594✔
1908

1909
    void addressing_service::garbage_collect_non_blocking(error_code& ec)
18,316,874✔
1910
    {
1911
        std::unique_lock<mutex_type> l(refcnt_requests_mtx_, std::try_to_lock);
18,316,874✔
1912
        if (!l.owns_lock())
18,316,874✔
1913
            return;    // no need to compete for garbage collection
1,302✔
1914

1915
        send_refcnt_requests_non_blocking(l, ec);
18,315,572✔
1916
    }
18,316,874✔
1917

1918
    void addressing_service::garbage_collect(error_code& ec)
710✔
1919
    {
1920
        std::unique_lock<mutex_type> l(refcnt_requests_mtx_, std::try_to_lock);
710✔
1921
        if (!l.owns_lock())
710✔
1922
            return;    // no need to compete for garbage collection
6✔
1923

1924
        send_refcnt_requests_sync(l, ec);
704✔
1925
    }
710✔
1926

1927
    void addressing_service::send_refcnt_requests(
92,396✔
1928
        std::unique_lock<addressing_service::mutex_type>& l, error_code& ec)
1929
    {
1930
        if (!l.owns_lock())
92,396✔
1931
        {
1932
            HPX_THROWS_IF(ec, lock_error,
×
1933
                "addressing_service::send_refcnt_requests",
1934
                "mutex is not locked");
1935
            return;
×
1936
        }
1937

1938
        if (!enable_refcnt_caching_ ||
92,396✔
1939
            max_refcnt_requests_ == ++refcnt_requests_count_)
87,870✔
1940
            send_refcnt_requests_non_blocking(l, ec);
4,529✔
1941

1942
        else if (&ec != &throws)
87,867✔
1943
            ec = make_success_code();
80,567✔
1944
    }
92,396✔
1945

1946
#if defined(HPX_HAVE_AGAS_DUMP_REFCNT_ENTRIES)
1947
    void dump_refcnt_requests(
1948
        std::unique_lock<addressing_service::mutex_type>& l,
1949
        addressing_service::refcnt_requests_type const& requests,
1950
        const char* func_name)
1951
    {
1952
        HPX_ASSERT(l.owns_lock());
1953

1954
        std::stringstream ss;
1955
        hpx::util::format_to(ss,
1956
            "{1}, dumping client-side refcnt table, requests({2}):", func_name,
1957
            requests.size());
1958

1959
        typedef addressing_service::refcnt_requests_type::const_reference
1960
            const_reference;
1961

1962
        for (const_reference e : requests)
1963
        {
1964
            // The [client] tag is in there to make it easier to filter
1965
            // through the logs.
1966
            hpx::util::format_to(
1967
                ss, "\n  [client] gid({1}), credits({2})", e.first, e.second);
1968
        }
1969

1970
        LAGAS_(debug) << ss.str();
1971
    }
1972
#endif
1973

1974
    void addressing_service::send_refcnt_requests_non_blocking(
18,320,101✔
1975
        std::unique_lock<addressing_service::mutex_type>& l, error_code& ec)
1976
    {
1977
#if !defined(HPX_COMPUTE_DEVICE_CODE)
1978
        HPX_ASSERT(l.owns_lock());
18,320,101✔
1979

1980
        try
1981
        {
1982
            if (refcnt_requests_->empty())
18,320,101✔
1983
            {
1984
                l.unlock();
18,297,847✔
1985
                return;
18,297,847✔
1986
            }
1987

1988
            std::shared_ptr<refcnt_requests_type> p(new refcnt_requests_type);
22,254✔
1989

1990
            p.swap(refcnt_requests_);
22,254✔
1991
            refcnt_requests_count_ = 0;
22,254✔
1992

1993
            l.unlock();
22,254✔
1994

1995
            LAGAS_(info).format("addressing_service::send_refcnt_requests_non_"
22,254✔
1996
                                "blocking, requests({1})",
1997
                p->size());
×
1998

1999
#if defined(HPX_HAVE_AGAS_DUMP_REFCNT_ENTRIES)
2000
            if (LAGAS_ENABLED(debug))
2001
                dump_refcnt_requests(l, *p,
2002
                    "addressing_service::send_refcnt_requests_non_blocking");
2003
#endif
2004

2005
            // collect all requests for each locality
2006
            using requests_type = std::map<hpx::id_type,
2007
                std::vector<hpx::tuple<std::int64_t, naming::gid_type,
2008
                    naming::gid_type>>>;
2009
            requests_type requests;
22,254✔
2010

2011
            for (refcnt_requests_type::const_reference e : *p)
64,039✔
2012
            {
2013
                HPX_ASSERT(e.second < 0);
41,785✔
2014

2015
                naming::gid_type raw(e.first);
41,785✔
2016

2017
                hpx::id_type target(
41,785✔
2018
                    primary_namespace::get_service_instance(raw),
41,785✔
2019
                    hpx::id_type::management_type::unmanaged);
2020

2021
                requests[target].push_back(hpx::make_tuple(e.second, raw, raw));
41,785✔
2022
            }
41,785✔
2023

2024
            // send requests to all locality
2025
            requests_type::iterator end = requests.end();
22,254✔
2026
            for (requests_type::iterator it = requests.begin(); it != end; ++it)
45,304✔
2027
            {
2028
                server::primary_namespace::decrement_credit_action action;
2029
                hpx::post(action, HPX_MOVE(it->first), HPX_MOVE(it->second));
23,050✔
2030
            }
23,050✔
2031

2032
            if (&ec != &throws)
22,254✔
2033
                ec = make_success_code();
4,483✔
2034
        }
22,254✔
2035
        catch (hpx::exception const& e)
2036
        {
2037
            l.unlock();
×
2038
            HPX_RETHROWS_IF(
×
2039
                ec, e, "addressing_service::send_refcnt_requests_non_blocking");
2040
        }
×
2041
#else
2042
        HPX_UNUSED(l);
2043
        HPX_UNUSED(ec);
2044
        HPX_ASSERT(false);
2045
#endif
2046
    }
18,320,101✔
2047

2048
    std::vector<hpx::future<std::vector<std::int64_t>>>
2049
    addressing_service::send_refcnt_requests_async(
2,280✔
2050
        std::unique_lock<addressing_service::mutex_type>& l)
2051
    {
2052
#if !defined(HPX_COMPUTE_DEVICE_CODE)
2053
        HPX_ASSERT(l.owns_lock());
2,280✔
2054

2055
        if (refcnt_requests_->empty())
2,280✔
2056
        {
2057
            l.unlock();
2,145✔
2058
            return std::vector<hpx::future<std::vector<std::int64_t>>>();
2,145✔
2059
        }
2060

2061
        std::shared_ptr<refcnt_requests_type> p(new refcnt_requests_type);
135✔
2062

2063
        p.swap(refcnt_requests_);
135✔
2064
        refcnt_requests_count_ = 0;
135✔
2065

2066
        l.unlock();
135✔
2067

2068
        LAGAS_(info).format(
135✔
2069
            "addressing_service::send_refcnt_requests_async, requests({1})",
×
2070
            p->size());
×
2071

2072
#if defined(HPX_HAVE_AGAS_DUMP_REFCNT_ENTRIES)
2073
        if (LAGAS_ENABLED(debug))
2074
            dump_refcnt_requests(
2075
                l, *p, "addressing_service::send_refcnt_requests_sync");
2076
#endif
2077

2078
        // collect all requests for each locality
2079
        using requests_type = std::map<hpx::id_type,
2080
            std::vector<
2081
                hpx::tuple<std::int64_t, naming::gid_type, naming::gid_type>>>;
2082
        requests_type requests;
135✔
2083

2084
        std::vector<hpx::future<std::vector<std::int64_t>>> lazy_results;
135✔
2085
        for (refcnt_requests_type::const_reference e : *p)
3,453✔
2086
        {
2087
            HPX_ASSERT(e.second < 0);
3,318✔
2088

2089
            naming::gid_type raw(e.first);
3,318✔
2090

2091
            hpx::id_type target(primary_namespace::get_service_instance(raw),
3,318✔
2092
                hpx::id_type::management_type::unmanaged);
2093

2094
            requests[target].push_back(hpx::make_tuple(e.second, raw, raw));
3,318✔
2095
        }
3,318✔
2096

2097
        // send requests to all locality
2098
        requests_type::iterator end = requests.end();
135✔
2099
        for (requests_type::iterator it = requests.begin(); it != end; ++it)
276✔
2100
        {
2101
            server::primary_namespace::decrement_credit_action action;
2102
            lazy_results.push_back(
141✔
2103
                hpx::async(action, HPX_MOVE(it->first), HPX_MOVE(it->second)));
141✔
2104
        }
141✔
2105

2106
        return lazy_results;
135✔
2107
#else
2108
        HPX_UNUSED(l);
2109
        HPX_ASSERT(false);
2110
        std::vector<hpx::future<std::vector<std::int64_t>>> lazy_results;
2111
        return lazy_results;
2112
#endif
2113
    }
2,280✔
2114

2115
    void addressing_service::send_refcnt_requests_sync(
2,280✔
2116
        std::unique_lock<addressing_service::mutex_type>& l, error_code& ec)
2117
    {
2118
        std::vector<hpx::future<std::vector<std::int64_t>>> lazy_results =
2119
            send_refcnt_requests_async(l);
2,280✔
2120

2121
        // re throw possible errors
2122
        hpx::when_all(lazy_results).get();
2,280✔
2123

2124
        if (&ec != &throws)
2,280✔
2125
            ec = make_success_code();
×
2126
    }
2,280✔
2127

2128
    ///////////////////////////////////////////////////////////////////////////
2129
    hpx::future<void> addressing_service::mark_as_migrated(
4,776✔
2130
        naming::gid_type const& gid_,
2131
        hpx::move_only_function<std::pair<bool, hpx::future<void>>()>&& f,
2132
        bool expect_to_be_marked_as_migrating)
2133
    {
2134
        HPX_UNUSED(expect_to_be_marked_as_migrating);
4,776✔
2135
        if (!gid_)
4,776✔
2136
        {
2137
            return hpx::make_exceptional_future<void>(HPX_GET_EXCEPTION(
×
2138
                bad_parameter, "addressing_service::mark_as_migrated",
2139
                "invalid reference gid"));
2140
        }
2141

2142
        HPX_ASSERT(naming::detail::is_migratable(gid_));
4,776✔
2143

2144
        // Always first grab the AGAS lock before invoking the user supplied
2145
        // function. The user supplied code will grab another lock. Both locks have
2146
        // to be acquired and always in the same sequence.
2147
        // The AGAS lock needs to be acquired first as the migrated object might
2148
        // not exist on this locality, in which case it should not be accessed
2149
        // anymore. The only way to determine whether the object still exists on
2150
        // this locality is to query the migrated objects table in AGAS.
2151
        using lock_type = std::unique_lock<mutex_type>;
2152

2153
        lock_type lock(migrated_objects_mtx_);
4,776✔
2154
        util::ignore_while_checking ignore(&lock);
4,776✔
2155
        HPX_UNUSED(ignore);
4,776✔
2156

2157
        // call the user code for the component instance to be migrated, the
2158
        // returned future becomes ready whenever the component instance can be
2159
        // migrated (no threads are pending/active any more)
2160
        std::pair<bool, hpx::future<void>> result = f();
4,776✔
2161

2162
        // mark the gid as 'migrated' right away - the worst what can happen is
2163
        // that a parcel which comes in for this object is bouncing between this
2164
        // locality and the locality managing the address resolution for the object
2165
        if (result.first)
4,776✔
2166
        {
2167
            naming::gid_type gid(naming::detail::get_stripped_gid(gid_));
4,776✔
2168

2169
            migrated_objects_table_type::iterator it =
2170
                migrated_objects_table_.find(gid);
4,776✔
2171

2172
            // insert the object into the map of migrated objects
2173
            if (it == migrated_objects_table_.end())
4,776✔
2174
            {
2175
                HPX_ASSERT(!expect_to_be_marked_as_migrating);
3,232✔
2176
                migrated_objects_table_.insert(gid);
3,232✔
2177
            }
3,232✔
2178
            else
2179
            {
2180
                HPX_ASSERT(expect_to_be_marked_as_migrating);
1,544✔
2181
            }
2182

2183
            // avoid interactions with the locking in the cache
2184
            lock.unlock();
4,776✔
2185

2186
            // remove entry from cache
2187
            remove_cache_entry(gid_);
4,776✔
2188
        }
4,776✔
2189

2190
        return HPX_MOVE(result.second);
4,776✔
2191
    }
4,776✔
2192

2193
    void addressing_service::unmark_as_migrated(naming::gid_type const& gid_)
3,236✔
2194
    {
2195
        if (!gid_)
3,236✔
2196
        {
2197
            HPX_THROW_EXCEPTION(bad_parameter,
×
2198
                "addressing_service::unmark_as_migrated",
2199
                "invalid reference gid");
2200
            return;
2201
        }
2202

2203
        HPX_ASSERT(naming::detail::is_migratable(gid_));
3,236✔
2204

2205
        naming::gid_type gid(naming::detail::get_stripped_gid(gid_));
3,236✔
2206

2207
        std::unique_lock<mutex_type> lock(migrated_objects_mtx_);
3,236✔
2208

2209
        migrated_objects_table_type::iterator it =
2210
            migrated_objects_table_.find(gid);
3,236✔
2211

2212
        // remove the object from the map of migrated objects
2213
        if (it != migrated_objects_table_.end())
3,236✔
2214
        {
2215
            migrated_objects_table_.erase(it);
3,192✔
2216

2217
            // remove entry from cache
2218
            if (caching_ && naming::detail::store_in_cache(gid_))
3,192✔
2219
            {
2220
                // avoid interactions with the locking in the cache
2221
                lock.unlock();
×
2222

2223
                // remove entry from cache
2224
                remove_cache_entry(gid_);
×
2225
            }
×
2226
        }
3,192✔
2227
    }
3,236✔
2228

2229
    hpx::future<std::pair<hpx::id_type, naming::address>>
2230
    addressing_service::begin_migration(hpx::id_type const& id)
3,240✔
2231
    {
2232
        if (!id)
3,240✔
2233
        {
2234
            HPX_THROW_EXCEPTION(bad_parameter,
×
2235
                "addressing_service::begin_migration", "invalid reference id");
2236
        }
2237

2238
        HPX_ASSERT(naming::detail::is_migratable(id.get_gid()));
3,240✔
2239

2240
        naming::gid_type gid(naming::detail::get_stripped_gid(id.get_gid()));
3,240✔
2241

2242
        return primary_ns_.begin_migration(gid);
3,240✔
2243
    }
×
2244

2245
    bool addressing_service::end_migration(hpx::id_type const& id)
3,240✔
2246
    {
2247
        if (!id)
3,240✔
2248
        {
2249
            HPX_THROW_EXCEPTION(bad_parameter,
×
2250
                "addressing_service::end_migration", "invalid reference id");
2251
        }
2252

2253
        HPX_ASSERT(naming::detail::is_migratable(id.get_gid()));
3,240✔
2254

2255
        naming::gid_type gid(naming::detail::get_stripped_gid(id.get_gid()));
3,240✔
2256

2257
        return primary_ns_.end_migration(gid);
3,240✔
2258
    }
×
2259

2260
    bool addressing_service::was_object_migrated_locked(
34,963✔
2261
        naming::gid_type const& gid_)
2262
    {
2263
        naming::gid_type gid(naming::detail::get_stripped_gid(gid_));
34,963✔
2264

2265
        return migrated_objects_table_.find(gid) !=
69,926✔
2266
            migrated_objects_table_.end();
34,963✔
2267
    }
2268

2269
    std::pair<bool, components::pinned_ptr>
2270
    addressing_service::was_object_migrated(naming::gid_type const& gid,
5,447✔
2271
        hpx::move_only_function<components::pinned_ptr()>&& f    //-V669
2272
    )
2273
    {
2274
        if (!gid)
5,447✔
2275
        {
2276
            HPX_THROW_EXCEPTION(bad_parameter,
×
2277
                "addressing_service::was_object_migrated",
2278
                "invalid reference gid");
2279
            return std::make_pair(false, components::pinned_ptr());
2280
        }
2281

2282
        if (!naming::detail::is_migratable(gid))
5,448✔
2283
        {
2284
            return std::make_pair(false, f());
36✔
2285
        }
2286

2287
        // Always first grab the AGAS lock before invoking the user supplied
2288
        // function. The user supplied code will grab another lock. Both locks have
2289
        // to be acquired and always in the same sequence.
2290
        // The AGAS lock needs to be acquired first as the migrated object might
2291
        // not exist on this locality, in which case it should not be accessed
2292
        // anymore. The only way to determine whether the object still exists on
2293
        // this locality is to query the migrated objects table in AGAS.
2294
        using lock_type = std::unique_lock<mutex_type>;
2295

2296
        lock_type lock(migrated_objects_mtx_);
5,412✔
2297

2298
        if (was_object_migrated_locked(gid))
5,412✔
2299
            return std::make_pair(true, components::pinned_ptr());
×
2300

2301
        util::ignore_while_checking ignore(&lock);
5,412✔
2302
        HPX_UNUSED(ignore);
5,412✔
2303

2304
        return std::make_pair(false, f());
5,412✔
2305
    }
5,448✔
2306

2307
}}    // namespace hpx::agas
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc