• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #867

15 Jan 2023 08:00PM UTC coverage: 86.487% (+0.5%) from 85.951%
#867

push

StellarBot
Merge #6135

6135: Fixing warnings reported by MSVC analysis r=hkaiser a=hkaiser

- adding MSVC specific #pragma's to suppress the benign warnings


Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

120 of 120 new or added lines in 33 files covered. (100.0%)

174599 of 201880 relevant lines covered (86.49%)

1945607.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.19
/libs/full/agas/src/addressing_service.cpp
1
//  Copyright (c) 2011 Bryce Adelstein-Lelbach
2
//  Copyright (c) 2011-2021 Hartmut Kaiser
3
//  Copyright (c) 2016 Parsa Amini
4
//  Copyright (c) 2016 Thomas Heller
5
//
6
//  SPDX-License-Identifier: BSL-1.0
7
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
8
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9

10
#include <hpx/config.hpp>
11
#include <hpx/actions_base/traits/action_priority.hpp>
12
#include <hpx/actions_base/traits/action_was_object_migrated.hpp>
13
#include <hpx/agas/addressing_service.hpp>
14
#include <hpx/agas_base/detail/bootstrap_component_namespace.hpp>
15
#include <hpx/agas_base/detail/bootstrap_locality_namespace.hpp>
16
#include <hpx/assert.hpp>
17
#include <hpx/async_base/launch_policy.hpp>
18
#include <hpx/async_combinators/when_all.hpp>
19
#include <hpx/components_base/traits/component_supports_migration.hpp>
20
#include <hpx/datastructures/detail/dynamic_bitset.hpp>
21
#include <hpx/functional/bind.hpp>
22
#include <hpx/functional/bind_back.hpp>
23
#include <hpx/functional/bind_front.hpp>
24
#include <hpx/lock_registration/detail/register_locks.hpp>
25
#include <hpx/modules/async_distributed.hpp>
26
#include <hpx/modules/errors.hpp>
27
#include <hpx/modules/execution.hpp>
28
#include <hpx/modules/format.hpp>
29
#include <hpx/modules/logging.hpp>
30
#include <hpx/naming/split_gid.hpp>
31
#include <hpx/runtime_configuration/runtime_configuration.hpp>
32
#include <hpx/runtime_local/runtime_local_fwd.hpp>
33
#include <hpx/serialization/serialize.hpp>
34
#include <hpx/serialization/vector.hpp>
35
#include <hpx/thread_support/assert_owns_lock.hpp>
36
#include <hpx/thread_support/unlock_guard.hpp>
37
#include <hpx/type_support/unused.hpp>
38
#include <hpx/util/get_entry_as.hpp>
39
#include <hpx/util/insert_checked.hpp>
40

41
#include <cstddef>
42
#include <cstdint>
43
#include <functional>
44
#include <map>
45
#include <memory>
46
#include <mutex>
47
#include <sstream>
48
#include <string>
49
#include <system_error>
50
#include <utility>
51
#include <vector>
52

53
namespace hpx { namespace agas {
54

55
    struct addressing_service::gva_cache_key
185,061✔
56
    {    // {{{ gva_cache_key implementation
57
    private:
58
        using key_type = std::pair<naming::gid_type, naming::gid_type>;
59

60
        key_type key_;
61

62
    public:
63
        gva_cache_key()
184,170✔
64
          : key_()
184,170✔
65
        {
66
        }
184,170✔
67

68
        explicit gva_cache_key(
185,936✔
69
            naming::gid_type const& id, std::uint64_t count = 1)
70
          : key_(naming::detail::get_stripped_gid(id),
371,872✔
71
                naming::detail::get_stripped_gid(id) + (count - 1))
185,935✔
72
        {
73
            HPX_ASSERT(count);
185,936✔
74
        }
185,935✔
75

76
        naming::gid_type get_gid() const
363,894✔
77
        {
78
            return key_.first;
363,894✔
79
        }
80

81
        std::uint64_t get_count() const
1,092✔
82
        {
83
            naming::gid_type const size = key_.second - key_.first;
1,092✔
84
            HPX_ASSERT(size.get_msb() == 0);
1,092✔
85
            return size.get_lsb();
1,092✔
86
        }
87

88
        friend bool operator<(
604,037✔
89
            gva_cache_key const& lhs, gva_cache_key const& rhs)
90
        {
91
            return lhs.key_.second < rhs.key_.first;
604,037✔
92
        }
93

94
        friend bool operator==(
95
            gva_cache_key const& lhs, gva_cache_key const& rhs)
96
        {
97
            // Direct hit
98
            if (lhs.key_ == rhs.key_)
99
            {
100
                return true;
101
            }
102

103
            // Is lhs in rhs?
104
            if (1 == lhs.get_count() && 1 != rhs.get_count())
105
            {
106
                return rhs.key_.first <= lhs.key_.first &&
107
                    lhs.key_.second <= rhs.key_.second;
108
            }
109

110
            // Is rhs in lhs?
111
            else if (1 != lhs.get_count() && 1 == rhs.get_count())
112
            {
113
                return lhs.key_.first <= rhs.key_.first &&
114
                    rhs.key_.second <= lhs.key_.second;
115
            }
116

117
            return false;
118
        }
119
    };    // }}}
120

121
    addressing_service::addressing_service(
1,188✔
122
        util::runtime_configuration const& ini_)
123
      : gva_cache_(new gva_cache_type)
594✔
124
      , console_cache_(naming::invalid_locality_id)
594✔
125
      , max_refcnt_requests_(ini_.get_agas_max_pending_refcnt_requests())
594✔
126
      , refcnt_requests_count_(0)
594✔
127
      , enable_refcnt_caching_(true)
594✔
128
      , refcnt_requests_(new refcnt_requests_type)
594✔
129
      , service_type(ini_.get_agas_service_mode())
594✔
130
      , runtime_type(ini_.mode_)
594✔
131
      , caching_(ini_.get_agas_caching_mode())
594✔
132
      , range_caching_(caching_ ? ini_.get_agas_range_caching_mode() : false)
594✔
133
      , action_priority_(threads::thread_priority::boost)
594✔
134
      , rts_lva_(0)
594✔
135
      , state_(hpx::state::starting)
594✔
136
      , locality_()
594✔
137
    {
138
        if (caching_)
594✔
139
            gva_cache_->reserve(ini_.get_agas_local_cache_size());
594✔
140
    }
594✔
141

142
    void addressing_service::bootstrap(
452✔
143
        parcelset::endpoints_type const& endpoints,
144
        util::runtime_configuration& rtcfg)
145
    {    // {{{
146
        LPROGRESS_;
452✔
147

148
        HPX_ASSERT(is_bootstrap());
452✔
149
        launch_bootstrap(endpoints, rtcfg);
452✔
150
    }    // }}}
452✔
151

152
    void addressing_service::initialize(std::uint64_t rts_lva)
594✔
153
    {    // {{{
154
        rts_lva_ = rts_lva;
594✔
155
        set_status(hpx::state::running);
594✔
156
    }    // }}}
594✔
157

158
    namespace detail {
159

160
        std::uint32_t get_number_of_pus_in_cores(std::uint32_t num_cores);
161
    }
162

163
    void addressing_service::launch_bootstrap(
452✔
164
        parcelset::endpoints_type const& endpoints,
165
        util::runtime_configuration& rtcfg)
166
    {    // {{{
167
        component_ns_.reset(new detail::bootstrap_component_namespace);
452✔
168
        locality_ns_.reset(new detail::bootstrap_locality_namespace(
452✔
169
            reinterpret_cast<server::primary_namespace*>(primary_ns_.ptr())));
452✔
170

171
        naming::gid_type const here =
172
            naming::get_gid_from_locality_id(agas::booststrap_prefix);
452✔
173
        set_local_locality(here);
452✔
174

175
        rtcfg.parse("assigned locality",
904✔
176
            hpx::util::format(
452✔
177
                "hpx.locality!={1}", naming::get_locality_id_from_gid(here)));
452✔
178

179
        std::uint32_t num_threads =
452✔
180
            hpx::util::get_entry_as<std::uint32_t>(rtcfg, "hpx.os_threads", 1u);
452✔
181
        locality_ns_->allocate(endpoints, 0, num_threads, naming::invalid_gid);
452✔
182

183
        register_name("/0/agas/locality#0", here);
452✔
184
        if (is_console())
452✔
185
        {
186
            register_name("/0/locality#console", here);
452✔
187
        }
452✔
188
    }    // }}}
452✔
189

190
    void addressing_service::adjust_local_cache_size(std::size_t cache_size)
×
191
    {    // {{{
192
        // adjust the local AGAS cache size for the number of worker threads and
193
        // create the hierarchy based on the topology
194
        if (caching_)
×
195
        {
196
            std::size_t previous = gva_cache_->size();
×
197
            gva_cache_->reserve(cache_size);
×
198

199
            LAGAS_(info).format(
×
200
                "addressing_service::adjust_local_cache_size, previous size: "
×
201
                "{1}, new size: {2}",
202
                previous, cache_size);
203
        }
×
204
    }    // }}}
×
205

206
    void addressing_service::set_local_locality(naming::gid_type const& g)
594✔
207
    {
208
        locality_ = g;
594✔
209
        primary_ns_.set_local_locality(g);
594✔
210
    }
594✔
211

212
    bool addressing_service::register_locality(
142✔
213
        parcelset::endpoints_type const& endpoints, naming::gid_type& prefix,
214
        std::uint32_t num_threads, error_code& ec)
215
    {    // {{{
216
        try
217
        {
218
            prefix = naming::get_gid_from_locality_id(
142✔
219
                locality_ns_->allocate(endpoints, 0, num_threads, prefix));
142✔
220

221
            {
222
                std::unique_lock<mutex_type> l(resolved_localities_mtx_);
142✔
223
                std::pair<resolved_localities_type::iterator, bool> res =
224
                    resolved_localities_.insert(
142✔
225
                        std::make_pair(prefix, endpoints));
142✔
226

227
                if (!res.second)
142✔
228
                {
229
                    l.unlock();
×
230
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
231
                        "addressing_service::register_locality",
232
                        "locality insertion failed because of a duplicate");
233
                    return false;
×
234
                }
235
            }
142✔
236

237
            return true;
142✔
238
        }
×
239
        catch (hpx::exception const& e)
240
        {
241
            HPX_RETHROWS_IF(ec, e, "addressing_service::register_locality");
×
242
            return false;
×
243
        }
×
244
    }    // }}}
142✔
245

246
    void addressing_service::register_console(
142✔
247
        parcelset::endpoints_type const& eps)
248
    {
249
        std::lock_guard<mutex_type> l(resolved_localities_mtx_);
142✔
250
        std::pair<resolved_localities_type::iterator, bool> res =
251
            resolved_localities_.insert(
142✔
252
                std::make_pair(naming::get_gid_from_locality_id(0), eps));
142✔
253
        HPX_ASSERT(res.second);
142✔
254
        HPX_UNUSED(res);
142✔
255
    }
142✔
256

257
    bool addressing_service::has_resolved_locality(naming::gid_type const& gid)
×
258
    {    // {{{
259
        std::unique_lock<mutex_type> l(resolved_localities_mtx_);
×
260
        return resolved_localities_.find(gid) != resolved_localities_.end();
×
261
    }    // }}}
×
262

263
    void addressing_service::pre_cache_endpoints(
594✔
264
        std::vector<parcelset::endpoints_type> const& endpoints)
265
    {    // {{{
266
        std::unique_lock<mutex_type> l(resolved_localities_mtx_);
594✔
267
        std::uint32_t locality_id = 0;
594✔
268
        for (parcelset::endpoints_type const& endpoint : endpoints)
1,471✔
269
        {
270
            resolved_localities_.insert(resolved_localities_type::value_type(
1,754✔
271
                naming::get_gid_from_locality_id(locality_id), endpoint));
877✔
272
            ++locality_id;
877✔
273
        }
274
    }    // }}}
594✔
275

276
    parcelset::endpoints_type const& addressing_service::resolve_locality(
466,459✔
277
        naming::gid_type const& gid, error_code& ec)
278
    {    // {{{
279
        std::unique_lock<mutex_type> l(resolved_localities_mtx_);
466,459✔
280
        resolved_localities_type::iterator it = resolved_localities_.find(gid);
466,459✔
281
        if (it == resolved_localities_.end() || it->second.empty())
466,459✔
282
        {
283
            // The locality hasn't been requested to be resolved yet. Do it now.
284
            parcelset::endpoints_type endpoints;
×
285
            {
286
                hpx::unlock_guard<std::unique_lock<mutex_type>> ul(l);
×
287
                endpoints = locality_ns_->resolve_locality(gid);
×
288
                if (endpoints.empty())
×
289
                {
290
                    std::string str = hpx::util::format(
×
291
                        "couldn't resolve the given target locality ({})", gid);
×
292

293
                    l.unlock();
×
294

295
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
296
                        "addressing_service::resolve_locality", str);
297
                    return resolved_localities_[naming::invalid_gid];
×
298
                }
×
299
            }
×
300

301
            // Search again ... might have been added by a different thread already
302
            it = resolved_localities_.find(gid);
×
303
            if (it == resolved_localities_.end())
×
304
            {
305
                if (HPX_UNLIKELY(!util::insert_checked(
×
306
                        resolved_localities_.insert(
307
                            std::make_pair(gid, endpoints)),
308
                        it)))
309
                {
310
                    l.unlock();
×
311

312
                    HPX_THROWS_IF(ec, hpx::error::internal_server_error,
×
313
                        "addressing_service::resolve_locality",
314
                        "resolved locality insertion failed "
315
                        "due to a locking error or memory corruption");
316
                    return resolved_localities_[naming::invalid_gid];
×
317
                }
318
            }
×
319
            else if (it->second.empty() && !endpoints.empty())
×
320
            {
321
                resolved_localities_[gid] = endpoints;
×
322
            }
×
323
        }
×
324
        return it->second;
466,459✔
325
    }    // }}}
466,459✔
326

327
    // TODO: We need to ensure that the locality isn't unbound while it still holds
328
    // referenced objects.
329
    bool addressing_service::unregister_locality(
594✔
330
        naming::gid_type const& gid, error_code& ec)
331
    {    // {{{
332
        try
333
        {
334
            locality_ns_->free(gid);
594✔
335
            component_ns_->unregister_server_instance(ec);
594✔
336
            symbol_ns_.unregister_server_instance(ec);
594✔
337

338
            remove_resolved_locality(gid);
594✔
339
            return true;
594✔
340
        }
×
341
        catch (hpx::exception const& e)
342
        {
343
            HPX_RETHROWS_IF(ec, e, "addressing_service::unregister_locality");
×
344
            return false;
×
345
        }
×
346
    }    // }}}
594✔
347

348
    void addressing_service::remove_resolved_locality(
595✔
349
        naming::gid_type const& gid)
350
    {
351
        std::lock_guard<mutex_type> l(resolved_localities_mtx_);
595✔
352
        resolved_localities_type::iterator it = resolved_localities_.find(gid);
595✔
353
        if (it != resolved_localities_.end())
595✔
354
            resolved_localities_.erase(it);
594✔
355
    }
595✔
356

357
    bool addressing_service::get_console_locality(
1,415✔
358
        naming::gid_type& prefix, error_code& ec)
359
    {    // {{{
360
        try
361
        {
362
            if (get_status() != hpx::state::running)
1,415✔
363
            {
364
                if (&ec != &throws)
×
365
                    ec = make_success_code();
×
366
                return false;
×
367
            }
368

369
            if (is_console())
1,415✔
370
            {
371
                prefix = get_local_locality();
1,391✔
372
                if (&ec != &throws)
1,391✔
373
                    ec = make_success_code();
×
374
                return true;
1,391✔
375
            }
376

377
            {
378
                std::lock_guard<mutex_type> lock(console_cache_mtx_);
24✔
379

380
                if (console_cache_ != naming::invalid_locality_id)
24✔
381
                {
382
                    prefix = naming::get_gid_from_locality_id(console_cache_);
×
383
                    if (&ec != &throws)
×
384
                        ec = make_success_code();
×
385
                    return true;
×
386
                }
387
            }
24✔
388

389
            std::string key("/0/locality#console");
24✔
390

391
            hpx::id_type resolved_prefix = resolve_name(key);
24✔
392
            if (resolved_prefix != hpx::invalid_id)
24✔
393
            {
394
                std::uint32_t console =
24✔
395
                    naming::get_locality_id_from_id(resolved_prefix);
24✔
396
                prefix = resolved_prefix.get_gid();
24✔
397

398
                {
399
                    std::unique_lock<mutex_type> lock(console_cache_mtx_);
24✔
400
                    if (console_cache_ == naming::invalid_locality_id)
24✔
401
                    {
402
                        console_cache_ = console;
24✔
403
                    }
24✔
404
                    else
405
                    {
406
                        HPX_ASSERT_LOCKED(lock, console_cache_ == console);
×
407
                    }
408
                }
24✔
409

410
                LAGAS_(debug).format(
24✔
411
                    "addressing_server::get_console_locality, caching console "
×
412
                    "locality, prefix({1})",
413
                    console);
414

415
                return true;
24✔
416
            }
417

418
            return false;
×
419
        }
24✔
420
        catch (hpx::exception const& e)
421
        {
422
            HPX_RETHROWS_IF(ec, e, "addressing_service::get_console_locality");
×
423
            return false;
×
424
        }
×
425
    }    // }}}
1,415✔
426

427
    bool addressing_service::get_localities(
1,456✔
428
        std::vector<naming::gid_type>& locality_ids,
429
        components::component_type type, error_code& ec)
430
    {    // {{{ get_locality_ids implementation
431
        try
432
        {
433
            if (type != components::component_invalid)
1,456✔
434
            {
435
                std::vector<std::uint32_t> const p =
436
                    component_ns_->resolve_id(type);
15✔
437

438
                if (p.empty())
15✔
439
                    return false;
×
440

441
                locality_ids.clear();
15✔
442
                for (unsigned int i : p)
45✔
443
                {
444
                    locality_ids.push_back(naming::get_gid_from_locality_id(i));
30✔
445
                }
446

447
                return true;
15✔
448
            }
15✔
449
            else
450
            {
451
                const std::vector<std::uint32_t> p = locality_ns_->localities();
1,441✔
452

453
                if (!p.size())
1,441✔
454
                    return false;
×
455

456
                locality_ids.clear();
1,441✔
457
                for (unsigned int i : p)
3,352✔
458
                {
459
                    locality_ids.push_back(naming::get_gid_from_locality_id(i));
1,911✔
460
                }
461
                return true;
1,441✔
462
            }
1,441✔
463
        }
×
464
        catch (hpx::exception const& e)
465
        {
466
            HPX_RETHROWS_IF(ec, e, "addressing_service::get_locality_ids");
×
467
            return false;
×
468
        }
×
469
    }    // }}}
1,456✔
470

471
    ///////////////////////////////////////////////////////////////////////////
472
    std::uint32_t addressing_service::get_num_localities(
367✔
473
        components::component_type type, error_code& ec) const
474
    {    // {{{ get_num_localities implementation
475
        try
476
        {
477
            if (type == components::component_invalid)
367✔
478
            {
479
                return locality_ns_->get_num_localities();
367✔
480
            }
481

482
            return component_ns_->get_num_localities(type).get();
×
483
        }
×
484
        catch (hpx::exception const& e)
485
        {
486
            HPX_RETHROWS_IF(ec, e, "addressing_service::get_num_localities");
×
487
        }
×
488
        return std::uint32_t(-1);
×
489
    }    // }}}
367✔
490

491
    hpx::future<std::uint32_t> addressing_service::get_num_localities_async(
9✔
492
        components::component_type type) const
493
    {    // {{{ get_num_localities implementation
494
        if (type == components::component_invalid)
9✔
495
        {
496
            return locality_ns_->get_num_localities_async();
9✔
497
        }
498

499
        return component_ns_->get_num_localities(type);
×
500
    }    // }}}
9✔
501

502
    ///////////////////////////////////////////////////////////////////////////
503
    std::uint32_t addressing_service::get_num_overall_threads(
11✔
504
        error_code& ec) const
505
    {    // {{{ get_num_overall_threads implementation
506
        try
507
        {
508
            return locality_ns_->get_num_overall_threads();
11✔
509
        }
×
510
        catch (hpx::exception const& e)
511
        {
512
            HPX_RETHROWS_IF(
×
513
                ec, e, "addressing_service::get_num_overall_threads");
514
        }
×
515
        return std::uint32_t(0);
×
516
    }    // }}}
11✔
517

518
    hpx::future<std::uint32_t>
519
    addressing_service::get_num_overall_threads_async() const
×
520
    {    // {{{
521
        return locality_ns_->get_num_overall_threads_async();
×
522
    }    // }}}
523

524
    std::vector<std::uint32_t> addressing_service::get_num_threads(
×
525
        error_code& ec) const
526
    {    // {{{ get_num_threads implementation
527
        try
528
        {
529
            return locality_ns_->get_num_threads();
×
530
        }
×
531
        catch (hpx::exception const& e)
532
        {
533
            HPX_RETHROWS_IF(ec, e, "addressing_service::get_num_threads");
×
534
        }
×
535
        return std::vector<std::uint32_t>();
×
536
    }    // }}}
×
537

538
    hpx::future<std::vector<std::uint32_t>>
539
    addressing_service::get_num_threads_async() const
×
540
    {    // {{{
541
        return locality_ns_->get_num_threads_async();
×
542
    }    // }}}
543

544
    ///////////////////////////////////////////////////////////////////////////
545
    components::component_type addressing_service::get_component_id(
11,246✔
546
        std::string const& name, error_code& ec)
547
    {    /// {{{
548
        try
549
        {
550
            return component_ns_->bind_name(name);
11,246✔
551
        }
×
552
        catch (hpx::exception const& e)
553
        {
554
            HPX_RETHROWS_IF(ec, e, "addressing_service::get_component_id");
×
555
            return components::component_invalid;
×
556
        }
×
557
    }    // }}}
11,246✔
558

559
    void addressing_service::iterate_types(
×
560
        iterate_types_function_type const& f, error_code& ec)
561
    {    // {{{
562
        try
563
        {
564
            return component_ns_->iterate_types(f);
×
565
        }
×
566
        catch (hpx::exception const& e)
567
        {
568
            HPX_RETHROWS_IF(ec, e, "addressing_service::iterate_types");
×
569
        }
×
570
    }    // }}}
×
571

572
    std::string addressing_service::get_component_type_name(
×
573
        components::component_type id, error_code& ec)
574
    {    // {{{
575
        try
576
        {
577
            return component_ns_->get_component_type_name(id);
×
578
        }
×
579
        catch (hpx::exception const& e)
580
        {
581
            HPX_RETHROWS_IF(ec, e, "addressing_service::iterate_types");
×
582
        }
×
583
        return "<unknown>";
×
584
    }    // }}}
×
585

586
    components::component_type addressing_service::register_factory(
13,493✔
587
        std::uint32_t prefix, std::string const& name, error_code& ec)
588
    {    // {{{
589
        try
590
        {
591
            return component_ns_->bind_prefix(name, prefix);
13,493✔
592
        }
×
593
        catch (hpx::exception const& e)
594
        {
595
            HPX_RETHROWS_IF(ec, e, "addressing_service::register_factory");
×
596
            return components::component_invalid;
×
597
        }
×
598
    }    // }}}
13,493✔
599

600
    ///////////////////////////////////////////////////////////////////////////
601
    bool addressing_service::get_id_range(std::uint64_t count,
3,186✔
602
        naming::gid_type& lower_bound, naming::gid_type& upper_bound,
603
        error_code& ec)
604
    {    // {{{ get_id_range implementation
605
        try
606
        {
607
            // parcelset::endpoints_type() is an obsolete, dummy argument
608

609
            std::pair<naming::gid_type, naming::gid_type> rep(
610
                primary_ns_.allocate(count));
3,186✔
611

612
            if (rep.first == naming::invalid_gid ||
3,186✔
613
                rep.second == naming::invalid_gid)
3,186✔
614
                return false;
×
615

616
            lower_bound = rep.first;
3,186✔
617
            upper_bound = rep.second;
3,186✔
618

619
            return true;
3,186✔
620
        }
×
621
        catch (hpx::exception const& e)
622
        {
623
            HPX_RETHROWS_IF(ec, e, "addressing_service::get_id_range");
×
624
            return false;
×
625
        }
×
626
    }    // }}}
3,186✔
627

628
    bool addressing_service::bind_range_local(naming::gid_type const& lower_id,
5,217✔
629
        std::uint64_t count, naming::address const& baseaddr,
630
        std::uint64_t offset, error_code& ec)
631
    {    // {{{ bind_range implementation
632
        try
633
        {
634
            naming::gid_type const& prefix = baseaddr.locality_;
5,217✔
635

636
            // Create a global virtual address from the legacy calling convention
637
            // parameters
638
            gva const g(
5,217✔
639
                prefix, baseaddr.type_, count, baseaddr.address_, offset);
5,217✔
640

641
            primary_ns_.bind_gid(
10,434✔
642
                g, lower_id, naming::get_locality_from_gid(lower_id));
5,217✔
643

644
            if (range_caching_)
5,217✔
645
            {
646
                // Put the range into the cache.
647
                update_cache_entry(lower_id, g, ec);
5,217✔
648
            }
5,217✔
649
            else
650
            {
651
                // Only put the first GID in the range into the cache
652
                gva const first_g = g.resolve(lower_id, lower_id);
×
653
                update_cache_entry(lower_id, first_g, ec);
×
654
            }
655

656
            if (ec)
5,217✔
657
                return false;
×
658

659
            return true;
5,217✔
660
        }
×
661
        catch (hpx::exception const& e)
662
        {
663
            HPX_RETHROWS_IF(ec, e, "addressing_service::bind_range_local");
×
664
            return false;
×
665
        }
×
666
    }    // }}}
5,217✔
667

668
    bool addressing_service::bind_postproc(
2,028✔
669
        naming::gid_type const& lower_id, gva const& g, future<bool> f)
670
    {
671
        f.get();
2,028✔
672

673
        if (range_caching_)
2,028✔
674
        {
675
            // Put the range into the cache.
676
            update_cache_entry(lower_id, g);
2,028✔
677
        }
2,028✔
678
        else
679
        {
680
            // Only put the first GID in the range into the cache
681
            gva const first_g = g.resolve(lower_id, lower_id);
×
682
            update_cache_entry(lower_id, first_g);
×
683
        }
684

685
        return true;
2,028✔
686
    }
687

688
    hpx::future<bool> addressing_service::bind_range_async(
2,028✔
689
        naming::gid_type const& lower_id, std::uint64_t count,
690
        naming::address const& baseaddr, std::uint64_t offset,
691
        naming::gid_type const& locality)
692
    {
693
        // ask server
694
        naming::gid_type const& prefix = baseaddr.locality_;
2,028✔
695

696
        // Create a global virtual address from the legacy calling convention
697
        // parameters.
698
        gva const g(prefix, baseaddr.type_, count, baseaddr.address_, offset);
2,028✔
699

700
        naming::gid_type id(
701
            naming::detail::get_stripped_gid_except_dont_cache(lower_id));
2,028✔
702

703
        future<bool> f = primary_ns_.bind_gid_async(g, id, locality);
2,028✔
704

705
        return f.then(hpx::launch::sync,
2,028✔
706
            util::one_shot(hpx::bind_front(
2,028✔
707
                &addressing_service::bind_postproc, this, id, g)));
2,028✔
708
    }
2,028✔
709

710
    hpx::future<naming::address> addressing_service::unbind_range_async(
17,250✔
711
        naming::gid_type const& lower_id, std::uint64_t count)
712
    {
713
        return primary_ns_.unbind_gid_async(count, lower_id);
17,250✔
714
    }
715

716
    bool addressing_service::unbind_range_local(
1,814✔
717
        naming::gid_type const& lower_id, std::uint64_t count,
718
        naming::address& addr, error_code& ec)
719
    {    // {{{ unbind_range implementation
720
        try
721
        {
722
            addr = primary_ns_.unbind_gid(count, lower_id);
1,814✔
723

724
            return true;
1,814✔
725
        }
×
726
        catch (hpx::exception const& e)
727
        {
728
            HPX_RETHROWS_IF(ec, e, "addressing_service::unbind_range_local");
×
729
            return false;
×
730
        }
×
731
    }    // }}}
1,814✔
732

733
    /// This function will test whether the given address refers to an object
734
    /// living on the locality of the caller. We rely completely on the local AGAS
735
    /// cache and local AGAS instance, assuming that everything which is not in
736
    /// the cache is not local.
737

738
    bool addressing_service::is_local_address_cached(
1,981,061✔
739
        naming::gid_type const& gid, naming::address& addr, error_code& ec)
740
    {
741
        // Assume non-local operation if the gid is known to have been migrated
742
        naming::gid_type id(
743
            naming::detail::get_stripped_gid_except_dont_cache(gid));
1,981,065✔
744

745
#if defined(HPX_HAVE_NETWORKING)
746
        if (naming::detail::is_migratable(gid))
1,981,065✔
747
        {
748
            std::lock_guard<mutex_type> lock(migrated_objects_mtx_);
29,490✔
749
            if (was_object_migrated_locked(id))
29,490✔
750
            {
751
                if (&ec != &throws)
13,932✔
752
                    ec = make_success_code();
×
753
                return false;
13,932✔
754
            }
755
        }
29,490✔
756
#endif
757

758
        // Try to resolve the address of the GID from the locally available
759
        // information.
760

761
        // NOTE: We do not throw here for a reason; it is perfectly valid for the
762
        // GID to not be found in the cache.
763
        if (!resolve_cached(id, addr, ec) || ec)
1,967,133✔
764
        {
765
            if (ec)
103,951✔
766
                return false;
×
767

768
            // try also the local part of AGAS before giving up
769
            if (!resolve_full_local(id, addr, ec) || ec)
103,951✔
770
                return false;
8,704✔
771
        }
95,250✔
772

773
        return addr.locality_ == get_local_locality();
1,958,430✔
774
    }
1,981,066✔
775

776
    // Return true if at least one address is local.
777
    bool addressing_service::is_local_lva_encoded_address(std::uint64_t msb)
2,017,894✔
778
    {
779
        // NOTE: This should still be migration safe.
780
        return naming::detail::strip_internal_bits_and_component_type_from_gid(
4,035,788✔
781
                   msb) == get_local_locality().get_msb();
4,035,788✔
782
    }
783

784
    bool addressing_service::resolve_locally_known_addresses(
2,017,894✔
785
        naming::gid_type const& id, naming::address& addr)
786
    {
787
        // LVA-encoded GIDs (located on this machine)
788
        std::uint64_t lsb = id.get_lsb();
2,017,894✔
789
        std::uint64_t msb = id.get_msb();
2,017,894✔
790

791
        if (is_local_lva_encoded_address(msb))
2,017,894✔
792
        {
793
            addr.locality_ = get_local_locality();
1,625,605✔
794

795
            // An LSB of 0 references the runtime support component
796
            if (0 == lsb || lsb == rts_lva_)
1,625,605✔
797
            {
798
                HPX_ASSERT(rts_lva_);
1,449,752✔
799

800
                addr.type_ = components::component_runtime_support;
1,449,752✔
801
                addr.address_ =
1,449,752✔
802
                    reinterpret_cast<naming::address::address_type>(rts_lva_);
1,449,752✔
803
                return true;
1,449,752✔
804
            }
805

806
            if (naming::refers_to_local_lva(id))
175,853✔
807
            {
808
                // handle (non-migratable) components located on this locality first
809
                addr.type_ = naming::detail::get_component_type_from_gid(msb);
175,853✔
810
                addr.address_ =
175,853✔
811
                    reinterpret_cast<naming::address::address_type>(lsb);
175,853✔
812
                return true;
175,853✔
813
            }
814
        }
×
815

816
        msb = naming::detail::strip_internal_bits_from_gid(msb);
392,289✔
817

818
        // explicitly resolve localities
819
        if (naming::is_locality(id))
392,289✔
820
        {
821
            addr.locality_ = id;
20,803✔
822
            addr.type_ = components::component_runtime_support;
20,803✔
823
            // addr.address_ will be supplied on the target locality
824
            return true;
20,803✔
825
        }
826

827
        // authoritative AGAS component address resolution
828
        if (agas::locality_ns_msb == msb && agas::locality_ns_lsb == lsb)
371,486✔
829
        {
830
            addr = locality_ns_->addr();
316✔
831
            return true;
316✔
832
        }
833
        if (agas::component_ns_msb == msb && agas::component_ns_lsb == lsb)
371,171✔
834
        {
835
            addr = component_ns_->addr();
7,246✔
836
            return true;
7,246✔
837
        }
838

839
        naming::gid_type dest = naming::get_locality_from_gid(id);
363,926✔
840
        if (agas::primary_ns_lsb == lsb)
363,926✔
841
        {
842
            // primary AGAS service on locality 0?
843
            if (dest == get_local_locality())
46,368✔
844
            {
845
                addr = primary_ns_.addr();
27,621✔
846
            }
27,621✔
847
            // primary AGAS service on any locality
848
            else
849
            {
850
                addr.locality_ = dest;
18,747✔
851
                addr.type_ = hpx::components::component_agas_primary_namespace;
18,747✔
852
                // addr.address_ will be supplied on the target locality
853
            }
854
            return true;
46,368✔
855
        }
856

857
        if (agas::symbol_ns_lsb == lsb)
317,558✔
858
        {
859
            // symbol AGAS service on this locality?
860
            if (dest == get_local_locality())
17,339✔
861
            {
862
                addr = symbol_ns_.addr();
×
863
            }
×
864
            // symbol AGAS service on any locality
865
            else
866
            {
867
                addr.locality_ = dest;
17,339✔
868
                addr.type_ = hpx::components::component_agas_symbol_namespace;
17,339✔
869
                // addr.address_ will be supplied on the target locality
870
            }
871
            return true;
17,339✔
872
        }
873

874
        return false;
300,219✔
875
    }    // }}}
2,017,891✔
876

877
    bool addressing_service::resolve_full_local(
115,527✔
878
        naming::gid_type const& id, naming::address& addr, error_code& ec)
879
    {    // {{{ resolve implementation
880
        try
881
        {
882
            auto rep = primary_ns_.resolve_gid(id);
115,530✔
883

884
            using hpx::get;
885

886
            if (get<0>(rep) == naming::invalid_gid ||
115,530✔
887
                get<2>(rep) == naming::invalid_gid)
99,297✔
888
                return false;
16,233✔
889

890
            // Resolve the gva to the real resolved address (which is just a gva
891
            // with as fully resolved LVA and an offset of zero).
892
            naming::gid_type base_gid = get<0>(rep);
99,297✔
893
            gva const base_gva = get<1>(rep);
99,297✔
894

895
            gva const g = base_gva.resolve(id, base_gid);
99,297✔
896

897
            addr.locality_ = g.prefix;
99,297✔
898
            addr.type_ = g.type;
99,297✔
899
            addr.address_ = g.lva();
99,297✔
900

901
            if (naming::detail::store_in_cache(id))
99,297✔
902
            {
903
                HPX_ASSERT(addr.address_);
86,421✔
904
                if (range_caching_)
86,421✔
905
                {
906
                    // Put the range into the cache.
907
                    update_cache_entry(base_gid, base_gva, ec);
86,421✔
908
                }
86,421✔
909
                else
910
                {
911
                    // Put the fully resolved gva into the cache.
912
                    update_cache_entry(id, g, ec);
×
913
                }
914
            }
86,421✔
915

916
            if (ec)
99,297✔
917
                return false;
×
918

919
            if (&ec != &throws)
99,297✔
920
                ec = make_success_code();
×
921

922
            return true;
99,297✔
923
        }
×
924
        catch (hpx::exception const& e)
925
        {
926
            HPX_RETHROWS_IF(ec, e, "addressing_service::resolve_full_local");
×
927
            return false;
×
928
        }
×
929
    }    // }}}
115,530✔
930

931
    bool addressing_service::resolve_cached(
2,017,896✔
932
        naming::gid_type const& gid, naming::address& addr, error_code& ec)
933
    {    // {{{ resolve_cached implementation
934

935
        naming::gid_type id =
936
            naming::detail::get_stripped_gid_except_dont_cache(gid);
2,017,893✔
937

938
        // special cases
939
        if (resolve_locally_known_addresses(id, addr))
2,017,893✔
940
        {
941
            if (&ec != &throws)
1,717,674✔
942
                ec = make_success_code();
34,325✔
943
            return true;
1,717,680✔
944
        }
945

946
        // If caching is disabled, bail
947
        if (!caching_)
300,220✔
948
        {
949
            if (&ec != &throws)
×
950
                ec = make_success_code();
×
951
            return false;
×
952
        }
953

954
        // don't look at cache if id is marked as non-cache-able
955
        if (!naming::detail::store_in_cache(id))
300,219✔
956
        {
957
            if (&ec != &throws)
29,419✔
958
                ec = make_success_code();
3,232✔
959
            return false;
29,419✔
960
        }
961

962
        // don't look at the cache if the id is locally managed
963
        if (naming::get_locality_id_from_gid(id) ==
541,600✔
964
            naming::get_locality_id_from_gid(locality_))
270,800✔
965
        {
966
            if (&ec != &throws)
85,730✔
967
                ec = make_success_code();
5✔
968
            return false;
85,730✔
969
        }
970

971
        // force routing if target object was migrated
972
        if (naming::detail::is_migratable(id))
185,070✔
973
        {
974
            std::lock_guard<mutex_type> lock(migrated_objects_mtx_);
×
975
            if (was_object_migrated_locked(id))
×
976
            {
977
                if (&ec != &throws)
×
978
                    ec = make_success_code();
×
979
                return false;
×
980
            }
981
        }
×
982

983
        // first look up the requested item in the cache
984
        gva g;
185,071✔
985
        naming::gid_type idbase;
185,071✔
986
        if (get_cache_entry(id, g, idbase, ec))
185,071✔
987
        {
988
            addr.locality_ = g.prefix;
181,401✔
989
            addr.type_ = g.type;
181,401✔
990
            addr.address_ = g.lva(id, idbase);
181,401✔
991

992
            if (&ec != &throws)
181,401✔
993
                ec = make_success_code();
1,476✔
994

995
            return true;
181,401✔
996
        }
997

998
        if (&ec != &throws)
3,670✔
999
            ec = make_success_code();
4✔
1000

1001
        LAGAS_(debug).format(
3,670✔
1002
            "addressing_service::resolve_cached, cache miss for address {1}",
×
1003
            id);
1004

1005
        return false;
3,670✔
1006
    }    // }}}
2,017,900✔
1007

1008
    hpx::future<naming::address> addressing_service::resolve_async(
39,046✔
1009
        naming::gid_type const& gid)
1010
    {
1011
        if (!gid)
39,046✔
1012
        {
1013
            HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
×
1014
                "addressing_service::resolve_async", "invalid reference id");
1015
            return make_ready_future(naming::address());
1016
        }
1017

1018
        // Try the cache.
1019
        if (caching_)
39,045✔
1020
        {
1021
            naming::address addr;
39,047✔
1022
            error_code ec;
39,047✔
1023
            if (resolve_cached(gid, addr, ec))
39,045✔
1024
                return make_ready_future(addr);
35,806✔
1025

1026
            if (ec)
3,241✔
1027
            {
1028
                return hpx::make_exceptional_future<naming::address>(
×
1029
                    hpx::detail::access_exception(ec));
×
1030
            }
1031
        }
39,038✔
1032

1033
        // now try the AGAS service
1034
        return resolve_full_async(gid);
3,241✔
1035
    }
39,038✔
1036

1037
    hpx::future<hpx::id_type> addressing_service::get_colocation_id_async(
41✔
1038
        hpx::id_type const& id)
1039
    {
1040
        if (!id)
41✔
1041
        {
1042
            HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
1✔
1043
                "addressing_service::get_colocation_id_async",
1044
                "invalid reference id");
1045
            return make_ready_future(hpx::invalid_id);
1046
        }
1047

1048
        return primary_ns_.colocate(id.get_gid());
40✔
1049
    }
1✔
1050

1051
    ///////////////////////////////////////////////////////////////////////////
1052
    naming::address addressing_service::resolve_full_postproc(
3,241✔
1053
        naming::gid_type const& id, future<primary_namespace::resolved_type> f)
1054
    {
1055
        using hpx::get;
1056

1057
        naming::address addr;
3,241✔
1058

1059
        auto rep = f.get();
3,241✔
1060
        if (get<0>(rep) == naming::invalid_gid ||
3,241✔
1061
            get<2>(rep) == naming::invalid_gid)
3,241✔
1062
        {
1063
            HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
×
1064
                "addressing_service::resolve_full_postproc",
1065
                "could no resolve global id");
1066
            return addr;
1067
        }
1068

1069
        // Resolve the gva to the real resolved address (which is just a gva
1070
        // with as fully resolved LVA and and offset of zero).
1071
        naming::gid_type base_gid = get<0>(rep);
3,241✔
1072
        gva const base_gva = get<1>(rep);
3,241✔
1073

1074
        gva const g = base_gva.resolve(id, base_gid);
3,241✔
1075

1076
        addr.locality_ = g.prefix;
3,241✔
1077
        addr.type_ = g.type;
3,241✔
1078
        addr.address_ = g.lva();
3,241✔
1079

1080
        if (naming::detail::store_in_cache(id))
3,241✔
1081
        {
1082
            if (range_caching_)
9✔
1083
            {
1084
                // Put the range into the cache.
1085
                update_cache_entry(base_gid, base_gva);
9✔
1086
            }
9✔
1087
            else
1088
            {
1089
                // Put the fully resolved gva into the cache.
1090
                update_cache_entry(id, g);
×
1091
            }
1092
        }
9✔
1093

1094
        return addr;
3,241✔
1095
    }
×
1096

1097
    hpx::future<naming::address> addressing_service::resolve_full_async(
3,241✔
1098
        naming::gid_type const& gid)
1099
    {
1100
        if (!gid)
3,241✔
1101
        {
1102
            HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
×
1103
                "addressing_service::resolve_full_async",
1104
                "invalid reference id");
1105
            return make_ready_future(naming::address());
1106
        }
1107

1108
        // ask server
1109
        future<primary_namespace::resolved_type> f =
1110
            primary_ns_.resolve_full(gid);
3,241✔
1111

1112
        return f.then(hpx::launch::sync,
3,241✔
1113
            util::one_shot(hpx::bind_front(
3,241✔
1114
                &addressing_service::resolve_full_postproc, this, gid)));
3,241✔
1115
    }
3,241✔
1116

1117
    ///////////////////////////////////////////////////////////////////////////
1118
    bool addressing_service::resolve_full_local(naming::gid_type const* gids,
×
1119
        naming::address* addrs, std::size_t count,
1120
        hpx::detail::dynamic_bitset<>& locals, error_code& ec)
1121
    {
1122
        locals.resize(count);
×
1123

1124
        try
1125
        {
1126
            using hpx::get;
1127

1128
            // special cases
1129
            for (std::size_t i = 0; i != count; ++i)
×
1130
            {
1131
                if (addrs[i])
×
1132
                {
1133
                    locals.set(i, true);
×
1134
                    continue;
×
1135
                }
1136

1137
                HPX_ASSERT(!locals.test(i));
×
1138

1139
                if (!addrs[i] && !locals.test(i))
×
1140
                {
1141
                    auto rep = primary_ns_.resolve_gid(gids[i]);
×
1142

1143
                    if (get<0>(rep) == naming::invalid_gid ||
×
1144
                        get<2>(rep) == naming::invalid_gid)
×
1145
                        return false;
×
1146
                    // Resolve the gva to the real resolved address (which is
1147
                    // just a gva with as fully resolved LVA and and offset of
1148
                    // zero).
1149
                    naming::gid_type base_gid = get<0>(rep);
×
1150
                    gva const base_gva = get<1>(rep);
×
1151

1152
                    gva const g = base_gva.resolve(gids[i], base_gid);
×
1153

1154
                    naming::address& addr = addrs[i];
×
1155
                    addr.locality_ = g.prefix;
×
1156
                    addr.type_ = g.type;
×
1157
                    addr.address_ = g.lva();
×
1158

1159
                    hpx::error_code ec;
×
1160
                    if (naming::detail::store_in_cache(gids[i]))
×
1161
                    {
1162
                        if (range_caching_)
×
1163
                        {
1164
                            // Put the range into the cache.
1165
                            update_cache_entry(base_gid, base_gva, ec);
×
1166
                        }
×
1167
                        else
1168
                        {
1169
                            // Put the fully resolved gva into the cache.
1170
                            update_cache_entry(gids[i], g, ec);
×
1171
                        }
1172
                    }
×
1173

1174
                    if (ec)
×
1175
                        return false;
×
1176
                }
×
1177
            }
×
1178

1179
            return true;
×
1180
        }
×
1181
        catch (hpx::exception const& e)
1182
        {
1183
            HPX_RETHROWS_IF(ec, e, "addressing_service::resolve_full");
×
1184
            return false;
×
1185
        }
×
1186
    }
×
1187

1188
    bool addressing_service::resolve_cached(naming::gid_type const* gids,
×
1189
        naming::address* addrs, std::size_t count,
1190
        hpx::detail::dynamic_bitset<>& locals, error_code& ec)
1191
    {
1192
        locals.resize(count);
×
1193

1194
        std::size_t resolved = 0;
×
1195
        for (std::size_t i = 0; i != count; ++i)
×
1196
        {
1197
            if (!addrs[i] && !locals.test(i))
×
1198
            {
1199
                bool was_resolved = resolve_cached(gids[i], addrs[i], ec);
×
1200
                if (ec)
×
1201
                    return false;
×
1202
                if (was_resolved)
×
1203
                    ++resolved;
×
1204

1205
                if (addrs[i].locality_ == get_local_locality())
×
1206
                    locals.set(i, true);
×
1207
            }
×
1208

1209
            else if (addrs[i].locality_ == get_local_locality())
×
1210
            {
1211
                ++resolved;
×
1212
                locals.set(i, true);
×
1213
            }
×
1214
        }
×
1215

1216
        return resolved == count;    // returns whether all have been resolved
×
1217
    }
×
1218

1219
#if defined(HPX_HAVE_NETWORKING)
1220
    ///////////////////////////////////////////////////////////////////////////
1221
    void addressing_service::route(parcelset::parcel p,
7,531✔
1222
        hpx::function<void(std::error_code const&, parcelset::parcel const&)>&&
1223
            f,
1224
        threads::thread_priority local_priority)
1225
    {
1226
        if (HPX_UNLIKELY(nullptr == threads::get_self_ptr()))
7,531✔
1227
        {
1228
            // reschedule this call as an HPX thread
1229
            void (addressing_service::*route_ptr)(parcelset::parcel,
1✔
1230
                hpx::function<void(
1231
                    std::error_code const&, parcelset::parcel const&)>&&,
1232
                threads::thread_priority) = &addressing_service::route;
1233

1234
            threads::thread_init_data data(
1✔
1235
                threads::make_thread_function_nullary(util::deferred_call(
1✔
1236
                    route_ptr, this, HPX_MOVE(p), HPX_MOVE(f), local_priority)),
1✔
1237
                "addressing_service::route", threads::thread_priority::normal,
1✔
1238
                threads::thread_schedule_hint(),
1✔
1239
                threads::thread_stacksize::default_,
1240
                threads::thread_schedule_state::pending, true);
1241
            threads::register_thread(data);
1✔
1242
            return;
1243
        }
1✔
1244

1245
        primary_ns_.route(HPX_MOVE(p), HPX_MOVE(f));
7,530✔
1246
    }
7,531✔
1247
#endif
1248

1249
    ///////////////////////////////////////////////////////////////////////////
1250
    // The parameter 'compensated_credit' holds the amount of credits to be added
1251
    // to the acknowledged number of credits. The compensated credits are non-zero
1252
    // if there was a pending decref request at the point when the incref was sent.
1253
    // The pending decref was subtracted from the amount of credits to incref.
1254
    std::int64_t addressing_service::synchronize_with_async_incref(
9,366✔
1255
        hpx::future<std::int64_t> fut, hpx::id_type const&,
1256
        std::int64_t compensated_credit)
1257
    {
1258
        return fut.get() + compensated_credit;
9,366✔
1259
    }
1260

1261
    hpx::future<std::int64_t> addressing_service::incref_async(
9,983✔
1262
        naming::gid_type const& id, std::int64_t credit,
1263
        hpx::id_type const& keep_alive)
1264
    {    // {{{ incref implementation
1265
        naming::gid_type raw(naming::detail::get_stripped_gid(id));
9,983✔
1266

1267
        if (HPX_UNLIKELY(nullptr == threads::get_self_ptr()))
9,983✔
1268
        {
1269
            // reschedule this call as an HPX thread
1270
            hpx::future<std::int64_t> (addressing_service::*incref_async_ptr)(
×
1271
                naming::gid_type const&, std::int64_t, hpx::id_type const&) =
1272
                &addressing_service::incref_async;
1273

1274
            return async(incref_async_ptr, this, raw, credit, keep_alive);
×
1275
        }
1276

1277
        if (HPX_UNLIKELY(0 >= credit))
9,983✔
1278
        {
1279
            HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
×
1280
                "addressing_service::incref_async",
1281
                "invalid credit count of {1}", credit);
1282
            return hpx::future<std::int64_t>();
1283
        }
1284

1285
        HPX_ASSERT(keep_alive != hpx::invalid_id);
9,983✔
1286

1287
        using mapping = refcnt_requests_type::value_type;
1288

1289
        // Some examples of calculating the compensated credits below
1290
        //
1291
        //  case   pending   credits   remaining   sent to   compensated
1292
        //  no     decref              decrefs     AGAS      credits
1293
        // ------+---------+---------+------------+--------+-------------
1294
        //   1         0        10        0           0        10
1295
        //   2        10         9        1           0        10
1296
        //   3        10        10        0           0        10
1297
        //   4        10        11        0           1        10
1298

1299
        std::pair<naming::gid_type, std::int64_t> pending_incref;
9,983✔
1300
        bool has_pending_incref = false;
9,983✔
1301
        std::int64_t pending_decrefs = 0;
9,983✔
1302

1303
        {
1304
            std::lock_guard<mutex_type> l(refcnt_requests_mtx_);
9,983✔
1305

1306
            using iterator = refcnt_requests_type::iterator;
1307

1308
            iterator matches = refcnt_requests_->find(raw);
9,983✔
1309
            if (matches != refcnt_requests_->end())
9,983✔
1310
            {
1311
                pending_decrefs = matches->second;
773✔
1312
                matches->second += credit;
773✔
1313

1314
                // Increment requests need to be handled immediately.
1315

1316
                // If the given incref was fully compensated by a pending decref
1317
                // (i.e. match_data is less than 0) then there is no need
1318
                // to do anything more.
1319
                if (matches->second > 0)
773✔
1320
                {
1321
                    // credit > decrefs (case no 4): store the remaining incref to
1322
                    // be handled below.
1323
                    pending_incref = mapping(matches->first, matches->second);
156✔
1324
                    has_pending_incref = true;
156✔
1325

1326
                    refcnt_requests_->erase(matches);
156✔
1327
                }
156✔
1328
                else if (matches->second == 0)
617✔
1329
                {
1330
                    // credit == decref (case no. 3): if the incref offsets any
1331
                    // pending decref, just remove the pending decref request.
1332
                    refcnt_requests_->erase(matches);
516✔
1333
                }
516✔
1334
                else
1335
                {
1336
                    // credit < decref (case no. 2): do nothing
1337
                }
1338
            }
773✔
1339
            else
1340
            {
1341
                // case no. 1
1342
                pending_incref = mapping(raw, credit);
9,210✔
1343
                has_pending_incref = true;
9,210✔
1344
            }
1345
        }
9,983✔
1346

1347
        if (!has_pending_incref)
9,983✔
1348
        {
1349
            // no need to talk to AGAS, acknowledge the incref immediately
1350
            return hpx::make_ready_future(pending_decrefs);
617✔
1351
        }
1352

1353
        naming::gid_type const e_lower = pending_incref.first;
9,366✔
1354

1355
        hpx::future<std::int64_t> f = primary_ns_.increment_credit(
18,732✔
1356
            pending_incref.second, e_lower, e_lower);
9,366✔
1357

1358
        // pass the amount of compensated decrefs to the callback
1359
        using placeholders::_1;
1360
        return f.then(hpx::launch::sync,
9,366✔
1361
            util::one_shot(
9,366✔
1362
                hpx::bind(&addressing_service::synchronize_with_async_incref,
18,732✔
1363
                    this, _1, keep_alive, pending_decrefs)));
9,366✔
1364
    }    // }}}
9,983✔
1365

1366
    ///////////////////////////////////////////////////////////////////////////
1367
    void addressing_service::decref(
99,278✔
1368
        naming::gid_type const& gid, std::int64_t credit, error_code& ec)
1369
    {    // {{{ decref implementation
1370
        naming::gid_type raw(naming::detail::get_stripped_gid(gid));
99,278✔
1371

1372
        if (HPX_UNLIKELY(nullptr == threads::get_self_ptr()))
99,278✔
1373
        {
1374
            // reschedule this call as an HPX thread
1375
            threads::thread_init_data data(
6,870✔
1376
                threads::make_thread_function_nullary(
6,870✔
1377
                    [HPX_CXX20_CAPTURE_THIS(=)]() -> void {
27,480✔
1378
                        return decref(raw, credit, throws);
6,870✔
1379
                    }),
1380
                "addressing_service::decref", threads::thread_priority::normal,
6,870✔
1381
                threads::thread_schedule_hint(),
6,870✔
1382
                threads::thread_stacksize::default_,
1383
                threads::thread_schedule_state::pending, true);
1384
            threads::register_thread(data, ec);
6,870✔
1385
            return;
1386
        }
6,870✔
1387

1388
        if (HPX_UNLIKELY(credit <= 0))
92,408✔
1389
        {
1390
            HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
1391
                "addressing_service::decref", "invalid credit count of {1}",
1392
                credit);
1393
            return;
×
1394
        }
1395

1396
        try
1397
        {
1398
            std::unique_lock<mutex_type> l(refcnt_requests_mtx_);
92,408✔
1399

1400
            // Match the decref request with entries in the incref table
1401
            using iterator = refcnt_requests_type::iterator;
1402
            using mapping = refcnt_requests_type::value_type;
1403

1404
            iterator matches = refcnt_requests_->find(raw);
92,408✔
1405
            if (matches != refcnt_requests_->end())
92,408✔
1406
            {
1407
                matches->second -= credit;
46,637✔
1408
            }
46,637✔
1409
            else
1410
            {
1411
                std::pair<iterator, bool> p =
1412
                    refcnt_requests_->insert(mapping(raw, -credit));
45,771✔
1413

1414
                if (HPX_UNLIKELY(!p.second))
45,771✔
1415
                {
1416
                    l.unlock();
×
1417

1418
                    HPX_THROWS_IF(ec, hpx::error::bad_parameter,
×
1419
                        "addressing_service::decref",
1420
                        "couldn't insert decref request for {1} ({2})", raw,
1421
                        credit);
1422
                    return;
×
1423
                }
1424
            }
1425

1426
            send_refcnt_requests(l, ec);
92,408✔
1427
        }
92,408✔
1428
        catch (hpx::exception const& e)
1429
        {
1430
            HPX_RETHROWS_IF(ec, e, "addressing_service::decref");
×
1431
        }
×
1432
    }    // }}}
99,278✔
1433

1434
    ///////////////////////////////////////////////////////////////////////////
1435
    static bool correct_credit_on_failure(future<bool> f, hpx::id_type id,
2,476✔
1436
        std::int64_t mutable_gid_credit, std::int64_t new_gid_credit)
1437
    {
1438
        // Return the credit to the GID if the operation failed
1439
        if ((f.has_exception() && mutable_gid_credit != 0) || !f.get())
2,476✔
1440
        {
1441
            naming::detail::add_credit_to_gid(id.get_gid(), new_gid_credit);
2✔
1442
            return false;
2✔
1443
        }
1444
        return true;
2,474✔
1445
    }
2,476✔
1446

1447
    bool addressing_service::register_name(
3,000✔
1448
        std::string const& name, naming::gid_type const& id, error_code& ec)
1449
    {    // {{{
1450
        try
1451
        {
1452
            return symbol_ns_.bind(name, naming::detail::get_stripped_gid(id));
3,000✔
1453
        }
×
1454
        catch (hpx::exception const& e)
1455
        {
1456
            HPX_RETHROWS_IF(ec, e, "addressing_service::register_name");
×
1457
        }
×
1458
        return false;
×
1459
    }    // }}}
3,000✔
1460

1461
    bool addressing_service::register_name(
1,527✔
1462
        std::string const& name, hpx::id_type const& id, error_code& ec)
1463
    {
1464
        // We need to modify the reference count.
1465
        naming::gid_type& mutable_gid = const_cast<hpx::id_type&>(id).get_gid();
1,527✔
1466
        naming::gid_type new_gid =
1467
            naming::detail::split_gid_if_needed(mutable_gid).get();
1,527✔
1468
        std::int64_t new_credit = naming::detail::get_credit_from_gid(new_gid);
1,527✔
1469

1470
        try
1471
        {
1472
            return symbol_ns_.bind(name, new_gid);
1,527✔
1473
        }
×
1474
        catch (hpx::exception const& e)
1475
        {
1476
            if (new_credit != 0)
×
1477
            {
1478
                naming::detail::add_credit_to_gid(mutable_gid, new_credit);
×
1479
            }
×
1480
            HPX_RETHROWS_IF(ec, e, "addressing_service::register_name");
×
1481
        }
×
1482
        return false;
×
1483
    }
1,527✔
1484

1485
    hpx::future<bool> addressing_service::register_name_async(
3,143✔
1486
        std::string const& name, hpx::id_type const& id)
1487
    {    // {{{
1488
        // We need to modify the reference count.
1489
        naming::gid_type& mutable_gid = const_cast<hpx::id_type&>(id).get_gid();
3,143✔
1490
        naming::gid_type new_gid =
1491
            naming::detail::split_gid_if_needed(mutable_gid).get();
3,143✔
1492

1493
        future<bool> f = symbol_ns_.bind_async(name, new_gid);
3,143✔
1494

1495
        std::int64_t new_credit = naming::detail::get_credit_from_gid(new_gid);
3,143✔
1496
        if (new_credit != 0)
3,143✔
1497
        {
1498
            return f.then(hpx::launch::sync,
2,476✔
1499
                util::one_shot(hpx::bind_back(&correct_credit_on_failure, id,
4,952✔
1500
                    std::int64_t(HPX_GLOBALCREDIT_INITIAL), new_credit)));
2,476✔
1501
        }
1502

1503
        return f;
667✔
1504
    }    // }}}
3,143✔
1505

1506
    ///////////////////////////////////////////////////////////////////////////
1507
    hpx::id_type addressing_service::unregister_name(
2,747✔
1508
        std::string const& name, error_code& ec)
1509
    {    // {{{
1510
        try
1511
        {
1512
            return symbol_ns_.unbind(name);
2,747✔
1513
        }
×
1514
        catch (hpx::exception const& e)
1515
        {
1516
            HPX_RETHROWS_IF(ec, e, "addressing_service::unregister_name");
×
1517
            return hpx::invalid_id;
×
1518
        }
×
1519
    }    // }}}
2,747✔
1520

1521
    hpx::future<hpx::id_type> addressing_service::unregister_name_async(
680✔
1522
        std::string const& name)
1523
    {    // {{{
1524
        return symbol_ns_.unbind_async(name);
680✔
1525
    }    // }}}
×
1526

1527
    ///////////////////////////////////////////////////////////////////////////
1528
    hpx::id_type addressing_service::resolve_name(
114✔
1529
        std::string const& name, error_code& ec)
1530
    {    // {{{
1531
        try
1532
        {
1533
            return symbol_ns_.resolve(name);
114✔
1534
        }
×
1535
        catch (hpx::exception const& e)
1536
        {
1537
            HPX_RETHROWS_IF(ec, e, "addressing_service::resolve_name");
×
1538
            return hpx::invalid_id;
×
1539
        }
×
1540
    }    // }}}
114✔
1541

1542
    hpx::future<hpx::id_type> addressing_service::resolve_name_async(
×
1543
        std::string const& name)
1544
    {    // {{{
1545
        return symbol_ns_.resolve_async(name);
×
1546
    }    // }}}
×
1547

1548
    namespace detail {
1549
        hpx::future<hpx::id_type> on_register_event(
61,783✔
1550
            hpx::future<bool> f, hpx::future<hpx::id_type> result_f)
1551
        {
1552
            if (!f.get())
61,784✔
1553
            {
1554
                HPX_THROW_EXCEPTION(hpx::error::bad_request,
×
1555
                    "hpx::agas::detail::on_register_event",
1556
                    "request 'symbol_ns_on_event' failed");
1557
                return hpx::future<hpx::id_type>();
1558
            }
1559

1560
            return result_f;
61,784✔
1561
        }
×
1562
    }    // namespace detail
1563

1564
    future<hpx::id_type> addressing_service::on_symbol_namespace_event(
61,785✔
1565
        std::string const& name, bool call_for_past_events)
1566
    {
1567
        hpx::distributed::promise<hpx::id_type, naming::gid_type> p;
61,785✔
1568
        auto result_f = p.get_future();
61,785✔
1569

1570
        hpx::future<bool> f =
1571
            symbol_ns_.on_event(name, call_for_past_events, p.get_id());
61,785✔
1572

1573
        return f.then(hpx::launch::sync,
61,785✔
1574
            util::one_shot(hpx::bind_back(
61,785✔
1575
                &detail::on_register_event, HPX_MOVE(result_f))));
61,785✔
1576
    }
61,785✔
1577

1578
    // Return all matching entries in the symbol namespace
1579
    hpx::future<addressing_service::iterate_names_return_type>
1580
    addressing_service::iterate_ids(std::string const& pattern)
×
1581
    {    // {{{
1582
        return symbol_ns_.iterate_async(pattern);
×
1583
    }    // }}}
1584

1585
    // This function has to return false if the key is already in the cache (true
1586
    // means go ahead with the cache update).
1587
    bool check_for_collisions(addressing_service::gva_cache_key const& new_key,
546✔
1588
        addressing_service::gva_cache_key const& old_key)
1589
    {
1590
        return (new_key.get_gid() != old_key.get_gid()) ||
546✔
1591
            (new_key.get_count() != old_key.get_count());
546✔
1592
    }
1593

1594
    void addressing_service::update_cache_entry(
94,698✔
1595
        naming::gid_type const& id, gva const& g, error_code& ec)
1596
    {    // {{{
1597
        if (!caching_)
94,698✔
1598
        {
1599
            // If caching is disabled, we silently pretend success.
1600
            if (&ec != &throws)
×
1601
                ec = make_success_code();
×
1602
            return;
×
1603
        }
1604

1605
        // don't look at cache if id is marked as non-cache-able
1606
        if (!naming::detail::store_in_cache(id))
94,698✔
1607
        {
1608
            if (&ec != &throws)
2,092✔
1609
                ec = make_success_code();
×
1610
            return;
2,092✔
1611
        }
1612

1613
        naming::gid_type gid = naming::detail::get_stripped_gid(id);
92,606✔
1614

1615
        // don't look at the cache if the id is locally managed
1616
        if (naming::get_locality_id_from_gid(gid) ==
185,212✔
1617
            naming::get_locality_id_from_gid(locality_))
92,606✔
1618
        {
1619
            if (&ec != &throws)
90,840✔
1620
                ec = make_success_code();
×
1621
            return;
90,840✔
1622
        }
1623

1624
        if (hpx::threads::get_self_ptr() == nullptr)
1,766✔
1625
        {
1626
            // Don't update the cache while HPX is starting up ...
1627
            if (hpx::is_starting())
×
1628
            {
1629
                return;
×
1630
            }
1631
            threads::thread_init_data data(
×
1632
                threads::make_thread_function_nullary(
×
1633
                    [HPX_CXX20_CAPTURE_THIS(=)]() -> void {
×
1634
                        return update_cache_entry(id, g, throws);
×
1635
                    }),
1636
                "addressing_service::update_cache_entry",
×
1637
                threads::thread_priority::normal,
1638
                threads::thread_schedule_hint(),
×
1639
                threads::thread_stacksize::default_,
1640
                threads::thread_schedule_state::pending, true);
1641
            threads::register_thread(data, ec);
×
1642
        }
×
1643

1644
        try
1645
        {
1646
            // The entry in AGAS for a locality's RTS component has a count of 0,
1647
            // so we convert it to 1 here so that the cache doesn't break.
1648
            const std::uint64_t count = (g.count ? g.count : 1);
1,766✔
1649

1650
            LAGAS_(debug).format(
1,766✔
1651
                "addressing_service::update_cache_entry, gid({1}), count({2})",
×
1652
                gid, count);
1653

1654
            const gva_cache_key key(gid, count);
1,766✔
1655

1656
            {
1657
                std::unique_lock<mutex_type> lock(gva_cache_mtx_);
1,766✔
1658
                if (!gva_cache_->update_if(key, g, check_for_collisions))
1,766✔
1659
                {
1660
                    if (LAGAS_ENABLED(warning))
×
1661
                    {
1662
                        // Figure out who we collided with.
1663
                        addressing_service::gva_cache_key idbase;
×
1664
                        addressing_service::gva_cache_type::entry_type e;
×
1665

1666
                        if (!gva_cache_->get_entry(key, idbase, e))
×
1667
                        {
1668
                            // This is impossible under sane conditions.
1669
                            lock.unlock();
×
1670
                            HPX_THROWS_IF(ec, hpx::error::invalid_data,
×
1671
                                "addressing_service::update_cache_entry",
1672
                                "data corruption or lock error occurred in "
1673
                                "cache");
1674
                            return;
×
1675
                        }
1676

1677
                        LAGAS_(warning).format(
×
1678
                            "addressing_service::update_cache_entry, aborting "
×
1679
                            "update due to key collision in cache, "
1680
                            "new_gid({1}), new_count({2}), old_gid({3}), "
1681
                            "old_count({4})",
1682
                            gid, count, idbase.get_gid(), idbase.get_count());
×
1683
                    }
×
1684
                }
×
1685
            }
1,766✔
1686

1687
            if (&ec != &throws)
1,766✔
1688
                ec = make_success_code();
×
1689
        }
1,766✔
1690
        catch (hpx::exception const& e)
1691
        {
1692
            HPX_RETHROWS_IF(ec, e, "addressing_service::update_cache_entry");
×
1693
        }
×
1694
    }    // }}}
94,698✔
1695

1696
    bool addressing_service::get_cache_entry(naming::gid_type const& gid,
185,070✔
1697
        gva& gva, naming::gid_type& idbase, error_code& ec)
1698
    {
1699
        // Don't use the cache while HPX is starting up
1700
        if (hpx::is_starting())
185,070✔
1701
        {
1702
            return false;
901✔
1703
        }
1704
        gva_cache_key k(gid);
184,170✔
1705
        gva_cache_key idbase_key;
184,170✔
1706

1707
        std::unique_lock<mutex_type> lock(gva_cache_mtx_);
184,170✔
1708
        if (gva_cache_->get_entry(k, idbase_key, gva))
184,169✔
1709
        {
1710
            const std::uint64_t id_msb =
181,401✔
1711
                naming::detail::strip_internal_bits_from_gid(gid.get_msb());
181,401✔
1712

1713
            if (HPX_UNLIKELY(id_msb != idbase_key.get_gid().get_msb()))
181,401✔
1714
            {
1715
                lock.unlock();
×
1716
                HPX_THROWS_IF(ec, hpx::error::internal_server_error,
×
1717
                    "addressing_service::get_cache_entry",
1718
                    "bad entry in cache, MSBs of GID base and GID do not "
1719
                    "match");
1720
                return false;
×
1721
            }
1722
            idbase = idbase_key.get_gid();
181,401✔
1723
            return true;
181,401✔
1724
        }
1725

1726
        return false;
2,769✔
1727
    }
185,071✔
1728

1729
    void addressing_service::clear_cache(error_code& ec)
×
1730
    {    // {{{
1731
        if (!caching_)
×
1732
        {
1733
            // If caching is disabled, we silently pretend success.
1734
            if (&ec != &throws)
×
1735
                ec = make_success_code();
×
1736
            return;
×
1737
        }
1738

1739
        try
1740
        {
1741
            LAGAS_(warning).format(
×
1742
                "addressing_service::clear_cache, clearing cache");
×
1743

1744
            std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1745

1746
            gva_cache_->clear();
×
1747

1748
            if (&ec != &throws)
×
1749
                ec = make_success_code();
×
1750
        }
×
1751
        catch (hpx::exception const& e)
1752
        {
1753
            HPX_RETHROWS_IF(ec, e, "addressing_service::clear_cache");
×
1754
        }
×
1755
    }    // }}}
×
1756

1757
    void addressing_service::remove_cache_entry(
4,842✔
1758
        naming::gid_type const& id, error_code& ec)
1759
    {
1760
        // If caching is disabled, we silently pretend success.
1761
        if (!caching_)
4,842✔
1762
        {
1763
            if (&ec != &throws)
×
1764
                ec = make_success_code();
×
1765
            return;
×
1766
        }
1767

1768
        // don't look at cache if id is marked as non-cache-able
1769
        if (!naming::detail::store_in_cache(id))
4,842✔
1770
        {
1771
            if (&ec != &throws)
4,842✔
1772
                ec = make_success_code();
×
1773
            return;
4,842✔
1774
        }
1775

1776
        naming::gid_type gid = naming::detail::get_stripped_gid(id);
×
1777

1778
        // don't look at the cache if the id is locally managed
1779
        if (naming::get_locality_id_from_gid(gid) ==
×
1780
            naming::get_locality_id_from_gid(locality_))
×
1781
        {
1782
            if (&ec != &throws)
×
1783
                ec = make_success_code();
×
1784
            return;
×
1785
        }
1786

1787
        try
1788
        {
1789
            LAGAS_(warning).format("addressing_service::remove_cache_entry");
×
1790

1791
            std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1792

1793
            gva_cache_->erase([&gid](std::pair<gva_cache_key, gva> const& p) {
×
1794
                return gid == p.first.get_gid();
×
1795
            });
1796

1797
            if (&ec != &throws)
×
1798
                ec = make_success_code();
×
1799
        }
×
1800
        catch (hpx::exception const& e)
1801
        {
1802
            HPX_RETHROWS_IF(ec, e, "addressing_service::clear_cache");
×
1803
        }
×
1804
    }
4,842✔
1805

1806
    // Disable refcnt caching during shutdown
1807
    void addressing_service::start_shutdown(error_code& ec)
1,576✔
1808
    {
1809
        // If caching is disabled, we silently pretend success.
1810
        if (!caching_)
1,576✔
1811
            return;
×
1812

1813
        std::unique_lock<mutex_type> l(refcnt_requests_mtx_);
1,576✔
1814
        enable_refcnt_caching_ = false;
1,576✔
1815
        send_refcnt_requests_sync(l, ec);
1,576✔
1816
    }
1,576✔
1817

1818
    ///////////////////////////////////////////////////////////////////////////
1819
    // Helper functions to access the current cache statistics
1820
    std::uint64_t addressing_service::get_cache_entries(bool /* reset */)
×
1821
    {
1822
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1823
        return gva_cache_->size();
×
1824
    }
×
1825

1826
    std::uint64_t addressing_service::get_cache_hits(bool reset)
×
1827
    {
1828
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1829
        return gva_cache_->get_statistics().hits(reset);
×
1830
    }
×
1831

1832
    std::uint64_t addressing_service::get_cache_misses(bool reset)
×
1833
    {
1834
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1835
        return gva_cache_->get_statistics().misses(reset);
×
1836
    }
×
1837

1838
    std::uint64_t addressing_service::get_cache_evictions(bool reset)
×
1839
    {
1840
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1841
        return gva_cache_->get_statistics().evictions(reset);
×
1842
    }
×
1843

1844
    std::uint64_t addressing_service::get_cache_insertions(bool reset)
×
1845
    {
1846
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1847
        return gva_cache_->get_statistics().insertions(reset);
×
1848
    }
×
1849

1850
    ///////////////////////////////////////////////////////////////////////////
1851
    std::uint64_t addressing_service::get_cache_get_entry_count(bool reset)
×
1852
    {
1853
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1854
        return gva_cache_->get_statistics().get_get_entry_count(reset);
×
1855
    }
×
1856

1857
    std::uint64_t addressing_service::get_cache_insertion_entry_count(
×
1858
        bool reset)
1859
    {
1860
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1861
        return gva_cache_->get_statistics().get_insert_entry_count(reset);
×
1862
    }
×
1863

1864
    std::uint64_t addressing_service::get_cache_update_entry_count(bool reset)
×
1865
    {
1866
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1867
        return gva_cache_->get_statistics().get_update_entry_count(reset);
×
1868
    }
×
1869

1870
    std::uint64_t addressing_service::get_cache_erase_entry_count(bool reset)
×
1871
    {
1872
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1873
        return gva_cache_->get_statistics().get_erase_entry_count(reset);
×
1874
    }
×
1875

1876
    std::uint64_t addressing_service::get_cache_get_entry_time(bool reset)
×
1877
    {
1878
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1879
        return gva_cache_->get_statistics().get_get_entry_time(reset);
×
1880
    }
×
1881

1882
    std::uint64_t addressing_service::get_cache_insertion_entry_time(bool reset)
×
1883
    {
1884
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1885
        return gva_cache_->get_statistics().get_insert_entry_time(reset);
×
1886
    }
×
1887

1888
    std::uint64_t addressing_service::get_cache_update_entry_time(bool reset)
×
1889
    {
1890
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1891
        return gva_cache_->get_statistics().get_update_entry_time(reset);
×
1892
    }
×
1893

1894
    std::uint64_t addressing_service::get_cache_erase_entry_time(bool reset)
×
1895
    {
1896
        std::lock_guard<mutex_type> lock(gva_cache_mtx_);
×
1897
        return gva_cache_->get_statistics().get_erase_entry_time(reset);
×
1898
    }
×
1899

1900
    void addressing_service::register_server_instances()
594✔
1901
    {
1902
        // register root server
1903
        std::uint32_t locality_id =
594✔
1904
            naming::get_locality_id_from_gid(get_local_locality());
594✔
1905
        locality_ns_->register_server_instance(locality_id);
594✔
1906
        primary_ns_.register_server_instance(locality_id);
594✔
1907
        component_ns_->register_server_instance(locality_id);
594✔
1908
        symbol_ns_.register_server_instance(locality_id);
594✔
1909
    }
594✔
1910

1911
    void addressing_service::garbage_collect_non_blocking(error_code& ec)
21,162,128✔
1912
    {
1913
        std::unique_lock<mutex_type> l(refcnt_requests_mtx_, std::try_to_lock);
21,162,130✔
1914
        if (!l.owns_lock())
21,162,130✔
1915
            return;    // no need to compete for garbage collection
998✔
1916

1917
        send_refcnt_requests_non_blocking(l, ec);
21,161,132✔
1918
    }
21,162,130✔
1919

1920
    void addressing_service::garbage_collect(error_code& ec)
710✔
1921
    {
1922
        std::unique_lock<mutex_type> l(refcnt_requests_mtx_, std::try_to_lock);
710✔
1923
        if (!l.owns_lock())
710✔
1924
            return;    // no need to compete for garbage collection
5✔
1925

1926
        send_refcnt_requests_sync(l, ec);
705✔
1927
    }
710✔
1928

1929
    void addressing_service::send_refcnt_requests(
92,408✔
1930
        std::unique_lock<addressing_service::mutex_type>& l, error_code& ec)
1931
    {
1932
        if (!l.owns_lock())
92,408✔
1933
        {
1934
            HPX_THROWS_IF(ec, hpx::error::lock_error,
×
1935
                "addressing_service::send_refcnt_requests",
1936
                "mutex is not locked");
1937
            return;
×
1938
        }
1939

1940
        if (!enable_refcnt_caching_ ||
92,408✔
1941
            max_refcnt_requests_ == ++refcnt_requests_count_)
87,879✔
1942
            send_refcnt_requests_non_blocking(l, ec);
4,532✔
1943

1944
        else if (&ec != &throws)
87,876✔
1945
            ec = make_success_code();
80,576✔
1946
    }
92,408✔
1947

1948
#if defined(HPX_HAVE_AGAS_DUMP_REFCNT_ENTRIES)
1949
    void dump_refcnt_requests(
1950
        std::unique_lock<addressing_service::mutex_type>& l,
1951
        addressing_service::refcnt_requests_type const& requests,
1952
        char const* func_name)
1953
    {
1954
        HPX_ASSERT(l.owns_lock());
1955

1956
        std::stringstream ss;
1957
        hpx::util::format_to(ss,
1958
            "{1}, dumping client-side refcnt table, requests({2}):", func_name,
1959
            requests.size());
1960

1961
        typedef addressing_service::refcnt_requests_type::const_reference
1962
            const_reference;
1963

1964
        for (const_reference e : requests)
1965
        {
1966
            // The [client] tag is in there to make it easier to filter
1967
            // through the logs.
1968
            hpx::util::format_to(
1969
                ss, "\n  [client] gid({1}), credits({2})", e.first, e.second);
1970
        }
1971

1972
        LAGAS_(debug) << ss.str();
1973
    }
1974
#endif
1975

1976
    void addressing_service::send_refcnt_requests_non_blocking(
21,165,664✔
1977
        std::unique_lock<addressing_service::mutex_type>& l, error_code& ec)
1978
    {
1979
#if !defined(HPX_COMPUTE_DEVICE_CODE)
1980
        HPX_ASSERT_OWNS_LOCK(l);
21,165,664✔
1981

1982
        try
1983
        {
1984
            if (refcnt_requests_->empty())
21,165,664✔
1985
            {
1986
                l.unlock();
21,144,016✔
1987
                return;
21,144,016✔
1988
            }
1989

1990
            std::shared_ptr<refcnt_requests_type> p(new refcnt_requests_type);
21,648✔
1991

1992
            p.swap(refcnt_requests_);
21,648✔
1993
            refcnt_requests_count_ = 0;
21,648✔
1994

1995
            l.unlock();
21,648✔
1996

1997
            LAGAS_(info).format("addressing_service::send_refcnt_requests_non_"
21,648✔
1998
                                "blocking, requests({1})",
1999
                p->size());
×
2000

2001
#if defined(HPX_HAVE_AGAS_DUMP_REFCNT_ENTRIES)
2002
            if (LAGAS_ENABLED(debug))
2003
                dump_refcnt_requests(l, *p,
2004
                    "addressing_service::send_refcnt_requests_non_blocking");
2005
#endif
2006

2007
            // collect all requests for each locality
2008
            using requests_type = std::map<hpx::id_type,
2009
                std::vector<hpx::tuple<std::int64_t, naming::gid_type,
2010
                    naming::gid_type>>>;
2011
            requests_type requests;
21,648✔
2012

2013
            for (refcnt_requests_type::const_reference e : *p)
63,397✔
2014
            {
2015
                HPX_ASSERT(e.second < 0);
41,749✔
2016

2017
                naming::gid_type raw(e.first);
41,749✔
2018

2019
                hpx::id_type target(
41,749✔
2020
                    primary_namespace::get_service_instance(raw),
41,749✔
2021
                    hpx::id_type::management_type::unmanaged);
2022

2023
                requests[target].push_back(hpx::make_tuple(e.second, raw, raw));
41,749✔
2024
            }
41,749✔
2025

2026
            // send requests to all locality
2027
            requests_type::iterator end = requests.end();
21,648✔
2028
            for (requests_type::iterator it = requests.begin(); it != end; ++it)
44,193✔
2029
            {
2030
                server::primary_namespace::decrement_credit_action action;
2031
                hpx::post(action, it->first, HPX_MOVE(it->second));
22,545✔
2032
            }
22,545✔
2033

2034
            if (&ec != &throws)
21,648✔
2035
                ec = make_success_code();
4,481✔
2036
        }
21,648✔
2037
        catch (hpx::exception const& e)
2038
        {
2039
            l.unlock();
×
2040
            HPX_RETHROWS_IF(
×
2041
                ec, e, "addressing_service::send_refcnt_requests_non_blocking");
2042
        }
×
2043
#else
2044
        HPX_UNUSED(l);
2045
        HPX_UNUSED(ec);
2046
        HPX_ASSERT(false);
2047
#endif
2048
    }
21,165,664✔
2049

2050
    std::vector<hpx::future<std::vector<std::int64_t>>>
2051
    addressing_service::send_refcnt_requests_async(
2,281✔
2052
        std::unique_lock<addressing_service::mutex_type>& l)
2053
    {
2054
#if !defined(HPX_COMPUTE_DEVICE_CODE)
2055
        HPX_ASSERT_OWNS_LOCK(l);
2,281✔
2056

2057
        if (refcnt_requests_->empty())
2,281✔
2058
        {
2059
            l.unlock();
2,133✔
2060
            return std::vector<hpx::future<std::vector<std::int64_t>>>();
2,133✔
2061
        }
2062

2063
        std::shared_ptr<refcnt_requests_type> p(new refcnt_requests_type);
148✔
2064

2065
        p.swap(refcnt_requests_);
148✔
2066
        refcnt_requests_count_ = 0;
148✔
2067

2068
        l.unlock();
148✔
2069

2070
        LAGAS_(info).format(
148✔
2071
            "addressing_service::send_refcnt_requests_async, requests({1})",
×
2072
            p->size());
×
2073

2074
#if defined(HPX_HAVE_AGAS_DUMP_REFCNT_ENTRIES)
2075
        if (LAGAS_ENABLED(debug))
2076
            dump_refcnt_requests(
2077
                l, *p, "addressing_service::send_refcnt_requests_sync");
2078
#endif
2079

2080
        // collect all requests for each locality
2081
        using requests_type = std::map<hpx::id_type,
2082
            std::vector<
2083
                hpx::tuple<std::int64_t, naming::gid_type, naming::gid_type>>>;
2084
        requests_type requests;
148✔
2085

2086
        std::vector<hpx::future<std::vector<std::int64_t>>> lazy_results;
148✔
2087
        for (refcnt_requests_type::const_reference e : *p)
3,498✔
2088
        {
2089
            HPX_ASSERT(e.second < 0);
3,350✔
2090

2091
            naming::gid_type raw(e.first);
3,350✔
2092

2093
            hpx::id_type target(primary_namespace::get_service_instance(raw),
3,350✔
2094
                hpx::id_type::management_type::unmanaged);
2095

2096
            requests[target].push_back(hpx::make_tuple(e.second, raw, raw));
3,350✔
2097
        }
3,350✔
2098

2099
        // send requests to all locality
2100
        requests_type::iterator end = requests.end();
148✔
2101
        for (requests_type::iterator it = requests.begin(); it != end; ++it)
315✔
2102
        {
2103
            server::primary_namespace::decrement_credit_action action;
2104
            lazy_results.push_back(
167✔
2105
                hpx::async(action, it->first, HPX_MOVE(it->second)));
167✔
2106
        }
167✔
2107

2108
        return lazy_results;
148✔
2109
#else
2110
        HPX_UNUSED(l);
2111
        HPX_ASSERT(false);
2112
        std::vector<hpx::future<std::vector<std::int64_t>>> lazy_results;
2113
        return lazy_results;
2114
#endif
2115
    }
2,281✔
2116

2117
    void addressing_service::send_refcnt_requests_sync(
2,281✔
2118
        std::unique_lock<addressing_service::mutex_type>& l, error_code& ec)
2119
    {
2120
        std::vector<hpx::future<std::vector<std::int64_t>>> lazy_results =
2121
            send_refcnt_requests_async(l);
2,280✔
2122

2123
        // re throw possible errors
2124
        hpx::when_all(lazy_results).get();
2,281✔
2125

2126
        if (&ec != &throws)
2,281✔
2127
            ec = make_success_code();
×
2128
    }
2,281✔
2129

2130
    ///////////////////////////////////////////////////////////////////////////
2131
    hpx::future<void> addressing_service::mark_as_migrated(
4,842✔
2132
        naming::gid_type const& gid_,
2133
        hpx::move_only_function<std::pair<bool, hpx::future<void>>()>&& f,
2134
        bool expect_to_be_marked_as_migrating)
2135
    {
2136
        HPX_UNUSED(expect_to_be_marked_as_migrating);
4,842✔
2137
        if (!gid_)
4,842✔
2138
        {
2139
            return hpx::make_exceptional_future<void>(
×
2140
                HPX_GET_EXCEPTION(hpx::error::bad_parameter,
×
2141
                    "addressing_service::mark_as_migrated",
2142
                    "invalid reference gid"));
2143
        }
2144

2145
        HPX_ASSERT(naming::detail::is_migratable(gid_));
4,842✔
2146

2147
        // Always first grab the AGAS lock before invoking the user supplied
2148
        // function. The user supplied code will grab another lock. Both locks have
2149
        // to be acquired and always in the same sequence.
2150
        // The AGAS lock needs to be acquired first as the migrated object might
2151
        // not exist on this locality, in which case it should not be accessed
2152
        // anymore. The only way to determine whether the object still exists on
2153
        // this locality is to query the migrated objects table in AGAS.
2154
        using lock_type = std::unique_lock<mutex_type>;
2155

2156
        lock_type lock(migrated_objects_mtx_);
4,842✔
2157
        util::ignore_while_checking ignore(&lock);
4,842✔
2158
        HPX_UNUSED(ignore);
4,842✔
2159

2160
        // call the user code for the component instance to be migrated, the
2161
        // returned future becomes ready whenever the component instance can be
2162
        // migrated (no threads are pending/active any more)
2163
        std::pair<bool, hpx::future<void>> result = f();
4,842✔
2164

2165
        // mark the gid as 'migrated' right away - the worst what can happen is
2166
        // that a parcel which comes in for this object is bouncing between this
2167
        // locality and the locality managing the address resolution for the object
2168
        if (result.first)
4,842✔
2169
        {
2170
            naming::gid_type gid(naming::detail::get_stripped_gid(gid_));
4,842✔
2171

2172
            migrated_objects_table_type::iterator it =
2173
                migrated_objects_table_.find(gid);
4,842✔
2174

2175
            // insert the object into the map of migrated objects
2176
            if (it == migrated_objects_table_.end())
4,842✔
2177
            {
2178
                HPX_ASSERT(!expect_to_be_marked_as_migrating);
3,232✔
2179
                migrated_objects_table_.insert(gid);
3,232✔
2180
            }
3,232✔
2181
            else
2182
            {
2183
                HPX_ASSERT(expect_to_be_marked_as_migrating);
1,610✔
2184
            }
2185

2186
            // avoid interactions with the locking in the cache
2187
            lock.unlock();
4,842✔
2188

2189
            // remove entry from cache
2190
            remove_cache_entry(gid_);
4,842✔
2191
        }
4,842✔
2192

2193
        return HPX_MOVE(result.second);
4,842✔
2194
    }
4,842✔
2195

2196
    void addressing_service::unmark_as_migrated(naming::gid_type const& gid_)
3,238✔
2197
    {
2198
        if (!gid_)
3,238✔
2199
        {
2200
            HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
×
2201
                "addressing_service::unmark_as_migrated",
2202
                "invalid reference gid");
2203
            return;
2204
        }
2205

2206
        HPX_ASSERT(naming::detail::is_migratable(gid_));
3,238✔
2207

2208
        naming::gid_type gid(naming::detail::get_stripped_gid(gid_));
3,238✔
2209

2210
        std::unique_lock<mutex_type> lock(migrated_objects_mtx_);
3,238✔
2211

2212
        migrated_objects_table_type::iterator it =
2213
            migrated_objects_table_.find(gid);
3,238✔
2214

2215
        // remove the object from the map of migrated objects
2216
        if (it != migrated_objects_table_.end())
3,238✔
2217
        {
2218
            migrated_objects_table_.erase(it);
3,192✔
2219

2220
            // remove entry from cache
2221
            if (caching_ && naming::detail::store_in_cache(gid_))
3,192✔
2222
            {
2223
                // avoid interactions with the locking in the cache
2224
                lock.unlock();
×
2225

2226
                // remove entry from cache
2227
                remove_cache_entry(gid_);
×
2228
            }
×
2229
        }
3,192✔
2230
    }
3,238✔
2231

2232
    hpx::future<std::pair<hpx::id_type, naming::address>>
2233
    addressing_service::begin_migration(hpx::id_type const& id)
3,240✔
2234
    {
2235
        if (!id)
3,240✔
2236
        {
2237
            HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
×
2238
                "addressing_service::begin_migration", "invalid reference id");
2239
        }
2240

2241
        HPX_ASSERT(naming::detail::is_migratable(id.get_gid()));
3,240✔
2242

2243
        naming::gid_type gid(naming::detail::get_stripped_gid(id.get_gid()));
3,240✔
2244

2245
        return primary_ns_.begin_migration(gid);
3,240✔
2246
    }
×
2247

2248
    bool addressing_service::end_migration(hpx::id_type const& id)
3,242✔
2249
    {
2250
        if (!id)
3,242✔
2251
        {
2252
            HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
×
2253
                "addressing_service::end_migration", "invalid reference id");
2254
        }
2255

2256
        HPX_ASSERT(naming::detail::is_migratable(id.get_gid()));
3,242✔
2257

2258
        naming::gid_type gid(naming::detail::get_stripped_gid(id.get_gid()));
3,242✔
2259

2260
        return primary_ns_.end_migration(gid);
3,242✔
2261
    }
×
2262

2263
    bool addressing_service::was_object_migrated_locked(
34,901✔
2264
        naming::gid_type const& gid_)
2265
    {
2266
        naming::gid_type gid(naming::detail::get_stripped_gid(gid_));
34,901✔
2267

2268
        return migrated_objects_table_.find(gid) !=
69,802✔
2269
            migrated_objects_table_.end();
34,901✔
2270
    }
2271

2272
    std::pair<bool, components::pinned_ptr>
2273
    addressing_service::was_object_migrated(naming::gid_type const& gid,
5,447✔
2274
        hpx::move_only_function<components::pinned_ptr()>&& f    //-V669
2275
    )
2276
    {
2277
        if (!gid)
5,447✔
2278
        {
2279
            HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
×
2280
                "addressing_service::was_object_migrated",
2281
                "invalid reference gid");
2282
            return std::make_pair(false, components::pinned_ptr());
2283
        }
2284

2285
        if (!naming::detail::is_migratable(gid))
5,447✔
2286
        {
2287
            return std::make_pair(false, f());
36✔
2288
        }
2289

2290
        // Always first grab the AGAS lock before invoking the user supplied
2291
        // function. The user supplied code will grab another lock. Both locks have
2292
        // to be acquired and always in the same sequence.
2293
        // The AGAS lock needs to be acquired first as the migrated object might
2294
        // not exist on this locality, in which case it should not be accessed
2295
        // anymore. The only way to determine whether the object still exists on
2296
        // this locality is to query the migrated objects table in AGAS.
2297
        using lock_type = std::unique_lock<mutex_type>;
2298

2299
        lock_type lock(migrated_objects_mtx_);
5,411✔
2300

2301
        if (was_object_migrated_locked(gid))
5,411✔
2302
            return std::make_pair(true, components::pinned_ptr());
1✔
2303

2304
        util::ignore_while_checking ignore(&lock);
5,410✔
2305
        HPX_UNUSED(ignore);
5,410✔
2306

2307
        return std::make_pair(false, f());
5,410✔
2308
    }
5,447✔
2309

2310
}}    // namespace hpx::agas
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc