• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/libs/full/collectives/src/create_communicator.cpp
1
//  Copyright (c) 2020-2025 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#include <hpx/config.hpp>
8

9
#if !defined(HPX_COMPUTE_DEVICE_CODE)
10

11
#include <hpx/assert.hpp>
12
#include <hpx/collectives/create_communicator.hpp>
13
#include <hpx/components/basename_registration.hpp>
14
#include <hpx/components_base/agas_interface.hpp>
15
#include <hpx/components_base/server/component.hpp>
16
#include <hpx/modules/async_base.hpp>
17
#include <hpx/modules/async_distributed.hpp>
18
#include <hpx/modules/errors.hpp>
19
#include <hpx/modules/format.hpp>
20
#include <hpx/modules/lock_registration.hpp>
21
#include <hpx/modules/synchronization.hpp>
22
#include <hpx/modules/type_support.hpp>
23
#include <hpx/runtime_components/component_factory.hpp>
24
#include <hpx/runtime_components/new.hpp>
25
#include <hpx/runtime_distributed/server/runtime_support.hpp>
26

27
#include <cstddef>
28
#include <cstdint>
29
#include <mutex>
30
#include <string>
31
#include <utility>
32

33
#include <hpx/config/warnings_prefix.hpp>
34

35
namespace hpx::util {
×
36

37
    // This is explicitly instantiated to ensure that the id is stable across
38
    // shared libraries.
×
39
    extra_data_id_type
40
    extra_data_helper<collectives::detail::communicator_data>::id() noexcept
41
    {
42
        static std::uint8_t id = 0;
43
        return &id;
44
    }
45
}    // namespace hpx::util
46

×
47
///////////////////////////////////////////////////////////////////////////////
48
using collectives_component =
49
    hpx::components::component<hpx::collectives::detail::communicator_server>;
50

51
HPX_REGISTER_COMPONENT(collectives_component)
52

×
53
namespace hpx::collectives {
×
54

×
55
    namespace detail {
×
56

57
        communicator_server::communicator_server() noexcept    //-V730
58
          : num_sites_(0)
×
59
        {
60
            HPX_ASSERT(false);    // shouldn't ever be called
×
61
        }
62

×
63
        communicator_server::communicator_server(
×
64
            std::size_t num_sites, char const* basename) noexcept
×
65
          : gate_(num_sites)
66
          , num_sites_(num_sites)
67
          , basename_(basename)
×
68
        {
69
            HPX_ASSERT(
70
                num_sites != 0 && num_sites != static_cast<std::size_t>(-1));
71
        }
×
72

73
        communicator_server::~communicator_server() = default;
74
    }    // namespace detail
75

76
    ///////////////////////////////////////////////////////////////////////////
77
    void communicator::set_info(num_sites_arg num_sites,
×
78
        this_site_arg this_site, root_site_arg root_site) noexcept
×
79
    {
×
80
        auto& [num_sites_, this_site_, root_site_] =
81
            get_extra_data<detail::communicator_data>();
×
82

83
        num_sites_ = num_sites;
84
        this_site_ = this_site;
85
        root_site_ = root_site;
86
    }
87

88
    std::pair<num_sites_arg, this_site_arg> communicator::get_info()
89
        const noexcept
90
    {
91
        auto const* client_data =
92
            try_get_extra_data<detail::communicator_data>();
93

×
94
        if (client_data != nullptr)
95
        {
96
            return std::make_pair(
97
                client_data->num_sites_, client_data->this_site_);
×
98
        }
99

100
        return std::make_pair(num_sites_arg{}, this_site_arg{});
101
    }
×
102

103
    std::tuple<num_sites_arg, this_site_arg, root_site_arg>
×
104
    communicator::get_info_ex() const noexcept
105
    {
×
106
        auto const* client_data =
107
            try_get_extra_data<detail::communicator_data>();
×
108

×
109
        if (client_data != nullptr)
110
        {
111
            return std::make_tuple(client_data->num_sites_,
112
                client_data->this_site_, client_data->root_site_);
113
        }
114

115
        return std::make_tuple(
116
            num_sites_arg{}, this_site_arg{}, root_site_arg());
117
    }
118

×
119
    ///////////////////////////////////////////////////////////////////////////
×
120
    communicator create_communicator(char const* basename,
121
        num_sites_arg num_sites, this_site_arg this_site,
×
122
        generation_arg generation, root_site_arg root_site)
123
    {
124
        if (num_sites.is_default())
×
125
        {
126
            num_sites = agas::get_num_localities(hpx::launch::sync);
127
        }
128
        if (this_site.is_default())
129
        {
130
            this_site = agas::get_locality_id();
131
            if (root_site == static_cast<std::size_t>(-1))    //-V1051
132
            {
×
133
                root_site = this_site;
134
            }
×
135
        }
×
136

×
137
        HPX_ASSERT(this_site < num_sites);
138
        HPX_ASSERT(
×
139
            root_site != static_cast<std::size_t>(-1) && root_site < num_sites);
140

141
        std::string name;
142
        if (num_sites != 1)
143
        {
144
            name = basename;
×
145
            if (!generation.is_default())
×
146
            {
147
                name += std::to_string(generation) + "/";
148
            }
149
        }
150

×
151
        if (this_site == root_site)
152
        {
153
            // create a new communicator
154
            auto c = hpx::local_new<communicator>(num_sites, basename);
×
155

156
            // Return communicator object right away if there is only one site
157
            // involved.
158
            if (num_sites == 1)
×
159
            {
160
                c.set_info(num_sites, this_site);
161
                return c;
162
            }
163

164
            // register the communicator's id using the given basename, this
165
            // keeps the communicator alive
166
            auto f = c.register_as(
167
                hpx::detail::name_from_basename(HPX_MOVE(name), this_site));
168

169
            return f.then(hpx::launch::sync,
170
                [=, target = HPX_MOVE(c)](hpx::future<bool>&& fut) mutable {
×
171
                    if (bool const result = fut.get(); !result)
×
172
                    {
×
173
                        HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
174
                            "hpx::collectives::detail::create_communicator",
×
175
                            "the given base name for the communicator "
176
                            "operation was already registered: {}",
177
                            target.registered_name());
×
178
                    }
179
                    target.set_info(num_sites, this_site, root_site);
180
                    return target;
181
                });
182
        }
183

184
        // find existing communicator
×
185
        return hpx::find_from_basename<communicator>(HPX_MOVE(name), root_site)
×
186
            .then(hpx::launch::sync, [=](communicator&& c) {
187
                c.set_info(num_sites, this_site, root_site);
×
188
                return HPX_MOVE(c);
189
            });
×
190
    }
191

192
    communicator create_communicator(hpx::launch::sync_policy policy,
193
        char const* basename, num_sites_arg num_sites, this_site_arg this_site,
194
        generation_arg generation, root_site_arg root_site)
195
    {
196
        if (num_sites.is_default())
×
197
        {
198
            num_sites = agas::get_num_localities(hpx::launch::sync);
199
        }
200
        if (this_site.is_default())
201
        {
×
202
            this_site = agas::get_locality_id();
203
            if (root_site == static_cast<std::size_t>(-1))    //-V1051
204
            {
205
                root_site = this_site;
206
            }
207
        }
208

209
        HPX_ASSERT(this_site < num_sites);
210
        HPX_ASSERT(
211
            root_site != static_cast<std::size_t>(-1) && root_site < num_sites);
212

213
        std::string name(basename);
214
        if (!generation.is_default())
215
        {
216
            name += std::to_string(generation) + "/";
217
        }
218

219
        if (this_site == root_site)
220
        {
221
            // create a new communicator
222
            auto c = hpx::local_new<communicator>(num_sites, basename);
223

224
            // register the communicator's id using the given basename, this
225
            // keeps the communicator alive
226
            auto f = c.register_as(
227
                hpx::detail::name_from_basename(HPX_MOVE(name), this_site));
228

229
            if (bool const result = f.get(); !result)
230
            {
231
                HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
232
                    "hpx::collectives::detail::create_communicator",
233
                    "the given base name for the communicator "
234
                    "operation was already registered: {}",
235
                    c.registered_name());
236
            }
237

238
            c.set_info(num_sites, this_site, root_site);
239
            return c;
240
        }
241

242
        // find existing communicator
243
        auto c = hpx::find_from_basename<communicator>(
244
            policy, HPX_MOVE(name), root_site);
245
        c.set_info(num_sites, this_site, root_site);
246
        return c;
247
    }
248

249
    ///////////////////////////////////////////////////////////////////////////
250
    communicator create_local_communicator(char const* basename,
251
        num_sites_arg num_sites, this_site_arg this_site,
252
        generation_arg generation, root_site_arg root_site)
253
    {
254
        if (root_site == static_cast<std::size_t>(-1))
255
        {
256
            root_site = this_site;
257
        }
258

259
        HPX_ASSERT(this_site < num_sites);
260
        HPX_ASSERT(
261
            root_site != static_cast<std::size_t>(-1) && root_site < num_sites);
262
        HPX_ASSERT(basename != nullptr && basename[0] != '\0');
263

264
        // make sure the communicator will be registered in the local AGAS
265
        // symbol service instance
266
        std::string name;
267
        if (num_sites != 1)
268
        {
269
            name = hpx::util::format("/{}{}{}", agas::get_locality_id(),
270
                basename[0] == '/' ? "" : "/", basename);
271
            if (!generation.is_default())
272
            {
273
                name += std::to_string(generation) + "/";
274
            }
275
        }
276

277
        if (this_site == root_site)
278
        {
279
            // create a new communicator
280
            auto c = hpx::local_new<communicator>(num_sites, basename);
281

282
            // Return communicator object right away if there is only one site
283
            // involved.
284
            if (num_sites == 1)
285
            {
286
                c.set_info(num_sites, this_site);
287
                return c;
288
            }
289

290
            // register the communicator's id using the given basename, this
291
            // keeps the communicator alive
292
            bool const result = c.register_as(hpx::launch::sync,
293
                hpx::detail::name_from_basename(HPX_MOVE(name), this_site));
294

295
            if (!result)
296
            {
297
                HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
298
                    "hpx::collectives::detail::create_local_communicator",
299
                    "the given base name for the communicator operation "
300
                    "was already registered: {}",
301
                    c.registered_name());
302
            }
303

304
            c.set_info(num_sites, this_site, root_site);
305
            return c;
306
        }
307

308
        // find existing communicator
309
        return hpx::find_from_basename<communicator>(HPX_MOVE(name), root_site)
310
            .then(hpx::launch::sync, [=](communicator&& c) {
311
                c.set_info(num_sites, this_site, root_site);
312
                return HPX_MOVE(c);
313
            });
314
    }
315

316
    communicator create_local_communicator(hpx::launch::sync_policy policy,
317
        char const* basename, num_sites_arg num_sites, this_site_arg this_site,
318
        generation_arg generation, root_site_arg root_site)
319
    {
320
        if (root_site == static_cast<std::size_t>(-1))
321
        {
322
            root_site = this_site;
323
        }
324

325
        HPX_ASSERT(this_site < num_sites);
326
        HPX_ASSERT(
327
            root_site != static_cast<std::size_t>(-1) && root_site < num_sites);
328
        HPX_ASSERT(basename != nullptr && basename[0] != '\0');
329

330
        // make sure the communicator will be registered in the local AGAS
331
        // symbol service instance
332
        std::string name = hpx::util::format("/{}{}{}", agas::get_locality_id(),
333
            basename[0] == '/' ? "" : "/", basename);
334
        if (!generation.is_default())
335
        {
336
            name += std::to_string(generation) + "/";
337
        }
338

339
        if (this_site == root_site)
340
        {
341
            // create a new communicator
342
            auto c = hpx::local_new<communicator>(num_sites, basename);
343

344
            // register the communicator's id using the given basename, this
345
            // keeps the communicator alive
346
            bool const result = c.register_as(hpx::launch::sync,
347
                hpx::detail::name_from_basename(HPX_MOVE(name), this_site));
348

349
            if (!result)
350
            {
351
                HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
352
                    "hpx::collectives::detail::create_local_communicator",
353
                    "the given base name for the communicator operation was "
354
                    "already registered: {}",
355
                    c.registered_name());
356
            }
357

358
            c.set_info(num_sites, this_site, root_site);
359
            return c;
360
        }
361

362
        // find existing communicator
363
        auto c = hpx::find_from_basename<communicator>(
364
            policy, HPX_MOVE(name), root_site);
365
        c.set_info(num_sites, this_site, root_site);
366
        return c;
367
    }
368

369
    ///////////////////////////////////////////////////////////////////////////
370
    // Predefined global communicator
371
    namespace {
372

373
        communicator world_communicator;
374
        communicator local_communicator;
375

376
        hpx::mutex local_communicator_mtx;
377
    }    // namespace
378

379
    communicator get_world_communicator()
380
    {
381
        HPX_ASSERT(world_communicator);
382
        return world_communicator;
383
    }
384

385
    namespace detail {
386

387
        void create_global_communicator()
388
        {
389
            HPX_ASSERT(!world_communicator);
390

391
            auto const num_sites =
392
                num_sites_arg(agas::get_num_localities(hpx::launch::sync));
393
            auto const this_site = this_site_arg(agas::get_locality_id());
394

395
            world_communicator =
396
                create_communicator(hpx::launch::sync, "/0/world_communicator",
397
                    num_sites, this_site, generation_arg(), root_site_arg(0));
398
            world_communicator.set_info(num_sites, this_site, root_site_arg(0));
399
        }
400

401
        void reset_global_communicator()
402
        {
403
            if (world_communicator)
404
            {
405
                world_communicator.detach();
406
            }
407
        }
408
    }    // namespace detail
409

410
    communicator get_local_communicator()
411
    {
412
        detail::create_local_communicator();
413
        return local_communicator;
414
    }
415

416
    namespace detail {
417

418
        void create_local_communicator()
419
        {
420
            std::unique_lock<hpx::mutex> l(local_communicator_mtx);
421
            [[maybe_unused]] util::ignore_while_checking il(&l);
422

423
            if (!local_communicator)
424
            {
425
                auto const num_sites =
426
                    num_sites_arg(hpx::get_num_worker_threads());
427
                auto const this_site =
428
                    this_site_arg(hpx::get_worker_thread_num());
429

430
                local_communicator = collectives::create_local_communicator(
431
                    hpx::launch::sync, "local_communicator", num_sites,
432
                    this_site, generation_arg(), root_site_arg(0));
433
                local_communicator.set_info(
434
                    num_sites, this_site, root_site_arg(0));
435
            }
436
        }
437

438
        void reset_local_communicator()
439
        {
440
            if (local_communicator)
441
            {
442
                local_communicator.detach();
443
            }
444
        }
445
    }    // namespace detail
446
}    // namespace hpx::collectives
447

448
#endif    // !HPX_COMPUTE_DEVICE_CODE
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc