• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/examples/quickstart/zerocopy_rdma.cpp
1
//  Copyright (c) 2014-2025 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#include <hpx/config.hpp>
8
#if !defined(HPX_COMPUTE_DEVICE_CODE)
9
#include <hpx/hpx.hpp>
10
#include <hpx/hpx_main.hpp>
11
#include <hpx/modules/type_support.hpp>
12

13
#include <hpx/serialization.hpp>
14

15
#include <cstddef>
16
#include <iostream>
17
#include <utility>
18
#include <vector>
19

20
///////////////////////////////////////////////////////////////////////////////
21
constexpr std::size_t ZEROCOPY_DATASIZE = 1'048'576;
22

23
///////////////////////////////////////////////////////////////////////////////
24
// A custom allocator which takes a pointer in its constructor and then returns
25
// this pointer in response to any allocate request. It is here to try to fool
26
// the hpx serialization into copying directly into a user provided buffer
27
// without copying from a result into another buffer.
28
template <typename T>
29
class pointer_allocator
30
{
31
public:
32
    typedef T value_type;
33
    typedef T* pointer;
34
    typedef T const* const_pointer;
35
    typedef T& reference;
36
    typedef T const& const_reference;
37
    typedef std::size_t size_type;
38
    typedef std::ptrdiff_t difference_type;
39

40
    pointer_allocator() noexcept
41
      : pointer_(nullptr)
42
      , size_(0)
43
    {
44
    }
45

46
    pointer_allocator(pointer p, size_type size) noexcept
×
47
      : pointer_(p)
×
48
      , size_(size)
×
49
    {
50
    }
51

52
    static pointer address(reference value)
53
    {
54
        return &value;
55
    }
56
    static const_pointer address(const_reference value)
57
    {
58
        return &value;
59
    }
60

61
    pointer allocate(size_type n, void const* = nullptr)
62
    {
63
        HPX_ASSERT(n == size_);
64
        HPX_UNUSED(n);
65
        return static_cast<T*>(pointer_);
×
66
    }
67

68
    void deallocate(pointer p, size_type n) noexcept
69
    {
70
        HPX_ASSERT(p == pointer_ && n == size_);
71
        HPX_UNUSED(p);
72
        HPX_UNUSED(n);
73
    }
74

75
private:
76
    // serialization support
77
    friend class hpx::serialization::access;
78

79
    template <typename Archive>
80
    void load(Archive& ar, unsigned int const)
×
81
    {
82
        std::size_t t = 0;
83
        ar >> size_ >> t;
84
        pointer_ = reinterpret_cast<pointer>(t);
×
85
    }
×
86

87
    template <typename Archive>
88
    void save(Archive& ar, unsigned int const) const
×
89
    {
90
        std::size_t t = reinterpret_cast<std::size_t>(pointer_);
×
91
        ar << size_ << t;
92
    }
×
93

94
    HPX_SERIALIZATION_SPLIT_MEMBER()
×
95

96
private:
97
    pointer pointer_;
98
    size_type size_;
99
};
100

101
///////////////////////////////////////////////////////////////////////////////
102
// Buffer object used on the client side to specify where to place the received
103
// data
104
typedef hpx::serialization::serialize_buffer<double> general_buffer_type;
105

106
// Buffer object used for sending the data back to the receiver.
107
typedef hpx::serialization::serialize_buffer<double, pointer_allocator<double>>
108
    transfer_buffer_type;
109

110
///////////////////////////////////////////////////////////////////////////////
111
struct zerocopy_server : hpx::components::component_base<zerocopy_server>
112
{
113
private:
114
    void release_lock()
×
115
    {
116
        // all we need to do is to unlock the data
117
        mtx_.unlock();
118
    }
×
119

120
public:
121
    explicit zerocopy_server(std::size_t size = 0)
×
122
      : data_(size, 3.1415)
×
123
    {
124
    }
×
125

126
    ///////////////////////////////////////////////////////////////////////////
127
    // Retrieve an array of doubles to the given address
128
    transfer_buffer_type get_here(std::size_t size, std::size_t remote_buffer)
×
129
    {
130
        pointer_allocator<double> const allocator(
131
            reinterpret_cast<double*>(remote_buffer), size);
×
132

133
        // lock the mutex, will be unlocked by the transfer buffer's deleter
134
        mtx_.lock();
×
135

136
        // we use our data directly without copying
137
        return transfer_buffer_type(data_.data(), size,
138
            transfer_buffer_type::reference,
139
            hpx::bind(&zerocopy_server::release_lock, this), allocator);
×
140
    }
141
    HPX_DEFINE_COMPONENT_ACTION(zerocopy_server, get_here, get_here_action)
142

143
    ///////////////////////////////////////////////////////////////////////////
144
    // Retrieve an array of doubles
145
    general_buffer_type get(std::size_t size)
×
146
    {
147
        // lock the mutex, will be unlocked by the transfer buffer's deleter
148
        mtx_.lock();
×
149

150
        // we use our data directly without copying
151
        return general_buffer_type(data_.data(), size,
152
            general_buffer_type::reference,
153
            hpx::bind(&zerocopy_server::release_lock, this));
×
154
    }
155
    HPX_DEFINE_COMPONENT_ACTION(zerocopy_server, get, get_action)
156

157
private:
158
    std::vector<double> data_;
159
    hpx::spinlock mtx_;
160
};
161

162
typedef hpx::components::component<zerocopy_server> server_type;
163
HPX_REGISTER_COMPONENT(server_type, zerocopy_server)
×
164

165
typedef zerocopy_server::get_here_action zerocopy_get_here_action;
166
HPX_REGISTER_ACTION_DECLARATION(zerocopy_get_here_action)
167
HPX_REGISTER_ACTION(zerocopy_get_here_action)
×
168

169
typedef zerocopy_server::get_action zerocopy_get_action;
170
HPX_REGISTER_ACTION_DECLARATION(zerocopy_get_action)
171
HPX_REGISTER_ACTION(zerocopy_get_action)
×
172

173
///////////////////////////////////////////////////////////////////////////////
174
struct zerocopy : hpx::components::client_base<zerocopy, zerocopy_server>
×
175
{
176
private:
177
    // Copy he data once into the destination buffer if the get() operation was
178
    // entirely local (no data copies have been made so far).
179
    static void transfer_data(
×
180
        general_buffer_type recv, hpx::future<transfer_buffer_type> f)
181
    {
182
        transfer_buffer_type buffer(f.get());
×
183
        if (buffer.data() != recv.data())
×
184
        {
185
            std::copy(
186
                buffer.data(), buffer.data() + buffer.size(), recv.data());
×
187
        }
188
    }
×
189

190
public:
191
    typedef hpx::components::client_base<zerocopy, zerocopy_server> base_type;
192

193
    zerocopy(hpx::future<hpx::id_type>&& fid)
194
      : base_type(std::move(fid))
×
195
    {
196
    }
197

198
    //
199
    hpx::future<void> get_here(general_buffer_type& buff) const
×
200
    {
201
        zerocopy_get_here_action act;
202

203
        using hpx::placeholders::_1;
204
        std::size_t buffer_address = reinterpret_cast<std::size_t>(buff.data());
×
205
        return hpx::async(act, this->get_id(), buff.size(), buffer_address)
×
206
            .then(hpx::bind(&zerocopy::transfer_data, buff, _1));
×
207
    }
208
    void get_here(hpx::launch::sync_policy, general_buffer_type& buff) const
×
209
    {
210
        get_here(buff).get();
×
211
    }
×
212

213
    //
214
    hpx::future<general_buffer_type> get(std::size_t size) const
×
215
    {
216
        zerocopy_get_action act;
217
        return hpx::async(act, this->get_id(), size);
×
218
    }
219
    general_buffer_type get(hpx::launch::sync_policy, std::size_t size) const
×
220
    {
221
        return get(size).get();
×
222
    }
223
};
224

225
///////////////////////////////////////////////////////////////////////////////
226
int main()
×
227
{
228
    for (hpx::id_type const& id : hpx::find_all_localities())
×
229
    {
230
        zerocopy zc = hpx::new_<zerocopy_server>(id, ZEROCOPY_DATASIZE);
×
231

232
        general_buffer_type buffer(new double[ZEROCOPY_DATASIZE],
×
233
            ZEROCOPY_DATASIZE, general_buffer_type::take);
234

×
235
        {
×
236
            hpx::chrono::high_resolution_timer t;
237

238
            for (int i = 0; i != 100; ++i)
239
            {
240
                [[maybe_unused]] auto r =
×
241
                    zc.get(hpx::launch::sync, ZEROCOPY_DATASIZE);
×
242
            }
243

244
            double d = t.elapsed();
245
            std::cout << "Elapsed time 'get' (locality "
246
                      << hpx::naming::get_locality_id_from_id(id) << "): " << d
×
247
                      << "[s]\n";
248
        }
249

250
        {
251
            hpx::chrono::high_resolution_timer t;
252

×
253
            for (int i = 0; i != 100; ++i)
×
254
                zc.get_here(hpx::launch::sync, buffer);
255

256
            double const d = t.elapsed();
257
            std::cout << "Elapsed time 'get_here' (locality "
258
                      << hpx::naming::get_locality_id_from_id(id) << "): " << d
×
259
                      << "[s]\n";
260
        }
261
    }
262

263
    return 0;
×
264
}
265

266
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc