• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.0
/libs/full/parcelset/src/parcel.cpp
1
//  Copyright (c) 2007-2025 Hartmut Kaiser
2
//  Copyright (c)      2011 Bryce Lelbach
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
#include <hpx/config.hpp>
9

10
#if defined(HPX_HAVE_NETWORKING)
11
#include <hpx/assert.hpp>
12
#include <hpx/modules/datastructures.hpp>
13
#include <hpx/modules/errors.hpp>
14
#include <hpx/modules/format.hpp>
15
#include <hpx/modules/itt_notify.hpp>
16
#include <hpx/modules/runtime_local.hpp>
17
#include <hpx/modules/serialization.hpp>
18
#include <hpx/modules/threading_base.hpp>
19
#include <hpx/modules/timing.hpp>
20

21
#include <hpx/actions/transfer_action.hpp>
22
#include <hpx/actions_base/detail/action_factory.hpp>
23
#include <hpx/components_base/agas_interface.hpp>
24
#include <hpx/components_base/component_type.hpp>
25
#include <hpx/naming/detail/preprocess_gid_types.hpp>
26
#include <hpx/parcelset/parcel.hpp>
27
#include <hpx/parcelset/parcelhandler.hpp>
28
#include <hpx/parcelset_base/parcel_interface.hpp>
29

30
#include <cstddef>
31
#include <cstdint>
32
#include <memory>
33
#include <string>
34
#include <type_traits>
35
#include <utility>
36

37
///////////////////////////////////////////////////////////////////////////////
38
namespace hpx::parcelset::detail {
39

414✔
40
    parcel_data::parcel_data()
41
      : source_id_(naming::invalid_gid)
42
      , dest_(naming::invalid_gid)
43
#if defined(HPX_HAVE_PARCEL_PROFILING)
44
      , start_time_(0)
45
      , creation_time_(chrono::high_resolution_timer::now())
46

47
#endif
414✔
48
      , has_continuation_(false)
49
    {
50
    }
51

52
    parcel_data::parcel_data(
414✔
53
        naming::gid_type&& dest, naming::address&& addr, bool has_continuation)
54
      : source_id_(naming::invalid_gid)
55
      , dest_(HPX_MOVE(dest))
56
      , addr_(HPX_MOVE(addr))
57
#if defined(HPX_HAVE_PARCEL_PROFILING)
58
      , start_time_(0)
59
      , creation_time_(chrono::high_resolution_timer::now())
60
#endif
414✔
61
      , has_continuation_(has_continuation)
62
    {
63
    }
64

65
    parcel_data::parcel_data(parcel_data&& rhs) noexcept
66
      : source_id_(HPX_MOVE(rhs.source_id_))
67
      , dest_(HPX_MOVE(rhs.dest_))
68
      , addr_(HPX_MOVE(rhs.addr_))
69
#if defined(HPX_HAVE_PARCEL_PROFILING)
70
      , parcel_id_(HPX_MOVE(rhs.parcel_id_))
71
      , start_time_(rhs.start_time_)
72
      , creation_time_(rhs.creation_time_)
73
#endif
74
      , has_continuation_(rhs.has_continuation_)
75
    {
76
        rhs.source_id_ = naming::invalid_gid;
77
        rhs.dest_ = naming::invalid_gid;
78
        rhs.addr_ = naming::address();
79

80
#if defined(HPX_HAVE_PARCEL_PROFILING)
81
        rhs.parcel_id_ = naming::invalid_gid;
82
        rhs.start_time_ = 0;
83
        rhs.creation_time_ = 0;
84
#endif
85
    }
86

×
87
    parcel_data& parcel_data::operator=(parcel_data&& rhs) noexcept
88
    {
×
89
        source_id_ = HPX_MOVE(rhs.source_id_);
×
90
        dest_ = HPX_MOVE(rhs.dest_);
91
        addr_ = HPX_MOVE(rhs.addr_);
92
#if defined(HPX_HAVE_PARCEL_PROFILING)
93
        parcel_id_ = HPX_MOVE(rhs.parcel_id_);
94
        start_time_ = rhs.start_time_;
95
        creation_time_ = rhs.creation_time_;
96
#endif
×
97
        has_continuation_ = rhs.has_continuation_;
98

99
        rhs.source_id_ = naming::invalid_gid;
100
        rhs.dest_ = naming::invalid_gid;
101
        rhs.addr_ = naming::address();
102
#if defined(HPX_HAVE_PARCEL_PROFILING)
103
        rhs.parcel_id_ = naming::invalid_gid;
104
        rhs.start_time_ = 0;
105
        rhs.creation_time_ = 0;
106
#endif
×
107
        return *this;
108
    }
109

×
110
    void parcel_data::serialize(serialization::input_archive& ar, unsigned)
111
    {
×
112
        ar >> source_id_;
×
113
        ar >> dest_;
×
114
        ar >> addr_;
115

116
#if defined(HPX_HAVE_PARCEL_PROFILING)
117
        ar >> parcel_id_;
118
        ar >> start_time_;
119
        ar >> creation_time_;
120
#endif
121

×
122
        ar >> has_continuation_;
×
123
    }
124

×
125
    void parcel_data::serialize(
126
        serialization::output_archive& ar, unsigned) const
127
    {
×
128
        ar << source_id_;
×
129
        ar << dest_;
×
130
        ar << addr_;
131

132
#if defined(HPX_HAVE_PARCEL_PROFILING)
133
        ar << parcel_id_;
134
        ar << start_time_;
135
        ar << creation_time_;
136
#endif
137

138
        ar << has_continuation_;
×
139
    }
140

141
    ///////////////////////////////////////////////////////////////////////////
142
#if defined(HPX_DEBUG)
143
    bool parcel::is_valid() const
144
    {
145
        // empty parcels are always valid
146
#if defined(HPX_HAVE_PARCEL_PROFILING)
147
        if (0.0 == data_.creation_time_)    //-V550
148
            return true;
149
#endif
150

151
        // verify target destination
152
        if (data_.dest_ && data_.addr_.locality_)
153
        {
154
            // if we have a destination we need an action as well
155
            if (!action_)
156
            {
157
                return false;
158
            }
159
        }
160

161
        // verify that the action targets the correct type
162
        if (action_ &&
163
            data_.addr_.type_ !=
164
                to_int(hpx::components::component_enum_type::invalid))
165
        {
166
            int const type = action_->get_component_type();
167
            if (!components::types_are_compatible(type, data_.addr_.type_))
168
            {
169
                return false;
170
            }
171
        }
172

173
        return true;
174
    }
×
175
#else
176
    // Only used in debug mode.
×
177
    bool parcel::is_valid() const
178
    {
179
        return true;
180
    }
414✔
181
#endif
414✔
182

414✔
183
    parcel::parcel()
184
      : size_(0)
414✔
185
      , num_chunks_(0)
186
    {
828✔
187
    }
188

414✔
189
    parcel::~parcel() = default;
414✔
190

414✔
191
    parcel::parcel(naming::gid_type&& dest, naming::address&& addr,
192
        std::unique_ptr<actions::base_action> act)
414✔
193
      : data_(HPX_MOVE(dest), HPX_MOVE(addr), act->has_continuation())
414✔
194
      , action_(HPX_MOVE(act))
195
      , size_(0)
414✔
196
      , num_chunks_(0)
197
    {
×
198
    }
199

×
200
    void parcel::reset()
201
    {
×
202
        data_ = detail::parcel_data();
203
        action_.reset();
×
204
    }
205

×
206
    char const* parcel::get_action_name() const
207
    {
208
        return action_->get_action_name();
×
209
    }
210

×
211
    int parcel::get_component_type() const
212
    {
213
        return action_->get_component_type();
×
214
    }
215

×
216
    int parcel::get_action_type() const
217
    {
218
        return static_cast<int>(action_->get_action_type());
408✔
219
    }
220

408✔
221
    hpx::id_type parcel::source_id() const
222
    {
223
        return {data_.source_id_, hpx::id_type::management_type::unmanaged};
408✔
224
    }
225

408✔
226
    void parcel::set_source_id(hpx::id_type const& source_id)
227
    {
408✔
228
        if (source_id != hpx::invalid_id)
229
        {
408✔
230
            data_.source_id_ = source_id.get_gid();
231
        }
×
232
    }
233

×
234
    void parcel::set_destination_id(naming::gid_type&& dest)
235
    {
×
236
        data_.dest_ = dest;
237
        HPX_ASSERT(is_valid());
408✔
238
    }
239

240
    naming::gid_type const& parcel::destination() const
408✔
241
    {
242
        HPX_ASSERT(is_valid());
243
        return data_.dest_;
414✔
244
    }
245

414✔
246
    naming::address const& parcel::addr() const
247
    {
248
        return data_.addr_;
408✔
249
    }
250

408✔
251
    naming::address& parcel::addr()
252
    {
253
        return data_.addr_;
×
254
    }
255

×
256
    std::uint32_t parcel::destination_locality_id() const
257
    {
258
        return naming::get_locality_id_from_gid(destination_locality());
414✔
259
    }
260

414✔
261
    naming::gid_type const& parcel::destination_locality() const
262
    {
263
        return addr().locality_;
×
264
    }
265

266
    double parcel::start_time() const
267
    {
268
#if defined(HPX_HAVE_PARCEL_PROFILING)
×
269
        return data_.start_time_;
270
#else
271
        return 0.0;
272
#endif
×
273
    }
274

275
    void parcel::set_start_time(double time)
276
    {
277
#if defined(HPX_HAVE_PARCEL_PROFILING)
278
        data_.start_time_ = time;
279
#else
×
280
        HPX_UNUSED(time);
281
#endif
×
282
    }
283

284
    double parcel::creation_time() const
285
    {
286
#if defined(HPX_HAVE_PARCEL_PROFILING)
×
287
        return data_.creation_time_;
288
#else
289
        return 0.0;
290
#endif
×
291
    }
292

×
293
    threads::thread_priority parcel::get_thread_priority() const
294
    {
295
        return action_->get_thread_priority();
×
296
    }
297

×
298
    threads::thread_stacksize parcel::get_thread_stacksize() const
299
    {
300
        return action_->get_thread_stacksize();
×
301
    }
302

×
303
    std::uint32_t parcel::get_parent_locality_id() const
304
    {
305
        return action_->get_parent_locality_id();
×
306
    }
307

×
308
    threads::thread_id_type parcel::get_parent_thread_id() const
309
    {
310
        return action_->get_parent_thread_id();
×
311
    }
312

×
313
    std::uint64_t parcel::get_parent_thread_phase() const
314
    {
315
        return action_->get_parent_thread_phase();
316
    }
317

318
#if defined(HPX_HAVE_PARCEL_PROFILING)
319
    naming::gid_type const& parcel::parcel_id() const
320
    {
321
        return data_.parcel_id_;
322
    }
323

324
    naming::gid_type& parcel::parcel_id()
325
    {
326
        return data_.parcel_id_;
327
    }
328
#endif
414✔
329

330
#if defined(HPX_HAVE_NETWORKING)
331
    serialization::binary_filter* parcel::get_serialization_filter() const
414✔
332
    {
414✔
333
        hpx::optional<parcelset::parcel> const p =
334
            action_->get_embedded_parcel();
414✔
335
        if (!p)
336
        {
×
337
            return action_->get_serialization_filter();
338
        }
339
        return p->get_serialization_filter();
375✔
340
    }
341

342
    policies::message_handler* parcel::get_message_handler(
343
        locality const& loc) const
375✔
344
    {
375✔
345
        hpx::optional<parcelset::parcel> const p =
346
            action_->get_embedded_parcel();
375✔
347
        if (!p)
348
        {
×
349
            return action_->get_message_handler(loc);
350
        }
351
        return p->get_message_handler(loc);
352
    }
816✔
353
#endif
354

816✔
355
    bool parcel::does_termination_detection() const
356
    {
357
        return action_ ? action_->does_termination_detection() : false;
414✔
358
    }
359

360
    parcel::split_gids_type parcel::move_split_gids() const
361
    {
414✔
362
        split_gids_type gids;
363
        std::swap(gids, split_gids_);
364
        return gids;
×
365
    }
366

367
    void parcel::set_split_gids(parcel::split_gids_type&& split_gids)
368
    {
×
369
        HPX_ASSERT(split_gids_.empty());
370
        split_gids_ = HPX_MOVE(split_gids);
×
371
    }
372

×
373
    std::size_t parcel::num_chunks() const
374
    {
375
        return num_chunks_;
837✔
376
    }
377

837✔
378
    std::size_t& parcel::num_chunks()
379
    {
380
        return num_chunks_;
×
381
    }
382

×
383
    std::size_t parcel::size() const
384
    {
385
        return size_;
837✔
386
    }
387

837✔
388
    std::size_t& parcel::size()
389
    {
390
        return size_;
391
    }
414✔
392

393
    std::pair<naming::address_type, naming::component_type>
414✔
394
    parcel::determine_lva() const
395
    {
396
        int comptype = action_->get_component_type();
414✔
397

398
        // decode the local virtual address of the parcel
399
        naming::address::address_type lva = data_.addr_.address_;
400

414✔
401
        // by convention, a zero address references either the local
402
        // runtime support component or one of the AGAS components
57✔
403
        if (nullptr == lva)
404
        {
33✔
405
            switch (static_cast<components::component_enum_type>(comptype))
33✔
406
            {
407
            case components::component_enum_type::runtime_support:
408
                lva = agas::get_runtime_support_lva();
×
409
                break;
×
410

411
            case components::component_enum_type::agas_primary_namespace:
412
                lva = agas::get_primary_ns_lva();
15✔
413
                break;
15✔
414

415
            case components::component_enum_type::agas_symbol_namespace:
416
                lva = agas::get_symbol_ns_lva();
417
                break;
418

419
            case components::component_enum_type::plain_function:
420
                break;
421

422
            default:
423
                HPX_ASSERT(false);
424
            }
425
        }
426

414✔
427
        // make sure the component_type of the action matches the
428
        // component type in the destination address
429
        if (HPX_UNLIKELY(
×
430
                !components::types_are_compatible(data_.addr_.type_, comptype)))
431
        {
432
            HPX_THROW_EXCEPTION(hpx::error::bad_component_type,
433
                "parcel::determine_lva",
434
                " types are not compatible: destination_type({}) "
435
                "action_type({}) parcel ({})",
436
                data_.addr_.type_, comptype, *this);
414✔
437
        }
438

439
        return std::make_pair(lva, comptype);
414✔
440
    }
441

442
    bool parcel::load_schedule(serialization::input_archive& ar,
414✔
443
        std::size_t num_thread, bool& deferred_schedule)
444
    {
445
        load_data(ar);
446

447
        // make sure this parcel destination matches the proper locality
448
        HPX_ASSERT(destination_locality() == data_.addr_.locality_);
414✔
449

450
        std::pair<naming::address_type, naming::component_type> const p =
451
            determine_lva();
414✔
452

414✔
453
        // make sure the target has not been migrated away
454
        auto const r = action_->was_object_migrated(data_.dest_, p.first);
455
        if (r.first)
×
456
        {
457
            // If the object was migrated, just load the action and return.
458
            action_->load(ar);
459
            return true;
460
        }
461

462
        // schedule later if this is de-serialized with zero-copy semantics
463
        if (ar.try_get_extra_data<
×
464
                serialization::detail::allow_zero_copy_receive>() != nullptr)
465
        {
466
            action_->load(ar);
467
            return false;
468
        }
414✔
469

470
        // continuation support, this is handled in the transfer action
471
        action_->load_schedule(ar, HPX_MOVE(data_.dest_), p.first, p.second,
472
            num_thread, deferred_schedule);
473

474
#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX)
475
        static util::itt::event parcel_recv("recv_parcel");
476
        util::itt::event_tick(parcel_recv);
477
#endif
478

479
#if defined(HPX_HAVE_APEX) && defined(HPX_HAVE_PARCEL_PROFILING)
480
        // tell APEX about the received parcel
481
        util::external_timer::recv(data_.parcel_id_.get_lsb(), size_,
482
            naming::get_locality_id_from_gid(data_.source_id_),
483
            reinterpret_cast<std::uint64_t>(
484
                action_->get_parent_thread_id().get()));
485
#endif
486

487
        return false;
×
488
    }
489

490
    bool parcel::schedule_action(std::size_t num_thread)
491
    {
492
        // make sure this parcel destination matches the proper locality
493
        HPX_ASSERT(destination_locality() == data_.addr_.locality_);
×
494

495
        std::pair<naming::address_type, naming::component_type> const p =
496
            determine_lva();
×
497

×
498
        // make sure the target has not been migrated away
499
        auto const r = action_->was_object_migrated(data_.dest_, p.first);
500
        if (r.first)
501
        {
502
            // If the object was migrated, just route.
503
            return true;
504
        }
505

×
506
        // dispatch action, register work item either with or without
×
507
        // continuation support, this is handled in the transfer action
508
        action_->schedule_thread(
509
            HPX_MOVE(data_.dest_), p.first, p.second, num_thread);
510
        return false;
414✔
511
    }
512

513
    void parcel::load_data(serialization::input_archive& ar)
414✔
514
    {
515
        using hpx::actions::detail::action_registry;
516
        ar >> data_;
517

518
        std::uint32_t id;
519
        ar >> id;
414✔
520

521
#if !defined(HPX_DEBUG)
522
        action_.reset(action_registry::create(id, data_.has_continuation_));
523
#else
524
        std::string name;
525
        ar >> name;
526
        action_.reset(
414✔
527
            action_registry::create(id, data_.has_continuation_, &name));
528
#endif
×
529
    }
530

×
531
    void parcel::load(serialization::input_archive& ar, unsigned)
×
532
    {
×
533
        load_data(ar);
534
        action_->load(ar);
837✔
535
    }
536

537
    void parcel::save_data(serialization::output_archive& ar) const
837✔
538
    {
539
        using hpx::serialization::access;
837✔
540
        ar << data_;
541

542
        std::uint32_t const id = action_->get_action_id();
543
        ar << id;
544

545
#if defined(HPX_DEBUG)
546
        std::string const name(action_->get_action_name());
837✔
547
        ar << name;
548
#endif
837✔
549
    }
550

837✔
551
    void parcel::save(serialization::output_archive& ar, unsigned) const
837✔
552
    {
837✔
553
        save_data(ar);
554
        action_->save(ar);
×
555
    }
556

×
557
    std::ostream& operator<<(std::ostream& os, parcel const& p)
×
558
    {
559
        return hpx::util::format_to(
560
            os, "({}:{}:{})", p.destination(), p.addr(), p.get_action_name());
561
    }
562
}    // namespace hpx::parcelset::detail
563

564
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc