• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #859

05 Jan 2023 05:50PM UTC coverage: 85.881% (-0.6%) from 86.496%
#859

push

StellarBot
Merge #6116

6116: Add new command line argument --hpx:loopback_network r=hkaiser a=JiakunYan

Option `--hpx:loopback_network=0|1`. If set to 1, the network (at least the initialization phase) will be enabled even if the locality number is 1.

It is useful for debugging networks.

Co-authored-by: Jiakun Yan <jiakunyan1998@gmail.com>

4 of 4 new or added lines in 1 file covered. (100.0%)

173321 of 201815 relevant lines covered (85.88%)

1844045.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.92
/libs/full/runtime_distributed/include/hpx/runtime_distributed/server/migrate_component.hpp
1
//  Copyright (c) 2007-2021 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#pragma once
8

9
#include <hpx/config.hpp>
10
#include <hpx/actions_base/plain_action.hpp>
11
#include <hpx/components/get_ptr.hpp>
12
#include <hpx/components_base/agas_interface.hpp>
13
#include <hpx/components_base/traits/component_supports_migration.hpp>
14
#include <hpx/components_base/traits/is_component.hpp>
15
#include <hpx/naming_base/id_type.hpp>
16
#include <hpx/runtime_distributed/find_here.hpp>
17
#include <hpx/runtime_distributed/stubs/runtime_support.hpp>
18

19
#include <cstdint>
20
#include <memory>
21
#include <utility>
22

23
namespace hpx { namespace components { namespace server {
24

25
    ///////////////////////////////////////////////////////////////////////////
26
    //
27
    // Migrate a given component instance to the specified target locality (the
28
    // component instance is called 'object' below).
29
    //
30
    // Migration of an object will involve at least two localities, but can
31
    // touch on up to 4 localities:
32
    //
33
    // A) The locality on which the migration operation is triggered. This is
34
    //    the locality where `hpx::components::migrate()` is invoked.
35
    // B) The locality where the object to be migrated is currently located.
36
    // C) The locality where the object should be moved (migrated) to.
37
    // D) The locality which hosts the AGAS instance responsible for resolving
38
    //    the global address of the object to be migrated.
39
    //
40
    //    The localities B and C will be different, while the localities A and
41
    //    D could be the same as either of the others.
42
    //
43
    // Object migration is performed in several steps:
44
    //
45
    // 1) The migration is triggered on locality A by invoking the `migrate()`
46
    //    function. This invokes the action `perform_migrate_component_action`
47
    //    on the locality where the object to migrate is currently located
48
    //    (locality B).
49
    //
50
    //    The action `perform_migrate_component_action` executes the following
51
    //    steps:
52
    //    a) It will delay the start of the migration operation until no more
53
    //       actions (threads) are pending or currently running for the object
54
    //       to be migrated (the object is unpinned).
55
    //    b) It marks the object which is about to be migrated as 'was
56
    //       migrated'. This information is stored in the AGAS client side
57
    //       representation on locality B. It is used to forward all incoming
58
    //       parcels to the object's new locality. It is also used to force any
59
    //       locally triggered actions for the object to go through the parcel
60
    //       layer. For this, the object to be migrated is removed from the
61
    //       local AGAS cache.
62
    //    c) It will trigger the actual migration operation (see step 2).
63
    //
64
    //    In order to keep track of any pending and currently running actions
65
    //    (threads) for the object to migrate, any thread which is being
66
    //    scheduled will pin the object. The object will be unpinned only once
67
    //    the scheduled thread has executed to completion. Any last unpinning
68
    //    of the object will release any possibly pending migration operations
69
    //    (see step 1a).
70
    //
71
    // 2) The migration is triggered by invoking the action
72
    //    `trigger_migration_component_action` on the locality which is
73
    //    responsible for managing the address resolution for the object which
74
    //    has to be migrated (locality D).
75
    //
76
    //    The action `trigger_migration_component_action` performs 3 steps:
77
    //    a) Invoke `agas::begin_migration`, which marks the global id in AGAS,
78
    //       deferring all address resolution requests until end_migration is
79
    //       called. Note that the future returned by agas::begin_migration
80
    //       will become ready only after all currently pending operations on
81
    //       the target object have finished executing.
82
    //    b) Invoke the actual migration operation (see step 3)
83
    //    c) Invoke `agas::end_migration`, which un-marks the global id and
84
    //       releases all pending address resolution requests. Those requests
85
    //       now return the new object location (on locality C).
86
    //
87
    // 3) The actual migration (`migrate_component_action`) is executed on the
88
    //    locality where the object is currently located (locality B). This
89
    //    involves several steps as well:
90
    //    a) Retrieve the (shared-) pointer to the object, this pins the
91
    //       object. The object is unpinned by the deleter associated with the
92
    //       shared pointer.
93
    //    b) Invoke the action `runtime_support::migrate_component` on the
94
    //       locality where the object has to be moved to. This passes
95
    //       along the shared pointer to the object and recreates the object
96
    //       on the target locality and updates the association of the object's
97
    //       global id with the new local virtual address in AGAS.
98
    //    c) Mark the old object (through the original shared pointer) as
99
    //       migrated which will delete it once the shared pointer goes out of
100
    //       scope.
101
    //
102
    //    The entry in the AGAS client side representation on locality B which
103
    //    marks the object as 'was migrated' will be left untouched (for now).
104
    //    This is necessary to allow for all parcels which where still resolved
105
    //    to the old locality will be properly forwarded to the new location
106
    //    of the object. Eventually this entry will have to be cleaned up no
107
    //    later than when the object is destroyed.
108
    //
109
    ///////////////////////////////////////////////////////////////////////////
110

111
    ///////////////////////////////////////////////////////////////////////////
112
    // This is step 3 of the migration process
113
    //
114
    // This will be executed on the locality where the object lives which is
115
    // to be migrated
116
    namespace detail {
117
        // trigger the actual migration
118
        template <typename Component, typename DistPolicy>
119
        future<id_type> migrate_component_postproc(
2,020✔
120
            std::shared_ptr<Component> const& ptr, id_type const& to_migrate,
121
            DistPolicy const& policy)
122
        {
123
            using components::stubs::runtime_support;
124

125
            std::uint32_t pin_count = ptr->pin_count();
2,020✔
126

127
            if (pin_count == ~0x0u)
2,020✔
128
            {
129
                agas::unmark_as_migrated(to_migrate.get_gid());
×
130

131
                return hpx::make_exceptional_future<id_type>(
×
132
                    HPX_GET_EXCEPTION(hpx::error::invalid_status,
×
133
                        "hpx::components::server::migrate_component",
134
                        "attempting to migrate an instance of a component "
135
                        "that was already migrated"));
136
            }
137

138
            if (pin_count > 1)
2,020✔
139
            {
140
                agas::unmark_as_migrated(to_migrate.get_gid());
×
141

142
                return hpx::make_exceptional_future<id_type>(
×
143
                    HPX_GET_EXCEPTION(hpx::error::migration_needs_retry,
×
144
                        "hpx::components::server::migrate_component",
145
                        "attempting to migrate an instance of a component "
146
                        "that is currently pinned"));
147
            }
148

149
            return runtime_support::migrate_component_async<Component>(
2,020✔
150
                policy, ptr, to_migrate)
2,020✔
151
                .then(launch::sync, [ptr, to_migrate](future<id_type>&& f) {
10,100✔
152
                    ptr->mark_as_migrated();
2,020✔
153
                    if (f.has_exception())
2,020✔
154
                    {
155
                        agas::unmark_as_migrated(to_migrate.get_gid());
×
156
                    }
×
157
                    return f.get();
2,020✔
158
                });
159
        }
2,020✔
160
    }    // namespace detail
161

162
    template <typename Component, typename DistPolicy>
163
    future<id_type> migrate_component(id_type const& to_migrate,
3,232✔
164
        naming::address const& addr, DistPolicy const& policy)
165
    {
166
        // 'migration' to same locality as before is a no-op
167
        if (policy.get_next_target() == hpx::find_here())
3,232✔
168
        {
169
            agas::unmark_as_migrated(to_migrate.get_gid());
1,212✔
170
            return make_ready_future(to_migrate);
1,212✔
171
        }
172

173
        if (!traits::component_supports_migration<Component>::call())
174
        {
175
            agas::unmark_as_migrated(to_migrate.get_gid());
176

177
            return hpx::make_exceptional_future<hpx::id_type>(
178
                HPX_GET_EXCEPTION(hpx::error::invalid_status,
179
                    "hpx::components::server::migrate_component",
180
                    "attempting to migrate an instance of a component that "
181
                    "does not support migration"));
182
        }
183

184
        // retrieve pointer to object (must be local)
185
        std::shared_ptr<Component> ptr =
186
            hpx::detail::get_ptr_for_migration<Component>(addr, to_migrate);
2,020✔
187

188
        // perform actual migration by sending data over to target locality
189
        return detail::migrate_component_postproc<Component>(
2,020✔
190
            ptr, to_migrate, policy);
2,020✔
191
    }
3,232✔
192

193
    template <typename Component, typename DistPolicy>
194
    struct migrate_component_action
195
      : ::hpx::actions::action<future<id_type> (*)(id_type const&,
196
                                   naming::address const&, DistPolicy const&),
197
            &migrate_component<Component, DistPolicy>,
198
            migrate_component_action<Component, DistPolicy>>
199
    {
200
    };
201

202
    ///////////////////////////////////////////////////////////////////////////
203
    // This is step 2 of the migration process
204
    //
205
    // This is executed on the locality responsible for managing the address
206
    // resolution for the given object.
207
    template <typename Component, typename DistPolicy>
208
    future<id_type> trigger_migrate_component(id_type const& to_migrate,
3,232✔
209
        DistPolicy const& policy, hpx::id_type const& id,
210
        naming::address const& addr)
211
    {
212
        if (!traits::component_supports_migration<Component>::call())
213
        {
214
            return hpx::make_exceptional_future<id_type>(
215
                HPX_GET_EXCEPTION(hpx::error::invalid_status,
216
                    "hpx::components::server::trigger_migrate_component",
217
                    "attempting to migrate an instance of a component that "
218
                    "does not support migration"));
219
        }
220

221
        if (naming::get_locality_id_from_id(to_migrate) != get_locality_id())
3,232✔
222
        {
223
            return hpx::make_exceptional_future<id_type>(HPX_GET_EXCEPTION(
×
224
                hpx::error::invalid_status,
225
                "hpx::components::server::trigger_migrate_component",
226
                "this function has to be executed on the locality "
227
                "responsible for managing the address of the given object"));
228
        }
229

230
        // perform actual object migration
231
        typedef migrate_component_action<Component, DistPolicy> action_type;
232

233
        // force unwrapping outer future
234
        future<id_type> f = async<action_type>(id, to_migrate, addr, policy);
3,232✔
235

236
        return f.then(
3,232✔
237
            launch::sync, [=](future<id_type>&& f) -> future<id_type> {
16,160✔
238
                agas::end_migration(to_migrate);
3,232✔
239
                if (f.has_exception())
3,232✔
240
                {
241
                    try
242
                    {
243
                        f.get();    // rethrow exception
×
244
                    }
×
245
                    catch (hpx::exception const& e)
246
                    {
247
                        // simply retry if the migration operation detected
248
                        // a (global) race between scheduled actions waiting
249
                        // in AGAS and other migration operations
250
                        if (e.get_error() == hpx::error::migration_needs_retry)
×
251
                        {
252
                            return trigger_migrate_component<Component>(
×
253
                                to_migrate, policy, id, addr);
×
254
                        }
255
                        throw;
×
256
                    }
×
257
                    catch (...)
258
                    {
259
                        throw;
×
260
                    }
×
261
                }
×
262
                return HPX_MOVE(f);
3,232✔
263
            });
3,232✔
264
    }
3,232✔
265

266
    template <typename Component, typename DistPolicy>
267
    struct trigger_migrate_component_action
268
      : ::hpx::actions::action<future<id_type> (*)(id_type const&,
269
                                   DistPolicy const&, hpx::id_type const&,
270
                                   naming::address const&),
271
            &trigger_migrate_component<Component, DistPolicy>,
272
            trigger_migrate_component_action<Component, DistPolicy>>
273
    {
274
    };
275

276
    ///////////////////////////////////////////////////////////////////////////
277
    // This is step 1 of the migration process
278
    //
279
    // This is executed on the locality where the object to migrate is
280
    // currently located.
281
    template <typename Component, typename DistPolicy>
282
    future<id_type> perform_migrate_component(
3,232✔
283
        id_type const& to_migrate, DistPolicy const& policy)
284
    {
285
        if (!traits::component_supports_migration<Component>::call())
286
        {
287
            return hpx::make_exceptional_future<id_type>(
288
                HPX_GET_EXCEPTION(hpx::error::invalid_status,
289
                    "hpx::components::server::perform_migrate_component",
290
                    "attempting to migrate an instance of a component that "
291
                    "does not support migration"));
292
        }
293

294
        // retrieve pointer to object (must be local)
295
        return hpx::get_ptr<Component>(to_migrate)
3,232✔
296
            .then(launch::sync,
3,232✔
297
                [=](future<std::shared_ptr<Component>>&& f) -> future<id_type> {
16,160✔
298
                    std::shared_ptr<Component> ptr = f.get();
3,232✔
299

300
                    using bm_result = std::pair<hpx::id_type, naming::address>;
301

302
                    // mark object in AGAS as being migrated first
303
                    return agas::begin_migration(to_migrate)
6,464✔
304
                        .then(launch::sync,
3,232✔
305
                            [ptr = HPX_MOVE(ptr), to_migrate, policy](
16,160✔
306
                                hpx::future<bm_result>&& bmf) mutable
307
                            -> future<id_type> {
308
                                auto r = bmf.get();    // propagate exceptions
3,232✔
309

310
                                // Delay the start of the migration operation until no
311
                                // more actions (threads) are pending or currently
312
                                // running for the given object (until the object is
313
                                // unpinned).
314
                                future<void> trigger_migration =
315
                                    ptr->mark_as_migrated(to_migrate);
3,232✔
316

317
                                // Unpin the object, will trigger migration if this is
318
                                // the only pin-count.
319
                                ptr = std::shared_ptr<Component>{};
3,232✔
320

321
                                // Once the migration is possible (object is not pinned
322
                                // anymore trigger the necessary actions)
323
                                return trigger_migration.then(
3,232✔
324
                                    launch::async,    // run on separate thread
325
                                    [=](future<void>&& f) -> future<id_type> {
16,160✔
326
                                        // end migration operation in case of error
327
                                        if (f.has_exception())
3,232✔
328
                                        {
329
                                            agas::end_migration(to_migrate);
×
330
                                        }
×
331

332
                                        f.get();    // rethrow exception
3,232✔
333

334
                                        // now trigger 2nd step of migration
335
                                        typedef trigger_migrate_component_action<
336
                                            Component, DistPolicy>
337
                                            action_type;
338

339
                                        return async<action_type>(
3,232✔
340
                                            naming::get_locality_from_id(
3,232✔
341
                                                to_migrate),
3,232✔
342
                                            to_migrate, policy, r.first,
3,232✔
343
                                            r.second);
3,232✔
344
                                    });
×
345
                            });
3,232✔
346
                });
3,232✔
347
    }
×
348

349
    template <typename Component, typename DistPolicy>
350
    struct perform_migrate_component_action
351
      : ::hpx::actions::action<future<id_type> (*)(
352
                                   id_type const&, DistPolicy const&),
353
            &perform_migrate_component<Component, DistPolicy>,
354
            perform_migrate_component_action<Component, DistPolicy>>
355
    {
356
    };
357
}}}    // namespace hpx::components::server
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc