• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #853

19 Dec 2022 01:01AM UTC coverage: 86.287% (+0.4%) from 85.912%
#853

push

StellarBot
Merge #6109

6109: Modernize serialization module r=hkaiser a=hkaiser

- flyby separate serialization of Boost types

working towards https://github.com/STEllAR-GROUP/hpx/issues/5497

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

53 of 53 new or added lines in 6 files covered. (100.0%)

173939 of 201582 relevant lines covered (86.29%)

1931657.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.99
/libs/full/parcelport_tcp/src/connection_handler_tcp.cpp
1
//  Copyright (c) 2007-2021 Hartmut Kaiser
2
//  Copyright (c) 2007 Richard D Guidry Jr
3
//  Copyright (c) 2011 Bryce Lelbach
4
//  Copyright (c) 2011 Katelyn Kufahl
5
//  Copyright (c) 2011-2014 Thomas Heller
6
//
7
//  SPDX-License-Identifier: BSL-1.0
8
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
9
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
10

11
#include <hpx/config.hpp>
12

13
#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_TCP)
14
#include <hpx/assert.hpp>
15
#include <hpx/modules/asio.hpp>
16
#include <hpx/modules/errors.hpp>
17
#include <hpx/modules/functional.hpp>
18
#include <hpx/modules/runtime_configuration.hpp>
19
#include <hpx/modules/util.hpp>
20

21
#include <hpx/parcelport_tcp/connection_handler.hpp>
22
#include <hpx/parcelport_tcp/locality.hpp>
23
#include <hpx/parcelport_tcp/receiver.hpp>
24
#include <hpx/parcelport_tcp/sender.hpp>
25
#include <hpx/parcelset_base/locality.hpp>
26

27
#include <asio/io_context.hpp>
28
#include <asio/ip/tcp.hpp>
29

30
#include <chrono>
31
#include <cstddef>
32
#include <cstdint>
33
#include <exception>
34
#include <memory>
35
#include <mutex>
36
#include <string>
37
#include <system_error>
38
#include <thread>
39

40
namespace hpx::parcelset::policies::tcp {
41

42
    parcelset::locality parcelport_address(
283✔
43
        util::runtime_configuration const& ini)
44
    {
45
        // load all components as described in the configuration information
46
        if (ini.has_section("hpx.parcel"))
283✔
47
        {
48
            util::section const* sec = ini.get_section("hpx.parcel");
283✔
49
            if (nullptr != sec)
283✔
50
            {
51
                return parcelset::locality(
283✔
52
                    locality(sec->get_entry("address", HPX_INITIAL_IP_ADDRESS),
566✔
53
                        hpx::util::get_entry_as<std::uint16_t>(
283✔
54
                            *sec, "port", HPX_INITIAL_IP_PORT)));
283✔
55
            }
56
        }
×
57

58
        return parcelset::locality(
×
59
            locality(HPX_INITIAL_IP_ADDRESS, HPX_INITIAL_IP_PORT));
×
60
    }
283✔
61

62
    connection_handler::connection_handler(
566✔
63
        util::runtime_configuration const& ini,
64
        threads::policies::callback_notifier const& notifier)
65
      : base_type(ini, parcelport_address(ini), notifier)
283✔
66
      , acceptor_(nullptr)
283✔
67
    {
283✔
68
        if (here_.type() != std::string("tcp"))
283✔
69
        {
70
            HPX_THROW_EXCEPTION(hpx::error::network_error,
×
71
                "tcp::parcelport::parcelport",
72
                "this parcelport was instantiated to represent an unexpected "
73
                "locality type: {}",
74
                here_.type());
75
        }
76
    }
283✔
77

78
    connection_handler::~connection_handler()
564✔
79
    {
564✔
80
        HPX_ASSERT(acceptor_ == nullptr);
282✔
81
    }
564✔
82

83
    bool connection_handler::do_run()
283✔
84
    {
85
        using asio::ip::tcp;
86
        asio::io_context& io_service = io_service_pool_.get_io_service();
283✔
87
        if (nullptr == acceptor_)
283✔
88
            acceptor_ = new tcp::acceptor(io_service);
283✔
89

90
        // initialize network
91
        std::size_t tried = 0;
283✔
92
        exception_list errors;
283✔
93
        util::endpoint_iterator_type end = util::accept_end();
283✔
94
        for (util::endpoint_iterator_type it =
1,132✔
95
                 util::accept_begin(here_.get<locality>(), io_service);
283✔
96
             it != end; ++it, ++tried)
566✔
97
        {
98
            try
99
            {
100
                std::shared_ptr<receiver> receiver_conn(new receiver(
283✔
101
                    io_service, get_max_inbound_message_size(), *this));
283✔
102

103
                tcp::endpoint ep = *it;
283✔
104
                acceptor_->open(ep.protocol());
283✔
105
                acceptor_->set_option(tcp::acceptor::reuse_address(true));
283✔
106
                acceptor_->bind(ep);
283✔
107
                acceptor_->listen();
283✔
108
                acceptor_->async_accept(receiver_conn->socket(),
283✔
109
                    hpx::bind(&connection_handler::handle_accept, this,
283✔
110
                        placeholders::_1, receiver_conn));
111
            }
283✔
112
            catch (std::system_error const&)
113
            {
114
                errors.add(std::current_exception());
×
115
                continue;
116
            }
×
117
        }
283✔
118

119
        if (errors.size() == tried)
283✔
120
        {
121
            // all attempts failed
122
            HPX_THROW_EXCEPTION(hpx::error::network_error,
×
123
                "tcp::parcelport::run", errors.get_message());
124
            return false;
125
        }
126
        return true;
127
    }
283✔
128

129
    void connection_handler::do_stop()
846✔
130
    {
131
        {
132
            // cancel all pending read operations, close those sockets
133
            std::lock_guard<hpx::spinlock> l(connections_mtx_);
846✔
134
            for (std::shared_ptr<receiver> const& c : accepted_connections_)
1,494✔
135
            {
136
                c->shutdown();
648✔
137
            }
138

139
            accepted_connections_.clear();
846✔
140
#if defined(HPX_HOLDON_TO_OUTGOING_CONNECTIONS)
141
            write_connections_.clear();
142
#endif
143
        }
846✔
144
        if (acceptor_ != nullptr)
846✔
145
        {
146
            std::error_code ec;
282✔
147
            acceptor_->close(ec);
282✔
148
            delete acceptor_;
282✔
149
            acceptor_ = nullptr;
282✔
150
        }
282✔
151
    }
846✔
152

153
    std::shared_ptr<sender> connection_handler::create_connection(
823✔
154
        parcelset::locality const& l, error_code& ec)
155
    {
156
        asio::io_context& io_service = io_service_pool_.get_io_service();
823✔
157

158
        // The parcel gets serialized inside the connection constructor, no
159
        // need to keep the original parcel alive after this call returned.
160
        std::shared_ptr<sender> sender_connection(
823✔
161
            new sender(io_service, l, this));
823✔
162

163
        // Connect to the target locality, retry if needed
164
        std::error_code error = asio::error::try_again;
823✔
165
        for (std::size_t i = 0; i < HPX_MAX_NETWORK_RETRIES; ++i)
888✔
166
        {
167
            // The acceptor is only nullptr when the parcelport has been stopped.
168
            // An exit here, avoids hangs when late parcels are in flight (those are
169
            // mainly decref requests).
170
            if (acceptor_ == nullptr)
888✔
171
                return std::shared_ptr<sender>();
×
172
            try
173
            {
174
                util::endpoint_iterator_type end = util::connect_end();
888✔
175
                for (util::endpoint_iterator_type it =
2,711✔
176
                         util::connect_begin(l.get<locality>(), io_service);
888✔
177
                     it != end; ++it)
1,016✔
178
                {
179
                    asio::ip::tcp::socket& s = sender_connection->socket();
882✔
180
                    s.close();
882✔
181
                    s.connect(*it, error);
882✔
182
                    if (!error)
882✔
183
                        break;
817✔
184
                }
65✔
185
                if (!error)
875✔
186
                    break;
810✔
187

188
                // wait for a really short amount of time
189
                if (hpx::threads::get_self_ptr())
65✔
190
                {
191
                    this_thread::suspend(
×
192
                        hpx::threads::thread_schedule_state::pending,
193
                        "connection_handler(tcp)::create_connection");
×
194
                }
×
195
                else
196
                {
197
                    std::this_thread::sleep_for(
65✔
198
                        std::chrono::milliseconds(HPX_NETWORK_RETRIES_SLEEP));
65✔
199
                }
200
            }
875✔
201
            catch (std::system_error const& e)
202
            {
203
                sender_connection->socket().close();
×
204
                sender_connection.reset();
×
205

206
                HPX_THROWS_IF(ec, hpx::error::network_error,
×
207
                    "tcp::connection_handler::get_connection", e.what());
208
                return sender_connection;
×
209
            }
×
210
        }
65✔
211

212
        if (error)
809✔
213
        {
214
            sender_connection->socket().close();
×
215
            sender_connection.reset();
×
216

217
            if (tolerate_node_faults())
×
218
                return sender_connection;
×
219

220
            HPX_THROWS_IF(ec, hpx::error::network_error,
×
221
                "tcp::connection_handler::get_connection",
222
                "{} (while trying to connect to: {})", error.message(), l);
223
            return sender_connection;
×
224
        }
225

226
        // make sure the Nagle algorithm is disabled for this socket,
227
        // disable lingering on close
228
        asio::ip::tcp::socket& s = sender_connection->socket();
812✔
229

230
        s.set_option(asio::ip::tcp::no_delay(true));
809✔
231
        s.set_option(asio::socket_base::linger(true, 0));
815✔
232

233
#if defined(HPX_HOLDON_TO_OUTGOING_CONNECTIONS)
234
        {
235
            std::lock_guard<hpx::spinlock> lock(connections_mtx_);
236
            write_connections_.insert(sender_connection);
237
        }
238
#endif
239
#if defined(HPX_DEBUG)
240
        HPX_ASSERT(l == sender_connection->destination());
810✔
241

242
        std::string connection_addr = s.remote_endpoint().address().to_string();
814✔
243
        std::uint16_t connection_port = s.remote_endpoint().port();
802✔
244
        HPX_ASSERT(hpx::util::cleanup_ip_address(l.get<locality>().address()) ==
814✔
245
            hpx::util::cleanup_ip_address(connection_addr));
246
        HPX_ASSERT(l.get<locality>().port() == connection_port);
806✔
247
#endif
248

249
        if (&ec != &throws)
805✔
250
            ec = make_success_code();
805✔
251

252
        return sender_connection;
815✔
253
    }
817✔
254

255
    parcelset::locality connection_handler::agas_locality(
424✔
256
        util::runtime_configuration const& ini) const
257
    {
258
        // load all components as described in the configuration information
259
        if (ini.has_section("hpx.agas"))
424✔
260
        {
261
            util::section const* sec = ini.get_section("hpx.agas");
424✔
262
            if (nullptr != sec)
424✔
263
            {
264
                return parcelset::locality(
424✔
265
                    locality(sec->get_entry("address", HPX_INITIAL_IP_ADDRESS),
848✔
266
                        hpx::util::get_entry_as<std::uint16_t>(
424✔
267
                            *sec, "port", HPX_INITIAL_IP_PORT)));
424✔
268
            }
269
        }
×
270

271
        return parcelset::locality(
×
272
            locality(HPX_INITIAL_IP_ADDRESS, HPX_INITIAL_IP_PORT));
×
273
    }
424✔
274

275
    parcelset::locality connection_handler::create_locality() const
710✔
276
    {
277
        return parcelset::locality(locality());
710✔
278
    }
×
279

280
    // accepted new incoming connection
281
    void connection_handler::handle_accept(
1,105✔
282
        std::error_code const& e, std::shared_ptr<receiver> receiver_conn)
283
    {
284
        if (!e)
1,105✔
285
        {
286
            // handle this incoming connection
287
            std::shared_ptr<receiver> c(receiver_conn);
823✔
288

289
            asio::io_context& io_service = io_service_pool_.get_io_service();
823✔
290
            receiver_conn.reset(new receiver(
823✔
291
                io_service, get_max_inbound_message_size(), *this));
823✔
292
            acceptor_->async_accept(receiver_conn->socket(),
823✔
293
                hpx::bind(&connection_handler::handle_accept, this,
823✔
294
                    placeholders::_1, receiver_conn));
295

296
            {
297
                // keep track of all accepted connections
298
                std::lock_guard<hpx::spinlock> l(connections_mtx_);
823✔
299
                accepted_connections_.insert(c);
823✔
300
            }
823✔
301

302
            // disable Nagle algorithm, disable lingering on close
303
            asio::ip::tcp::socket& s = c->socket();
823✔
304
            s.set_option(asio::ip::tcp::no_delay(true));
823✔
305
            s.set_option(asio::socket_base::linger(true, 0));
823✔
306

307
            // now accept the incoming connection by starting to read from the
308
            // socket
309
            c->async_read(hpx::bind(&connection_handler::handle_read_completion,
1,646✔
310
                this, placeholders::_1, c));
823✔
311
        }
823✔
312
        else
313
        {
314
            // remove this connection from the list of known connections
315
            std::lock_guard<hpx::spinlock> l(mtx_);
282✔
316
            accepted_connections_.erase(receiver_conn);
282✔
317
        }
282✔
318
    }
1,105✔
319

320
    // Handle completion of a read operation.
321
    void connection_handler::handle_read_completion(
437,635✔
322
        std::error_code const& e, std::shared_ptr<receiver> receiver_conn)
323
    {
324
        if (!e)
437,638✔
325
        {
326
            return;
436,816✔
327
        }
328

329
        if (e != asio::error::operation_aborted && e != asio::error::eof)
818✔
330
        {
331
            LPT_(error).format(
×
332
                "handle read operation completion: error: {}", e.message());
×
333
        }
×
334

335
        {
336
            // remove this connection from the list of known connections
337
            std::lock_guard<hpx::spinlock> l(connections_mtx_);
819✔
338
            accepted_connections_.erase(receiver_conn);
819✔
339
        }
819✔
340
    }
437,635✔
341
}    // namespace hpx::parcelset::policies::tcp
342

343
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc