• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/libs/full/agas/src/route.cpp
1
//  Copyright (c) 2011 Vinay C Amatya
2
//  Copyright (c) 2007-2025 Hartmut Kaiser
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
#include <hpx/config.hpp>
9

10
#if defined(HPX_HAVE_NETWORKING)
11
#include <hpx/actions_base/plain_action.hpp>
12
#include <hpx/agas/addressing_service.hpp>
13
#include <hpx/agas_base/route.hpp>
14
#include <hpx/agas_base/server/primary_namespace.hpp>
15
#include <hpx/assert.hpp>
16
#include <hpx/async_distributed/continuation.hpp>
17
#include <hpx/async_distributed/detail/post.hpp>
18
#include <hpx/components_base/agas_interface.hpp>
19
#include <hpx/modules/errors.hpp>
20
#include <hpx/modules/runtime_local.hpp>
21
#include <hpx/modules/timing.hpp>
22
#include <hpx/parcelset/parcel.hpp>
23
#include <hpx/parcelset_base/detail/parcel_route_handler.hpp>
24

25
#include <atomic>
26
#include <cstddef>
27
#include <cstdint>
28
#include <mutex>
29
#include <utility>
30

31
#include <hpx/config/warnings_prefix.hpp>
32

×
33
namespace hpx::detail {
34

35
    void update_agas_cache(hpx::naming::gid_type const& gid,
36
        hpx::naming::address const& addr, std::uint64_t count,
×
37
        std::uint64_t offset)
×
38
    {
39
        hpx::agas::update_cache_entry(gid, addr, count, offset);
40
    }
×
41
}    // namespace hpx::detail
42

43
HPX_PLAIN_ACTION_ID(hpx::detail::update_agas_cache, update_agas_cache_action,
44
    hpx::actions::update_agas_cache_action_id)
45

×
46
namespace hpx::agas::server {
47

×
48
    void route_impl(primary_namespace& server, parcelset::parcel&& p)
49
    {
×
50
        LPT_(debug).format("agas::server::route_impl: {}", p.parcel_id());
×
51

×
52
        naming::gid_type const& gid = p.destination();
53
        naming::address& addr = p.addr();
54
        primary_namespace::resolved_type cache_address;
55

56
        // resolve destination addresses, we should be able to resolve all of
57
        // them, otherwise it's an error
58
        {
59
            std::unique_lock<primary_namespace::mutex_type> l(server.mutex());
60

61
            error_code& ec = throws;
×
62

63
            // wait for any migration to be completed
×
64
            if (naming::detail::is_migratable(gid))
65
            {
66
                server.wait_for_migration_locked(l, gid, ec);
×
67
            }
68

×
69
            cache_address = server.resolve_gid_locked(l, gid, ec);
70

×
71
            if (ec || hpx::get<0>(cache_address) == naming::invalid_gid)
72
            {
×
73
                l.unlock();
74

75
                HPX_THROWS_IF(ec, hpx::error::no_success,
76
                    "primary_namespace::route",
77
                    "can't route parcel to unknown gid: {}", gid);
78

79
                return;
80
            }
×
81

82
            // retain don't store in cache flag
83
            if (!naming::detail::store_in_cache(gid) &&
84
                !naming::is_locality(gid))
85
            {
86
                naming::detail::set_dont_store_in_cache(
87
                    hpx::get<0>(cache_address));
88
            }
89

90
            gva const g = hpx::get<1>(cache_address)
×
91
                              .resolve(gid, hpx::get<0>(cache_address));
×
92

93
            addr.locality_ = g.prefix;
94
            addr.type_ = g.type;
×
95
            addr.address_ = g.lva();
96
        }
97

×
98
        hpx::id_type const source = p.source_id();
×
99

100
        // either send the parcel on its way or execute actions locally
101
        if (naming::get_locality_id_from_gid(addr.locality_) ==
×
102
            agas::get_locality_id())
103
        {
104
            // destination is local
×
105
            if (p.schedule_action())
×
106
            {
107
                // object was migrated, route again
108
                agas::route(HPX_MOVE(p),
109
                    &hpx::parcelset::detail::parcel_route_handler,
110
                    threads::thread_priority::normal);
111
            }
112
        }
×
113
        else
114
        {
115
            // destination is remote
×
116
            hpx::parcelset::put_parcel(HPX_MOVE(p));
×
117
        }
118

119
        runtime const& rt = get_runtime();
120
        if (rt.get_state() < hpx::state::pre_shutdown)
121
        {
×
122
            // asynchronously update cache on source locality update remote
123
            // cache if the id is not flagged otherwise
124
            naming::gid_type const& id = hpx::get<0>(cache_address);
×
125
            if (id && naming::detail::store_in_cache(id))
126
            {
127
                gva const& g = hpx::get<1>(cache_address);
128
                naming::address address(g.prefix, g.type, g.lva());
129

130
                HPX_ASSERT(naming::is_locality(source));
131

132
                hpx::post<update_agas_cache_action>(
133
                    source, id, address, g.count, g.offset);
134
            }
135
        }
136
    }
137

138
    ///////////////////////////////////////////////////////////////////////////
139
    struct init_route_function
140
    {
141
        init_route_function()
142
        {
143
            server::route = &route_impl;
144
        }
145
    };
146

147
    init_route_function init;
148
}    // namespace hpx::agas::server
149

150
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc