• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #868

16 Jan 2023 08:21PM UTC coverage: 86.487%. Remained the same
#868

push

StellarBot
Merge #6137

6137: Adding example of a simple master/slave distributed application r=hkaiser a=hkaiser

The purpose of this example is to demonstrate how HPX actions can be used to build a simple master-slave application. The master (locality 0) assigns work to the slaves (all other localities). Note that if this application is run on one locality only it uses the same locality for the master and the slave functionalities.

The slaves receive a message that encodes how many sub-tasks of a certain type they should spawn locally.


Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

72 of 72 new or added lines in 1 file covered. (100.0%)

174663 of 201952 relevant lines covered (86.49%)

1849169.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.88
/libs/full/parcelset/src/parcel.cpp
1
//  Copyright (c) 2007-2021 Hartmut Kaiser
2
//  Copyright (c)      2011 Bryce Lelbach
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
#include <hpx/config.hpp>
9

10
#if defined(HPX_HAVE_NETWORKING)
11
#include <hpx/assert.hpp>
12
#include <hpx/modules/datastructures.hpp>
13
#include <hpx/modules/format.hpp>
14
#include <hpx/modules/itt_notify.hpp>
15
#include <hpx/modules/runtime_local.hpp>
16
#include <hpx/modules/serialization.hpp>
17
#include <hpx/modules/threading_base.hpp>
18
#include <hpx/modules/timing.hpp>
19

20
#include <hpx/actions/transfer_action.hpp>
21
#include <hpx/actions_base/detail/action_factory.hpp>
22
#include <hpx/components_base/agas_interface.hpp>
23
#include <hpx/components_base/component_type.hpp>
24
#include <hpx/naming/detail/preprocess_gid_types.hpp>
25
#include <hpx/parcelset/parcel.hpp>
26
#include <hpx/parcelset/parcelhandler.hpp>
27
#include <hpx/parcelset_base/parcel_interface.hpp>
28

29
#include <cstddef>
30
#include <cstdint>
31
#include <memory>
32
#include <string>
33
#include <type_traits>
34
#include <utility>
35

36
///////////////////////////////////////////////////////////////////////////////
37
namespace hpx::parcelset::detail {
38

39
    parcel_data::parcel_data()
475,099✔
40
      : source_id_(naming::invalid_gid)
475,099✔
41
      , dest_(naming::invalid_gid)
475,099✔
42
#if defined(HPX_HAVE_PARCEL_PROFILING)
43
      , start_time_(0)
44
      , creation_time_(chrono::high_resolution_timer::now())
45

46
#endif
47
      , has_continuation_(false)
475,099✔
48
    {
49
    }
475,099✔
50

51
    parcel_data::parcel_data(
470,181✔
52
        naming::gid_type&& dest, naming::address&& addr, bool has_continuation)
53
      : source_id_(naming::invalid_gid)
470,181✔
54
      , dest_(HPX_MOVE(dest))
470,181✔
55
      , addr_(HPX_MOVE(addr))
470,181✔
56
#if defined(HPX_HAVE_PARCEL_PROFILING)
57
      , start_time_(0)
58
      , creation_time_(chrono::high_resolution_timer::now())
59
#endif
60
      , has_continuation_(has_continuation)
470,181✔
61
    {
62
    }
470,181✔
63

64
    parcel_data::parcel_data(parcel_data&& rhs) noexcept
65
      : source_id_(HPX_MOVE(rhs.source_id_))
66
      , dest_(HPX_MOVE(rhs.dest_))
67
      , addr_(HPX_MOVE(rhs.addr_))
68
#if defined(HPX_HAVE_PARCEL_PROFILING)
69
      , parcel_id_(HPX_MOVE(rhs.parcel_id_))
70
      , start_time_(rhs.start_time_)
71
      , creation_time_(rhs.creation_time_)
72
#endif
73
      , has_continuation_(rhs.has_continuation_)
74
    {
75
        rhs.source_id_ = naming::invalid_gid;
76
        rhs.dest_ = naming::invalid_gid;
77
        rhs.addr_ = naming::address();
78

79
#if defined(HPX_HAVE_PARCEL_PROFILING)
80
        rhs.parcel_id_ = naming::invalid_gid;
81
        rhs.start_time_ = 0;
82
        rhs.creation_time_ = 0;
83
#endif
84
    }
85

86
    parcel_data& parcel_data::operator=(parcel_data&& rhs) noexcept
×
87
    {
88
        source_id_ = HPX_MOVE(rhs.source_id_);
×
89
        dest_ = HPX_MOVE(rhs.dest_);
×
90
        addr_ = HPX_MOVE(rhs.addr_);
×
91
#if defined(HPX_HAVE_PARCEL_PROFILING)
92
        parcel_id_ = HPX_MOVE(rhs.parcel_id_);
93
        start_time_ = rhs.start_time_;
94
        creation_time_ = rhs.creation_time_;
95
#endif
96
        has_continuation_ = rhs.has_continuation_;
×
97

98
        rhs.source_id_ = naming::invalid_gid;
×
99
        rhs.dest_ = naming::invalid_gid;
×
100
        rhs.addr_ = naming::address();
×
101
#if defined(HPX_HAVE_PARCEL_PROFILING)
102
        rhs.parcel_id_ = naming::invalid_gid;
103
        rhs.start_time_ = 0;
104
        rhs.creation_time_ = 0;
105
#endif
106
        return *this;
×
107
    }
108

109
    void parcel_data::serialize(serialization::input_archive& ar, unsigned)
×
110
    {
111
        ar >> source_id_;
×
112
        ar >> dest_;
×
113
        ar >> addr_;
×
114

115
#if defined(HPX_HAVE_PARCEL_PROFILING)
116
        ar >> parcel_id_;
117
        ar >> start_time_;
118
        ar >> creation_time_;
119
#endif
120

121
        ar >> has_continuation_;
×
122
    }
×
123

124
    void parcel_data::serialize(serialization::output_archive& ar, unsigned)
×
125
    {
126
        ar << source_id_;
×
127
        ar << dest_;
×
128
        ar << addr_;
×
129

130
#if defined(HPX_HAVE_PARCEL_PROFILING)
131
        ar << parcel_id_;
132
        ar << start_time_;
133
        ar << creation_time_;
134
#endif
135

136
        ar << has_continuation_;
×
137
    }
×
138

139
    ///////////////////////////////////////////////////////////////////////////
140
#if defined(HPX_DEBUG)
141
    bool parcel::is_valid() const
488,565✔
142
    {
143
        // empty parcels are always valid
144
#if defined(HPX_HAVE_PARCEL_PROFILING)
145
        if (0.0 == data_.creation_time_)    //-V550
146
            return true;
147
#endif
148

149
        // verify target destination
150
        if (data_.dest_ && data_.addr_.locality_)
488,565✔
151
        {
152
            // if we have a destination we need an action as well
153
            if (!action_)
462,082✔
154
            {
155
                return false;
×
156
            }
157
        }
462,082✔
158

159
        // verify that the action targets the correct type
160
        if (action_ && data_.addr_.type_ != components::component_invalid)
488,565✔
161
        {
162
            int type = action_->get_component_type();
476,962✔
163
            if (!components::types_are_compatible(type, data_.addr_.type_))
476,962✔
164
            {
165
                return false;
×
166
            }
167
        }
476,962✔
168

169
        return true;
488,567✔
170
    }
488,567✔
171
#else
172
    // Only used in debug mode.
173
    bool parcel::is_valid() const
174
    {
175
        return true;
176
    }
177
#endif
178

179
    parcel::parcel()
475,097✔
180
      : data_()
475,094✔
181
      , action_()
475,094✔
182
      , size_(0)
475,094✔
183
      , num_chunks_(0)
475,094✔
184
    {
950,194✔
185
    }
475,094✔
186

187
    parcel::~parcel() = default;
1,890,590✔
188

189
    parcel::parcel(naming::gid_type&& dest, naming::address&& addr,
940,361✔
190
        std::unique_ptr<actions::base_action> act)
191
      : data_(HPX_MOVE(dest), HPX_MOVE(addr), act->has_continuation())
470,180✔
192
      , action_(HPX_MOVE(act))
470,181✔
193
      , size_(0)
470,181✔
194
      , num_chunks_(0)
470,181✔
195
    {
940,360✔
196
    }
470,181✔
197

198
    void parcel::reset()
×
199
    {
200
        data_ = detail::parcel_data();
×
201
        action_.reset();
×
202
    }
×
203

204
    char const* parcel::get_action_name() const
323✔
205
    {
206
        return action_->get_action_name();
323✔
207
    }
208

209
    int parcel::get_component_type() const
320✔
210
    {
211
        return action_->get_component_type();
320✔
212
    }
213

214
    int parcel::get_action_type() const
320✔
215
    {
216
        return static_cast<int>(action_->get_action_type());
320✔
217
    }
218

219
    hpx::id_type parcel::source_id() const
481,437✔
220
    {
221
        return hpx::id_type(
481,437✔
222
            data_.source_id_, hpx::id_type::management_type::unmanaged);
481,437✔
223
    }
224

225
    void parcel::set_source_id(hpx::id_type const& source_id)
469,898✔
226
    {
227
        if (source_id != hpx::invalid_id)
469,898✔
228
        {
229
            data_.source_id_ = source_id.get_gid();
469,898✔
230
        }
469,898✔
231
    }
469,898✔
232

233
    void parcel::set_destination_id(naming::gid_type&& dest)
×
234
    {
235
        data_.dest_ = dest;
×
236
        HPX_ASSERT(is_valid());
×
237
    }
×
238

239
    naming::gid_type const& parcel::destination() const
488,565✔
240
    {
241
        HPX_ASSERT(is_valid());
488,565✔
242
        return data_.dest_;
488,567✔
243
    }
244

245
    naming::address const& parcel::addr() const
948,802✔
246
    {
247
        return data_.addr_;
948,797✔
248
    }
249

250
    naming::address& parcel::addr()
481,129✔
251
    {
252
        return data_.addr_;
481,129✔
253
    }
254

255
    std::uint32_t parcel::destination_locality_id() const
×
256
    {
257
        return naming::get_locality_id_from_gid(destination_locality());
×
258
    }
259

260
    naming::gid_type const& parcel::destination_locality() const
948,811✔
261
    {
262
        return addr().locality_;
948,810✔
263
    }
264

265
    double parcel::start_time() const
320✔
266
    {
267
#if defined(HPX_HAVE_PARCEL_PROFILING)
268
        return data_.start_time_;
269
#else
270
        return 0.0;
320✔
271
#endif
272
    }
273

274
    void parcel::set_start_time(double time)
×
275
    {
276
#if defined(HPX_HAVE_PARCEL_PROFILING)
277
        data_.start_time_ = time;
278
#else
279
        HPX_UNUSED(time);
×
280
#endif
281
    }
×
282

283
    double parcel::creation_time() const
×
284
    {
285
#if defined(HPX_HAVE_PARCEL_PROFILING)
286
        return data_.creation_time_;
287
#else
288
        return 0.0;
×
289
#endif
290
    }
291

292
    threads::thread_priority parcel::get_thread_priority() const
320✔
293
    {
294
        return action_->get_thread_priority();
320✔
295
    }
296

297
    threads::thread_stacksize parcel::get_thread_stacksize() const
320✔
298
    {
299
        return action_->get_thread_stacksize();
320✔
300
    }
301

302
    std::uint32_t parcel::get_parent_locality_id() const
320✔
303
    {
304
        return action_->get_parent_locality_id();
320✔
305
    }
306

307
    threads::thread_id_type parcel::get_parent_thread_id() const
320✔
308
    {
309
        return action_->get_parent_thread_id();
320✔
310
    }
311

312
    std::uint64_t parcel::get_parent_thread_phase() const
320✔
313
    {
314
        return action_->get_parent_thread_phase();
320✔
315
    }
316

317
#if defined(HPX_HAVE_PARCEL_PROFILING)
318
    naming::gid_type const& parcel::parcel_id() const
319
    {
320
        return data_.parcel_id_;
321
    }
322

323
    naming::gid_type& parcel::parcel_id()
324
    {
325
        return data_.parcel_id_;
326
    }
327
#endif
328

329
#if defined(HPX_HAVE_NETWORKING)
330
    serialization::binary_filter* parcel::get_serialization_filter() const
455,743✔
331
    {
332
        hpx::optional<parcelset::parcel> p = action_->get_embedded_parcel();
455,743✔
333
        if (!p)
455,743✔
334
        {
335
            return action_->get_serialization_filter();
448,354✔
336
        }
337
        return p->get_serialization_filter();
7,389✔
338
    }
455,743✔
339

340
    policies::message_handler* parcel::get_message_handler(
471,654✔
341
        locality const& loc) const
342
    {
343
        hpx::optional<parcelset::parcel> p = action_->get_embedded_parcel();
471,655✔
344
        if (!p)
471,655✔
345
        {
346
            return action_->get_message_handler(loc);
464,344✔
347
        }
348
        return p->get_message_handler(loc);
7,311✔
349
    }
471,655✔
350
#endif
351

352
    bool parcel::does_termination_detection() const
473,666✔
353
    {
354
        return action_ ? action_->does_termination_detection() : false;
473,666✔
355
    }
356

357
    parcel::split_gids_type parcel::move_split_gids() const
466,523✔
358
    {
359
        split_gids_type gids;
466,522✔
360
        std::swap(gids, split_gids_);
466,522✔
361
        return gids;
466,522✔
362
    }
466,523✔
363

364
    void parcel::set_split_gids(parcel::split_gids_type&& split_gids)
10,684✔
365
    {
366
        HPX_ASSERT(split_gids_.empty());
10,684✔
367
        split_gids_ = HPX_MOVE(split_gids);
10,684✔
368
    }
10,684✔
369

370
    std::size_t parcel::num_chunks() const
×
371
    {
372
        return num_chunks_;
×
373
    }
374

375
    std::size_t& parcel::num_chunks()
933,427✔
376
    {
377
        return num_chunks_;
933,427✔
378
    }
379

380
    std::size_t parcel::size() const
×
381
    {
382
        return size_;
×
383
    }
384

385
    std::size_t& parcel::size()
933,519✔
386
    {
387
        return size_;
933,518✔
388
    }
389

390
    std::pair<naming::address_type, naming::component_type>
391
    parcel::determine_lva()
481,720✔
392
    {
393
        int comptype = action_->get_component_type();
481,723✔
394

395
        // decode the local virtual address of the parcel
396
        naming::address::address_type lva = data_.addr_.address_;
481,723✔
397

398
        // by convention, a zero address references either the local
399
        // runtime support component or one of the AGAS components
400
        if (nullptr == lva)
481,723✔
401
        {
402
            switch (comptype)
57,037✔
403
            {
404
            case components::component_runtime_support:
405
                lva = agas::get_runtime_support_lva();
5,092✔
406
                break;
5,092✔
407

408
            case components::component_agas_primary_namespace:
409
                lva = agas::get_primary_ns_lva();
18,720✔
410
                break;
18,720✔
411

412
            case components::component_agas_symbol_namespace:
413
                lva = agas::get_symbol_ns_lva();
17,333✔
414
                break;
17,333✔
415

416
            case components::component_plain_function:
417
                break;
15,891✔
418

419
            default:
420
                HPX_ASSERT(false);
×
421
            }
×
422
        }
57,036✔
423

424
        // make sure the component_type of the action matches the
425
        // component type in the destination address
426
        if (HPX_UNLIKELY(
481,714✔
427
                !components::types_are_compatible(data_.addr_.type_, comptype)))
428
        {
429
            HPX_THROW_EXCEPTION(hpx::error::bad_component_type,
×
430
                "parcel::determine_lva",
431
                " types are not compatible: destination_type({}) "
432
                "action_type({}) parcel ({})",
433
                data_.addr_.type_, comptype, *this);
434
        }
435

436
        return std::make_pair(lva, comptype);
481,715✔
437
    }
×
438

439
    bool parcel::load_schedule(serialization::input_archive& ar,
466,490✔
440
        std::size_t num_thread, bool& deferred_schedule)
441
    {
442
        load_data(ar);
466,500✔
443

444
        // make sure this parcel destination matches the proper locality
445
        HPX_ASSERT(destination_locality() == data_.addr_.locality_);
466,490✔
446

447
        std::pair<naming::address_type, naming::component_type> p =
448
            determine_lva();
466,494✔
449

450
        // make sure the target has not been migrated away
451
        auto r = action_->was_object_migrated(data_.dest_, p.first);
466,494✔
452
        if (r.first)
466,494✔
453
        {
454
            // If the object was migrated, just load the action and return.
455
            action_->load(ar);
×
456
            return true;
×
457
        }
458

459
        // continuation support, this is handled in the transfer action
460
        action_->load_schedule(ar, HPX_MOVE(data_.dest_), p.first, p.second,
932,988✔
461
            num_thread, deferred_schedule);
466,522✔
462

463
#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX)
464
        static util::itt::event parcel_recv("recv_parcel");
465
        util::itt::event_tick(parcel_recv);
466
#endif
467

468
#if defined(HPX_HAVE_APEX) && defined(HPX_HAVE_PARCEL_PROFILING)
469
        // tell APEX about the received parcel
470
        util::external_timer::recv(data_.parcel_id_.get_lsb(), size_,
471
            naming::get_locality_id_from_gid(data_.source_id_),
472
            reinterpret_cast<std::uint64_t>(
473
                action_->get_parent_thread_id().get()));
474
#endif
475

476
        return false;
466,522✔
477
    }
466,522✔
478

479
    bool parcel::schedule_action(std::size_t num_thread)
15,231✔
480
    {
481
        // make sure this parcel destination matches the proper locality
482
        HPX_ASSERT(destination_locality() == data_.addr_.locality_);
15,231✔
483

484
        std::pair<naming::address_type, naming::component_type> p =
485
            determine_lva();
15,228✔
486

487
        // make sure the target has not been migrated away
488
        auto r = action_->was_object_migrated(data_.dest_, p.first);
15,228✔
489
        if (r.first)
15,228✔
490
        {
491
            // If the object was migrated, just route.
492
            return true;
×
493
        }
494

495
        // dispatch action, register work item either with or without
496
        // continuation support, this is handled in the transfer action
497
        action_->schedule_thread(
30,456✔
498
            HPX_MOVE(data_.dest_), p.first, p.second, num_thread);
15,231✔
499
        return false;
15,231✔
500
    }
15,231✔
501

502
    void parcel::load_data(serialization::input_archive& ar)
475,085✔
503
    {
504
        using hpx::actions::detail::action_registry;
505
        ar >> data_;
475,096✔
506

507
        std::uint32_t id;
508
        ar >> id;
475,096✔
509

510
#if !defined(HPX_DEBUG)
511
        action_.reset(action_registry::create(id, data_.has_continuation_));
512
#else
513
        std::string name;
475,096✔
514
        ar >> name;
475,096✔
515
        action_.reset(
475,096✔
516
            action_registry::create(id, data_.has_continuation_, &name));
475,104✔
517
#endif
518
    }
475,104✔
519

520
    void parcel::load(serialization::input_archive& ar, unsigned)
8,598✔
521
    {
522
        load_data(ar);
8,598✔
523
        action_->load(ar);
8,598✔
524
    }
8,598✔
525

526
    void parcel::save_data(serialization::output_archive& ar) const
966,994✔
527
    {
528
        using hpx::serialization::access;
529
        ar << data_;
967,000✔
530

531
        std::uint32_t const id = action_->get_action_id();
967,000✔
532
        ar << id;
967,000✔
533

534
#if defined(HPX_DEBUG)
535
        std::string const name(action_->get_action_name());
966,994✔
536
        ar << name;
967,002✔
537
#endif
538
    }
967,002✔
539

540
    void parcel::save(serialization::output_archive& ar, unsigned) const
966,996✔
541
    {
542
        save_data(ar);
966,996✔
543
        action_->save(ar);
966,996✔
544
    }
966,996✔
545

546
    std::ostream& operator<<(std::ostream& os, parcel const& p)
×
547
    {
548
        return hpx::util::format_to(
×
549
            os, "({}:{}:{})", p.destination(), p.addr(), p.get_action_name());
×
550
    }
551
}    // namespace hpx::parcelset::detail
552

553
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc