• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.7
/components/parcel_plugins/coalescing/src/coalescing_message_handler.cpp
1
//  Copyright (c) 2007-2025 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#include <hpx/config.hpp>
8

9
#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCEL_COALESCING)
10
#include <hpx/assert.hpp>
11
#include <hpx/modules/errors.hpp>
12
#include <hpx/modules/format.hpp>
13
#include <hpx/modules/functional.hpp>
14
#include <hpx/modules/plugin.hpp>
15
#include <hpx/modules/runtime_local.hpp>
16
#include <hpx/modules/thread_support.hpp>
17
#include <hpx/modules/timing.hpp>
18
#include <hpx/modules/util.hpp>
19

20
#include <hpx/parcel_coalescing/counter_registry.hpp>
21
#include <hpx/parcel_coalescing/message_handler.hpp>
22
#include <hpx/parcelset_base/parcelport.hpp>
23
#include <hpx/plugin_factories/message_handler_factory.hpp>
24

25
#include <boost/accumulators/accumulators.hpp>
26

27
#include <chrono>
28
#include <cstddef>
29
#include <cstdint>
30
#include <mutex>
31
#include <string>
32
#include <utility>
33
#include <vector>
34

35
#include <hpx/config/warnings_prefix.hpp>
36

37
namespace hpx::traits {
38

39
    // Inject additional configuration data into the factory registry for this
40
    // type. This information ends up in the system wide configuration database
41
    // under the plugin specific section:
42
    //
43
    //      [hpx.plugins.coalescing_message_handler]
44
    //      ...
45
    //      num_messages = 50
46
    //      interval = 100
47
    //
48
    template <>
49
    struct plugin_config_data<hpx::plugins::parcel::coalescing_message_handler>
50
    {
51
        static constexpr char const* call() noexcept
52
        {
53
            return "num_messages = 50\n"
54
                   "interval = 100\n"
55
                   "allow_background_flush = 1";
56
        }
57
    };
58
}    // namespace hpx::traits
64✔
59

960✔
60
///////////////////////////////////////////////////////////////////////////////
61
HPX_REGISTER_PLUGIN_MODULE_DYNAMIC()
62
HPX_REGISTER_MESSAGE_HANDLER_FACTORY(
63
    hpx::plugins::parcel::coalescing_message_handler,
64
    coalescing_message_handler)
65

66
///////////////////////////////////////////////////////////////////////////////
67
namespace hpx::plugins::parcel {
6✔
68

69
    namespace detail {
6✔
70
        std::size_t get_num_messages(std::size_t num_messages)
71
        {
6✔
72
            return hpx::util::from_string<std::size_t>(hpx::get_config_entry(
73
                "hpx.plugins.coalescing_message_handler.num_messages",
74
                num_messages));
6✔
75
        }
76

6✔
77
        std::size_t get_interval(std::size_t interval)
6✔
78
        {
79
            return hpx::util::from_string<std::size_t>(hpx::get_config_entry(
80
                "hpx.plugins.coalescing_message_handler.interval", interval));
6✔
81
        }
82

6✔
83
        bool get_background_flush()
84
        {
12✔
85
            std::string const value = hpx::get_config_entry(
12✔
86
                "hpx.plugins.coalescing_message_handler.allow_background_flush",
87
                "1");
88
            return !value.empty() && value[0] != '0';
89
        }
×
90
    }    // namespace detail
91

×
92
    void coalescing_message_handler::update_num_messages()
×
93
    {
×
94
        std::lock_guard<mutex_type> l(mtx_);
×
95
        num_coalesced_parcels_ =
96
            detail::get_num_messages(num_coalesced_parcels_);
×
97
    }
98

×
99
    void coalescing_message_handler::update_interval()
×
100
    {
×
101
        std::lock_guard<mutex_type> l(mtx_);
102
        interval_ = detail::get_interval(interval_);
6✔
103
    }
104

6✔
105
    coalescing_message_handler::coalescing_message_handler(
6✔
106
        char const* action_name, parcelset::parcelport* pp, std::size_t num,
6✔
107
        std::size_t interval)
6✔
108
      : pp_(pp)
6✔
109
      , num_coalesced_parcels_(detail::get_num_messages(num))
12✔
110
      , interval_(detail::get_interval(interval))
111
      , buffer_(num_coalesced_parcels_)
6✔
112
      , timer_(hpx::bind_back(&coalescing_message_handler::timer_flush, this),
6✔
113
            hpx::bind_back(&coalescing_message_handler::flush_terminate, this),
6✔
114
            std::string(action_name) + "_timer")
6✔
115
      , stopped_(false)
6✔
116
      , allow_background_flush_(detail::get_background_flush())
6✔
117
      , action_name_(action_name)
6✔
118
      , num_parcels_(0)
6✔
119
      , reset_num_parcels_(0)
6✔
120
      , reset_num_parcels_per_message_parcels_(0)
6✔
121
      , num_messages_(0)
6✔
122
      , reset_num_messages_(0)
6✔
123
      , reset_num_parcels_per_message_messages_(0)
6✔
124
      , started_at_(static_cast<std::int64_t>(
6✔
125
            hpx::chrono::high_resolution_clock::now()))
6✔
126
      , reset_time_num_parcels_(0)
12✔
127
      , last_parcel_time_(started_at_)
128
      , histogram_min_boundary_(-1)
129
      , histogram_max_boundary_(-1)
18✔
130
      , histogram_num_buckets_(-1)
131
    {
132
        // register performance counter functions
133
        coalescing_counter_registry::instance().register_action(action_name,
134
            hpx::bind_front(
135
                &coalescing_message_handler::get_parcels_count, this),
136
            hpx::bind_front(
137
                &coalescing_message_handler::get_messages_count, this),
138
            hpx::bind_front(
139
                &coalescing_message_handler::get_parcels_per_message_count,
140
                this),
141
            hpx::bind_front(
142
                &coalescing_message_handler::get_average_time_between_parcels,
143
                this),
144
            hpx::bind_front(&coalescing_message_handler::
145
                                get_time_between_parcels_histogram_creator,
12✔
146
                this));
147

148
        // register parameter update callbacks
12✔
149
        set_config_entry_callback(
150
            "hpx.plugins.coalescing_message_handler.num_messages",
151
            hpx::bind(&coalescing_message_handler::update_num_messages, this));
6✔
152
        set_config_entry_callback(
153
            "hpx.plugins.coalescing_message_handler.interval",
195✔
154
            hpx::bind(&coalescing_message_handler::update_interval, this));
155
    }
156

195✔
157
    void coalescing_message_handler::put_parcel(parcelset::locality const& dest,
195✔
158
        parcelset::parcel p, write_handler_type f)
159
    {
160
        std::unique_lock<mutex_type> l(mtx_);
161
        ++num_parcels_;
195✔
162

195✔
163
        // get time since last parcel
164
        auto const parcel_time = static_cast<std::int64_t>(
165
            hpx::chrono::high_resolution_clock::now());
195✔
166
        std::int64_t const time_since_last_parcel =
167
            parcel_time - last_parcel_time_;
168
        last_parcel_time_ = parcel_time;
169

170
        // collect data for time between parcels histogram
171
        if (time_between_parcels_)
172
            (*time_between_parcels_)(time_since_last_parcel);
195✔
173

6✔
174
        std::chrono::microseconds const interval(interval_);
175

176
        // just send parcel if the coalescing was stopped or the buffer is
189✔
177
        // empty and time since last parcel is larger than coalescing interval.
189✔
178
        if (stopped_ ||
179
            (buffer_.empty() &&
180
                std::chrono::nanoseconds(time_since_last_parcel) > interval))
189✔
181
        {
182
            ++num_messages_;
183
            l.unlock();
184

185
            // this instance should not buffer parcels anymore
6✔
186
            pp_->put_parcel(dest, HPX_MOVE(p), HPX_MOVE(f));
187
            return;
6✔
188
        }
189

6✔
190
        detail::message_buffer::message_buffer_append_state const s =
191
            buffer_.append(dest, HPX_MOVE(p), HPX_MOVE(f));
192

193
        switch (s)
6✔
194
        {
6✔
195
        case detail::message_buffer::first_message:
6✔
196
            [[fallthrough]];
197
        case detail::message_buffer::normal:
×
198
            // start deadline timer to flush buffer
×
199
            l.unlock();
200
            timer_.start(interval);
201
            break;
202

203
        case detail::message_buffer::buffer_now_full:
×
204
            flush_locked(l,
×
205
                parcelset::policies::message_handler::flush_mode_buffer_full,
×
206
                false, true);
207
            break;
208

209
        default:
210
            l.unlock();
211
            HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
212
                "coalescing_message_handler::put_parcel",
×
213
                "unexpected return value from message_buffer::append");
214
            return;
215
        }
×
216
    }
×
217

218
    bool coalescing_message_handler::timer_flush()
×
219
    {
220
        // adjust timer if needed
221
        std::unique_lock<mutex_type> l(mtx_);
222
        if (!buffer_.empty())
223
        {
224
            flush_locked(l,
×
225
                parcelset::policies::message_handler::flush_mode_timer, false,
226
                false);
227
        }
435,327✔
228

229
        // do not restart timer for now, will be restarted on next parcel
230
        return false;
231
    }
435,327✔
232

870,654✔
233
    bool coalescing_message_handler::flush(
234
        parcelset::policies::message_handler::flush_mode mode,
235
        bool stop_buffering)
6✔
236
    {
237
        std::unique_lock<mutex_type> l(mtx_);
6✔
238
        return flush_locked(l, mode, stop_buffering, true);
6✔
239
    }
240

6✔
241
    void coalescing_message_handler::flush_terminate()
242
    {
435,333✔
243
        std::unique_lock<mutex_type> l(mtx_);
244
        flush_locked(l, parcelset::policies::message_handler::flush_mode_timer,
245
            true, true);
246
    }
247

248
    bool coalescing_message_handler::flush_locked(
249
        std::unique_lock<mutex_type>& l,
250
        parcelset::policies::message_handler::flush_mode mode,
435,333✔
251
        bool stop_buffering, bool cancel_timer)
252
    {
253
        HPX_ASSERT(l.owns_lock());
254

255
        // proceed with background work only if explicitly allowed
256
        if (!allow_background_flush_ &&
257
            mode ==
258
                parcelset::policies::message_handler::
435,333✔
259
                    flush_mode_background_work)
260
        {
6✔
261
            return false;
262
        }
263

6✔
264
        if (!stopped_ && stop_buffering)
265
        {
6✔
266
            stopped_ = true;
435,327✔
267
            {
268
                hpx::unlock_guard<std::unique_lock<mutex_type>> ul(l);
269
                timer_.stop();    // interrupt timer
435,327✔
270
            }
271
        }
272
        else if (cancel_timer)
435,333✔
273
        {
274
            hpx::unlock_guard<std::unique_lock<mutex_type>> ul(l);
275
            timer_.stop();    // interrupt timer
6✔
276
        }
6✔
277

278
        if (buffer_.empty())
6✔
279
            return false;
6✔
280

281
        detail::message_buffer buff(num_coalesced_parcels_);
282
        std::swap(buff, buffer_);
6✔
283

284
        ++num_messages_;
285

6✔
286
        // 26110: Caller failing to hold lock 'l'
287
#if defined(HPX_MSVC)
288
#pragma warning(push)
×
289
#pragma warning(disable : 26110)
290
#endif
291
        l.unlock();
×
292
#if defined(HPX_MSVC)
293
#pragma warning(pop)
×
294
#endif
295

×
296
        HPX_ASSERT(nullptr != pp_);
×
297
        buff(pp_);    // 'invoke' the buffer
×
298

299
        return true;
300
    }
×
301

×
302
    // performance counter values
303
    std::int64_t coalescing_message_handler::get_average_time_between_parcels(
×
304
        bool reset)
×
305
    {
×
306
        std::lock_guard<mutex_type> l(mtx_);
307
        auto const now = static_cast<std::int64_t>(
308
            hpx::chrono::high_resolution_clock::now());
309
        if (num_parcels_ == 0)
×
310
        {
311
            if (reset)
×
312
                started_at_ = now;
313
            return 0;
×
314
        }
×
315

316
        std::int64_t const num_parcels = num_parcels_ - reset_time_num_parcels_;
317
        if (num_parcels == 0)
318
        {
319
            if (reset)
320
                started_at_ = now;
×
321
            return 0;
322
        }
×
323

×
324
        HPX_ASSERT(now >= started_at_);
×
325
        std::int64_t const value = (now - started_at_) / num_parcels;
×
326

×
327
        if (reset)
328
        {
329
            started_at_ = now;
×
330
            reset_time_num_parcels_ = num_parcels_;
331
        }
332

×
333
        return value;
334
    }
×
335

336
    std::int64_t coalescing_message_handler::get_parcels_count(bool reset)
×
337
    {
338
        std::unique_lock<mutex_type> l(mtx_);
×
339
        std::int64_t const num_parcels = num_parcels_ - reset_num_parcels_;
×
340
        if (reset)
341
            reset_num_parcels_ = num_parcels_;
×
342
        return num_parcels;
343
    }
344

×
345
    std::int64_t coalescing_message_handler::get_parcels_per_message_count(
×
346
        bool reset)
×
347
    {
×
348
        std::unique_lock<mutex_type> l(mtx_);
349

×
350
        if (num_messages_ == 0)
351
        {
×
352
            if (reset)
×
353
            {
354
                reset_num_parcels_per_message_parcels_ = num_parcels_;
355
                reset_num_parcels_per_message_messages_ = num_messages_;
×
356
            }
357
            return 0;
358
        }
×
359

360
        std::int64_t const num_parcels =
361
            num_parcels_ - reset_num_parcels_per_message_parcels_;
×
362
        std::int64_t const num_messages =
363
            num_messages_ - reset_num_parcels_per_message_messages_;
×
364

×
365
        if (reset)
×
366
        {
×
367
            reset_num_parcels_per_message_parcels_ = num_parcels_;
×
368
            reset_num_parcels_per_message_messages_ = num_messages_;
369
        }
370

371
        if (num_messages == 0)
×
372
            return 0;
373

374
        return num_parcels / num_messages;
×
375
    }
376

×
377
    std::int64_t coalescing_message_handler::get_messages_count(bool reset)
×
378
    {
379
        std::unique_lock<mutex_type> l(mtx_);
×
380
        std::int64_t const num_messages = num_messages_ - reset_num_messages_;
×
381
        if (reset)
382
            reset_num_messages_ = num_messages_;
383
        return num_messages;
384
    }
385

386
    std::vector<std::int64_t>
387
    coalescing_message_handler::get_time_between_parcels_histogram(
388
        bool /* reset */)
389
    {
390
        std::vector<std::int64_t> result;
×
391

×
392
        std::unique_lock<mutex_type> l(mtx_);
×
393
        if (!time_between_parcels_)
394
        {
395
            l.unlock();
×
396
            HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
397
                "coalescing_message_handler::"
×
398
                "get_time_between_parcels_histogram",
399
                "parcel-arrival-histogram counter was not initialized for "
400
                "action type: {}",
×
401
                action_name_);
402
            return result;
403
        }
×
404

405
        // first add histogram parameters
406
        result.push_back(histogram_min_boundary_);
407
        result.push_back(histogram_max_boundary_);
408
        result.push_back(histogram_num_buckets_);
×
409

×
410
        auto const data = hpx::util::histogram(*time_between_parcels_);
411
        for (auto const& item : data)
412
        {
413
            result.push_back(static_cast<std::int64_t>(item.second * 1000));
414
        }
415

416
        return result;
417
    }
×
418

×
419
    void coalescing_message_handler::get_time_between_parcels_histogram_creator(
×
420
        std::int64_t min_boundary, std::int64_t max_boundary,
421
        std::int64_t num_buckets,
422
        hpx::function<std::vector<std::int64_t>(bool)>& result)
×
423
    {
×
424
        std::lock_guard<mutex_type> l(mtx_);
×
425
        if (time_between_parcels_)
×
426
        {
427
            result = hpx::bind_front(
428
                &coalescing_message_handler::get_time_between_parcels_histogram,
429
                this);
430
            return;
431
        }
432

433
        histogram_min_boundary_ = min_boundary;
434
        histogram_max_boundary_ = max_boundary;
1,088✔
435
        histogram_num_buckets_ = num_buckets;
436

437
        time_between_parcels_.reset(
1,088✔
438
            new histogram_collector_type(hpx::util::tag::histogram::num_bins =
1,088✔
439
                                             static_cast<double>(num_buckets),
2,176✔
440
                hpx::util::tag::histogram::min_range =
1,088✔
441
                    static_cast<double>(min_boundary),
442
                hpx::util::tag::histogram::max_range =
443
                    static_cast<double>(max_boundary)));
444
        last_parcel_time_ = static_cast<std::int64_t>(
445
            hpx::chrono::high_resolution_clock::now());
446

447
        result = hpx::bind_front(
448
            &coalescing_message_handler::get_time_between_parcels_histogram,
449
            this);
450
    }
451

452
    ///////////////////////////////////////////////////////////////////////////
453
    // register the given action (called during startup)
454
    void coalescing_message_handler::register_action(
455
        char const* action, error_code& ec)
456
    {
457
        coalescing_counter_registry::instance().register_action(action);
458
        if (&ec != &throws)
459
            ec = make_success_code();
460
    }
461
}    // namespace hpx::plugins::parcel
462

463
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc