• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.21
/libs/full/agas_base/src/server/locality_namespace_server.cpp
1
//  Copyright (c) 2011 Bryce Adelstein-Lelbach
2
//  Copyright (c) 2012-2025 Hartmut Kaiser
3
//  Copyright (c) 2016 Thomas Heller
4
//
5
//  SPDX-License-Identifier: BSL-1.0
6
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
7
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8

9
#include <hpx/config.hpp>
10
#include <hpx/agas_base/server/locality_namespace.hpp>
11
#include <hpx/agas_base/server/primary_namespace.hpp>
12
#include <hpx/assert.hpp>
13
#include <hpx/async_distributed/continuation.hpp>
14
#include <hpx/components_base/component_type.hpp>
15
#include <hpx/modules/errors.hpp>
16
#include <hpx/modules/format.hpp>
17
#include <hpx/modules/logging.hpp>
18
#include <hpx/modules/serialization.hpp>
19
#include <hpx/modules/timing.hpp>
20
#include <hpx/modules/util.hpp>
21
#include <hpx/naming/credit_handling.hpp>
22

23
#include <atomic>
24
#include <cstddef>
25
#include <cstdint>
26
#include <list>
27
#include <map>
28
#include <mutex>
29
#include <string>
30
#include <utility>
31
#include <vector>
32

33
namespace hpx { namespace agas {
34

35
    naming::gid_type bootstrap_locality_namespace_gid()
×
36
    {
37
        return naming::gid_type(agas::primary_ns_msb, agas::locality_ns_lsb);
×
38
    }
39

40
    hpx::id_type bootstrap_locality_namespace_id()
×
41
    {
42
        return hpx::id_type(agas::locality_ns_msb, agas::locality_ns_lsb,
43
            hpx::id_type::management_type::unmanaged);
×
44
    }
45
}}    // namespace hpx::agas
46

47
namespace hpx { namespace agas { namespace server {
48

49
    void locality_namespace::register_server_instance(
29✔
50
        char const* servicename, error_code& ec)
51
    {
52
        // now register this AGAS instance with AGAS :-P
53
        instance_name_ = agas::service_name;
29✔
54
        instance_name_ += servicename;
55
        instance_name_ += agas::server::locality_namespace_service_name;
56

57
        // register a gid (not the id) to avoid AGAS holding a reference to this
58
        // component
59
        agas::register_name(
29✔
60
            launch::sync, instance_name_, get_unmanaged_id().get_gid(), ec);
29✔
61
    }
29✔
62

63
    void locality_namespace::unregister_server_instance(error_code& ec)
×
64
    {
65
        agas::unregister_name(launch::sync, instance_name_, ec);
×
66
        this->base_type::finalize();
×
67
    }
×
68

69
    void locality_namespace::finalize()
×
70
    {
71
        if (!instance_name_.empty())
×
72
        {
73
            error_code ec(throwmode::lightweight);
74
            agas::unregister_name(launch::sync, instance_name_, ec);
×
75
        }
76
    }
×
77

78
    std::uint32_t locality_namespace::allocate(
32✔
79
        parcelset::endpoints_type const& endpoints, std::uint64_t count,
80
        std::uint32_t num_threads, naming::gid_type suggested_prefix)
81
    {    // {{{ allocate implementation
82
        util::scoped_timer<std::atomic<std::int64_t>> update(
83
            counter_data_.allocate_.time_, counter_data_.allocate_.enabled_);
32✔
84
        counter_data_.increment_allocate_count();
32✔
85

86
        using hpx::get;
87

88
        std::unique_lock<mutex_type> l(mutex_);
32✔
89

90
#if defined(HPX_DEBUG)
91
        for (partition_table_type::value_type const& partition : partitions_)
92
        {
93
            HPX_ASSERT(get<0>(partition.second) != endpoints);
94
        }
95
#endif
96
        // Check for address space exhaustion.
97
        if (HPX_UNLIKELY(0xFFFFFFFE < partitions_.size()))    //-V104
32✔
98
        {
99
            l.unlock();
×
100

101
            HPX_THROW_EXCEPTION(hpx::error::internal_server_error,
×
102
                "locality_namespace::allocate",
103
                "primary namespace has been exhausted");
104
        }
105

106
        // Compute the locality's prefix.
107
        std::uint32_t prefix = naming::invalid_locality_id;
32✔
108

109
        // check if the suggested prefix can be used instead of the next
110
        // free one
111
        std::uint32_t suggested_locality_id =
112
            naming::get_locality_id_from_gid(suggested_prefix);
113

114
        partition_table_type::iterator it = partitions_.end();
115
        if (suggested_locality_id != naming::invalid_locality_id)
32✔
116
        {
117
            it = partitions_.find(suggested_locality_id);
118

119
            if (it == partitions_.end())
3✔
120
            {
121
                prefix = suggested_locality_id;
3✔
122
            }
123
            else
124
            {
125
                do
126
                {
127
                    prefix = prefix_counter_++;
×
128
                    it = partitions_.find(prefix);
129
                } while (it != partitions_.end());
×
130
            }
131
        }
132
        else
133
        {
134
            do
135
            {
136
                prefix = prefix_counter_++;
29✔
137
                it = partitions_.find(prefix);
138
            } while (it != partitions_.end());
29✔
139
        }
140

141
        // We need to create an entry in the partition table for this
142
        // locality.
143
        if (HPX_UNLIKELY(!util::insert_checked(
96✔
144
                partitions_.insert(std::make_pair(
145
                    prefix, partition_type(endpoints, num_threads))),
146
                it)))
147
        {
148
            l.unlock();
×
149

150
            HPX_THROW_EXCEPTION(hpx::error::lock_error,
×
151
                "locality_namespace::allocate",
152
                "partition table insertion failed due to a locking "
153
                "error or memory corruption, endpoint({1}), "
154
                "prefix({2})",
155
                endpoints, prefix);
156
        }
157

158
        // Now that we've inserted the locality into the partition table
159
        // successfully, we need to put the locality's GID into the GVA
160
        // table so that parcels can be sent to the memory of a locality.
161
        if (primary_)
32✔
162
        {
163
            naming::gid_type id(naming::get_gid_from_locality_id(prefix));
32✔
164
            gva const g(id,
32✔
165
                to_int(hpx::components::component_enum_type::runtime_support),
166
                count);
32✔
167

168
            if (!primary_->bind_gid(g, id, id))
×
169
            {
170
                HPX_THROW_EXCEPTION(hpx::error::bad_request,
171
                    "locality_namespace::allocate",
172
                    "unable to bind prefix({1}) to a gid", prefix);
32✔
173
            }
174
            return prefix;
175
        }
×
176

177
        LAGAS_(info).format(
178
            "locality_namespace::allocate, ep({1}), count({2}), prefix({3})",
179
            endpoints, count, prefix);
×
180

32✔
181
        return prefix;
182
    }    // }}}
×
183

184
    parcelset::endpoints_type locality_namespace::resolve_locality(
185
        naming::gid_type const& locality)
186
    {    // {{{ resolve_locality implementation
×
187
        util::scoped_timer<std::atomic<std::int64_t>> update(
×
188
            counter_data_.resolve_locality_.time_,
×
189
            counter_data_.resolve_locality_.enabled_);
190
        counter_data_.increment_resolve_locality_count();
191

192
        using hpx::get;
193
        std::uint32_t prefix = naming::get_locality_id_from_gid(locality);
×
194

195
        std::lock_guard<mutex_type> l(mutex_);
196
        partition_table_type::iterator it = partitions_.find(prefix);
×
197

198
        if (it != partitions_.end())
199
        {
200
            return get<0>(it->second);
201
        }
×
202

×
203
        return parcelset::endpoints_type();
204
    }    // }}}
32✔
205

206
    void locality_namespace::free(naming::gid_type const& locality)
207
    {    // {{{ free implementation
32✔
208
        util::scoped_timer<std::atomic<std::int64_t>> update(
32✔
209
            counter_data_.free_.time_, counter_data_.free_.enabled_);
210
        counter_data_.increment_free_count();
211

212
        using hpx::get;
213

214
        // parameters
215
        std::uint32_t prefix = naming::get_locality_id_from_gid(locality);
32✔
216

217
        std::unique_lock<mutex_type> l(mutex_);
218

219
        partition_table_type::iterator pit = partitions_.find(prefix),
220
                                       pend = partitions_.end();
32✔
221

222
        if (pit != pend)
223
        {
224
            /*
225
        // Wipe the locality from the tables.
226
        naming::gid_type locality =
227
            naming::get_gid_from_locality_id(get<0>(pit->second));
228

229
        // first remove entry from reverse partition table
230
        prefixes_.erase(get<0>(pit->second));
231
        */
232

233
            // now remove it from the main partition table
234
            partitions_.erase(pit);
32✔
235

236
            if (primary_)
32✔
237
            {
238
                l.unlock();
239

240
                // remove primary namespace
241
                {
242
                    naming::gid_type service(
32✔
243
                        agas::primary_ns_msb, agas::primary_ns_lsb);
32✔
244
                    primary_->unbind_gid(
245
                        1, naming::replace_locality_id(service, prefix));
246
                }
247

248
                // remove symbol namespace
249
                {
250
                    naming::gid_type service(
32✔
251
                        agas::symbol_ns_msb, agas::symbol_ns_lsb);
32✔
252
                    primary_->unbind_gid(
253
                        1, naming::replace_locality_id(service, prefix));
254
                }
255

256
                // remove locality itself
32✔
257
                {
258
                    primary_->unbind_gid(0, locality);
259
                }
260
            }
261

262
            /*LAGAS_(info).format("locality_namespace::free, ep({1})", ep);*/
263
        }
264

265
        /*LAGAS_(info).format(
32✔
266
            "locality_namespace::free, ep({1}), response(no_success)", ep);*/
267
    }    // }}}
39✔
268

269
    std::vector<std::uint32_t> locality_namespace::localities()
270
    {    // {{{ localities implementation
39✔
271
        util::scoped_timer<std::atomic<std::int64_t>> update(
39✔
272
            counter_data_.localities_.time_,
39✔
273
            counter_data_.localities_.enabled_);
274
        counter_data_.increment_localities_count();
39✔
275

276
        std::lock_guard<mutex_type> l(mutex_);
39✔
277

278
        std::vector<std::uint32_t> p;
279

280
        partition_table_type::const_iterator it = partitions_.begin(),
281
                                             end = partitions_.end();
84✔
282

45✔
283
        for (/**/; it != end; ++it)
284
            p.push_back(it->first);
39✔
285

×
286
        LAGAS_(info).format(
287
            "locality_namespace::localities, localities({1})", p.size());
39✔
288

39✔
289
        return p;
290
    }    // }}}
1✔
291

292
    std::uint32_t locality_namespace::get_num_localities()
293
    {    // {{{ get_num_localities implementation
1✔
294
        util::scoped_timer<std::atomic<std::int64_t>> update(
1✔
295
            counter_data_.num_localities_.time_,
1✔
296
            counter_data_.num_localities_.enabled_);
1✔
297
        counter_data_.increment_num_localities_count();
298
        std::lock_guard<mutex_type> l(mutex_);
299

1✔
300
        std::uint32_t num_localities =
301
            static_cast<std::uint32_t>(partitions_.size());
1✔
302

303
        LAGAS_(info).format(
304
            "locality_namespace::get_num_localities, localities({1})",
305
            num_localities);
1✔
306

1✔
307
        return num_localities;
308
    }    // }}}
×
309

310
    std::vector<std::uint32_t> locality_namespace::get_num_threads()
×
311
    {    // {{{ get_num_threads implementation
312
        std::lock_guard<mutex_type> l(mutex_);
×
313

314
        std::vector<std::uint32_t> num_threads;
315

×
316
        partition_table_type::iterator end = partitions_.end();
317
        for (partition_table_type::iterator it = partitions_.begin(); it != end;
318
            ++it)
319
        {
×
320
            using hpx::get;
321
            num_threads.push_back(get<1>(it->second));
322
        }
×
323

324
        LAGAS_(info).format(
×
325
            "locality_namespace::get_num_threads, localities({1})",
326
            num_threads.size());
×
327

328
        return num_threads;
329
    }    // }}}
×
330

331
    std::uint32_t locality_namespace::get_num_overall_threads()
×
332
    {
333
        std::lock_guard<mutex_type> l(mutex_);
×
334

335
        std::uint32_t num_threads = 0;
336

×
337
        partition_table_type::iterator end = partitions_.end();
338
        for (partition_table_type::iterator it = partitions_.begin(); it != end;
339
            ++it)
340
        {
×
341
            using hpx::get;
342
            num_threads += get<1>(it->second);
343
        }
×
344

345
        LAGAS_(info).format(
346
            "locality_namespace::get_num_overall_threads, localities({1})",
347
            num_threads);
×
348

349
        return num_threads;
350
    }
351

×
352
    // access current counter values
353
    std::int64_t locality_namespace::counter_data::get_allocate_count(
354
        bool reset)
×
355
    {
356
        return util::get_and_reset_value(allocate_.count_, reset);
357
    }
×
358

359
    std::int64_t locality_namespace::counter_data::get_resolve_locality_count(
360
        bool reset)
×
361
    {
362
        return util::get_and_reset_value(resolve_locality_.count_, reset);
363
    }
×
364

365
    std::int64_t locality_namespace::counter_data::get_free_count(bool reset)
×
366
    {
367
        return util::get_and_reset_value(free_.count_, reset);
368
    }
×
369

370
    std::int64_t locality_namespace::counter_data::get_localities_count(
371
        bool reset)
×
372
    {
373
        return util::get_and_reset_value(localities_.count_, reset);
374
    }
×
375

376
    std::int64_t locality_namespace::counter_data::get_num_localities_count(
377
        bool reset)
×
378
    {
379
        return util::get_and_reset_value(num_localities_.count_, reset);
380
    }
×
381

382
    std::int64_t locality_namespace::counter_data::get_num_threads_count(
383
        bool reset)
×
384
    {
385
        return util::get_and_reset_value(num_threads_.count_, reset);
386
    }
×
387

388
    std::int64_t locality_namespace::counter_data::get_overall_count(bool reset)
×
389
    {
×
390
        return util::get_and_reset_value(allocate_.count_, reset) +
×
391
            util::get_and_reset_value(resolve_locality_.count_, reset) +
×
392
            util::get_and_reset_value(free_.count_, reset) +
×
393
            util::get_and_reset_value(localities_.count_, reset) +
×
394
            util::get_and_reset_value(num_localities_.count_, reset) +
395
            util::get_and_reset_value(num_threads_.count_, reset);
396
    }
×
397

398
    void locality_namespace::counter_data::enable_all()
×
399
    {
×
400
        allocate_.enabled_ = true;
×
401
        resolve_locality_.enabled_ = true;
×
402
        free_.enabled_ = true;
×
403
        localities_.enabled_ = true;
×
404
        num_localities_.enabled_ = true;
×
405
        num_threads_.enabled_ = true;
406
    }
407

×
408
    // access execution time counters
409
    std::int64_t locality_namespace::counter_data::get_allocate_time(bool reset)
×
410
    {
411
        return util::get_and_reset_value(allocate_.time_, reset);
412
    }
×
413

414
    std::int64_t locality_namespace::counter_data::get_resolve_locality_time(
415
        bool reset)
×
416
    {
417
        return util::get_and_reset_value(resolve_locality_.time_, reset);
418
    }
×
419

420
    std::int64_t locality_namespace::counter_data::get_free_time(bool reset)
×
421
    {
422
        return util::get_and_reset_value(free_.time_, reset);
423
    }
×
424

425
    std::int64_t locality_namespace::counter_data::get_localities_time(
426
        bool reset)
×
427
    {
428
        return util::get_and_reset_value(localities_.time_, reset);
429
    }
×
430

431
    std::int64_t locality_namespace::counter_data::get_num_localities_time(
432
        bool reset)
×
433
    {
434
        return util::get_and_reset_value(num_localities_.time_, reset);
435
    }
×
436

437
    std::int64_t locality_namespace::counter_data::get_num_threads_time(
438
        bool reset)
×
439
    {
440
        return util::get_and_reset_value(num_threads_.time_, reset);
441
    }
×
442

443
    std::int64_t locality_namespace::counter_data::get_overall_time(bool reset)
×
444
    {
×
445
        return util::get_and_reset_value(allocate_.time_, reset) +
×
446
            util::get_and_reset_value(resolve_locality_.time_, reset) +
×
447
            util::get_and_reset_value(free_.time_, reset) +
×
448
            util::get_and_reset_value(localities_.time_, reset) +
×
449
            util::get_and_reset_value(num_localities_.time_, reset) +
450
            util::get_and_reset_value(num_threads_.time_, reset);
451
    }
452

32✔
453
    // increment counter values
454
    void locality_namespace::counter_data::increment_allocate_count()
32✔
455
    {
456
        if (allocate_.enabled_)
457
        {
458
            ++allocate_.count_;
32✔
459
        }
460
    }
×
461

462
    void locality_namespace::counter_data::increment_resolve_locality_count()
×
463
    {
464
        if (resolve_locality_.enabled_)
465
        {
466
            ++resolve_locality_.count_;
×
467
        }
468
    }
32✔
469

470
    void locality_namespace::counter_data::increment_free_count()
32✔
471
    {
472
        if (free_.enabled_)
473
        {
474
            ++free_.count_;
32✔
475
        }
476
    }
39✔
477

478
    void locality_namespace::counter_data::increment_localities_count()
39✔
479
    {
480
        if (localities_.enabled_)
481
        {
482
            ++localities_.count_;
39✔
483
        }
484
    }
1✔
485

486
    void locality_namespace::counter_data::increment_num_localities_count()
1✔
487
    {
488
        if (num_localities_.enabled_)
489
        {
490
            ++num_localities_.count_;
1✔
491
        }
492
    }
×
493

494
    void locality_namespace::counter_data::increment_num_threads_count()
×
495
    {
496
        if (num_threads_.enabled_)
497
        {
498
            ++num_threads_.count_;
×
499
        }
500
    }
501
}}}    // namespace hpx::agas::server
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc