• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 6666188375

27 Oct 2023 10:45AM UTC coverage: 69.614% (-8.5%) from 78.12%
6666188375

push

github

web-flow
Merge pull request #290 from ska-sa/prepare-4.1.1

Prepare 4.1.1

4747 of 6819 relevant lines covered (69.61%)

123579.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/recv_chunk_stream.cpp
1
/* Copyright 2021-2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <vector>
22
#include <memory>
23
#include <cstddef>
24
#include <cassert>
25
#include <stdexcept>
26
#include <functional>
27
#include <algorithm>
28
#include <utility>
29
#include <new>
30
#include <spead2/common_defines.h>
31
#include <spead2/common_memory_allocator.h>
32
#include <spead2/recv_packet.h>
33
#include <spead2/recv_live_heap.h>
34
#include <spead2/recv_heap.h>
35
#include <spead2/recv_utils.h>
36
#include <spead2/recv_stream.h>
37
#include <spead2/recv_chunk_stream.h>
38

39
namespace spead2::recv
40
{
41

42
chunk_stream_config &chunk_stream_config::set_items(const std::vector<item_pointer_t> &item_ids)
×
43
{
44
    this->item_ids = item_ids;
×
45
    return *this;
×
46
}
47

48
chunk_stream_config &chunk_stream_config::set_max_chunks(std::size_t max_chunks)
×
49
{
50
    if (max_chunks == 0)
×
51
        throw std::invalid_argument("max_chunks cannot be 0");
×
52
    this->max_chunks = max_chunks;
×
53
    return *this;
×
54
}
55

56
chunk_stream_config &chunk_stream_config::set_place(chunk_place_function place)
×
57
{
58
    this->place = std::move(place);
×
59
    return *this;
×
60
}
61

62
chunk_stream_config &chunk_stream_config::set_allocate(chunk_allocate_function allocate)
×
63
{
64
    this->allocate = std::move(allocate);
×
65
    return *this;
×
66
}
67

68
chunk_stream_config &chunk_stream_config::set_ready(chunk_ready_function ready)
×
69
{
70
    this->ready = std::move(ready);
×
71
    return *this;
×
72
}
73

74
chunk_stream_config &chunk_stream_config::enable_packet_presence(std::size_t payload_size)
×
75
{
76
    if (payload_size == 0)
×
77
        throw std::invalid_argument("payload_size must not be zero");
×
78
    this->packet_presence_payload_size = payload_size;
×
79
    return *this;
×
80
}
81

82
chunk_stream_config &chunk_stream_config::disable_packet_presence()
×
83
{
84
    packet_presence_payload_size = 0;
×
85
    return *this;
×
86
}
87

88
chunk_stream_config &chunk_stream_config::set_max_heap_extra(std::size_t max_heap_extra)
×
89
{
90
    this->max_heap_extra = max_heap_extra;
×
91
    return *this;
×
92
}
93

94

95
namespace detail
96
{
97

98
// Round a size up to the next multiple of align
99
static std::size_t round_up(std::size_t size, std::size_t align)
×
100
{
101
    return (size + align - 1) / align * align;
×
102
}
103

104
chunk_window::chunk_window(std::size_t max_chunks) : chunks(max_chunks) {}
×
105

106
chunk_stream_state_base::chunk_stream_state_base(
×
107
    const stream_config &config, const chunk_stream_config &chunk_config)
×
108
    : orig_memcpy(config.get_memcpy()),
×
109
    chunk_config(chunk_config),
×
110
    stream_id(config.get_stream_id()),
×
111
    base_stat_index(config.next_stat_index()),
×
112
    chunks(chunk_config.get_max_chunks())
×
113
{
114
    if (!this->chunk_config.get_place())
×
115
        throw std::invalid_argument("chunk_config.place is not set");
×
116

117
    /* Compute the memory required for place_data_storage. The layout is
118
     * - chunk_place_data
119
     * - item pointers (with s_item_pointer_t alignment)
120
     * - extra (with max_align_t alignment)
121
     */
122
    constexpr std::size_t max_align = alignof(std::max_align_t);
×
123
    const std::size_t n_items = chunk_config.get_items().size();
×
124
    std::size_t space = sizeof(chunk_place_data);
×
125
    space = round_up(space, alignof(s_item_pointer_t));
×
126
    std::size_t item_offset = space;
×
127
    space += n_items * sizeof(s_item_pointer_t);
×
128
    space = round_up(space, max_align);
×
129
    std::size_t extra_offset = space;
×
130
    space += chunk_config.get_max_heap_extra();
×
131
    /* operator new is required to return a pointer suitably aligned for an
132
     * object of the requested size. Round up to a multiple of max_align so
133
     * that the library cannot infer a smaller alignment.
134
     */
135
    space = round_up(space, max_align);
×
136

137
    /* Allocate the memory, and use placement new to initialise it. For the
138
     * arrays the placement new shouldn't actually run any code, but it
139
     * officially starts the lifetime of the object in terms of the C++ spec.
140
     * It's not clear whether it's actually portable in C++17: implementations
141
     * used to be allowed to add overhead for array new, even when using
142
     * placement new.  CWG 2382 disallowed that for placement new, and in
143
     * practice it sounds like no compiler ever added overhead for scalar types
144
     * (MSVC used to do it for polymorphic classes).
145
     *
146
     * In C++20 it's probably not necessary to use the placement new due to
147
     * the rules about implicit-lifetime types, although the examples imply
148
     * it is necessary to use std::launder so it wouldn't be any simpler.
149
     */
150
    unsigned char *ptr = reinterpret_cast<unsigned char *>(operator new(space));
×
151
    place_data = new(ptr) chunk_place_data();
×
152
    if (n_items > 0)
×
153
        place_data->items = new(ptr + item_offset) s_item_pointer_t[n_items];
×
154
    else
155
        place_data->items = nullptr;
×
156
    if (chunk_config.get_max_heap_extra() > 0)
×
157
        place_data->extra = new(ptr + extra_offset) std::uint8_t[chunk_config.get_max_heap_extra()];
×
158
    else
159
        place_data->extra = nullptr;
×
160
    place_data_storage.reset(ptr);
×
161
}
×
162

163
void chunk_stream_state_base::free_place_data::operator()(unsigned char *ptr) const
×
164
{
165
    // It's not totally clear whether std::launder is required here, but
166
    // better to be safe.
167
    auto *place_data = std::launder(reinterpret_cast<chunk_place_data *>(ptr));
×
168
    place_data->~chunk_place_data();
×
169
    operator delete(ptr);
×
170
}
×
171

172
void chunk_stream_state_base::packet_memcpy(
×
173
    const memory_allocator::pointer &allocation,
174
    const packet_header &packet) const
175
{
176
    const heap_metadata &metadata = *get_heap_metadata(allocation);
×
177
    if (chunk_too_old(metadata.chunk_id))
×
178
    {
179
        // The packet corresponds to a chunk that has already been aged out
180
        // TODO: increment a counter / log a warning
181
        return;
×
182
    }
183
    orig_memcpy(allocation, packet);
×
184
    std::size_t payload_divide = chunk_config.get_packet_presence_payload_size();
×
185
    if (payload_divide != 0)
×
186
    {
187
        // TODO: could possibly optimise this using something like libdivide
188
        std::size_t index = metadata.heap_index + packet.payload_offset / payload_divide;
×
189
        assert(index < metadata.chunk_ptr->present_size);
×
190
        metadata.chunk_ptr->present[index] = true;
×
191
    }
192
}
193

194
void chunk_stream_state_base::do_heap_ready(live_heap &&lh)
×
195
{
196
    if (lh.is_complete())
×
197
    {
198
        heap h(std::move(lh));
×
199
        auto metadata = get_heap_metadata(h.get_payload());
×
200
        // We need to check the chunk_id because the chunk might have been aged
201
        // out while the heap was incomplete.
202
        if (metadata && metadata->chunk_ptr
×
203
            && !chunk_too_old(metadata->chunk_id)
×
204
            && !get_chunk_config().get_packet_presence_payload_size())
×
205
        {
206
            assert(metadata->heap_index < metadata->chunk_ptr->present_size);
×
207
            metadata->chunk_ptr->present[metadata->heap_index] = true;
×
208
        }
209
    }
×
210
}
×
211

212
const chunk_stream_state_base::heap_metadata *chunk_stream_state_base::get_heap_metadata(
×
213
    const memory_allocator::pointer &ptr)
214
{
215
    return ptr.get_deleter().target<heap_metadata>();
×
216
}
217

218
chunk_manager_simple::chunk_manager_simple(const chunk_stream_config &chunk_config)
×
219
{
220
    if (!chunk_config.get_allocate())
×
221
        throw std::invalid_argument("chunk_config.allocate is not set");
×
222
    if (!chunk_config.get_ready())
×
223
        throw std::invalid_argument("chunk_config.ready is not set");
×
224
}
×
225

226
std::uint64_t *chunk_manager_simple::get_batch_stats(chunk_stream_state<chunk_manager_simple> &state) const
×
227
{
228
    return static_cast<chunk_stream *>(&state)->batch_stats.data();
×
229
}
230

231
chunk *chunk_manager_simple::allocate_chunk(chunk_stream_state<chunk_manager_simple> &state, std::int64_t chunk_id)
×
232
{
233
    const auto &allocate = state.chunk_config.get_allocate();
×
234
    std::unique_ptr<chunk> owned = allocate(chunk_id, state.place_data->batch_stats);
×
235
    return owned.release();  // ready_chunk will re-take ownership
×
236
}
×
237

238
void chunk_manager_simple::ready_chunk(chunk_stream_state<chunk_manager_simple> &state, chunk *c)
×
239
{
240
    std::unique_ptr<chunk> owned(c);
×
241
    state.chunk_config.get_ready()(std::move(owned), get_batch_stats(state));
×
242
}
×
243

244
template class chunk_stream_state<chunk_manager_simple>;
245
template class chunk_stream_allocator<chunk_manager_simple>;
246

247
} // namespace detail
248

249
chunk_stream::chunk_stream(
×
250
    io_service_ref io_service,
251
    const stream_config &config,
252
    const chunk_stream_config &chunk_config)
×
253
    : chunk_stream_state(config, chunk_config, detail::chunk_manager_simple(chunk_config)),
254
    stream(std::move(io_service), adjust_config(config))
×
255
{
256
}
×
257

258
void chunk_stream::heap_ready(live_heap &&lh)
×
259
{
260
    do_heap_ready(std::move(lh));
×
261
}
×
262

263
void chunk_stream::stop_received()
×
264
{
265
    stream::stop_received();
×
266
    flush_chunks();
×
267
}
×
268

269
void chunk_stream::stop()
×
270
{
271
    {
272
        std::lock_guard<std::mutex> lock(get_queue_mutex());
×
273
        flush_chunks();
×
274
    }
×
275
    stream::stop();
×
276
}
×
277

278
chunk_stream::~chunk_stream()
×
279
{
280
    stop();
×
281
}
×
282

283
} // namespace spead2::recv
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc