• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.88
/libs/full/parcelset/src/parcelhandler.cpp
1
//  Copyright (c) 2007-2025 Hartmut Kaiser
2
//  Copyright (c) 2013-2014 Thomas Heller
3
//  Copyright (c) 2007      Richard D Guidry Jr
4
//  Copyright (c) 2011      Bryce Lelbach & Katelyn Kufahl
5
//
6
//  SPDX-License-Identifier: BSL-1.0
7
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
8
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9

10
#include <hpx/config.hpp>
11

12
#if defined(HPX_HAVE_NETWORKING)
13
#include <hpx/assert.hpp>
14
#include <hpx/modules/errors.hpp>
15
#include <hpx/modules/format.hpp>
16
#include <hpx/modules/functional.hpp>
17
#include <hpx/modules/futures.hpp>
18
#include <hpx/modules/io_service.hpp>
19
#include <hpx/modules/itt_notify.hpp>
20
#include <hpx/modules/logging.hpp>
21
#include <hpx/modules/preprocessor.hpp>
22
#include <hpx/modules/resource_partitioner.hpp>
23
#include <hpx/modules/runtime_configuration.hpp>
24
#include <hpx/modules/runtime_local.hpp>
25
#include <hpx/modules/string_util.hpp>
26
#include <hpx/modules/synchronization.hpp>
27
#include <hpx/modules/thread_support.hpp>
28
#include <hpx/modules/threading_base.hpp>
29
#include <hpx/modules/threadmanager.hpp>
30
#include <hpx/modules/type_support.hpp>
31
#include <hpx/modules/util.hpp>
32

33
#include <hpx/components_base/agas_interface.hpp>
34
#include <hpx/naming_base/gid_type.hpp>
35
#include <hpx/parcelset/init_parcelports.hpp>
36
#include <hpx/parcelset/message_handler_fwd.hpp>
37
#include <hpx/parcelset/parcelhandler.hpp>
38
#include <hpx/parcelset_base/parcelset_base_fwd.hpp>
39
#include <hpx/parcelset_base/policies/message_handler.hpp>
40
#include <hpx/plugin_factories/parcelport_factory_base.hpp>
41

42
#include <asio/error.hpp>
43

44
#include <algorithm>
45
#include <cstddef>
46
#include <cstdint>
47
#include <exception>
48
#include <iostream>
49
#include <map>
50
#include <memory>
51
#include <mutex>
52
#include <sstream>
53
#include <string>
54
#include <system_error>
55
#include <utility>
56
#include <vector>
57
#if defined(HPX_HAVE_PARCEL_PROFILING)
58
#include <chrono>
59
#endif
60

61
#include <hpx/config/warnings_prefix.hpp>
62

63
///////////////////////////////////////////////////////////////////////////////
64
namespace hpx::detail {
65

66
    void dijkstra_make_black();    // forward declaration only
67
}    // namespace hpx::detail
68

69
///////////////////////////////////////////////////////////////////////////////
70
namespace hpx::parcelset {
71

72
    ///////////////////////////////////////////////////////////////////////////
73
    // A parcel is submitted for transport at the source locality site to
74
    // the parcel set of the locality with the put-parcel command
75
    // This function is synchronous.
3✔
76
    void parcelhandler::sync_put_parcel(parcel p)    //-V669
77
    {
78
        hpx::promise<void> promise;
3✔
79
        future<void> sent_future = promise.get_future();
3✔
80
        put_parcel(
3✔
81
            HPX_MOVE(p), [&promise](std::error_code const&, parcel const&) {
82
                promise.set_value();
3✔
83
            });               // schedule parcel send
3✔
84
        sent_future.get();    // wait for the parcel to be sent
3✔
85
    }
86

32✔
87
    parcelhandler::parcelhandler(util::runtime_configuration const& cfg)
32✔
88
      : tm_(nullptr)
89
      , use_alternative_parcelports_(false)
90
      , enable_parcel_handling_(true)
32✔
91
      , load_message_handlers_(
32✔
92
            util::get_entry_as<int>(cfg, "hpx.parcel.message_handlers", 0) != 0)
93
      , count_routed_(0)
32✔
94
      , write_handler_(&default_write_handler)
95
#if defined(HPX_HAVE_NETWORKING)
32✔
96
      , is_networking_enabled_(cfg.enable_networking())
97
#else
98
      , is_networking_enabled_(false)
99
#endif
100
    {
32✔
101
        LPROGRESS_;
32✔
102
    }
103

32✔
104
    parcelhandler::~parcelhandler() = default;
105

32✔
106
    void parcelhandler::set_notification_policies(
107
        util::runtime_configuration const& cfg, threads::threadmanager* tm,
108
        threads::policies::callback_notifier const& notifier)
109
    {
32✔
110
        is_networking_enabled_ = hpx::is_networking_enabled();
32✔
111
        tm_ = tm;
112

38✔
113
        if (is_networking_enabled_ &&
50✔
114
            cfg.get_entry("hpx.parcel.enable", "1") != "0")
115
        {
6✔
116
            for (plugins::parcelport_factory_base* factory :
30✔
117
                get_parcelport_factories())
118
            {
18✔
119
                std::shared_ptr<parcelport> pp(factory->create(cfg, notifier));
18✔
120
                attach_parcelport(pp);
121
            }
122
        }
32✔
123
    }
124

50✔
125
    std::shared_ptr<parcelport> parcelhandler::get_bootstrap_parcelport() const
126
    {
50✔
127
        std::string const cfgkey("hpx.parcel.bootstrap");
50✔
128
        if (!pports_.empty())
129
        {
130
            auto const it =
24✔
131
                pports_.find(get_priority(get_config_entry(cfgkey, "tcp")));
24✔
132
            if (it != pports_.end() && it->first > 0)
133
            {
134
                return it->second;
135
            }
136
        }
137

26✔
138
        for (auto const& pp : pports_)
139
        {
×
140
            if (pp.first > 0 && pp.second->can_bootstrap())
141
            {
142
                return pp.second;
143
            }
144
        }
145

26✔
146
        if (hpx::is_networking_enabled())
147
        {
×
148
            std::cerr << "Could not find usable bootstrap parcelport.\n";
×
149
            std::cerr << hpx::util::format(
150
                "Preconfigured bootstrap parcelport: '{}'\n",
×
151
                get_config_entry(cfgkey, "tcp"));
152

×
153
            if (pports_.empty())
154
            {
×
155
                std::cerr << "No available parcelports\n";
156
            }
157
            else
158
            {
×
159
                std::cerr << "List of available parcelports:\n";
×
160
                for (auto const& pp : pports_)
161
                {
×
162
                    std::cerr << hpx::util::format(
163
                        "  {}, priority: {}, can bootstrap: {}\n",
×
164
                        pp.second->type(), pp.first,
×
165
                        pp.second->can_bootstrap());
166
                }
×
167
                std::cerr << "\n";
168
            }
169

170
            // terminate this locality as there is nothing else we can do
×
171
            std::terminate();
172
        }
173

174
        return {};
175
    }
176

32✔
177
    void parcelhandler::initialize()
178
    {
32✔
179
        exception_list exceptions;
32✔
180
        std::vector<int> failed_pps;
50✔
181
        for (pports_type::value_type& pp : pports_)
182
        {
183
            // protect against exceptions thrown by a parcelport during
184
            // initialization
185
            hpx::detail::try_catch_exception_ptr(
54✔
186
                [&]() {
36✔
187
                    if (pp.second != get_bootstrap_parcelport())
188
                    {
12✔
189
                        if (pp.first > 0)
4✔
190
                            pp.second->run(false);
191
                    }
192

193
                    // early parcel handling is finished, normal operation is
194
                    // about to start
18✔
195
                    if (pp.first > 0)
196
                    {
10✔
197
                        pp.second->initialized();
198
                    }
18✔
199
                },
36✔
200
                [&](std::exception_ptr&& e) {
×
201
                    exceptions.add(HPX_MOVE(e));
×
202
                    failed_pps.push_back(pp.first);
×
203
                });
204
        }
205

206
        // handle exceptions
32✔
207
        if (exceptions.size() != 0)
208
        {
×
209
            if (failed_pps.size() == pports_.size())
210
            {
×
211
                std::cerr << hpx::util::format(
212
                    "all parcelports failed initializing on locality {}, "
213
                    "exiting:\n{}\n",
×
214
                    agas::get_locality_id(), exceptions.get_message());
×
215
                std::terminate();
216
            }
217
            else
218
            {
×
219
                std::cerr << hpx::util::format(
220
                    "warning: the following errors were detected while "
221
                    "initializing parcelports on locality {}:\n{}\n",
×
222
                    agas::get_locality_id(), exceptions.get_message());
223
            }
224

225
            // clean up parcelports that have failed initialization
×
226
            std::cerr << "the following parcelports will be disabled:\n";
×
227
            for (int pp : failed_pps)
228
            {
229
                auto it = pports_.find(pp);
×
230
                if (it != pports_.end())
231
                {
×
232
                    std::cerr << "  " << (*it).second->type() << "\n";
×
233
                    (*it).second->stop();
234
                    pports_.erase(it);
235
                }
236
            }
×
237
            std::cerr << "\n";
238
        }
32✔
239
    }
240

×
241
    void parcelhandler::list_parcelport(std::ostringstream& strm,
242
        std::string const& ppname, int priority, bool bootstrap) const
243
    {
×
244
        hpx::util::format_to(strm, "parcel port: {}", ppname);
245

×
246
        std::string const cfgkey("hpx.parcel." + ppname + ".enable");
247
        bool const enabled =
×
248
            hpx::util::from_string<int>(get_config_entry(cfgkey, "0"), 0);
×
249
        strm << (enabled ? ", enabled" : ", not enabled");
250

×
251
        if (bootstrap)
×
252
            strm << ", bootstrap";
253

×
254
        hpx::util::format_to(strm, ", priority {}\n", priority);
×
255
    }
256

257
    // list available parcel ports
×
258
    void parcelhandler::list_parcelports(std::ostringstream& strm) const
259
    {
×
260
        for (pports_type::value_type const& pp : pports_)
261
        {
×
262
            list_parcelport(strm, pp.second->type(), pp.second->priority(),
×
263
                pp.second == get_bootstrap_parcelport());
264
        }
×
265
        strm << '\n';
×
266
    }
267

×
268
    write_handler_type parcelhandler::set_write_handler(write_handler_type f)
269
    {
×
270
        std::lock_guard<mutex_type> l(mtx_);
×
271
        std::swap(f, write_handler_);
×
272
        return f;
273
    }
274

32✔
275
    bool parcelhandler::enum_parcelports(
276
        hpx::move_only_function<bool(std::string const&)> const& f) const
277
    {
50✔
278
        for (pports_type::value_type const& pp : pports_)
279
        {
28✔
280
            if (pp.first > 0 && !f(pp.second->type()))
281
            {
282
                return false;
283
            }
284
        }
285
        return true;
286
    }
287

53✔
288
    int parcelhandler::get_priority(std::string const& name) const
289
    {
290
        auto const it = priority_.find(name);
53✔
291
        if (it == priority_.end())
292
            return 0;
53✔
293
        return it->second;
294
    }
295

29✔
296
    parcelport* parcelhandler::find_parcelport(
297
        std::string const& type, error_code&) const
298
    {
29✔
299
        int const priority = get_priority(type);
29✔
300
        if (priority <= 0)
301
            return nullptr;
302
        HPX_ASSERT(pports_.find(priority) != pports_.end());
303
        return pports_.find(priority)->second.get();    // -V783
304
    }
305

18✔
306
    void parcelhandler::attach_parcelport(std::shared_ptr<parcelport> const& pp)
307
    {
18✔
308
        if (!hpx::is_networking_enabled() || !pp)
309
        {
×
310
            return;
311
        }
312

313
        // add the new parcelport to the list of parcel-ports we care about
18✔
314
        int priority = pp->priority();
315
        std::string const cfgkey(
18✔
316
            std::string("hpx.parcel.") + pp->type() + ".enable");
36✔
317
        if (get_config_entry(cfgkey, "0") != "1")
318
        {
8✔
319
            priority = -priority;
320
        }
18✔
321
        pports_[priority] = pp;
18✔
322
        priority_[pp->type()] = priority;
323

324
        // add the endpoint of the new parcelport
325
        HPX_ASSERT(pp->type() == pp->here().type());
18✔
326
        if (priority > 0)
10✔
327
            endpoints_[pp->type()] = pp->here();
328
    }
329

330
    ///////////////////////////////////////////////////////////////////////////
331
    /// \brief Make sure the specified locality is not held by any
332
    /// connection caches anymore
×
333
    void parcelhandler::remove_from_connection_cache(
334
        naming::gid_type const& gid, endpoints_type const& endpoints) const
335
    {
×
336
        for (endpoints_type::value_type const& loc : endpoints)
337
        {
×
338
            for (pports_type::value_type const& pp : pports_)
339
            {
×
340
                if (std::string(pp.second->type()) == loc.second.type())
341
                {
×
342
                    pp.second->remove_from_connection_cache(loc.second);
343
                }
344
            }
345
        }
346

×
347
        agas::remove_resolved_locality(gid);
×
348
    }
349

350
    ///////////////////////////////////////////////////////////////////////////
686,019✔
351
    bool parcelhandler::do_background_work(std::size_t num_thread,
352
        bool stop_buffering, parcelport_background_mode mode)
353
    {
354
        bool did_some_work = false;
686,019✔
355
        if (!is_networking_enabled_)
356
        {
357
            return did_some_work;
358
        }
359

686,019✔
360
        LPT_(debug).format(
361
            "parcelhandler::do_background_work: thread {}, mode {}", num_thread,
×
362
            get_parcelport_background_mode_name(mode));
363

364
        // flush all parcel buffers
686,019✔
365
        if (0 == num_thread &&
366
            (mode & parcelport_background_mode::flush_buffers))
367
        {
686,019✔
368
            std::unique_lock<mutex_type> l(handlers_mtx_, std::try_to_lock);
369

686,019✔
370
            if (l.owns_lock())
371
            {
372
                using parcelset::policies::message_handler;
373
                constexpr message_handler::flush_mode flush_mode =
374
                    message_handler::flush_mode_background_work;
375

376
                auto const end = handlers_.end();
1,121,346✔
377
                for (auto it = handlers_.begin(); it != end; ++it)
378
                {
435,327✔
379
                    if ((*it).second)
380
                    {
381
                        std::shared_ptr<policies::message_handler> const p(
382
                            (*it).second);
383
                        unlock_guard<std::unique_lock<mutex_type>> ul(l);
435,327✔
384
                        did_some_work = p->flush(flush_mode, stop_buffering) ||
385
                            did_some_work;
386
                    }
387
                }
388
            }
389
        }
390

391
        // make sure all pending parcels are being handled
2,744,076✔
392
        for (pports_type::value_type const& pp : pports_)
393
        {
2,058,057✔
394
            if (pp.first > 0)
395
            {
396
                did_some_work =
1,353,189✔
397
                    pp.second->do_background_work(num_thread, mode) ||
398
                    did_some_work;
399
            }
400
        }
401

402
        return did_some_work;
403
    }
404

24✔
405
    void parcelhandler::flush_parcels() const
406
    {
24✔
407
        if (is_networking_enabled_)
408
        {
409
            // now flush all parcel ports to be shut down
96✔
410
            for (pports_type::value_type const& pp : pports_)
411
            {
72✔
412
                if (pp.first > 0)
413
                {
40✔
414
                    pp.second->flush_parcels();
415
                }
416
            }
417
        }
24✔
418
    }
419

95✔
420
    void parcelhandler::stop(bool blocking)
421
    {
422
        // now stop all parcel ports
149✔
423
        for (pports_type::value_type const& pp : pports_)
424
        {
54✔
425
            if (pp.first > 0)
426
            {
30✔
427
                pp.second->stop(blocking);
428
            }
429
        }
430

431
        // release all message handlers
432
        handlers_.clear();
95✔
433
    }
434

4✔
435
    bool parcelhandler::get_raw_remote_localities(
436
        std::vector<naming::gid_type>& locality_ids,
437
        components::component_type type, error_code& ec) const
438
    {
4✔
439
        std::vector<naming::gid_type> allprefixes;
4✔
440
        bool const result = get_raw_localities(allprefixes, type, ec);
4✔
441
        if (ec || !result)
442
            return false;
443

4✔
444
        std::remove_copy(allprefixes.begin(), allprefixes.end(),
4✔
445
            std::back_inserter(locality_ids), agas::get_locality());
446

4✔
447
        return !locality_ids.empty();
448
    }
449

39✔
450
    bool parcelhandler::get_raw_localities(
451
        std::vector<naming::gid_type>& locality_ids,
452
        components::component_type type, error_code&) const
453
    {
39✔
454
        std::vector<std::uint32_t> const ids = agas::get_all_locality_ids(type);
455

456
        locality_ids.clear();
39✔
457
        locality_ids.reserve(ids.size());
84✔
458
        for (auto const& id : ids)
459
        {
45✔
460
            locality_ids.emplace_back(naming::get_gid_from_locality_id(id));
461
        }
462

78✔
463
        return !locality_ids.empty();
464
    }
465

466
    std::pair<std::shared_ptr<parcelport>, locality>
408✔
467
    parcelhandler::find_appropriate_destination(
468
        naming::gid_type const& dest_gid)
469
    {
408✔
470
        endpoints_type const& dest_endpoints = agas::resolve_locality(dest_gid);
471

408✔
472
        for (pports_type::value_type& pp : pports_)
473
        {
408✔
474
            if (pp.first > 0)
475
            {
476
                if (locality const& dest =
408✔
477
                        find_endpoint(dest_endpoints, pp.second->type());
816✔
478
                    dest &&
408✔
479
                    pp.second->can_connect(dest, use_alternative_parcelports_))
480
                {
816✔
481
                    return std::make_pair(pp.second, dest);
482
                }
483
            }
484
        }
485

×
486
        std::ostringstream strm;
×
487
        strm << "target locality: " << dest_gid << "\n";
×
488
        strm << "available destination endpoints:\n" << dest_endpoints << "\n";
×
489
        strm << "available parcelports:\n";
×
490
        for (auto const& pp : pports_)
491
        {
×
492
            list_parcelport(strm, pp.second->type(), pp.second->priority(),
×
493
                pp.second == get_bootstrap_parcelport());
×
494
            strm << "\t [" << pp.second->here() << "]\n";
495
        }
496

×
497
        HPX_THROW_EXCEPTION(hpx::error::network_error,
498
            "parcelhandler::find_appropriate_destination",
499
            "The locality gid cannot be resolved to a valid endpoint. "
500
            "No valid parcelport configured. Detailed information:\n{}",
501
            strm.str());
×
502
    }
503

408✔
504
    locality parcelhandler::find_endpoint(
505
        endpoints_type const& eps, std::string const& name)
506
    {
507
        auto const it = eps.find(name);
408✔
508
        if (it != eps.end())
408✔
509
            return it->second;
×
510
        return {};
511
    }
512

513
    // Return the reference to an existing io_service
×
514
    util::io_service_pool* parcelhandler::get_thread_pool(
515
        char const* name) const
516
    {
517
        util::io_service_pool* result = nullptr;
×
518
        for (pports_type::value_type const& pp : pports_)
519
        {
×
520
            result = pp.second->get_thread_pool(name);
×
521
            if (result)
522
                return result;
523
        }
524
        return result;
525
    }
526

527
    namespace detail {
528

408✔
529
        void parcel_sent_handler(
530
            parcelhandler::write_handler_type const& f,    //-V669
531
            std::error_code const& ec, parcelset::parcel const& p)
532
        {
533
            // inform termination detection of a sent message
408✔
534
            if (!p.does_termination_detection())
535
            {
384✔
536
                hpx::detail::dijkstra_make_black();
537
            }
538

539
            // invoke the original handler
540
            f(ec, p);
541

542
#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX)
543
            static util::itt::event parcel_send("send_parcel");
544
            util::itt::event_tick(parcel_send);
545
#endif
546

547
#if defined(HPX_HAVE_APEX) && defined(HPX_HAVE_PARCEL_PROFILING)
548
            // tell APEX about the sent parcel
549
            util::external_timer::send(
550
                p.parcel_id().get_lsb(), p.size(), p.destination_locality_id());
551
#endif
408✔
552
        }
553
    }    // namespace detail
554

×
555
    void parcelhandler::put_parcel(parcelset::parcel p)
556
    {
×
557
        LPT_(debug).format(
558
            "parcelhandler::put_parcel: submitted: {}", p.parcel_id());
559

560
        // inform termination detection of a sent message
×
561
        if (!p.does_termination_detection())
562
        {
×
563
            hpx::detail::dijkstra_make_black();
564
        }
565

×
566
        auto handler = [this](std::error_code const& ec,
×
567
                           parcelset::parcel const& p) -> void {
×
568
            invoke_write_handler(ec, p);
569

×
570
            LPT_(debug).format(
571
                "parcelhandler::put_parcel: handled: {}", p.parcel_id());
×
572
        };
573

×
574
        put_parcel_impl(HPX_MOVE(p), HPX_MOVE(handler));
×
575
    }
576

408✔
577
    void parcelhandler::put_parcel(parcelset::parcel p, write_handler_type f)
578
    {
408✔
579
        LPT_(debug).format(
580
            "parcelhandler::put_parcel (with handler): submitted: {}",
581
            p.parcel_id());
582

583
        // inform termination detection of a sent message
408✔
584
        if (!p.does_termination_detection())
585
        {
384✔
586
            hpx::detail::dijkstra_make_black();
587
        }
588

816✔
589
        auto handler = [this, f = HPX_MOVE(f)](std::error_code const& ec,
408✔
590
                           parcelset::parcel const& p) -> void {
408✔
591
            invoke_write_handler(ec, p);
592
            f(ec, p);
593

408✔
594
            LPT_(debug).format(
595
                "parcelhandler::put_parcel: handled: {}", p.parcel_id());
816✔
596
        };
597

816✔
598
        put_parcel_impl(HPX_MOVE(p), HPX_MOVE(handler));
408✔
599
    }
600

408✔
601
    void parcelhandler::put_parcel_impl(parcel&& p, write_handler_type&& f)
602
    {
603
        HPX_ASSERT(is_networking_enabled_);
604

408✔
605
        naming::gid_type const& gid = p.destination();
408✔
606
        naming::address& addr = p.addr();
607

608
        // During bootstrap this is handled separately (see
609
        // addressing_service::resolve_locality.
610

611
        // if this isn't an HPX thread, the stack space check will return false
408✔
612
        if (!this_thread::has_sufficient_stack_space() &&
×
613
            hpx::threads::threadmanager_is(hpx::state::running))
614
        {
×
615
            LPT_(debug).format("parcelhandler::put_parcel_impl: reschedule: {}",
616
                p.parcel_id());
617

618
            {
619
                // reschedule request as an HPX thread to avoid hangs
620
                void (parcelhandler::*put_parcel_ptr)(
621
                    parcel, write_handler_type) = &parcelhandler::put_parcel;
622

623
                threads::thread_init_data data(
×
624
                    threads::make_thread_function_nullary(
625
                        put_parcel_ptr, this, HPX_MOVE(p), HPX_MOVE(f)),
626
                    "parcelhandler::put_parcel",
627
                    threads::thread_priority::boost,
628
                    threads::thread_schedule_hint(),
629
                    threads::thread_stacksize::medium,
630
                    threads::thread_schedule_state::pending, true);
×
631
                threads::register_thread(data);
632
                return;
633
            }
634
        }
635

636
        // properly initialize parcel
408✔
637
        init_parcel(p);
638

639
        bool resolved_locally = true;
640

641
        if (!addr)
642
        {
×
643
            resolved_locally = agas::resolve_local(gid, addr);
644
        }
645

646
        write_handler_type wrapped_f =
408✔
647
            hpx::bind_front(&detail::parcel_sent_handler, HPX_MOVE(f));
648

649
        // If we were able to resolve the address(es) locally we send the
650
        // parcel directly to the destination.
408✔
651
        if (resolved_locally)
652
        {
653
            // dispatch to the message handler which is associated with the
654
            // encapsulated action
655
            using destination_pair =
656
                std::pair<std::shared_ptr<parcelport>, locality>;
657
            destination_pair const dest =
408✔
658
                find_appropriate_destination(addr.locality_);
659

408✔
660
            if (load_message_handlers_ && !hpx::is_stopped_or_shutting_down())
661
            {
375✔
662
                if (policies::message_handler* mh =
375✔
663
                        p.get_message_handler(dest.second))
664
                {
195✔
665
                    mh->put_parcel(
666
                        dest.second, HPX_MOVE(p), HPX_MOVE(wrapped_f));
195✔
667
                    return;
668
                }
669
            }
670

213✔
671
            dest.first->put_parcel(
672
                dest.second, HPX_MOVE(p), HPX_MOVE(wrapped_f));
213✔
673
            return;
408✔
674
        }
675

676
        // At least one of the addresses is locally unknown, route the parcel
677
        // to the AGAS managing the destination.
678
        ++count_routed_;
679

×
680
        agas::route(HPX_MOVE(p), HPX_MOVE(wrapped_f));
681
    }
682

×
683
    void parcelhandler::put_parcels(std::vector<parcel> parcels)
684
    {
×
685
        LPT_(debug).format("parcelhandler::put_parcels: submitted: {} parcels",
×
686
            parcels.size());
687

688
        // inform termination detection of a sent message
×
689
        for (auto const& p : parcels)
690
        {
×
691
            if (!p.does_termination_detection())
692
            {
×
693
                hpx::detail::dijkstra_make_black();
694
                break;
695
            }
696
        }
697

698
        std::vector<write_handler_type> handlers(parcels.size(),
×
699
            [this](std::error_code const& ec, parcel const& p) -> void {
×
700
                invoke_write_handler(ec, p);
×
701
                LPT_(debug).format(
702
                    "parcelhandler::put_parcels: handled: {}", p.parcel_id());
×
703
            });
704

×
705
        put_parcels_impl(HPX_MOVE(parcels), HPX_MOVE(handlers));
×
706
    }
707

×
708
    void parcelhandler::put_parcels(
709
        std::vector<parcel> parcels, std::vector<write_handler_type> funcs)
710
    {
×
711
        LPT_(debug).format(
712
            "parcelhandler::put_parcels (with handlers): submitted: {} parcels",
×
713
            parcels.size());
714

715
        // inform termination detection of a sent message
×
716
        for (auto const& p : parcels)
717
        {
×
718
            if (!p.does_termination_detection())
719
            {
×
720
                hpx::detail::dijkstra_make_black();
721
                break;
722
            }
723
        }
724

×
725
        std::vector<write_handler_type> handlers;
726

×
727
        handlers.reserve(parcels.size());
×
728
        for (std::size_t i = 0; i != parcels.size(); ++i)
729
        {
×
730
            handlers.emplace_back([this, f = HPX_MOVE(funcs[i])](
731
                                      std::error_code const& ec,
×
732
                                      parcel const& p) -> void {
×
733
                invoke_write_handler(ec, p);
734
                f(ec, p);
735

×
736
                LPT_(debug).format(
737
                    "parcelhandler::put_parcels: handled: {}", p.parcel_id());
×
738
            });
739
        }
740

×
741
        put_parcels_impl(HPX_MOVE(parcels), HPX_MOVE(handlers));
×
742
    }
743

×
744
    void parcelhandler::put_parcels_impl(std::vector<parcel>&& parcels,
745
        std::vector<write_handler_type>&& handlers)
746
    {
747
        HPX_ASSERT(is_networking_enabled_);
748

×
749
        if (parcels.size() != handlers.size())
750
        {
×
751
            HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
752
                "parcelhandler::put_parcels",
753
                "mismatched number of parcels and handlers");
754
        }
755

756
        // if this isn't an HPX thread, the stack space check will return false
×
757
        if (!this_thread::has_sufficient_stack_space() &&
×
758
            hpx::threads::threadmanager_is(hpx::state::running))
759
        {
×
760
            LPT_(debug).format(
761
                "parcelhandler::put_parcels_impl: rescheduling: {}",
×
762
                parcels.size());
763

764
            // reschedule request as an HPX thread to avoid hangs
765
            void (parcelhandler::*put_parcels_ptr)(std::vector<parcel>,
766
                std::vector<write_handler_type>) = &parcelhandler::put_parcels;
767

768
            threads::thread_init_data data(
×
769
                threads::make_thread_function_nullary(put_parcels_ptr, this,
×
770
                    HPX_MOVE(parcels), HPX_MOVE(handlers)),
771
                "parcelhandler::put_parcels", threads::thread_priority::boost,
772
                threads::thread_schedule_hint(),
773
                threads::thread_stacksize::medium,
774
                threads::thread_schedule_state::pending, true);
775
            threads::register_thread(data);
×
776
            return;
777
        }
778

779
        // partition parcels depending on whether their destination can be
780
        // resolved locally
781
        std::size_t const num_parcels = parcels.size();
782

783
        std::vector<parcel> resolved_parcels;
×
784
        resolved_parcels.reserve(num_parcels);
×
785
        std::vector<write_handler_type> resolved_handlers;
×
786
        resolved_handlers.reserve(num_parcels);
×
787

788
        using destination_pair =
789
            std::pair<std::shared_ptr<parcelport>, locality>;
790

791
        destination_pair resolved_dest;
×
792

793
        std::vector<parcel> nonresolved_parcels;
×
794
        std::vector<write_handler_type> nonresolved_handlers;
×
795

796
        nonresolved_parcels.reserve(num_parcels);
×
797
        nonresolved_handlers.reserve(num_parcels);
×
798

799
        naming::gid_type dest_locality;
×
800
        for (std::size_t i = 0; i != num_parcels; ++i)
×
801
        {
802
            parcel& p = parcels[i];
803

804
            // properly initialize parcel
805
            init_parcel(p);
×
806

807
            bool resolved_locally = true;
808
            naming::address& addr = p.addr();
×
809

810
            if (!addr)
811
            {
812
                resolved_locally = agas::resolve_local(p.destination(), addr);
×
813
            }
814

815
            write_handler_type f = hpx::bind_front(
×
816
                &detail::parcel_sent_handler, HPX_MOVE(handlers[i]));
×
817

818
            // make sure all parcels go to the same locality
819
            if (i == 0)
×
820
            {
821
                dest_locality = p.destination_locality();
×
822
            }
823
            else if (dest_locality != p.destination_locality())
×
824
            {
825
                HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
×
826
                    "parcelhandler::put_parcels",
827
                    "mismatched destinations, all parcels are expected to "
828
                    "target the same locality");
829
            }
830

831
            // If we were able to resolve the address(es) locally we would send
832
            // the parcel directly to the destination.
833
            if (resolved_locally)
×
834
            {
835
                // dispatch to the message handler which is associated with the
836
                // encapsulated action
837
                destination_pair dest =
838
                    find_appropriate_destination(addr.locality_);
×
839

840
                if (load_message_handlers_)
×
841
                {
842
                    if (policies::message_handler* mh =
×
843
                            p.get_message_handler(dest.second))
×
844
                    {
845
                        mh->put_parcel(dest.second, HPX_MOVE(p), HPX_MOVE(f));
×
846
                        continue;
847
                    }
848
                }
849

850
                resolved_parcels.emplace_back(HPX_MOVE(p));
×
851
                resolved_handlers.emplace_back(HPX_MOVE(f));
×
852
                if (!resolved_dest.second)
×
853
                {
854
                    resolved_dest = dest;
855
                }
856
                else
857
                {
858
                    HPX_ASSERT(resolved_dest == dest);
859
                }
860
            }
×
861
            else
862
            {
863
                nonresolved_parcels.emplace_back(HPX_MOVE(p));
×
864
                nonresolved_handlers.emplace_back(HPX_MOVE(f));
×
865
            }
866
        }
867

868
        // handle parcel that have been locally resolved
869
        if (!resolved_parcels.empty())
×
870
        {
871
            HPX_ASSERT(!!resolved_dest.first && !!resolved_dest.second);
872
            resolved_dest.first->put_parcels(resolved_dest.second,
×
873
                HPX_MOVE(resolved_parcels), HPX_MOVE(resolved_handlers));
874
        }
875

876
        // At least one of the addresses is locally unknown, route the
877
        // parcel to the AGAS managing the destination.
878
        for (std::size_t i = 0; i != nonresolved_parcels.size(); ++i)
×
879
        {
880
            ++count_routed_;
881
            agas::route(HPX_MOVE(nonresolved_parcels[i]),
×
882
                HPX_MOVE(nonresolved_handlers[i]));
883
        }
884
    }
×
885

886
    void parcelhandler::invoke_write_handler(
408✔
887
        std::error_code const& ec, parcel const& p) const
888
    {
889
        write_handler_type const f = write_handler_;
890
        f(ec, p);
891
    }
408✔
892

893
    ///////////////////////////////////////////////////////////////////////////
894
    std::int64_t parcelhandler::get_outgoing_queue_length(bool reset) const
×
895
    {
896
        std::int64_t parcel_count = 0;
897
        for (pports_type::value_type const& pp : pports_)
×
898
        {
899
            parcel_count += pp.second->get_pending_parcels_count(reset);
×
900
        }
901
        return parcel_count;
×
902
    }
903

904
    ///////////////////////////////////////////////////////////////////////////
905
    // default callback for put_parcel
906
    void default_write_handler(std::error_code const& ec, parcel const& p)
603✔
907
    {
908
        if (ec)
603✔
909
        {
910
            // If we are in a stopped state, ignore some errors
911
            if (hpx::is_stopped_or_shutting_down())
×
912
            {
913
                using ::asio::error::make_error_code;
914
                if (ec == make_error_code(::asio::error::connection_aborted) ||
915
                    ec == make_error_code(::asio::error::connection_reset) ||
916
                    ec == make_error_code(::asio::error::broken_pipe) ||
917
                    ec == make_error_code(::asio::error::not_connected) ||
918
                    ec == make_error_code(::asio::error::eof))
919
                {
920
                    return;
×
921
                }
922
            }
923
            else if (hpx::tolerate_node_faults())
×
924
            {
925
                if (ec ==
926
                    ::asio::error::make_error_code(
927
                        ::asio::error::connection_reset))
928
                {
×
929
                    return;
930
                }
931
            }
932

933
            // all unhandled exceptions terminate the whole application
×
934
            std::exception_ptr const exception = hpx::detail::get_exception(
×
935
                hpx::exception(ec), "default_write_handler", __FILE__, __LINE__,
×
936
                parcelset::dump_parcel(p));
937

×
938
            hpx::report_error(exception);
939
        }
940
    }
941

942
    ///////////////////////////////////////////////////////////////////////////
195✔
943
    policies::message_handler* parcelhandler::get_message_handler(
944
        char const* action, char const* message_handler_type,
945
        std::size_t num_messages, std::size_t interval, locality const& loc,
946
        error_code& ec)
947
    {
195✔
948
        if (!is_networking_enabled_)
949
        {
950
            return nullptr;
951
        }
952

195✔
953
        std::unique_lock<mutex_type> l(handlers_mtx_);
954

195✔
955
        handler_key_type key(loc, action);
956
        auto it = handlers_.find(key);
195✔
957
        if (it == handlers_.end())
958
        {
6✔
959
            std::shared_ptr<policies::message_handler> p;
960

961
            {
962
                // Just ignore the handlers_mtx_ while checking. We need to hold
963
                // the lock here to avoid multiple registrations that happens
964
                // right now in the parcel coalescing plugin
965
                [[maybe_unused]] hpx::util::ignore_while_checking const il(&l);
966

12✔
967
                p.reset(hpx::create_message_handler(message_handler_type,
968
                    action, find_parcelport(loc.type()), num_messages, interval,
969
                    ec));
970
            }
971

972
            it = handlers_.find(key);
6✔
973
            if (it != handlers_.end())
974
            {
975
                // if some other thread has created the entry in the meantime
×
976
                l.unlock();
×
977
                if (&ec != &throws)
978
                {
×
979
                    if ((*it).second)
×
980
                        ec = make_success_code();
981
                    else
×
982
                        ec = make_error_code(
×
983
                            hpx::error::bad_parameter, throwmode::lightweight);
984
                }
×
985
                return (*it).second.get();
986
            }
987

6✔
988
            if (ec || !p)
989
            {
990
                // insert an empty entry into the map to avoid trying to create
991
                // this handler again
992
                p.reset();
993
                std::pair<message_handler_map::iterator, bool> const r =
×
994
                    handlers_.emplace(key, p);
995

×
996
                l.unlock();
×
997
                if (!r.second)
998
                {
×
999
                    HPX_THROWS_IF(ec, hpx::error::internal_server_error,
1000
                        "parcelhandler::get_message_handler",
1001
                        "could not store empty message handler");
×
1002
                    return nullptr;
1003
                }
1004
                return nullptr;    // no message handler available
1005
            }
1006

1007
            std::pair<message_handler_map::iterator, bool> const r =
6✔
1008
                handlers_.emplace(key, p);
1009

6✔
1010
            l.unlock();
6✔
1011
            if (!r.second)
1012
            {
×
1013
                HPX_THROWS_IF(ec, hpx::error::internal_server_error,
1014
                    "parcelhandler::get_message_handler",
1015
                    "could not store newly created message handler");
×
1016
                return nullptr;
1017
            }
6✔
1018
            it = r.first;
1019
        }
189✔
1020
        else if (!(*it).second)
1021
        {
×
1022
            l.unlock();
×
1023
            if (&ec != &throws)
1024
            {
×
1025
                ec = make_error_code(
×
1026
                    hpx::error::bad_parameter, throwmode::lightweight);
1027
            }
1028
            else
1029
            {
×
1030
                HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
1031
                    "parcelhandler::get_message_handler",
1032
                    "couldn't find an appropriate message handler");
1033
            }
×
1034
            return nullptr;    // no message handler available
1035
        }
1036

195✔
1037
        if (&ec != &throws)
390✔
1038
            ec = make_success_code();
1039

195✔
1040
        return (*it).second.get();
195✔
1041
    }
1042

1043
    ///////////////////////////////////////////////////////////////////////////
×
1044
    std::string parcelhandler::get_locality_name() const
1045
    {
×
1046
        for (pports_type::value_type const& pp : pports_)
1047
        {
×
1048
            if (pp.first > 0)
1049
            {
×
1050
                std::string name = pp.second->get_locality_name();
×
1051
                if (!name.empty())
×
1052
                    return name;
1053
            }
1054
        }
×
1055
        return "<unknown>";
1056
    }
1057

1058
    ///////////////////////////////////////////////////////////////////////////
1059
    // Performance counter data
1060

1061
    // number of parcels routed
×
1062
    std::int64_t parcelhandler::get_parcel_routed_count(bool reset)
1063
    {
×
1064
        return util::get_and_reset_value(count_routed_, reset);
1065
    }
1066

1067
#if defined(HPX_HAVE_PARCELPORT_COUNTERS)
1068
    // number of parcels sent
1069
    std::int64_t parcelhandler::get_parcel_send_count(
1070
        std::string const& pp_type, bool reset) const
1071
    {
1072
        error_code ec(throwmode::lightweight);
1073
        parcelport* pp = find_parcelport(pp_type, ec);
1074
        return pp ? pp->get_parcel_send_count(reset) : 0;
1075
    }
1076

1077
    // number of messages sent
1078
    std::int64_t parcelhandler::get_message_send_count(
1079
        std::string const& pp_type, bool reset) const
1080
    {
1081
        error_code ec(throwmode::lightweight);
1082
        parcelport* pp = find_parcelport(pp_type, ec);
1083
        return pp ? pp->get_message_send_count(reset) : 0;
1084
    }
1085

1086
    // number of parcels received
1087
    std::int64_t parcelhandler::get_parcel_receive_count(
1088
        std::string const& pp_type, bool reset) const
1089
    {
1090
        error_code ec(throwmode::lightweight);
1091
        parcelport* pp = find_parcelport(pp_type, ec);
1092
        return pp ? pp->get_parcel_receive_count(reset) : 0;
1093
    }
1094

1095
    // number of messages received
1096
    std::int64_t parcelhandler::get_message_receive_count(
1097
        std::string const& pp_type, bool reset) const
1098
    {
1099
        error_code ec(throwmode::lightweight);
1100
        parcelport* pp = find_parcelport(pp_type, ec);
1101
        return pp ? pp->get_message_receive_count(reset) : 0;
1102
    }
1103

1104
    // the total time it took for all sends, from async_write to the
1105
    // completion handler (nanoseconds)
1106
    std::int64_t parcelhandler::get_sending_time(
1107
        std::string const& pp_type, bool reset) const
1108
    {
1109
        error_code ec(throwmode::lightweight);
1110
        parcelport* pp = find_parcelport(pp_type, ec);
1111
        return pp ? pp->get_sending_time(reset) : 0;
1112
    }
1113

1114
    // the total time it took for all receives, from async_read to the
1115
    // completion handler (nanoseconds)
1116
    std::int64_t parcelhandler::get_receiving_time(
1117
        std::string const& pp_type, bool reset) const
1118
    {
1119
        error_code ec(throwmode::lightweight);
1120
        parcelport* pp = find_parcelport(pp_type, ec);
1121
        return pp ? pp->get_receiving_time(reset) : 0;
1122
    }
1123

1124
    // the total time it took for all sender-side serialization operations
1125
    // (nanoseconds)
1126
    std::int64_t parcelhandler::get_sending_serialization_time(
1127
        std::string const& pp_type, bool reset) const
1128
    {
1129
        error_code ec(throwmode::lightweight);
1130
        parcelport* pp = find_parcelport(pp_type, ec);
1131
        return pp ? pp->get_sending_serialization_time(reset) : 0;
1132
    }
1133

1134
    // the total time it took for all receiver-side serialization
1135
    // operations (nanoseconds)
1136
    std::int64_t parcelhandler::get_receiving_serialization_time(
1137
        std::string const& pp_type, bool reset) const
1138
    {
1139
        error_code ec(throwmode::lightweight);
1140
        parcelport* pp = find_parcelport(pp_type, ec);
1141
        return pp ? pp->get_receiving_serialization_time(reset) : 0;
1142
    }
1143

1144
    // total data sent (bytes)
1145
    std::int64_t parcelhandler::get_data_sent(
1146
        std::string const& pp_type, bool reset) const
1147
    {
1148
        error_code ec(throwmode::lightweight);
1149
        parcelport* pp = find_parcelport(pp_type, ec);
1150
        return pp ? pp->get_data_sent(reset) : 0;
1151
    }
1152

1153
    // total data (uncompressed) sent (bytes)
1154
    std::int64_t parcelhandler::get_raw_data_sent(
1155
        std::string const& pp_type, bool reset) const
1156
    {
1157
        error_code ec(throwmode::lightweight);
1158
        parcelport* pp = find_parcelport(pp_type, ec);
1159
        return pp ? pp->get_raw_data_sent(reset) : 0;
1160
    }
1161

1162
    // total data received (bytes)
1163
    std::int64_t parcelhandler::get_data_received(
1164
        std::string const& pp_type, bool reset) const
1165
    {
1166
        error_code ec(throwmode::lightweight);
1167
        parcelport* pp = find_parcelport(pp_type, ec);
1168
        return pp ? pp->get_data_received(reset) : 0;
1169
    }
1170

1171
    // total data (uncompressed) received (bytes)
1172
    std::int64_t parcelhandler::get_raw_data_received(
1173
        std::string const& pp_type, bool reset) const
1174
    {
1175
        error_code ec(throwmode::lightweight);
1176
        parcelport* pp = find_parcelport(pp_type, ec);
1177
        return pp ? pp->get_raw_data_received(reset) : 0;
1178
    }
1179

1180
    std::int64_t parcelhandler::get_buffer_allocate_time_sent(
1181
        std::string const& pp_type, bool reset) const
1182
    {
1183
        error_code ec(throwmode::lightweight);
1184
        parcelport* pp = find_parcelport(pp_type, ec);
1185
        return pp ? pp->get_buffer_allocate_time_sent(reset) : 0;
1186
    }
1187
    std::int64_t parcelhandler::get_buffer_allocate_time_received(
1188
        std::string const& pp_type, bool reset) const
1189
    {
1190
        error_code ec(throwmode::lightweight);
1191
        parcelport* pp = find_parcelport(pp_type, ec);
1192
        return pp ? pp->get_buffer_allocate_time_received(reset) : 0;
1193
    }
1194

1195
    // total zero-copy chunks sent
1196
    std::int64_t parcelhandler::get_zchunks_send_count(
1197
        std::string const& pp_type, bool reset) const
1198
    {
1199
        error_code ec(throwmode::lightweight);
1200
        parcelport* pp = find_parcelport(pp_type, ec);
1201
        return pp ? pp->get_zchunks_send_count(reset) : 0;
1202
    }
1203

1204
    // total zero-copy chunks received
1205
    std::int64_t parcelhandler::get_zchunks_recv_count(
1206
        std::string const& pp_type, bool reset) const
1207
    {
1208
        error_code ec(throwmode::lightweight);
1209
        parcelport* pp = find_parcelport(pp_type, ec);
1210
        return pp ? pp->get_zchunks_recv_count(reset) : 0;
1211
    }
1212

1213
    // the maximum number of zero-copy chunks per message sent
1214
    std::int64_t parcelhandler::get_zchunks_send_per_msg_count_max(
1215
        std::string const& pp_type, bool reset) const
1216
    {
1217
        error_code ec(throwmode::lightweight);
1218
        parcelport* pp = find_parcelport(pp_type, ec);
1219
        return pp ? pp->get_zchunks_send_per_msg_count_max(reset) : 0;
1220
    }
1221

1222
    // the maximum number of zero-copy chunks per message received
1223
    std::int64_t parcelhandler::get_zchunks_recv_per_msg_count_max(
1224
        std::string const& pp_type, bool reset) const
1225
    {
1226
        error_code ec(throwmode::lightweight);
1227
        parcelport* pp = find_parcelport(pp_type, ec);
1228
        return pp ? pp->get_zchunks_recv_per_msg_count_max(reset) : 0;
1229
    }
1230

1231
    // the size of zero-copy chunks per message sent
1232
    std::int64_t parcelhandler::get_zchunks_send_size(
1233
        std::string const& pp_type, bool reset) const
1234
    {
1235
        error_code ec(throwmode::lightweight);
1236
        parcelport* pp = find_parcelport(pp_type, ec);
1237
        return pp ? pp->get_zchunks_send_size(reset) : 0;
1238
    }
1239

1240
    // the size of zero-copy chunks per message received
1241
    std::int64_t parcelhandler::get_zchunks_recv_size(
1242
        std::string const& pp_type, bool reset) const
1243
    {
1244
        error_code ec(throwmode::lightweight);
1245
        parcelport* pp = find_parcelport(pp_type, ec);
1246
        return pp ? pp->get_zchunks_recv_size(reset) : 0;
1247
    }
1248

1249
    // the maximum size of zero-copy chunks per message sent
1250
    std::int64_t parcelhandler::get_zchunks_send_size_max(
1251
        std::string const& pp_type, bool reset) const
1252
    {
1253
        error_code ec(throwmode::lightweight);
1254
        parcelport* pp = find_parcelport(pp_type, ec);
1255
        return pp ? pp->get_zchunks_send_size_max(reset) : 0;
1256
    }
1257

1258
    // the maximum size of zero-copy chunks per message received
1259
    std::int64_t parcelhandler::get_zchunks_recv_size_max(
1260
        std::string const& pp_type, bool reset) const
1261
    {
1262
        error_code ec(throwmode::lightweight);
1263
        parcelport* pp = find_parcelport(pp_type, ec);
1264
        return pp ? pp->get_zchunks_recv_size_max(reset) : 0;
1265
    }
1266

1267
#if defined(HPX_HAVE_PARCELPORT_COUNTERS) &&                                   \
1268
    defined(HPX_HAVE_PARCELPORT_ACTION_COUNTERS)
1269
    // same as above, just separated data for each action
1270
    // number of parcels sent
1271
    std::int64_t parcelhandler::get_action_parcel_send_count(
1272
        std::string const& pp_type, std::string const& action, bool reset) const
1273
    {
1274
        error_code ec(throwmode::lightweight);
1275
        parcelport* pp = find_parcelport(pp_type, ec);
1276
        return pp ? pp->get_action_parcel_send_count(action, reset) : 0;
1277
    }
1278

1279
    // number of parcels received
1280
    std::int64_t parcelhandler::get_action_parcel_receive_count(
1281
        std::string const& pp_type, std::string const& action, bool reset) const
1282
    {
1283
        error_code ec(throwmode::lightweight);
1284
        parcelport* pp = find_parcelport(pp_type, ec);
1285
        return pp ? pp->get_action_parcel_receive_count(action, reset) : 0;
1286
    }
1287

1288
    // the total time it took for all sender-side serialization operations
1289
    // (nanoseconds)
1290
    std::int64_t parcelhandler::get_action_sending_serialization_time(
1291
        std::string const& pp_type, std::string const& action, bool reset) const
1292
    {
1293
        error_code ec(throwmode::lightweight);
1294
        parcelport* pp = find_parcelport(pp_type, ec);
1295
        return pp ? pp->get_action_sending_serialization_time(action, reset) :
1296
                    0;
1297
    }
1298

1299
    // the total time it took for all receiver-side serialization
1300
    // operations (nanoseconds)
1301
    std::int64_t parcelhandler::get_action_receiving_serialization_time(
1302
        std::string const& pp_type, std::string const& action, bool reset) const
1303
    {
1304
        error_code ec(throwmode::lightweight);
1305
        parcelport* pp = find_parcelport(pp_type, ec);
1306
        return pp ? pp->get_action_receiving_serialization_time(action, reset) :
1307
                    0;
1308
    }
1309

1310
    // total data sent (bytes)
1311
    std::int64_t parcelhandler::get_action_data_sent(
1312
        std::string const& pp_type, std::string const& action, bool reset) const
1313
    {
1314
        error_code ec(throwmode::lightweight);
1315
        parcelport* pp = find_parcelport(pp_type, ec);
1316
        return pp ? pp->get_action_data_sent(action, reset) : 0;
1317
    }
1318

1319
    // total data received (bytes)
1320
    std::int64_t parcelhandler::get_action_data_received(
1321
        std::string const& pp_type, std::string const& action, bool reset) const
1322
    {
1323
        error_code ec(throwmode::lightweight);
1324
        parcelport* pp = find_parcelport(pp_type, ec);
1325
        return pp ? pp->get_action_data_received(action, reset) : 0;
1326
    }
1327
#endif
1328
#endif
1329
    // connection stack statistics
×
1330
    std::int64_t parcelhandler::get_connection_cache_statistics(
1331
        std::string const& pp_type,
1332
        parcelport::connection_cache_statistics_type stat_type,
1333
        bool reset) const
1334
    {
1335
        error_code ec(throwmode::lightweight);
×
1336
        parcelport* pp = find_parcelport(pp_type, ec);
×
1337
        return pp ? pp->get_connection_cache_statistics(stat_type, reset) : 0;
1338
    }
1339

1340
    std::vector<plugins::parcelport_factory_base*>&
50✔
1341
    parcelhandler::get_parcelport_factories()
1342
    {
50✔
1343
        auto& factories = plugins::get_parcelport_factories();
50✔
1344
        if (factories.empty() && hpx::is_networking_enabled())
1345
        {
32✔
1346
            init_static_parcelport_factories(factories);
1347
        }
50✔
1348
        return factories;
1349
    }
1350

6✔
1351
    void parcelhandler::init(
1352
        int* argc, char*** argv, util::command_line_handling& cfg)
1353
    {
1354
        HPX_ASSERT(hpx::is_networking_enabled());
1355

6✔
1356
        for (plugins::parcelport_factory_base* factory :
30✔
1357
            get_parcelport_factories())
1358
        {
18✔
1359
            factory->init(argc, argv, cfg);
1360
        }
6✔
1361
    }
1362

6✔
1363
    void parcelhandler::init(hpx::resource::partitioner& rp)
1364
    {
1365
        HPX_ASSERT(hpx::is_networking_enabled());
1366

6✔
1367
        for (plugins::parcelport_factory_base* factory :
30✔
1368
            get_parcelport_factories())
1369
        {
18✔
1370
            factory->init(rp);
1371
        }
6✔
1372
    }
1373

32✔
1374
    std::vector<std::string> load_runtime_configuration()
1375
    {
32✔
1376
        std::vector<std::string> ini_defs;
1377

32✔
1378
        ini_defs.emplace_back("[hpx.parcel]");
32✔
1379
        ini_defs.emplace_back(
1380
            "address = ${HPX_PARCEL_SERVER_ADDRESS:" HPX_INITIAL_IP_ADDRESS
1381
            "}");
32✔
1382
        ini_defs.emplace_back(
1383
            "port = ${HPX_PARCEL_SERVER_PORT:" HPX_PP_STRINGIZE(
1384
                HPX_INITIAL_IP_PORT) "}");
32✔
1385
        ini_defs.emplace_back(
1386
            "bootstrap = ${HPX_PARCEL_BOOTSTRAP:" HPX_PARCEL_BOOTSTRAP "}");
32✔
1387
        ini_defs.emplace_back(
1388
            "max_connections = ${HPX_PARCEL_MAX_CONNECTIONS:" HPX_PP_STRINGIZE(
1389
                HPX_PARCEL_MAX_CONNECTIONS) "}");
32✔
1390
        ini_defs.emplace_back(
1391
            "max_connections_per_locality = "
1392
            "${HPX_PARCEL_MAX_CONNECTIONS_PER_LOCALITY:" HPX_PP_STRINGIZE(
1393
                HPX_PARCEL_MAX_CONNECTIONS_PER_LOCALITY) "}");
32✔
1394
        ini_defs.emplace_back("max_message_size = "
1395
                              "${HPX_PARCEL_MAX_MESSAGE_SIZE:" HPX_PP_STRINGIZE(
1396
                                  HPX_PARCEL_MAX_MESSAGE_SIZE) "}");
32✔
1397
        ini_defs.emplace_back(
1398
            "max_outbound_message_size = "
1399
            "${HPX_PARCEL_MAX_OUTBOUND_MESSAGE_SIZE:" HPX_PP_STRINGIZE(
1400
                HPX_PARCEL_MAX_OUTBOUND_MESSAGE_SIZE) "}");
32✔
1401
        ini_defs.emplace_back(endian::native == endian::big ?
1402
                "endian_out = ${HPX_PARCEL_ENDIAN_OUT:big}" :
1403
                "endian_out = ${HPX_PARCEL_ENDIAN_OUT:little}");
32✔
1404
        ini_defs.emplace_back(
1405
            "array_optimization = ${HPX_PARCEL_ARRAY_OPTIMIZATION:1}");
32✔
1406
        ini_defs.emplace_back(
1407
            "zero_copy_optimization = ${HPX_PARCEL_ZERO_COPY_OPTIMIZATION:"
1408
            "$[hpx.parcel.array_optimization]}");
32✔
1409
        ini_defs.emplace_back("zero_copy_receive_optimization = "
1410
                              "${HPX_PARCEL_ZERO_COPY_RECEIVE_OPTIMIZATION:"
1411
                              "$[hpx.parcel.zero_copy_optimization]}");
32✔
1412
        ini_defs.emplace_back(
1413
            "async_serialization = ${HPX_PARCEL_ASYNC_SERIALIZATION:1}");
1414
#if defined(HPX_HAVE_PARCEL_COALESCING)
32✔
1415
        ini_defs.emplace_back(
1416
            "message_handlers = ${HPX_PARCEL_MESSAGE_HANDLERS:1}");
1417
#else
1418
        ini_defs.emplace_back(
1419
            "message_handlers = ${HPX_PARCEL_MESSAGE_HANDLERS:0}");
1420
#endif
32✔
1421
        ini_defs.emplace_back(
1422
            "zero_copy_serialization_threshold = "
1423
            "${HPX_PARCEL_ZERO_COPY_SERIALIZATION_THRESHOLD:" HPX_PP_STRINGIZE(
1424
                HPX_ZERO_COPY_SERIALIZATION_THRESHOLD) "}");
32✔
1425
        ini_defs.emplace_back("max_background_threads = "
1426
                              "${HPX_PARCEL_MAX_BACKGROUND_THREADS:-1}");
1427

32✔
1428
        for (plugins::parcelport_factory_base* f :
160✔
1429
            parcelhandler::get_parcelport_factories())
1430
        {
96✔
1431
            f->get_plugin_info(ini_defs);
1432
        }
32✔
1433
        return ini_defs;
×
1434
    }
1435

1436
    ///////////////////////////////////////////////////////////////////////////
408✔
1437
    void parcelhandler::init_parcel(parcel& p)
1438
    {
1439
        // ensure the source locality id is set (if no component id is given)
816✔
1440
        if (!p.source_id())
1441
        {
816✔
1442
            p.set_source_id(hpx::id_type(agas::get_locality(),
1443
                hpx::id_type::management_type::unmanaged));
1444
        }
1445

1446
#if defined(HPX_HAVE_PARCEL_PROFILING)
1447
        // set the current local time for this locality
1448
        p.set_start_time(hpx::chrono::high_resolution_timer::now());
1449

1450
        if (!p.parcel_id())
1451
        {
1452
            error_code ec(throwmode::lightweight);    // ignore all errors
1453
            std::uint32_t locality_id = agas::get_locality_id(ec);
1454
            p.parcel_id() = parcelset::parcel::generate_unique_id(locality_id);
1455
        }
1456
#endif
408✔
1457
    }
1458
}    // namespace hpx::parcelset
1459

1460
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc