• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.94
/libs/core/serialization/include/hpx/serialization/input_container.hpp
1
//  Copyright (c) 2007-2025 Hartmut Kaiser
2
//  Copyright (c)      2014 Thomas Heller
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7

8
#pragma once
9

10
#include <hpx/config.hpp>
11
#include <hpx/assert.hpp>
12
#include <hpx/modules/errors.hpp>
13
#include <hpx/serialization/binary_filter.hpp>
14
#include <hpx/serialization/container.hpp>
15
#include <hpx/serialization/serialization_chunk.hpp>
16
#include <hpx/serialization/traits/serialization_access_data.hpp>
17

18
#include <cstddef>    // for size_t
19
#include <cstdint>
20
#include <cstring>    // for memcpy
21
#include <memory>
22
#include <vector>
23

24
namespace hpx::serialization {
25

26
    HPX_CXX_EXPORT template <typename Container>
27
    struct input_container : erased_input_container
28
    {
29
    private:
30
        using access_traits = traits::serialization_access_data<Container>;
31

32
        [[nodiscard]] constexpr std::size_t get_chunk_size(
33
            std::size_t chunk) const noexcept
34
        {
35
            return (*chunks_)[chunk].size_;
×
36
        }
37

38
        [[nodiscard]] constexpr chunk_type get_chunk_type(
39
            std::size_t chunk) const noexcept
40
        {
41
            return (*chunks_)[chunk].type_;
42
        }
43

44
        constexpr chunk_data const& get_chunk_data(
45
            std::size_t chunk) const noexcept
46
        {
47
            return (*chunks_)[chunk].data_;
48
        }
49

50
        constexpr chunk_data& get_chunk_data(std::size_t chunk) noexcept
51
        {
52
            return (*chunks_)[chunk].data_;
53
        }
54

55
        [[nodiscard]] constexpr std::size_t get_num_chunks() const noexcept
56
        {
57
            return chunks_->size();
58
        }
59

60
    public:
61
        input_container(
62
            Container const& cont, std::size_t inbound_data_size) noexcept
63
          : cont_(cont)
64
          , current_(0)
65
          , decompressed_size_(inbound_data_size)
66
          , zero_copy_serialization_threshold_(
67
                HPX_ZERO_COPY_SERIALIZATION_THRESHOLD)
68
          , chunks_(nullptr)
69
          , current_chunk_(static_cast<std::size_t>(-1))
70
          , current_chunk_size_(0)
71
        {
72
        }
73

74
        input_container(Container const& cont,
415✔
75
            std::vector<serialization_chunk>* chunks,
76
            std::size_t inbound_data_size) noexcept
77
          : cont_(cont)
415✔
78
          , current_(0)
415✔
79
          , decompressed_size_(inbound_data_size)
415✔
80
          , zero_copy_serialization_threshold_(
415✔
81
                HPX_ZERO_COPY_SERIALIZATION_THRESHOLD)
82
          , chunks_(nullptr)
415✔
83
          , current_chunk_(static_cast<std::size_t>(-1))
415✔
84
          , current_chunk_size_(0)
415✔
85
        {
86
            if (chunks && chunks->size() != 0)
415✔
87
            {
88
                chunks_ = chunks;
×
89
                current_chunk_ = 0;
×
90
            }
91
        }
92

93
        void set_filter(binary_filter* filter) override
×
94
        {
95
            filter_.reset(filter);
96
            if (filter)
×
97
            {
98
                current_ = access_traits::init_data(
×
99
                    cont_, filter_.get(), current_, decompressed_size_);
100

101
                if (decompressed_size_ < current_)
×
102
                {
103
                    HPX_THROW_EXCEPTION(hpx::error::serialization_error,
×
104
                        "input_container::set_filter",
105
                        "archive data binary stream is too short");
106
                }
107
            }
108
        }
×
109

110
        void set_zero_copy_serialization_threshold(
415✔
111
            std::size_t zero_copy_serialization_threshold) override
112
        {
113
            zero_copy_serialization_threshold_ =
415✔
114
                zero_copy_serialization_threshold;
115
            if (zero_copy_serialization_threshold_ == 0)
415✔
116
            {
117
                zero_copy_serialization_threshold_ =
1✔
118
                    HPX_ZERO_COPY_SERIALIZATION_THRESHOLD;
119
            }
120
        }
415✔
121

122
        void load_binary(void* address, std::size_t count) override
5,574✔
123
        {
124
            if (filter_ != nullptr)
5,574✔
125
            {
126
                filter_->load(address, count);
×
127
            }
128
            else
129
            {
130
                std::size_t new_current = current_ + count;
5,574✔
131
                if (new_current > access_traits::size(cont_))
5,574✔
132
                {
133
                    HPX_THROW_EXCEPTION(hpx::error::serialization_error,
×
134
                        "input_container::load_binary",
135
                        "archive data binary stream is too short");
136
                }
137

138
                access_traits::read(cont_, count, current_, address);
5,568✔
139

140
                current_ = new_current;
5,574✔
141

142
                if (chunks_ != nullptr)
5,574✔
143
                {
144
                    current_chunk_size_ += count;
×
145

146
                    // make sure we switch to the next serialization_chunk if
147
                    // necessary
148
                    std::size_t const current_chunk_size =
149
                        get_chunk_size(current_chunk_);
×
150
                    if (current_chunk_size != 0 &&
×
151
                        current_chunk_size_ >= current_chunk_size)
152
                    {
153
                        // raise an error if we read past the serialization_chunk
154
                        if (current_chunk_size_ > current_chunk_size)
×
155
                        {
156
                            HPX_THROW_EXCEPTION(hpx::error::serialization_error,
×
157
                                "input_container::load_binary",
158
                                "archive data binary stream structure "
159
                                "mismatch");
160
                        }
×
161
                        ++current_chunk_;
×
162
                        current_chunk_size_ = 0;
163
                    }
164
                }
165
            }
5,574✔
166
        }
167

3✔
168
        void load_binary_chunk(void* address, std::size_t count,
169
            bool allow_zero_copy_receive) override
170
        {
171
            HPX_ASSERT(static_cast<std::int64_t>(count) >= 0);
172

3✔
173
            if (chunks_ == nullptr ||
3✔
174
                count < zero_copy_serialization_threshold_ ||
175
                filter_ != nullptr)
176
            {
177
                // fall back to serialization_chunk-less archive
3✔
178
                this->input_container::load_binary(address, count);
179
            }
180
            else
181
            {
182
                HPX_ASSERT(current_chunk_ != static_cast<std::size_t>(-1));
183
                HPX_ASSERT(get_chunk_type(current_chunk_) ==
184
                    chunk_type::chunk_type_pointer);
185

×
186
                if (get_chunk_size(current_chunk_) != count)
187
                {
×
188
                    HPX_THROW_EXCEPTION(hpx::error::serialization_error,
189
                        "input_container::load_binary_chunk",
190
                        "archive data binary stream data chunk size mismatch");
191
                }
192

193
                auto*& buffer = get_chunk_data(current_chunk_).pos_;
×
194
                if (allow_zero_copy_receive)
195
                {
196
                    // If the receiving end supports zer-copy serialization of
197
                    // larger chunks, the de-serialization pass should not copy
198
                    // the data, but simply return the address of the buffer
199
                    // where the data will be placed by the networking layer.
200
                    HPX_ASSERT(buffer == nullptr);
×
201
                    buffer = address;
202
                }
203
                else
204
                {
205
                    // Unfortunately we can't implement a zero copy policy on
206
                    // the receiving end as the parcelport doesn't support this.
207
                    // The memory was already allocated by the serialization
208
                    // code, thus we copy the received data.
209
                    HPX_ASSERT(buffer != nullptr);
×
210
                    std::memcpy(address, buffer, count);
211
                }
×
212
                ++current_chunk_;
213
            }
3✔
214
        }
215

216
        Container const& cont_;
217
        std::size_t current_;
218
        std::unique_ptr<binary_filter> filter_;
219
        std::size_t decompressed_size_;
220
        std::size_t zero_copy_serialization_threshold_;
221

222
        std::vector<serialization_chunk>* chunks_;
223
        std::size_t current_chunk_;
224
        std::size_t current_chunk_size_;
225
    };
226
}    // namespace hpx::serialization
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc