• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 5462348362

pending completion
5462348362

Pull #219

github

web-flow
Merge fbf2f7ae2 into 99464cfdf
Pull Request #219: Add chunk stream groups

589 of 589 new or added lines in 16 files covered. (100.0%)

5397 of 7206 relevant lines covered (74.9%)

52888.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.12
/src/recv_chunk_stream.cpp
1
/* Copyright 2021-2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <vector>
22
#include <memory>
23
#include <cstddef>
24
#include <cassert>
25
#include <stdexcept>
26
#include <functional>
27
#include <algorithm>
28
#include <utility>
29
#include <spead2/common_defines.h>
30
#include <spead2/common_memory_allocator.h>
31
#include <spead2/recv_packet.h>
32
#include <spead2/recv_live_heap.h>
33
#include <spead2/recv_heap.h>
34
#include <spead2/recv_utils.h>
35
#include <spead2/recv_stream.h>
36
#include <spead2/recv_chunk_stream.h>
37

38
namespace spead2
39
{
40
namespace recv
41
{
42

43
constexpr std::size_t chunk_stream_config::default_max_chunks;
44

45
chunk_stream_config &chunk_stream_config::set_items(const std::vector<item_pointer_t> &item_ids)
26✔
46
{
47
    this->item_ids = item_ids;
26✔
48
    return *this;
26✔
49
}
50

51
chunk_stream_config &chunk_stream_config::set_max_chunks(std::size_t max_chunks)
26✔
52
{
53
    if (max_chunks == 0)
26✔
54
        throw std::invalid_argument("max_chunks cannot be 0");
1✔
55
    this->max_chunks = max_chunks;
25✔
56
    return *this;
25✔
57
}
58

59
chunk_stream_config &chunk_stream_config::set_place(chunk_place_function place)
68✔
60
{
61
    this->place = std::move(place);
68✔
62
    return *this;
68✔
63
}
64

65
chunk_stream_config &chunk_stream_config::set_allocate(chunk_allocate_function allocate)
13✔
66
{
67
    this->allocate = std::move(allocate);
13✔
68
    return *this;
13✔
69
}
70

71
chunk_stream_config &chunk_stream_config::set_ready(chunk_ready_function ready)
13✔
72
{
73
    this->ready = std::move(ready);
13✔
74
    return *this;
13✔
75
}
76

77
chunk_stream_config &chunk_stream_config::enable_packet_presence(std::size_t payload_size)
1✔
78
{
79
    if (payload_size == 0)
1✔
80
        throw std::invalid_argument("payload_size must not be zero");
×
81
    this->packet_presence_payload_size = payload_size;
1✔
82
    return *this;
1✔
83
}
84

85
chunk_stream_config &chunk_stream_config::disable_packet_presence()
×
86
{
87
    packet_presence_payload_size = 0;
×
88
    return *this;
×
89
}
90

91
chunk_stream_config &chunk_stream_config::set_max_heap_extra(std::size_t max_heap_extra)
10✔
92
{
93
    this->max_heap_extra = max_heap_extra;
10✔
94
    return *this;
10✔
95
}
96

97

98
namespace detail
99
{
100

101
// Round a size up to the next multiple of align
102
static std::size_t round_up(std::size_t size, std::size_t align)
312✔
103
{
104
    return (size + align - 1) / align * align;
312✔
105
}
106

107
chunk_window::chunk_window(std::size_t max_chunks) : chunks(max_chunks) {}
129✔
108

109
chunk_stream_state_base::chunk_stream_state_base(
105✔
110
    const stream_config &config, const chunk_stream_config &chunk_config)
105✔
111
    : orig_memcpy(config.get_memcpy()),
105✔
112
    chunk_config(chunk_config),
105✔
113
    stream_id(config.get_stream_id()),
105✔
114
    base_stat_index(config.next_stat_index()),
105✔
115
    chunks(chunk_config.get_max_chunks())
105✔
116
{
117
    if (!this->chunk_config.get_place())
105✔
118
        throw std::invalid_argument("chunk_config.place is not set");
1✔
119

120
    /* Compute the memory required for place_data_storage. The layout is
121
     * - chunk_place_data
122
     * - item pointers (with s_item_pointer_t alignment)
123
     * - extra (with max_align_t alignment)
124
     */
125
    constexpr std::size_t max_align = alignof(std::max_align_t);
104✔
126
    const std::size_t n_items = chunk_config.get_items().size();
104✔
127
    std::size_t space = sizeof(chunk_place_data);
104✔
128
    space = round_up(space, alignof(s_item_pointer_t));
104✔
129
    std::size_t item_offset = space;
104✔
130
    space += n_items * sizeof(s_item_pointer_t);
104✔
131
    space = round_up(space, max_align);
104✔
132
    std::size_t extra_offset = space;
104✔
133
    space += chunk_config.get_max_heap_extra();
104✔
134
    /* operator new is required to return a pointer suitably aligned for an
135
     * object of the requested size. Round up to a multiple of max_align so
136
     * that the library cannot infer a smaller alignment.
137
     */
138
    space = round_up(space, max_align);
104✔
139

140
    /* Allocate the memory, and use placement new to initialise it. For the
141
     * arrays the placement new shouldn't actually run any code, but it
142
     * officially starts the lifetime of the object in terms of the C++ spec.
143
     * It's not if it's actually portable in C++11: implementations used to be
144
     * allowed to add overhead for array new, even when using placement new.
145
     * CWG 2382 disallowed that for placement new, and in practice it sounds
146
     * like no compiler ever added overhead for scalar types (MSVC used to do
147
     * it for polymorphic classes).
148
     *
149
     * In C++20 it's probably not necessary to use the placement new due to
150
     * the rules about implicit-lifetime types, although the examples imply
151
     * it is necessary to use std::launder so it wouldn't be any simpler.
152
     */
153
    unsigned char *ptr = reinterpret_cast<unsigned char *>(operator new(space));
104✔
154
    place_data = new(ptr) chunk_place_data();
104✔
155
    if (n_items > 0)
104✔
156
        place_data->items = new(ptr + item_offset) s_item_pointer_t[n_items];
64✔
157
    else
158
        place_data->items = nullptr;
40✔
159
    if (chunk_config.get_max_heap_extra() > 0)
104✔
160
        place_data->extra = new(ptr + extra_offset) std::uint8_t[chunk_config.get_max_heap_extra()];
10✔
161
    else
162
        place_data->extra = nullptr;
94✔
163
    place_data_storage.reset(ptr);
104✔
164
}
108✔
165

166
void chunk_stream_state_base::free_place_data::operator()(unsigned char *ptr) const
104✔
167
{
168
    // TODO: should this use std::launder in C++17?
169
    auto *place_data = reinterpret_cast<chunk_place_data *>(ptr);
104✔
170
    place_data->~chunk_place_data();
104✔
171
    operator delete(ptr);
104✔
172
}
104✔
173

174
void chunk_stream_state_base::packet_memcpy(
3,713✔
175
    const memory_allocator::pointer &allocation,
176
    const packet_header &packet) const
177
{
178
    const heap_metadata &metadata = *get_heap_metadata(allocation);
3,713✔
179
    if (chunk_too_old(metadata.chunk_id))
3,713✔
180
    {
181
        // The packet corresponds to a chunk that has already been aged out
182
        // TODO: increment a counter / log a warning
183
        return;
1,275✔
184
    }
185
    orig_memcpy(allocation, packet);
2,438✔
186
    std::size_t payload_divide = chunk_config.get_packet_presence_payload_size();
2,438✔
187
    if (payload_divide != 0)
2,438✔
188
    {
189
        // TODO: could possibly optimise this using something like libdivide
190
        std::size_t index = metadata.heap_index + packet.payload_offset / payload_divide;
3✔
191
        assert(index < metadata.chunk_ptr->present_size);
192
        metadata.chunk_ptr->present[index] = true;
3✔
193
    }
194
}
195

196
void chunk_stream_state_base::do_heap_ready(live_heap &&lh)
3,703✔
197
{
198
    if (lh.is_complete())
3,703✔
199
    {
200
        heap h(std::move(lh));
3,702✔
201
        auto metadata = get_heap_metadata(h.get_payload());
3,698✔
202
        // We need to check the chunk_id because the chunk might have been aged
203
        // out while the heap was incomplete.
204
        if (metadata && metadata->chunk_ptr
3,698✔
205
            && !chunk_too_old(metadata->chunk_id)
2,432✔
206
            && !get_chunk_config().get_packet_presence_payload_size())
7,396✔
207
        {
208
            assert(metadata->heap_index < metadata->chunk_ptr->present_size);
209
            metadata->chunk_ptr->present[metadata->heap_index] = true;
2,430✔
210
        }
211
    }
3,698✔
212
}
3,702✔
213

214
const chunk_stream_state_base::heap_metadata *chunk_stream_state_base::get_heap_metadata(
7,411✔
215
    const memory_allocator::pointer &ptr)
216
{
217
    return ptr.get_deleter().target<heap_metadata>();
7,411✔
218
}
219

220
chunk_manager_simple::chunk_manager_simple(const chunk_stream_config &chunk_config)
13✔
221
{
222
    if (!chunk_config.get_allocate())
13✔
223
        throw std::invalid_argument("chunk_config.allocate is not set");
×
224
    if (!chunk_config.get_ready())
13✔
225
        throw std::invalid_argument("chunk_config.ready is not set");
×
226
}
13✔
227

228
std::uint64_t *chunk_manager_simple::get_batch_stats(chunk_stream_state<chunk_manager_simple> &state) const
1,583✔
229
{
230
    return static_cast<chunk_stream *>(&state)->batch_stats.data();
1,583✔
231
}
232

233
chunk *chunk_manager_simple::allocate_chunk(chunk_stream_state<chunk_manager_simple> &state, std::int64_t chunk_id)
173✔
234
{
235
    const auto &allocate = state.chunk_config.get_allocate();
173✔
236
    std::unique_ptr<chunk> owned = allocate(chunk_id, state.place_data->batch_stats);
173✔
237
    return owned.release();  // ready_chunk will re-take ownership
346✔
238
}
173✔
239

240
void chunk_manager_simple::ready_chunk(chunk_stream_state<chunk_manager_simple> &state, chunk *c)
76✔
241
{
242
    std::unique_ptr<chunk> owned(c);
76✔
243
    state.chunk_config.get_ready()(std::move(owned), get_batch_stats(state));
76✔
244
}
76✔
245

246
template class chunk_stream_state<chunk_manager_simple>;
247
template class chunk_stream_allocator<chunk_manager_simple>;
248

249
} // namespace detail
250

251
chunk_stream::chunk_stream(
13✔
252
    io_service_ref io_service,
253
    const stream_config &config,
254
    const chunk_stream_config &chunk_config)
13✔
255
    : chunk_stream_state(config, chunk_config, detail::chunk_manager_simple(chunk_config)),
256
    stream(std::move(io_service), adjust_config(config))
13✔
257
{
258
}
12✔
259

260
void chunk_stream::heap_ready(live_heap &&lh)
1,499✔
261
{
262
    do_heap_ready(std::move(lh));
1,499✔
263
}
1,499✔
264

265
void chunk_stream::stop_received()
12✔
266
{
267
    stream::stop_received();
12✔
268
    flush_chunks();
12✔
269
}
12✔
270

271
void chunk_stream::stop()
35✔
272
{
273
    {
274
        std::lock_guard<std::mutex> lock(get_queue_mutex());
35✔
275
        flush_chunks();
35✔
276
    }
35✔
277
    stream::stop();
35✔
278
}
35✔
279

280
chunk_stream::~chunk_stream()
12✔
281
{
282
    stop();
12✔
283
}
12✔
284

285
} // namespace recv
286
} // namespace spead2
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc