• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #853

19 Dec 2022 01:01AM UTC coverage: 86.287% (+0.4%) from 85.912%
#853

push

StellarBot
Merge #6109

6109: Modernize serialization module r=hkaiser a=hkaiser

- flyby separate serialization of Boost types

working towards https://github.com/STEllAR-GROUP/hpx/issues/5497

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

53 of 53 new or added lines in 6 files covered. (100.0%)

173939 of 201582 relevant lines covered (86.29%)

1931657.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.99
/libs/full/parcelset/src/parcelhandler.cpp
1
//  Copyright (c) 2007-2022 Hartmut Kaiser
2
//  Copyright (c) 2013-2014 Thomas Heller
3
//  Copyright (c) 2007      Richard D Guidry Jr
4
//  Copyright (c) 2011      Bryce Lelbach & Katelyn Kufahl
5
//
6
//  SPDX-License-Identifier: BSL-1.0
7
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
8
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9

10
#include <hpx/config.hpp>
11

12
#if defined(HPX_HAVE_NETWORKING)
13
#include <hpx/assert.hpp>
14
#include <hpx/modules/errors.hpp>
15
#include <hpx/modules/format.hpp>
16
#include <hpx/modules/functional.hpp>
17
#include <hpx/modules/futures.hpp>
18
#include <hpx/modules/io_service.hpp>
19
#include <hpx/modules/itt_notify.hpp>
20
#include <hpx/modules/logging.hpp>
21
#include <hpx/modules/preprocessor.hpp>
22
#include <hpx/modules/resource_partitioner.hpp>
23
#include <hpx/modules/runtime_configuration.hpp>
24
#include <hpx/modules/runtime_local.hpp>
25
#include <hpx/modules/string_util.hpp>
26
#include <hpx/modules/synchronization.hpp>
27
#include <hpx/modules/thread_support.hpp>
28
#include <hpx/modules/threading_base.hpp>
29
#include <hpx/modules/threadmanager.hpp>
30
#include <hpx/modules/type_support.hpp>
31
#include <hpx/modules/util.hpp>
32
#include <hpx/util/from_string.hpp>
33

34
#include <hpx/components_base/agas_interface.hpp>
35
#include <hpx/naming_base/gid_type.hpp>
36
#include <hpx/parcelset/message_handler_fwd.hpp>
37
#include <hpx/parcelset/parcelhandler.hpp>
38
#include <hpx/parcelset/static_parcelports.hpp>
39
#include <hpx/parcelset_base/policies/message_handler.hpp>
40
#include <hpx/plugin_factories/parcelport_factory_base.hpp>
41

42
#include <asio/error.hpp>
43

44
#include <algorithm>
45
#include <cstddef>
46
#include <cstdint>
47
#include <exception>
48
#include <iostream>
49
#include <map>
50
#include <memory>
51
#include <mutex>
52
#include <sstream>
53
#include <string>
54
#include <system_error>
55
#include <utility>
56
#include <vector>
57
#if defined(HPX_HAVE_PARCEL_PROFILING)
58
#include <chrono>
59
#endif
60

61
///////////////////////////////////////////////////////////////////////////////
62
namespace hpx { namespace detail {
63
    void dijkstra_make_black();    // forward declaration only
64
}}                                 // namespace hpx::detail
65

66
///////////////////////////////////////////////////////////////////////////////
67
namespace hpx::parcelset {
68

69
    ///////////////////////////////////////////////////////////////////////////
70
    // A parcel is submitted for transport at the source locality site to
71
    // the parcel set of the locality with the put-parcel command
72
    // This function is synchronous.
73
    void parcelhandler::sync_put_parcel(parcel p)    //-V669
140✔
74
    {
75
        hpx::promise<void> promise;
140✔
76
        future<void> sent_future = promise.get_future();
140✔
77
        put_parcel(
140✔
78
            HPX_MOVE(p), [&promise](std::error_code const&, parcel const&) {
280✔
79
                promise.set_value();
140✔
80
            });               // schedule parcel send
140✔
81
        sent_future.get();    // wait for the parcel to be sent
140✔
82
    }
140✔
83

84
    parcelhandler::parcelhandler(util::runtime_configuration& cfg)
1,186✔
85
      : tm_(nullptr)
593✔
86
      , use_alternative_parcelports_(false)
593✔
87
      , enable_parcel_handling_(true)
593✔
88
      , load_message_handlers_(
1,186✔
89
            util::get_entry_as<int>(cfg, "hpx.parcel.message_handlers", 0) != 0)
593✔
90
      , count_routed_(0)
593✔
91
      , write_handler_(&default_write_handler)
593✔
92
#if defined(HPX_HAVE_NETWORKING)
93
      , is_networking_enabled_(cfg.enable_networking())
593✔
94
#else
95
      , is_networking_enabled_(false)
96
#endif
97
    {
98
        LPROGRESS_;
593✔
99
    }
593✔
100

101
    parcelhandler::~parcelhandler() = default;
591✔
102

103
    void parcelhandler::set_notification_policies(
593✔
104
        util::runtime_configuration& cfg, threads::threadmanager* tm,
105
        threads::policies::callback_notifier const& notifier)
106
    {
107
        is_networking_enabled_ = hpx::is_networking_enabled();
593✔
108
        tm_ = tm;
593✔
109

110
        if (is_networking_enabled_ &&
876✔
111
            cfg.get_entry("hpx.parcel.enable", "1") != "0")
283✔
112
        {
113
            for (plugins::parcelport_factory_base* factory :
566✔
114
                get_parcelport_factories())
283✔
115
            {
116
                std::shared_ptr<parcelport> pp(factory->create(cfg, notifier));
283✔
117
                attach_parcelport(pp);
283✔
118
            }
283✔
119
        }
283✔
120
    }
593✔
121

122
    std::shared_ptr<parcelport> parcelhandler::get_bootstrap_parcelport() const
876✔
123
    {
124
        std::string cfgkey("hpx.parcel.bootstrap");
876✔
125
        if (!pports_.empty())
876✔
126
        {
127
            auto it =
128
                pports_.find(get_priority(get_config_entry(cfgkey, "tcp")));
566✔
129
            if (it != pports_.end() && it->first > 0)
566✔
130
            {
131
                return it->second;
566✔
132
            }
133
        }
×
134

135
        for (auto const& pp : pports_)
310✔
136
        {
137
            if (pp.first > 0 && pp.second->can_bootstrap())
×
138
            {
139
                return pp.second;
×
140
            }
141
        }
142

143
        if (hpx::is_networking_enabled())
310✔
144
        {
145
            std::cerr << "Could not find usable bootstrap parcelport.\n";
×
146
            std::cerr << hpx::util::format(
×
147
                "Preconfigured bootstrap parcelport: '{}'\n",
×
148
                get_config_entry(cfgkey, "tcp"));
×
149

150
            if (pports_.empty())
×
151
            {
152
                std::cerr << "No available parcelports\n";
×
153
            }
×
154
            else
155
            {
156
                std::cerr << "List of available parcelports:\n";
×
157
                for (auto const& pp : pports_)
×
158
                {
159
                    std::cerr << hpx::util::format(
×
160
                        "  {}, priority: {}, can bootstrap: {}\n",
×
161
                        pp.second->type(), pp.first,
×
162
                        pp.second->can_bootstrap());
×
163
                }
164
                std::cerr << "\n";
×
165
            }
166

167
            // terminate this locality as there is nothing else we can do
168
            std::terminate();
×
169
        }
170

171
        return std::shared_ptr<parcelport>();
310✔
172
    }
876✔
173

174
    void parcelhandler::initialize()
593✔
175
    {
176
        exception_list exceptions;
593✔
177
        std::vector<int> failed_pps;
593✔
178
        for (pports_type::value_type& pp : pports_)
876✔
179
        {
180
            // protect against exceptions thrown by a parcelport during
181
            // initialization
182
            hpx::detail::try_catch_exception_ptr(
283✔
183
                [&]() {
566✔
184
                    if (pp.second != get_bootstrap_parcelport())
283✔
185
                    {
186
                        if (pp.first > 0)
×
187
                            pp.second->run(false);
×
188
                    }
×
189
                },
283✔
190
                [&](std::exception_ptr&& e) {
283✔
191
                    exceptions.add(HPX_MOVE(e));
×
192
                    failed_pps.push_back(pp.first);
×
193
                });
×
194
        }
195

196
        // handle exceptions
197
        if (exceptions.size() != 0)
593✔
198
        {
199
            if (failed_pps.size() == pports_.size())
×
200
            {
201
                std::cerr << hpx::util::format(
×
202
                    "all parcelports failed initializing on locality {}, "
×
203
                    "exiting:\n{}\n",
204
                    agas::get_locality_id(), exceptions.get_message());
×
205
                std::terminate();
×
206
            }
207
            else
208
            {
209
                std::cerr << hpx::util::format(
×
210
                    "warning: the following errors were detected while "
×
211
                    "initializing parcelports on locality {}:\n{}\n",
212
                    agas::get_locality_id(), exceptions.get_message());
×
213
            }
214

215
            // clean up parcelports that have failed initializtion
216
            std::cerr << "the following parcelports will be disabled:\n";
×
217
            for (int pp : failed_pps)
×
218
            {
219
                auto it = pports_.find(pp);
×
220
                if (it != pports_.end())
×
221
                {
222
                    std::cerr << "  " << (*it).second->type() << "\n";
×
223
                    (*it).second->stop();
×
224
                    pports_.erase(it);
×
225
                }
×
226
            }
227
            std::cerr << "\n";
×
228
        }
×
229
    }
593✔
230

231
    void parcelhandler::list_parcelport(std::ostringstream& strm,
×
232
        std::string const& ppname, int priority, bool bootstrap) const
233
    {
234
        hpx::util::format_to(strm, "parcel port: {}", ppname);
×
235

236
        std::string const cfgkey("hpx.parcel." + ppname + ".enable");
×
237
        bool const enabled =
×
238
            hpx::util::from_string<int>(get_config_entry(cfgkey, "0"), 0);
×
239
        strm << (enabled ? ", enabled" : ", not enabled");
×
240

241
        if (bootstrap)
×
242
            strm << ", bootstrap";
×
243

244
        hpx::util::format_to(strm, ", priority {}\n", priority);
×
245
    }
×
246

247
    // list available parcel ports
248
    void parcelhandler::list_parcelports(std::ostringstream& strm) const
×
249
    {
250
        for (pports_type::value_type const& pp : pports_)
×
251
        {
252
            list_parcelport(strm, pp.second->type(), pp.second->priority(),
×
253
                pp.second == get_bootstrap_parcelport());
×
254
        }
255
        strm << '\n';
×
256
    }
×
257

258
    write_handler_type parcelhandler::set_write_handler(write_handler_type f)
2✔
259
    {
260
        std::lock_guard<mutex_type> l(mtx_);
2✔
261
        std::swap(f, write_handler_);
2✔
262
        return f;
2✔
263
    }
2✔
264

265
    bool parcelhandler::enum_parcelports(
593✔
266
        hpx::move_only_function<bool(std::string const&)> const& f) const
267
    {
268
        for (pports_type::value_type const& pp : pports_)
876✔
269
        {
270
            if (!f(pp.second->type()))
283✔
271
            {
272
                return false;
×
273
            }
274
        }
275
        return true;
593✔
276
    }
593✔
277

278
    int parcelhandler::get_priority(std::string const& name) const
1,561✔
279
    {
280
        std::map<std::string, int>::const_iterator it = priority_.find(name);
1,561✔
281
        if (it == priority_.end())
1,561✔
282
            return 0;
×
283
        return it->second;
1,561✔
284
    }
1,561✔
285

286
    parcelport* parcelhandler::find_parcelport(
995✔
287
        std::string const& type, error_code&) const
288
    {
289
        int priority = get_priority(type);
995✔
290
        if (priority <= 0)
995✔
291
            return nullptr;
×
292
        HPX_ASSERT(pports_.find(priority) != pports_.end());
995✔
293
        return pports_.find(priority)->second.get();    // -V783
995✔
294
    }
995✔
295

296
    void parcelhandler::attach_parcelport(std::shared_ptr<parcelport> const& pp)
283✔
297
    {
298
        if (!hpx::is_networking_enabled() || !pp)
283✔
299
        {
300
            return;
×
301
        }
302

303
        // add the new parcelport to the list of parcel-ports we care about
304
        int priority = pp->priority();
283✔
305
        std::string cfgkey(std::string("hpx.parcel.") + pp->type() + ".enable");
283✔
306
        if (get_config_entry(cfgkey, "0") != "1")
283✔
307
        {
308
            priority = -priority;
×
309
        }
×
310
        pports_[priority] = pp;
283✔
311
        priority_[pp->type()] = priority;
283✔
312

313
        // add the endpoint of the new parcelport
314
        HPX_ASSERT(pp->type() == pp->here().type());
283✔
315
        if (priority > 0)
283✔
316
            endpoints_[pp->type()] = pp->here();
283✔
317
    }
283✔
318

319
    ///////////////////////////////////////////////////////////////////////////
320
    /// \brief Make sure the specified locality is not held by any
321
    /// connection caches anymore
322
    void parcelhandler::remove_from_connection_cache(
1✔
323
        naming::gid_type const& gid, endpoints_type const& endpoints)
324
    {
325
        for (endpoints_type::value_type const& loc : endpoints)
2✔
326
        {
327
            for (pports_type::value_type& pp : pports_)
2✔
328
            {
329
                if (std::string(pp.second->type()) == loc.second.type())
1✔
330
                {
331
                    pp.second->remove_from_connection_cache(loc.second);
1✔
332
                }
1✔
333
            }
334
        }
335

336
        agas::remove_resolved_locality(gid);
1✔
337
    }
1✔
338

339
    ///////////////////////////////////////////////////////////////////////////
340
    bool parcelhandler::do_background_work(std::size_t num_thread,
79,750,926✔
341
        bool stop_buffering, parcelport_background_mode mode)
342
    {
343
        bool did_some_work = false;
79,788,276✔
344
        if (!is_networking_enabled_)
79,788,276✔
345
        {
346
            return did_some_work;
×
347
        }
348

349
        // flush all parcel buffers
350
        if (0 == num_thread &&
79,788,276✔
351
            (mode & parcelport_background_mode_flush_buffers))
22,101,491✔
352
        {
353
            std::unique_lock<mutex_type> l(handlers_mtx_, std::try_to_lock);
22,101,491✔
354

355
            if (l.owns_lock())
22,101,491✔
356
            {
357
                using parcelset::policies::message_handler;
358
                message_handler::flush_mode flush_mode =
22,088,048✔
359
                    message_handler::flush_mode_background_work;
360

361
                message_handler_map::iterator end = handlers_.end();
22,088,048✔
362
                for (message_handler_map::iterator it = handlers_.begin();
42,947,365✔
363
                     it != end; ++it)
42,947,365✔
364
                {
365
                    if ((*it).second)
20,859,317✔
366
                    {
367
                        std::shared_ptr<policies::message_handler> p(
20,859,317✔
368
                            (*it).second);
20,859,317✔
369
                        unlock_guard<std::unique_lock<mutex_type>> ul(l);
20,859,317✔
370
                        did_some_work = p->flush(flush_mode, stop_buffering) ||
20,859,317✔
371
                            did_some_work;
20,859,317✔
372
                    }
20,859,317✔
373
                }
20,859,317✔
374
            }
22,088,048✔
375
        }
22,101,491✔
376

377
        // make sure all pending parcels are being handled
378
        for (pports_type::value_type& pp : pports_)
161,639,247✔
379
        {
380
            if (pp.first > 0)
80,656,623✔
381
            {
382
                did_some_work =
80,899,677✔
383
                    pp.second->do_background_work(num_thread, mode) ||
80,656,623✔
384
                    did_some_work;
80,883,998✔
385
            }
80,899,677✔
386
        }
387

388
        return did_some_work;
80,852,703✔
389
    }
80,852,703✔
390

391
    void parcelhandler::flush_parcels()
1,116✔
392
    {
393
        if (is_networking_enabled_)
1,116✔
394
        {
395
            // now flush all parcel ports to be shut down
396
            for (pports_type::value_type& pp : pports_)
2,232✔
397
            {
398
                if (pp.first > 0)
1,116✔
399
                {
400
                    pp.second->flush_parcels();
1,116✔
401
                }
1,116✔
402
            }
403
        }
1,116✔
404
    }
1,116✔
405

406
    void parcelhandler::stop(bool blocking)
1,669✔
407
    {
408
        // now stop all parcel ports
409
        for (pports_type::value_type& pp : pports_)
2,515✔
410
        {
411
            if (pp.first > 0)
846✔
412
            {
413
                pp.second->stop(blocking);
846✔
414
            }
846✔
415
        }
416

417
        // release all message handlers
418
        handlers_.clear();
1,669✔
419
    }
1,669✔
420

421
    bool parcelhandler::get_raw_remote_localities(
56✔
422
        std::vector<naming::gid_type>& locality_ids,
423
        components::component_type type, error_code& ec) const
424
    {
425
        std::vector<naming::gid_type> allprefixes;
56✔
426
        bool result = get_raw_localities(allprefixes, type, ec);
56✔
427
        if (ec || !result)
56✔
428
            return false;
×
429

430
        std::remove_copy(allprefixes.begin(), allprefixes.end(),
112✔
431
            std::back_inserter(locality_ids), agas::get_locality());
56✔
432

433
        return !locality_ids.empty();
56✔
434
    }
56✔
435

436
    bool parcelhandler::get_raw_localities(
1,447✔
437
        std::vector<naming::gid_type>& locality_ids,
438
        components::component_type type, error_code&) const
439
    {
440
        std::vector<std::uint32_t> ids = agas::get_all_locality_ids(type);
1,447✔
441

442
        locality_ids.clear();
1,447✔
443
        locality_ids.reserve(ids.size());
1,447✔
444
        for (auto id : ids)
3,376✔
445
        {
446
            locality_ids.push_back(naming::get_gid_from_locality_id(id));
1,929✔
447
        }
448

449
        return !locality_ids.empty();
1,447✔
450
    }
1,447✔
451

452
    std::pair<std::shared_ptr<parcelport>, locality>
453
    parcelhandler::find_appropriate_destination(
455,170✔
454
        naming::gid_type const& dest_gid)
455
    {
456
        endpoints_type const& dest_endpoints = agas::resolve_locality(dest_gid);
455,174✔
457

458
        for (pports_type::value_type& pp : pports_)
455,170✔
459
        {
460
            if (pp.first > 0)
455,174✔
461
            {
462
                locality const& dest =
455,174✔
463
                    find_endpoint(dest_endpoints, pp.second->type());
455,174✔
464
                if (dest &&
910,347✔
465
                    pp.second->can_connect(dest, use_alternative_parcelports_))
455,173✔
466
                    return std::make_pair(pp.second, dest);
455,173✔
467
            }
455,173✔
468
        }
469

470
        std::ostringstream strm;
×
471
        strm << "target locality: " << dest_gid << "\n";
×
472
        strm << "available destination endpoints:\n" << dest_endpoints << "\n";
×
473
        strm << "available partcelports:\n";
×
474
        for (auto const& pp : pports_)
×
475
        {
476
            list_parcelport(strm, pp.second->type(), pp.second->priority(),
×
477
                pp.second == get_bootstrap_parcelport());
×
478
            strm << "\t [" << pp.second->here() << "]\n";
×
479
        }
480

481
        HPX_THROW_EXCEPTION(hpx::error::network_error,
×
482
            "parcelhandler::find_appropriate_destination",
483
            "The locality gid cannot be resolved to a valid endpoint. "
484
            "No valid parcelport configured. Detailed information:\n{}",
485
            strm.str());
486
        return std::pair<std::shared_ptr<parcelport>, locality>();
487
    }
455,173✔
488

489
    locality parcelhandler::find_endpoint(
455,174✔
490
        endpoints_type const& eps, std::string const& name)
491
    {
492
        endpoints_type::const_iterator it = eps.find(name);
455,174✔
493
        if (it != eps.end())
455,174✔
494
            return it->second;
455,174✔
495
        return locality();
×
496
    }
455,174✔
497

498
    // Return the reference to an existing io_service
499
    util::io_service_pool* parcelhandler::get_thread_pool(char const* name)
×
500
    {
501
        util::io_service_pool* result = nullptr;
×
502
        for (pports_type::value_type& pp : pports_)
×
503
        {
504
            result = pp.second->get_thread_pool(name);
×
505
            if (result)
×
506
                return result;
×
507
        }
508
        return result;
×
509
    }
×
510

511
    namespace detail {
512
        void parcel_sent_handler(
459,385✔
513
            parcelhandler::write_handler_type& f,    //-V669
514
            std::error_code const& ec, parcelset::parcel const& p)
515
        {
516
            // inform termination detection of a sent message
517
            if (!p.does_termination_detection())
459,384✔
518
            {
519
                hpx::detail::dijkstra_make_black();
458,280✔
520
            }
458,280✔
521

522
            // invoke the original handler
523
            f(ec, p);
459,396✔
524

525
#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX)
526
            static util::itt::event parcel_send("send_parcel");
527
            util::itt::event_tick(parcel_send);
528
#endif
529

530
#if defined(HPX_HAVE_APEX) && defined(HPX_HAVE_PARCEL_PROFILING)
531
            // tell APEX about the sent parcel
532
            util::external_timer::send(
533
                p.parcel_id().get_lsb(), p.size(), p.destination_locality_id());
534
#endif
535
        }
459,396✔
536
    }    // namespace detail
537

538
    void parcelhandler::put_parcel(parcelset::parcel p)
×
539
    {
540
        auto handler = [this](std::error_code const& ec,
×
541
                           parcelset::parcel const& p) -> void {
542
            invoke_write_handler(ec, p);
×
543
        };
×
544

545
        put_parcel_impl(HPX_MOVE(p), HPX_MOVE(handler));
×
546
    }
×
547

548
    void parcelhandler::put_parcel(parcelset::parcel p, write_handler_type f)
459,338✔
549
    {
550
        auto handler = [this, f = HPX_MOVE(f)](std::error_code const& ec,
2,296,516✔
551
                           parcelset::parcel const& p) -> void {
552
            invoke_write_handler(ec, p);
459,335✔
553
            f(ec, p);
459,335✔
554
        };
459,335✔
555

556
        put_parcel_impl(HPX_MOVE(p), HPX_MOVE(handler));
459,338✔
557
    }
459,333✔
558

559
    void parcelhandler::put_parcel_impl(parcel&& p, write_handler_type&& f)
459,338✔
560
    {
561
        HPX_ASSERT(is_networking_enabled_);
459,338✔
562

563
        naming::gid_type const& gid = p.destination();
459,336✔
564
        naming::address& addr = p.addr();
459,336✔
565

566
        // During bootstrap this is handled separately (see
567
        // addressing_service::resolve_locality.
568

569
        // if this isn't an HPX thread, the stack space check will return false
570
        if (!this_thread::has_sufficient_stack_space() &&
459,338✔
571
            hpx::threads::threadmanager_is(hpx::state::running))
9✔
572
        {
573
            {
574
                // reschedule request as an HPX thread to avoid hangs
575
                void (parcelhandler::*put_parcel_ptr)(parcel p,
9✔
576
                    write_handler_type f) = &parcelhandler::put_parcel;
577

578
                threads::thread_init_data data(
9✔
579
                    threads::make_thread_function_nullary(util::deferred_call(
9✔
580
                        put_parcel_ptr, this, HPX_MOVE(p), HPX_MOVE(f))),
9✔
581
                    "parcelhandler::put_parcel",
9✔
582
                    threads::thread_priority::boost,
583
                    threads::thread_schedule_hint(),
9✔
584
                    threads::thread_stacksize::medium,
585
                    threads::thread_schedule_state::pending, true);
586
                threads::register_thread(data);
9✔
587
                return;
588
            }
9✔
589
        }
590

591
        // properly initialize parcel
592
        init_parcel(p);
459,328✔
593

594
        bool resolved_locally = true;
459,328✔
595

596
        if (!addr)
459,328✔
597
        {
598
            resolved_locally = agas::resolve_local(gid, addr);
6,480✔
599
        }
6,480✔
600

601
        write_handler_type wrapped_f =
602
            hpx::bind_front(&detail::parcel_sent_handler, HPX_MOVE(f));
459,328✔
603

604
        // If we were able to resolve the address(es) locally we send the
605
        // parcel directly to the destination.
606
        if (resolved_locally)
459,325✔
607
        {
608
            // dispatch to the message handler which is associated with the
609
            // encapsulated action
610
            using destination_pair =
611
                std::pair<std::shared_ptr<parcelport>, locality>;
612
            destination_pair dest =
613
                find_appropriate_destination(addr.locality_);
455,103✔
614

615
            if (load_message_handlers_ && !hpx::is_stopped_or_shutting_down())
455,103✔
616
            {
617
                policies::message_handler* mh =
453,252✔
618
                    p.get_message_handler(dest.second);
453,252✔
619

620
                if (mh)
453,252✔
621
                {
622
                    mh->put_parcel(
316,550✔
623
                        dest.second, HPX_MOVE(p), HPX_MOVE(wrapped_f));
158,274✔
624
                    return;
158,275✔
625
                }
626
            }
294,977✔
627

628
            dest.first->put_parcel(
593,658✔
629
                dest.second, HPX_MOVE(p), HPX_MOVE(wrapped_f));
296,828✔
630
            return;
296,828✔
631
        }
455,103✔
632

633
        // At least one of the addresses is locally unknown, route the parcel
634
        // to the AGAS managing the destination.
635
        ++count_routed_;
4,225✔
636

637
        agas::route(HPX_MOVE(p), HPX_MOVE(wrapped_f));
4,225✔
638
    }
459,337✔
639

640
    void parcelhandler::put_parcels(std::vector<parcel> parcels)
7✔
641
    {
642
        std::vector<write_handler_type> handlers(parcels.size(),
14✔
643
            [this](std::error_code const& ec, parcel const& p) -> void {
77✔
644
                return invoke_write_handler(ec, p);
70✔
645
            });
646

647
        put_parcels_impl(HPX_MOVE(parcels), HPX_MOVE(handlers));
7✔
648
    }
7✔
649

650
    void parcelhandler::put_parcels(
×
651
        std::vector<parcel> parcels, std::vector<write_handler_type> funcs)
652
    {
653
        std::vector<write_handler_type> handlers;
×
654

655
        handlers.reserve(parcels.size());
×
656
        for (std::size_t i = 0; i != parcels.size(); ++i)
×
657
        {
658
            handlers.emplace_back(
×
659
                [this, f = HPX_MOVE(funcs[i])](
×
660
                    std::error_code const& ec, parcel const& p) -> void {
661
                    invoke_write_handler(ec, p);
×
662
                    f(ec, p);
×
663
                });
×
664
        }
×
665

666
        put_parcels_impl(HPX_MOVE(parcels), HPX_MOVE(handlers));
×
667
    }
×
668

669
    void parcelhandler::put_parcels_impl(std::vector<parcel>&& parcels,
7✔
670
        std::vector<write_handler_type>&& handlers)
671
    {
672
        HPX_ASSERT(is_networking_enabled_);
7✔
673

674
        if (parcels.size() != handlers.size())
7✔
675
        {
676
            HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
×
677
                "parcelhandler::put_parcels",
678
                "mismatched number of parcels and handlers");
679
            return;
680
        }
681

682
        // if this isn't an HPX thread, the stack space check will return false
683
        if (!this_thread::has_sufficient_stack_space() &&
7✔
684
            hpx::threads::threadmanager_is(hpx::state::running))
×
685
        {
686
            // reschedule request as an HPX thread to avoid hangs
687
            void (parcelhandler::*put_parcels_ptr)(std::vector<parcel>,
×
688
                std::vector<write_handler_type>) = &parcelhandler::put_parcels;
689

690
            threads::thread_init_data data(
×
691
                threads::make_thread_function_nullary(
×
692
                    util::deferred_call(put_parcels_ptr, this,
×
693
                        HPX_MOVE(parcels), HPX_MOVE(handlers))),
×
694
                "parcelhandler::put_parcels", threads::thread_priority::boost,
×
695
                threads::thread_schedule_hint(),
×
696
                threads::thread_stacksize::medium,
697
                threads::thread_schedule_state::pending, true);
698
            threads::register_thread(data);
×
699
            return;
700
        }
×
701

702
        // partition parcels depending on whether their destination can be
703
        // resolved locally
704
        std::size_t num_parcels = parcels.size();
7✔
705

706
        std::vector<parcel> resolved_parcels;
7✔
707
        resolved_parcels.reserve(num_parcels);
7✔
708
        std::vector<write_handler_type> resolved_handlers;
7✔
709
        resolved_handlers.reserve(num_parcels);
7✔
710

711
        using destination_pair =
712
            std::pair<std::shared_ptr<parcelport>, locality>;
713

714
        destination_pair resolved_dest;
7✔
715

716
        std::vector<parcel> nonresolved_parcels;
7✔
717
        nonresolved_parcels.reserve(num_parcels);
7✔
718
        std::vector<write_handler_type> nonresolved_handlers;
7✔
719
        nonresolved_handlers.reserve(num_parcels);
7✔
720

721
        for (std::size_t i = 0; i != num_parcels; ++i)
77✔
722
        {
723
            parcel& p = parcels[i];
70✔
724

725
            // properly initialize parcel
726
            init_parcel(p);
70✔
727

728
            bool resolved_locally = true;
70✔
729
            naming::address& addr = p.addr();
70✔
730

731
            if (!addr)
70✔
732
            {
733
                resolved_locally = agas::resolve_local(p.destination(), addr);
70✔
734
            }
70✔
735

736
            write_handler_type f = hpx::bind_front(
70✔
737
                &detail::parcel_sent_handler, HPX_MOVE(handlers[i]));
70✔
738

739
            // make sure all parcels go to the same locality
740
            if (parcels[0].destination_locality() != p.destination_locality())
70✔
741
            {
742
                HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
×
743
                    "parcelhandler::put_parcels",
744
                    "mismatched destinations, all parcels are expected to "
745
                    "target the same locality");
746
                return;
747
            }
748

749
            // If we were able to resolve the address(es) locally we would send
750
            // the parcel directly to the destination.
751
            if (resolved_locally)
70✔
752
            {
753
                // dispatch to the message handler which is associated with the
754
                // encapsulated action
755
                destination_pair dest =
756
                    find_appropriate_destination(addr.locality_);
70✔
757

758
                if (load_message_handlers_)
70✔
759
                {
760
                    policies::message_handler* mh =
30✔
761
                        p.get_message_handler(dest.second);
30✔
762

763
                    if (mh)
30✔
764
                    {
765
                        mh->put_parcel(dest.second, HPX_MOVE(p), HPX_MOVE(f));
30✔
766
                        continue;
30✔
767
                    }
768
                }
×
769

770
                resolved_parcels.push_back(HPX_MOVE(p));
40✔
771
                resolved_handlers.push_back(HPX_MOVE(f));
40✔
772
                if (!resolved_dest.second)
40✔
773
                {
774
                    resolved_dest = dest;
4✔
775
                }
4✔
776
                else
777
                {
778
                    HPX_ASSERT(resolved_dest == dest);
36✔
779
                }
780
            }
70✔
781
            else
782
            {
783
                nonresolved_parcels.push_back(HPX_MOVE(p));
×
784
                nonresolved_handlers.push_back(HPX_MOVE(f));
×
785
            }
786
        }
70✔
787

788
        // handle parcel which have been locally resolved
789
        if (!resolved_parcels.empty())
7✔
790
        {
791
            HPX_ASSERT(!!resolved_dest.first && !!resolved_dest.second);
4✔
792
            resolved_dest.first->put_parcels(resolved_dest.second,
8✔
793
                HPX_MOVE(resolved_parcels), HPX_MOVE(resolved_handlers));
4✔
794
        }
4✔
795

796
        // At least one of the addresses is locally unknown, route the
797
        // parcel to the AGAS managing the destination.
798
        for (std::size_t i = 0; i != nonresolved_parcels.size(); ++i)
7✔
799
        {
800
            ++count_routed_;
×
801
            agas::route(HPX_MOVE(nonresolved_parcels[i]),
×
802
                HPX_MOVE(nonresolved_handlers[i]));
×
803
        }
×
804
    }
7✔
805

806
    void parcelhandler::invoke_write_handler(
459,405✔
807
        std::error_code const& ec, parcel const& p) const
808
    {
809
        write_handler_type f;
459,406✔
810
        {
811
            std::lock_guard<mutex_type> l(mtx_);
459,406✔
812
            f = write_handler_;
459,406✔
813
        }
459,406✔
814
        f(ec, p);
459,405✔
815
    }
459,405✔
816

817
    ///////////////////////////////////////////////////////////////////////////
818
    std::int64_t parcelhandler::get_outgoing_queue_length(bool reset) const
×
819
    {
820
        std::int64_t parcel_count = 0;
×
821
        for (pports_type::value_type const& pp : pports_)
×
822
        {
823
            parcel_count += pp.second->get_pending_parcels_count(reset);
×
824
        }
825
        return parcel_count;
×
826
    }
827

828
    ///////////////////////////////////////////////////////////////////////////
829
    // default callback for put_parcel
830
    void default_write_handler(std::error_code const& ec, parcel const& p)
701,678✔
831
    {
832
        if (ec)
701,677✔
833
        {
834
            // If we are in a stopped state, ignore some errors
835
            if (hpx::is_stopped_or_shutting_down())
×
836
            {
837
                using asio::error::make_error_code;
838
                if (ec == make_error_code(asio::error::connection_aborted) ||
×
839
                    ec == make_error_code(asio::error::connection_reset) ||
×
840
                    ec == make_error_code(asio::error::broken_pipe) ||
×
841
                    ec == make_error_code(asio::error::not_connected) ||
×
842
                    ec == make_error_code(asio::error::eof))
×
843
                {
844
                    return;
×
845
                }
846
            }
×
847
            else if (hpx::tolerate_node_faults())
×
848
            {
849
                if (ec ==
×
850
                    asio::error::make_error_code(asio::error::connection_reset))
×
851
                {
852
                    return;
×
853
                }
854
            }
×
855

856
            // all unhandled exceptions terminate the whole application
857
            std::exception_ptr exception = hpx::detail::get_exception(
×
858
                hpx::exception(ec), "default_write_handler", __FILE__, __LINE__,
×
859
                parcelset::dump_parcel(p));
×
860

861
            hpx::report_error(exception);
×
862
        }
×
863
    }
701,677✔
864

865
    ///////////////////////////////////////////////////////////////////////////
866
    policies::message_handler* parcelhandler::get_message_handler(
158,305✔
867
        char const* action, char const* message_handler_type,
868
        std::size_t num_messages, std::size_t interval, locality const& loc,
869
        error_code& ec)
870
    {
871
        if (!is_networking_enabled_)
158,305✔
872
        {
873
            return nullptr;
×
874
        }
875

876
        std::unique_lock<mutex_type> l(handlers_mtx_);
158,305✔
877
        handler_key_type key(loc, action);
158,305✔
878
        message_handler_map::iterator it = handlers_.find(key);
158,305✔
879

880
        if (it == handlers_.end())
158,305✔
881
        {
882
            std::shared_ptr<policies::message_handler> p;
285✔
883

884
            {
885
                // Just ignore the handlers_mtx_ while checking. We need to hold
886
                // the lock here to avoid multiple registrations that happens
887
                // right now in the parcel coalescing plugin
888
                hpx::util::ignore_while_checking il(&l);
285✔
889
                HPX_UNUSED(il);
285✔
890

891
                p.reset(hpx::create_message_handler(message_handler_type,
570✔
892
                    action, find_parcelport(loc.type()), num_messages, interval,
285✔
893
                    ec));
285✔
894
            }
285✔
895

896
            it = handlers_.find(key);
285✔
897
            if (it != handlers_.end())
285✔
898
            {
899
                // if some other thread has created the entry in the meantime
900
                l.unlock();
×
901
                if (&ec != &throws)
×
902
                {
903
                    if ((*it).second.get())
×
904
                        ec = make_success_code();
×
905
                    else
906
                        ec = make_error_code(
×
907
                            hpx::error::bad_parameter, throwmode::lightweight);
908
                }
×
909
                return (*it).second.get();
×
910
            }
911

912
            if (ec || !p.get())
285✔
913
            {
914
                // insert an empty entry into the map to avoid trying to
915
                // create this handler again
916
                p.reset();
×
917
                std::pair<message_handler_map::iterator, bool> r =
918
                    handlers_.insert(message_handler_map::value_type(key, p));
×
919

920
                l.unlock();
×
921
                if (!r.second)
×
922
                {
923
                    HPX_THROWS_IF(ec, hpx::error::internal_server_error,
×
924
                        "parcelhandler::get_message_handler",
925
                        "could not store empty message handler");
926
                    return nullptr;
×
927
                }
928
                return nullptr;    // no message handler available
×
929
            }
930

931
            std::pair<message_handler_map::iterator, bool> r =
932
                handlers_.insert(message_handler_map::value_type(key, p));
285✔
933

934
            l.unlock();
285✔
935
            if (!r.second)
285✔
936
            {
937
                HPX_THROWS_IF(ec, hpx::error::internal_server_error,
×
938
                    "parcelhandler::get_message_handler",
939
                    "could not store newly created message handler");
940
                return nullptr;
×
941
            }
942
            it = r.first;
285✔
943
        }
285✔
944
        else if (!(*it).second.get())
158,020✔
945
        {
946
            l.unlock();
×
947
            if (&ec != &throws)
×
948
            {
949
                ec = make_error_code(
×
950
                    hpx::error::bad_parameter, throwmode::lightweight);
951
            }
×
952
            else
953
            {
954
                HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
×
955
                    "parcelhandler::get_message_handler",
956
                    "couldn't find an appropriate message handler");
957
            }
958
            return nullptr;    // no message handler available
×
959
        }
960

961
        if (&ec != &throws)
158,305✔
962
            ec = make_success_code();
158,275✔
963

964
        return (*it).second.get();
158,305✔
965
    }
158,305✔
966

967
    ///////////////////////////////////////////////////////////////////////////
968
    std::string parcelhandler::get_locality_name() const
×
969
    {
970
        for (pports_type::value_type const& pp : pports_)
×
971
        {
972
            if (pp.first > 0)
×
973
            {
974
                std::string name = pp.second->get_locality_name();
×
975
                if (!name.empty())
×
976
                    return name;
×
977
            }
×
978
        }
979
        return "<unknown>";
×
980
    }
×
981

982
    ///////////////////////////////////////////////////////////////////////////
983
    // Performance counter data
984

985
    // number of parcels routed
986
    std::int64_t parcelhandler::get_parcel_routed_count(bool reset)
×
987
    {
988
        return util::get_and_reset_value(count_routed_, reset);
×
989
    }
990

991
#if defined(HPX_HAVE_PARCELPORT_COUNTERS)
992
    // number of parcels sent
993
    std::int64_t parcelhandler::get_parcel_send_count(
994
        std::string const& pp_type, bool reset) const
995
    {
996
        error_code ec(throwmode::lightweight);
997
        parcelport* pp = find_parcelport(pp_type, ec);
998
        return pp ? pp->get_parcel_send_count(reset) : 0;
999
    }
1000

1001
    // number of messages sent
1002
    std::int64_t parcelhandler::get_message_send_count(
1003
        std::string const& pp_type, bool reset) const
1004
    {
1005
        error_code ec(throwmode::lightweight);
1006
        parcelport* pp = find_parcelport(pp_type, ec);
1007
        return pp ? pp->get_message_send_count(reset) : 0;
1008
    }
1009

1010
    // number of parcels received
1011
    std::int64_t parcelhandler::get_parcel_receive_count(
1012
        std::string const& pp_type, bool reset) const
1013
    {
1014
        error_code ec(throwmode::lightweight);
1015
        parcelport* pp = find_parcelport(pp_type, ec);
1016
        return pp ? pp->get_parcel_receive_count(reset) : 0;
1017
    }
1018

1019
    // number of messages received
1020
    std::int64_t parcelhandler::get_message_receive_count(
1021
        std::string const& pp_type, bool reset) const
1022
    {
1023
        error_code ec(throwmode::lightweight);
1024
        parcelport* pp = find_parcelport(pp_type, ec);
1025
        return pp ? pp->get_message_receive_count(reset) : 0;
1026
    }
1027

1028
    // the total time it took for all sends, from async_write to the
1029
    // completion handler (nanoseconds)
1030
    std::int64_t parcelhandler::get_sending_time(
1031
        std::string const& pp_type, bool reset) const
1032
    {
1033
        error_code ec(throwmode::lightweight);
1034
        parcelport* pp = find_parcelport(pp_type, ec);
1035
        return pp ? pp->get_sending_time(reset) : 0;
1036
    }
1037

1038
    // the total time it took for all receives, from async_read to the
1039
    // completion handler (nanoseconds)
1040
    std::int64_t parcelhandler::get_receiving_time(
1041
        std::string const& pp_type, bool reset) const
1042
    {
1043
        error_code ec(throwmode::lightweight);
1044
        parcelport* pp = find_parcelport(pp_type, ec);
1045
        return pp ? pp->get_receiving_time(reset) : 0;
1046
    }
1047

1048
    // the total time it took for all sender-side serialization operations
1049
    // (nanoseconds)
1050
    std::int64_t parcelhandler::get_sending_serialization_time(
1051
        std::string const& pp_type, bool reset) const
1052
    {
1053
        error_code ec(throwmode::lightweight);
1054
        parcelport* pp = find_parcelport(pp_type, ec);
1055
        return pp ? pp->get_sending_serialization_time(reset) : 0;
1056
    }
1057

1058
    // the total time it took for all receiver-side serialization
1059
    // operations (nanoseconds)
1060
    std::int64_t parcelhandler::get_receiving_serialization_time(
1061
        std::string const& pp_type, bool reset) const
1062
    {
1063
        error_code ec(throwmode::lightweight);
1064
        parcelport* pp = find_parcelport(pp_type, ec);
1065
        return pp ? pp->get_receiving_serialization_time(reset) : 0;
1066
    }
1067

1068
    // total data sent (bytes)
1069
    std::int64_t parcelhandler::get_data_sent(
1070
        std::string const& pp_type, bool reset) const
1071
    {
1072
        error_code ec(throwmode::lightweight);
1073
        parcelport* pp = find_parcelport(pp_type, ec);
1074
        return pp ? pp->get_data_sent(reset) : 0;
1075
    }
1076

1077
    // total data (uncompressed) sent (bytes)
1078
    std::int64_t parcelhandler::get_raw_data_sent(
1079
        std::string const& pp_type, bool reset) const
1080
    {
1081
        error_code ec(throwmode::lightweight);
1082
        parcelport* pp = find_parcelport(pp_type, ec);
1083
        return pp ? pp->get_raw_data_sent(reset) : 0;
1084
    }
1085

1086
    // total data received (bytes)
1087
    std::int64_t parcelhandler::get_data_received(
1088
        std::string const& pp_type, bool reset) const
1089
    {
1090
        error_code ec(throwmode::lightweight);
1091
        parcelport* pp = find_parcelport(pp_type, ec);
1092
        return pp ? pp->get_data_received(reset) : 0;
1093
    }
1094

1095
    // total data (uncompressed) received (bytes)
1096
    std::int64_t parcelhandler::get_raw_data_received(
1097
        std::string const& pp_type, bool reset) const
1098
    {
1099
        error_code ec(throwmode::lightweight);
1100
        parcelport* pp = find_parcelport(pp_type, ec);
1101
        return pp ? pp->get_raw_data_received(reset) : 0;
1102
    }
1103

1104
    std::int64_t parcelhandler::get_buffer_allocate_time_sent(
1105
        std::string const& pp_type, bool reset) const
1106
    {
1107
        error_code ec(throwmode::lightweight);
1108
        parcelport* pp = find_parcelport(pp_type, ec);
1109
        return pp ? pp->get_buffer_allocate_time_sent(reset) : 0;
1110
    }
1111
    std::int64_t parcelhandler::get_buffer_allocate_time_received(
1112
        std::string const& pp_type, bool reset) const
1113
    {
1114
        error_code ec(throwmode::lightweight);
1115
        parcelport* pp = find_parcelport(pp_type, ec);
1116
        return pp ? pp->get_buffer_allocate_time_received(reset) : 0;
1117
    }
1118

1119
    // total zero-copy chunks sent
1120
    std::int64_t parcelhandler::get_zchunks_send_count(
1121
        std::string const& pp_type, bool reset) const
1122
    {
1123
        error_code ec(throwmode::lightweight);
1124
        parcelport* pp = find_parcelport(pp_type, ec);
1125
        return pp ? pp->get_zchunks_send_count(reset) : 0;
1126
    }
1127

1128
    // total zero-copy chunks received
1129
    std::int64_t parcelhandler::get_zchunks_recv_count(
1130
        std::string const& pp_type, bool reset) const
1131
    {
1132
        error_code ec(throwmode::lightweight);
1133
        parcelport* pp = find_parcelport(pp_type, ec);
1134
        return pp ? pp->get_zchunks_recv_count(reset) : 0;
1135
    }
1136

1137
    // the maximum number of zero-copy chunks per message sent
1138
    std::int64_t parcelhandler::get_zchunks_send_per_msg_count_max(
1139
        std::string const& pp_type, bool reset) const
1140
    {
1141
        error_code ec(throwmode::lightweight);
1142
        parcelport* pp = find_parcelport(pp_type, ec);
1143
        return pp ? pp->get_zchunks_send_per_msg_count_max(reset) : 0;
1144
    }
1145

1146
    // the maximum number of zero-copy chunks per message received
1147
    std::int64_t parcelhandler::get_zchunks_recv_per_msg_count_max(
1148
        std::string const& pp_type, bool reset) const
1149
    {
1150
        error_code ec(throwmode::lightweight);
1151
        parcelport* pp = find_parcelport(pp_type, ec);
1152
        return pp ? pp->get_zchunks_recv_per_msg_count_max(reset) : 0;
1153
    }
1154

1155
    // the size of zero-copy chunks per message sent
1156
    std::int64_t parcelhandler::get_zchunks_send_size(
1157
        std::string const& pp_type, bool reset) const
1158
    {
1159
        error_code ec(throwmode::lightweight);
1160
        parcelport* pp = find_parcelport(pp_type, ec);
1161
        return pp ? pp->get_zchunks_send_size(reset) : 0;
1162
    }
1163

1164
    // the size of zero-copy chunks per message received
1165
    std::int64_t parcelhandler::get_zchunks_recv_size(
1166
        std::string const& pp_type, bool reset) const
1167
    {
1168
        error_code ec(throwmode::lightweight);
1169
        parcelport* pp = find_parcelport(pp_type, ec);
1170
        return pp ? pp->get_zchunks_recv_size(reset) : 0;
1171
    }
1172

1173
    // the maximum size of zero-copy chunks per message sent
1174
    std::int64_t parcelhandler::get_zchunks_send_size_max(
1175
        std::string const& pp_type, bool reset) const
1176
    {
1177
        error_code ec(throwmode::lightweight);
1178
        parcelport* pp = find_parcelport(pp_type, ec);
1179
        return pp ? pp->get_zchunks_send_size_max(reset) : 0;
1180
    }
1181

1182
    // the maximum size of zero-copy chunks per message received
1183
    std::int64_t parcelhandler::get_zchunks_recv_size_max(
1184
        std::string const& pp_type, bool reset) const
1185
    {
1186
        error_code ec(throwmode::lightweight);
1187
        parcelport* pp = find_parcelport(pp_type, ec);
1188
        return pp ? pp->get_zchunks_recv_size_max(reset) : 0;
1189
    }
1190

1191
#if defined(HPX_HAVE_PARCELPORT_COUNTERS) &&                                   \
1192
    defined(HPX_HAVE_PARCELPORT_ACTION_COUNTERS)
1193
    // same as above, just separated data for each action
1194
    // number of parcels sent
1195
    std::int64_t parcelhandler::get_action_parcel_send_count(
1196
        std::string const& pp_type, std::string const& action, bool reset) const
1197
    {
1198
        error_code ec(throwmode::lightweight);
1199
        parcelport* pp = find_parcelport(pp_type, ec);
1200
        return pp ? pp->get_action_parcel_send_count(action, reset) : 0;
1201
    }
1202

1203
    // number of parcels received
1204
    std::int64_t parcelhandler::get_action_parcel_receive_count(
1205
        std::string const& pp_type, std::string const& action, bool reset) const
1206
    {
1207
        error_code ec(throwmode::lightweight);
1208
        parcelport* pp = find_parcelport(pp_type, ec);
1209
        return pp ? pp->get_action_parcel_receive_count(action, reset) : 0;
1210
    }
1211

1212
    // the total time it took for all sender-side serialization operations
1213
    // (nanoseconds)
1214
    std::int64_t parcelhandler::get_action_sending_serialization_time(
1215
        std::string const& pp_type, std::string const& action, bool reset) const
1216
    {
1217
        error_code ec(throwmode::lightweight);
1218
        parcelport* pp = find_parcelport(pp_type, ec);
1219
        return pp ? pp->get_action_sending_serialization_time(action, reset) :
1220
                    0;
1221
    }
1222

1223
    // the total time it took for all receiver-side serialization
1224
    // operations (nanoseconds)
1225
    std::int64_t parcelhandler::get_action_receiving_serialization_time(
1226
        std::string const& pp_type, std::string const& action, bool reset) const
1227
    {
1228
        error_code ec(throwmode::lightweight);
1229
        parcelport* pp = find_parcelport(pp_type, ec);
1230
        return pp ? pp->get_action_receiving_serialization_time(action, reset) :
1231
                    0;
1232
    }
1233

1234
    // total data sent (bytes)
1235
    std::int64_t parcelhandler::get_action_data_sent(
1236
        std::string const& pp_type, std::string const& action, bool reset) const
1237
    {
1238
        error_code ec(throwmode::lightweight);
1239
        parcelport* pp = find_parcelport(pp_type, ec);
1240
        return pp ? pp->get_action_data_sent(action, reset) : 0;
1241
    }
1242

1243
    // total data received (bytes)
1244
    std::int64_t parcelhandler::get_action_data_received(
1245
        std::string const& pp_type, std::string const& action, bool reset) const
1246
    {
1247
        error_code ec(throwmode::lightweight);
1248
        parcelport* pp = find_parcelport(pp_type, ec);
1249
        return pp ? pp->get_action_data_received(action, reset) : 0;
1250
    }
1251
#endif
1252
#endif
1253
    // connection stack statistics
1254
    std::int64_t parcelhandler::get_connection_cache_statistics(
×
1255
        std::string const& pp_type,
1256
        parcelport::connection_cache_statistics_type stat_type,
1257
        bool reset) const
1258
    {
1259
        error_code ec(throwmode::lightweight);
×
1260
        parcelport* pp = find_parcelport(pp_type, ec);
×
1261
        return pp ? pp->get_connection_cache_statistics(stat_type, reset) : 0;
×
1262
    }
×
1263

1264
    std::vector<plugins::parcelport_factory_base*>&
1265
    parcelhandler::get_parcelport_factories()
1,446✔
1266
    {
1267
        auto& factories = plugins::get_parcelport_factories();
1,446✔
1268
        if (factories.empty() && hpx::is_networking_enabled())
1,446✔
1269
        {
1270
            init_static_parcelport_factories(factories);
488✔
1271
        }
488✔
1272
        return factories;
1,446✔
1273
    }
1274

1275
    void parcelhandler::init(
283✔
1276
        int* argc, char*** argv, util::command_line_handling& cfg)
1277
    {
1278
        HPX_ASSERT(hpx::is_networking_enabled());
283✔
1279

1280
        for (plugins::parcelport_factory_base* factory :
566✔
1281
            get_parcelport_factories())
283✔
1282
        {
1283
            factory->init(argc, argv, cfg);
283✔
1284
        }
1285
    }
283✔
1286

1287
    void parcelhandler::init(hpx::resource::partitioner& rp)
283✔
1288
    {
1289
        HPX_ASSERT(hpx::is_networking_enabled());
283✔
1290

1291
        for (plugins::parcelport_factory_base* factory :
566✔
1292
            get_parcelport_factories())
283✔
1293
        {
1294
            factory->init(rp);
283✔
1295
        }
1296
    }
283✔
1297

1298
    std::vector<std::string> load_runtime_configuration()
597✔
1299
    {
1300
        std::vector<std::string> ini_defs;
597✔
1301

1302
        ini_defs.emplace_back("[hpx.parcel]");
597✔
1303
        ini_defs.emplace_back(
597✔
1304
            "address = ${HPX_PARCEL_SERVER_ADDRESS:" HPX_INITIAL_IP_ADDRESS
1305
            "}");
1306
        ini_defs.emplace_back(
597✔
1307
            "port = ${HPX_PARCEL_SERVER_PORT:" HPX_PP_STRINGIZE(
1308
                HPX_INITIAL_IP_PORT) "}");
1309
        ini_defs.emplace_back(
597✔
1310
            "bootstrap = ${HPX_PARCEL_BOOTSTRAP:" HPX_PARCEL_BOOTSTRAP "}");
1311
        ini_defs.emplace_back(
597✔
1312
            "max_connections = ${HPX_PARCEL_MAX_CONNECTIONS:" HPX_PP_STRINGIZE(
1313
                HPX_PARCEL_MAX_CONNECTIONS) "}");
1314
        ini_defs.emplace_back(
597✔
1315
            "max_connections_per_locality = "
1316
            "${HPX_PARCEL_MAX_CONNECTIONS_PER_LOCALITY:" HPX_PP_STRINGIZE(
1317
                HPX_PARCEL_MAX_CONNECTIONS_PER_LOCALITY) "}");
1318
        ini_defs.emplace_back("max_message_size = "
597✔
1319
                              "${HPX_PARCEL_MAX_MESSAGE_SIZE:" HPX_PP_STRINGIZE(
1320
                                  HPX_PARCEL_MAX_MESSAGE_SIZE) "}");
1321
        ini_defs.emplace_back(
597✔
1322
            "max_outbound_message_size = "
1323
            "${HPX_PARCEL_MAX_OUTBOUND_MESSAGE_SIZE:" HPX_PP_STRINGIZE(
1324
                HPX_PARCEL_MAX_OUTBOUND_MESSAGE_SIZE) "}");
1325
        ini_defs.emplace_back(endian::native == endian::big ?
597✔
1326
                "endian_out = ${HPX_PARCEL_ENDIAN_OUT:big}" :
1327
                "endian_out = ${HPX_PARCEL_ENDIAN_OUT:little}");
1328
        ini_defs.emplace_back(
597✔
1329
            "array_optimization = ${HPX_PARCEL_ARRAY_OPTIMIZATION:1}");
1330
        ini_defs.emplace_back(
597✔
1331
            "zero_copy_optimization = ${HPX_PARCEL_ZERO_COPY_OPTIMIZATION:"
1332
            "$[hpx.parcel.array_optimization]}");
1333
        ini_defs.emplace_back(
597✔
1334
            "async_serialization = ${HPX_PARCEL_ASYNC_SERIALIZATION:1}");
1335
#if defined(HPX_HAVE_PARCEL_COALESCING)
1336
        ini_defs.emplace_back(
597✔
1337
            "message_handlers = ${HPX_PARCEL_MESSAGE_HANDLERS:1}");
1338
#else
1339
        ini_defs.emplace_back(
1340
            "message_handlers = ${HPX_PARCEL_MESSAGE_HANDLERS:0}");
1341
#endif
1342
        ini_defs.emplace_back(
597✔
1343
            "zero_copy_serialization_threshold = "
1344
            "${HPX_PARCEL_ZERO_COPY_SERIALIZATION_THRESHOLD:" HPX_PP_STRINGIZE(
1345
                HPX_ZERO_COPY_SERIALIZATION_THRESHOLD) "}");
1346
        ini_defs.emplace_back("max_background_threads = "
597✔
1347
                              "${HPX_PARCEL_MAX_BACKGROUND_THREADS:-1}");
1348

1349
        for (plugins::parcelport_factory_base* f :
1,194✔
1350
            parcelhandler::get_parcelport_factories())
597✔
1351
        {
1352
            f->get_plugin_info(ini_defs);
597✔
1353
        }
1354
        return ini_defs;
597✔
1355
    }
597✔
1356

1357
    ///////////////////////////////////////////////////////////////////////////
1358
    void parcelhandler::init_parcel(parcel& p)
459,397✔
1359
    {
1360
        // ensure the source locality id is set (if no component id is given)
1361
        if (!p.source_id())
459,398✔
1362
        {
1363
            p.set_source_id(hpx::id_type(agas::get_locality(),
456,083✔
1364
                hpx::id_type::management_type::unmanaged));
1365
        }
456,083✔
1366

1367
#if defined(HPX_HAVE_PARCEL_PROFILING)
1368
        // set the current local time for this locality
1369
        p.set_start_time(hpx::chrono::high_resolution_timer::now());
1370

1371
        if (!p.parcel_id())
1372
        {
1373
            error_code ec(throwmode::lightweight);    // ignore all errors
1374
            std::uint32_t locality_id = agas::get_locality_id(ec);
1375
            p.parcel_id() = parcelset::parcel::generate_unique_id(locality_id);
1376
        }
1377
#endif
1378
    }
459,398✔
1379
}    // namespace hpx::parcelset
1380

1381
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc