• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.54
/libs/full/parcelset_base/src/parcelport.cpp
1
//  Copyright (c) 2007-2025 Hartmut Kaiser
2
//  Copyright (c) 2013-2014 Thomas Heller
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
// This is needed to make everything work with the Intel MPI library header
9
#include <hpx/config.hpp>
10

11
#if defined(HPX_HAVE_NETWORKING)
12
#include <hpx/assert.hpp>
13
#include <hpx/modules/errors.hpp>
14
#include <hpx/modules/io_service.hpp>
15
#include <hpx/modules/runtime_configuration.hpp>
16
#include <hpx/modules/runtime_local.hpp>
17
#include <hpx/modules/threading.hpp>
18
#include <hpx/modules/util.hpp>
19
#if defined(HPX_HAVE_APEX)
20
#include <hpx/modules/threading_base.hpp>
21
#endif
22

23
#include <hpx/parcelset_base/parcelport.hpp>
24

25
#include <cstddef>
26
#include <cstdint>
27
#include <exception>
28
#include <string>
29
#include <system_error>
30
#include <utility>
31

32
namespace hpx::parcelset {
33

34
    ///////////////////////////////////////////////////////////////////////////
35
    parcelport::parcelport(util::runtime_configuration const& ini,
18✔
36
        locality here, std::string const& type,
37
        std::size_t zero_copy_serialization_threshold)
18✔
38
      : num_parcel_destinations_(0)
39
      , here_(HPX_MOVE(here))
18✔
40
      , max_inbound_message_size_(0)
18✔
41
      , max_outbound_message_size_(0)
18✔
42
      , allow_array_optimizations_(true)
18✔
43
      , allow_zero_copy_optimizations_(true)
18✔
44
      , allow_zero_copy_receive_optimizations_(true)
18✔
45
      , async_serialization_(false)
18✔
46
      , priority_(hpx::util::get_entry_as<int>(
18✔
47
            ini, "hpx.parcel." + type + ".priority", 0))
18✔
48
      , type_(type)
18✔
49
      , zero_copy_serialization_threshold_(zero_copy_serialization_threshold)
18✔
50
    {
18✔
51
        std::string key("hpx.parcel.");
18✔
52
        key += type;
53

18✔
54
        // clang-format off
55
        max_inbound_message_size_ = static_cast<std::int64_t>(
56
            ini.get_max_inbound_message_size(type));
36✔
57
        max_outbound_message_size_ = static_cast<std::int64_t>(
58
            ini.get_max_outbound_message_size(type));
59
        // clang-format on
×
60

×
61
        if (hpx::util::get_entry_as<int>(ini, key + ".array_optimization", 1) ==
×
62
            0)
63
        {
18✔
64
            allow_array_optimizations_ = false;
36✔
65
            allow_zero_copy_optimizations_ = false;
66
            allow_zero_copy_receive_optimizations_ = false;
×
67
        }
×
68
        else if (hpx::util::get_entry_as<int>(
69
                     ini, key + ".zero_copy_optimization", 1) == 0)
18✔
70
        {
36✔
71
            allow_zero_copy_optimizations_ = false;
72
            allow_zero_copy_receive_optimizations_ = false;
×
73
        }
74
        else if (hpx::util::get_entry_as<int>(
75
                     ini, key + ".zero_copy_receive_optimization", 1) == 0)
18✔
76
        {
36✔
77
            allow_zero_copy_receive_optimizations_ = false;
78
        }
18✔
79

80
        if (hpx::util::get_entry_as<int>(
18✔
81
                ini, key + ".async_serialization", 0) != 0)
82
        {
18✔
83
            async_serialization_ = true;
84
        }
18✔
85
    }
86

87
    int parcelport::priority() const noexcept
464✔
88
    {
89
        return priority_;
464✔
90
    }
91

92
    std::string const& parcelport::type() const noexcept
558✔
93
    {
94
        return type_;
95
    }
558✔
96

97
    std::size_t parcelport::get_zero_copy_serialization_threshold()
98
        const noexcept
10✔
99
    {
100
        return zero_copy_serialization_threshold_;
10✔
101
    }
102

103
    locality const& parcelport::here() const noexcept
8✔
104
    {
105
        return here_;
408✔
106
    }
107

108
    void parcelport::initialized() {}
408✔
109

110
    bool parcelport::can_connect(
111
        locality const&, bool use_alternative_parcelport)
112
    {
113
        return use_alternative_parcelport || can_bootstrap();
114
    }
115

116
    ///////////////////////////////////////////////////////////////////////////
117
    // Update performance counter data
118
#if defined(HPX_HAVE_PARCELPORT_COUNTERS)
119
    void parcelport::add_received_data(parcelset::data_point const& data)
120
    {
121
        parcels_received_.add_data(data);
122
    }
123

124
    void parcelport::add_sent_data(parcelset::data_point const& data)
125
    {
126
        parcels_sent_.add_data(data);
127
    }
128
#endif
129
#if defined(HPX_HAVE_PARCELPORT_COUNTERS) &&                                   \
130
    defined(HPX_HAVE_PARCELPORT_ACTION_COUNTERS)
131
    void parcelport::add_received_data(
132
        char const* action, parcelset::data_point const& data)
133
    {
134
        action_parcels_received_.add_data(action, data);
135
    }
136

137
    void parcelport::add_sent_data(
138
        char const* action, parcelset::data_point const& data)
139
    {
140
        action_parcels_sent_.add_data(action, data);
141
    }
142
#endif
143

144
    ///////////////////////////////////////////////////////////////////////////
145
    // number of parcels sent
146
#if defined(HPX_HAVE_PARCELPORT_COUNTERS)
147
    std::int64_t parcelport::get_parcel_send_count(bool reset)
148
    {
149
        return parcels_sent_.num_parcels(reset);
150
    }
151

152
    // number of messages sent
153
    std::int64_t parcelport::get_message_send_count(bool reset)
154
    {
155
        return parcels_sent_.num_messages(reset);
156
    }
157

158
    // number of parcels received
159
    std::int64_t parcelport::get_parcel_receive_count(bool reset)
160
    {
161
        return parcels_received_.num_parcels(reset);
162
    }
163

164
    // number of messages received
165
    std::int64_t parcelport::get_message_receive_count(bool reset)
166
    {
167
        return parcels_received_.num_messages(reset);
168
    }
169

170
    // the total time it took for all sends, from async_write to the
171
    // completion handler (nanoseconds)
172
    std::int64_t parcelport::get_sending_time(bool reset)
173
    {
174
        return parcels_sent_.total_time(reset);
175
    }
176

177
    // the total time it took for all receives, from async_read to the
178
    // completion handler (nanoseconds)
179
    std::int64_t parcelport::get_receiving_time(bool reset)
180
    {
181
        return parcels_received_.total_time(reset);
182
    }
183

184
    // the total time it took for all sender-side serialization operations
185
    // (nanoseconds)
186
    std::int64_t parcelport::get_sending_serialization_time(bool reset)
187
    {
188
        return parcels_sent_.total_serialization_time(reset);
189
    }
190

191
    // the total time it took for all receiver-side serialization
192
    // operations (nanoseconds)
193
    std::int64_t parcelport::get_receiving_serialization_time(bool reset)
194
    {
195
        return parcels_received_.total_serialization_time(reset);
196
    }
197

198
    // total data sent (bytes)
199
    std::int64_t parcelport::get_data_sent(bool reset)
200
    {
201
        return parcels_sent_.total_bytes(reset);
202
    }
203

204
    // total data (uncompressed) sent (bytes)
205
    std::int64_t parcelport::get_raw_data_sent(bool reset)
206
    {
207
        return parcels_sent_.total_raw_bytes(reset);
208
    }
209

210
    // total data received (bytes)
211
    std::int64_t parcelport::get_data_received(bool reset)
212
    {
213
        return parcels_received_.total_bytes(reset);
214
    }
215

216
    // total data (uncompressed) received (bytes)
217
    std::int64_t parcelport::get_raw_data_received(bool reset)
218
    {
219
        return parcels_received_.total_raw_bytes(reset);
220
    }
221

222
    std::int64_t parcelport::get_buffer_allocate_time_sent(bool reset)
223
    {
224
        return parcels_sent_.total_buffer_allocate_time(reset);
225
    }
226

227
    std::int64_t parcelport::get_buffer_allocate_time_received(bool reset)
228
    {
229
        return parcels_received_.total_buffer_allocate_time(reset);
230
    }
231

232
    //// total zero-copy chunks sent
233
    std::int64_t parcelport::get_zchunks_send_count(bool reset)
234
    {
235
        return parcels_sent_.num_zchunks(reset);
236
    }
237

238
    //// total zero-copy chunks received
239
    std::int64_t parcelport::get_zchunks_recv_count(bool reset)
240
    {
241
        return parcels_received_.num_zchunks(reset);
242
    }
243

244
    //// the maximum number of zero-copy chunks per message sent
245
    std::int64_t parcelport::get_zchunks_send_per_msg_count_max(bool reset)
246
    {
247
        return parcels_sent_.num_zchunks_per_msg_max(reset);
248
    }
249

250
    //// the maximum number of zero-copy chunks per message received
251
    std::int64_t parcelport::get_zchunks_recv_per_msg_count_max(bool reset)
252
    {
253
        return parcels_received_.num_zchunks_per_msg_max(reset);
254
    }
255

256
    //// the size of zero-copy chunks per message sent
257
    std::int64_t parcelport::get_zchunks_send_size(bool reset)
258
    {
259
        return parcels_sent_.size_zchunks_total(reset);
260
    }
261

262
    //// the size of zero-copy chunks per message received
263
    std::int64_t parcelport::get_zchunks_recv_size(bool reset)
264
    {
265
        return parcels_received_.size_zchunks_total(reset);
266
    }
267

268
    //// the maximum size of zero-copy chunks per message sent
269
    std::int64_t parcelport::get_zchunks_send_size_max(bool reset)
270
    {
271
        return parcels_sent_.size_zchunks_max(reset);
272
    }
273

274
    //// the maximum size of zero-copy chunks per message received
275
    std::int64_t parcelport::get_zchunks_recv_size_max(bool reset)
276
    {
277
        return parcels_received_.size_zchunks_max(reset);
278
    }
279
#endif
280
    ///////////////////////////////////////////////////////////////////////////
281
#if defined(HPX_HAVE_PARCELPORT_COUNTERS) &&                                   \
282
    defined(HPX_HAVE_PARCELPORT_ACTION_COUNTERS)
283
    // same as above, just separated data for each action
284
    // number of parcels sent
285
    std::int64_t parcelport::get_action_parcel_send_count(
286
        std::string const& action, bool reset)
287
    {
288
        if (action.empty())
289
            return parcels_sent_.num_parcels(reset);
290
        return action_parcels_sent_.num_parcels(action, reset);
291
    }
292

293
    // number of parcels received
294
    std::int64_t parcelport::get_action_parcel_receive_count(
295
        std::string const& action, bool reset)
296
    {
297
        if (action.empty())
298
            return parcels_received_.num_parcels(reset);
299
        return action_parcels_received_.num_parcels(action, reset);
300
    }
301

302
    // the total time it took for all sender-side serialization operations
303
    // (nanoseconds)
304
    std::int64_t parcelport::get_action_sending_serialization_time(
305
        std::string const& action, bool reset)
306
    {
307
        if (action.empty())
308
            return parcels_sent_.total_serialization_time(reset);
309
        return action_parcels_sent_.total_serialization_time(action, reset);
310
    }
311

312
    // the total time it took for all receiver-side serialization
313
    // operations (nanoseconds)
314
    std::int64_t parcelport::get_action_receiving_serialization_time(
315
        std::string const& action, bool reset)
316
    {
317
        if (action.empty())
318
            return parcels_received_.total_serialization_time(reset);
319
        return action_parcels_received_.total_serialization_time(action, reset);
320
    }
321

322
    // total data sent (bytes)
323
    std::int64_t parcelport::get_action_data_sent(
324
        std::string const& action, bool reset)
325
    {
326
        if (action.empty())
327
            return parcels_sent_.total_bytes(reset);
328
        return action_parcels_sent_.total_bytes(action, reset);
329
    }
330

331
    // total data received (bytes)
332
    std::int64_t parcelport::get_action_data_received(
333
        std::string const& action, bool reset)
334
    {
335
        if (action.empty())
70✔
336
            return parcels_received_.total_bytes(reset);
337
        return action_parcels_received_.total_bytes(action, reset);
70✔
338
    }
339
#endif
98✔
340
    std::int64_t parcelport::get_pending_parcels_count(bool /*reset*/)
341
    {
28✔
342
        std::lock_guard<hpx::spinlock> l(mtx_);
343
        std::int64_t count = 0;
344
        for (auto&& p : pending_parcels_)
345
        {
70✔
346
            count += static_cast<std::int64_t>(hpx::get<0>(p.second).size());
347
            HPX_ASSERT(
348
                hpx::get<0>(p.second).size() == hpx::get<1>(p.second).size());
349
        }
×
350
        return count;
351
    }
×
352

353
    ///////////////////////////////////////////////////////////////////////////
354
    std::int64_t get_max_inbound_size(parcelport const& pp)
9✔
355
    {
356
        return pp.get_max_inbound_message_size();
9✔
357
    }
358

359
    std::int64_t parcelport::get_max_inbound_message_size() const noexcept
414✔
360
    {
361
        return max_inbound_message_size_;
414✔
362
    }
363

364
    std::int64_t parcelport::get_max_outbound_message_size() const noexcept
18✔
365
    {
366
        return max_outbound_message_size_;
18✔
367
    }
368

369
    bool parcelport::allow_array_optimizations() const noexcept
18✔
370
    {
371
        return allow_array_optimizations_;
18✔
372
    }
373

374
    bool parcelport::allow_zero_copy_optimizations() const noexcept
294✔
375
    {
376
        return allow_zero_copy_optimizations_;
294✔
377
    }
378

379
    bool parcelport::allow_zero_copy_receive_optimizations() const noexcept
×
380
    {
381
        return allow_zero_copy_receive_optimizations_;
×
382
    }
383

384
    bool parcelport::async_serialization() const noexcept
385
    {
386
        return async_serialization_;
6✔
387
    }
388

389
    ///////////////////////////////////////////////////////////////////////////
6✔
390
    // the code below is needed to bootstrap the parcel layer
391
    void parcelport::early_pending_parcel_handler(
392
        std::error_code const& ec, parcel const& p)
393
    {
×
394
        if (ec)
395
        {
396
            // all errors during early parcel handling are fatal
397
            std::exception_ptr const exception =
398
                HPX_GET_EXCEPTION(ec, "early_pending_parcel_handler",
×
399
                    "error while handling early parcel: " + ec.message() + "(" +
400
                        std::to_string(ec.value()) + ")" +
401
                        parcelset::dump_parcel(p));
402

403
            hpx::report_error(exception);
404
            return;
405
        }
406

407
#if defined(HPX_HAVE_APEX) && defined(HPX_HAVE_PARCEL_PROFILING)
408
        // tell APEX about the parcel sent
409
        util::external_timer::send(
410
            p.parcel_id().get_lsb(), p.size(), p.destination_locality_id());
411
#endif
412
    }
413

414
}    // namespace hpx::parcelset
415

416
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc