• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 16729047925

04 Aug 2025 04:44PM UTC coverage: 78.411% (-0.09%) from 78.504%
16729047925

push

github

bmerry
Wire set_pop_if_full up to Python

4 of 6 new or added lines in 1 file covered. (66.67%)

75 existing lines in 6 files now uncovered.

5586 of 7124 relevant lines covered (78.41%)

90538.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.97
/src/recv_chunk_stream.cpp
1
/* Copyright 2021-2023, 2025 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <vector>
22
#include <memory>
23
#include <cstddef>
24
#include <cassert>
25
#include <stdexcept>
26
#include <functional>
27
#include <algorithm>
28
#include <utility>
29
#include <new>
30
#include <spead2/common_defines.h>
31
#include <spead2/common_memory_allocator.h>
32
#include <spead2/recv_packet.h>
33
#include <spead2/recv_live_heap.h>
34
#include <spead2/recv_heap.h>
35
#include <spead2/recv_utils.h>
36
#include <spead2/recv_stream.h>
37
#include <spead2/recv_chunk_stream.h>
38

39
namespace spead2::recv
40
{
41

42
chunk_stream_config &chunk_stream_config::set_items(const std::vector<item_pointer_t> &item_ids)
28✔
43
{
44
    this->item_ids = item_ids;
28✔
45
    return *this;
28✔
46
}
47

48
chunk_stream_config &chunk_stream_config::set_max_chunks(std::size_t max_chunks)
28✔
49
{
50
    if (max_chunks == 0)
28✔
51
        throw std::invalid_argument("max_chunks cannot be 0");
1✔
52
    this->max_chunks = max_chunks;
27✔
53
    return *this;
27✔
54
}
55

56
chunk_stream_config &chunk_stream_config::set_place(chunk_place_function place)
70✔
57
{
58
    this->place = std::move(place);
70✔
59
    return *this;
70✔
60
}
61

62
chunk_stream_config &chunk_stream_config::set_allocate(chunk_allocate_function allocate)
13✔
63
{
64
    this->allocate = std::move(allocate);
13✔
65
    return *this;
13✔
66
}
67

68
chunk_stream_config &chunk_stream_config::set_ready(chunk_ready_function ready)
13✔
69
{
70
    this->ready = std::move(ready);
13✔
71
    return *this;
13✔
72
}
73

74
chunk_stream_config &chunk_stream_config::enable_packet_presence(std::size_t payload_size)
1✔
75
{
76
    if (payload_size == 0)
1✔
77
        throw std::invalid_argument("payload_size must not be zero");
×
78
    this->packet_presence_payload_size = payload_size;
1✔
79
    return *this;
1✔
80
}
81

82
chunk_stream_config &chunk_stream_config::disable_packet_presence()
×
83
{
84
    packet_presence_payload_size = 0;
×
85
    return *this;
×
86
}
87

88
chunk_stream_config &chunk_stream_config::set_max_heap_extra(std::size_t max_heap_extra)
10✔
89
{
90
    this->max_heap_extra = max_heap_extra;
10✔
91
    return *this;
10✔
92
}
93

UNCOV
94
chunk_stream_config &chunk_stream_config::set_pop_if_full(bool pop_if_full)
×
95
{
UNCOV
96
    this->pop_if_full = pop_if_full;
×
UNCOV
97
    return *this;
×
98
}
99

100

101
namespace detail
102
{
103

104
// Round a size up to the next multiple of align
105
static std::size_t round_up(std::size_t size, std::size_t align)
318✔
106
{
107
    return (size + align - 1) / align * align;
318✔
108
}
109

110
chunk_window::chunk_window(std::size_t max_chunks) : chunks(max_chunks) {}
132✔
111

112
chunk_stream_state_base::chunk_stream_state_base(
107✔
113
    const stream_config &config, const chunk_stream_config &chunk_config)
107✔
114
    : orig_memcpy(config.get_memcpy()),
107✔
115
    chunk_config(chunk_config),
107✔
116
    stream_id(config.get_stream_id()),
107✔
117
    base_stat_index(config.next_stat_index()),
107✔
118
    chunks(chunk_config.get_max_chunks())
107✔
119
{
120
    if (!this->chunk_config.get_place())
107✔
121
        throw std::invalid_argument("chunk_config.place is not set");
1✔
122

123
    /* Compute the memory required for place_data_storage. The layout is
124
     * - chunk_place_data
125
     * - item pointers (with s_item_pointer_t alignment)
126
     * - extra (with max_align_t alignment)
127
     */
128
    constexpr std::size_t max_align = alignof(std::max_align_t);
106✔
129
    const std::size_t n_items = chunk_config.get_items().size();
106✔
130
    std::size_t space = sizeof(chunk_place_data);
106✔
131
    space = round_up(space, alignof(s_item_pointer_t));
106✔
132
    std::size_t item_offset = space;
106✔
133
    space += n_items * sizeof(s_item_pointer_t);
106✔
134
    space = round_up(space, max_align);
106✔
135
    std::size_t extra_offset = space;
106✔
136
    space += chunk_config.get_max_heap_extra();
106✔
137
    /* operator new is required to return a pointer suitably aligned for an
138
     * object of the requested size. Round up to a multiple of max_align so
139
     * that the library cannot infer a smaller alignment.
140
     */
141
    space = round_up(space, max_align);
106✔
142

143
    /* Allocate the memory, and use placement new to initialise it. For the
144
     * arrays the placement new shouldn't actually run any code, but it
145
     * officially starts the lifetime of the object in terms of the C++ spec.
146
     * It's not clear whether it's actually portable in C++17: implementations
147
     * used to be allowed to add overhead for array new, even when using
148
     * placement new.  CWG 2382 disallowed that for placement new, and in
149
     * practice it sounds like no compiler ever added overhead for scalar types
150
     * (MSVC used to do it for polymorphic classes).
151
     *
152
     * In C++20 it's probably not necessary to use the placement new due to
153
     * the rules about implicit-lifetime types, although the examples imply
154
     * it is necessary to use std::launder so it wouldn't be any simpler.
155
     */
156
    unsigned char *ptr = reinterpret_cast<unsigned char *>(operator new(space));
106✔
157
    place_data = new(ptr) chunk_place_data();
106✔
158
    if (n_items > 0)
106✔
159
        place_data->items = new(ptr + item_offset) s_item_pointer_t[n_items];
66✔
160
    else
161
        place_data->items = nullptr;
40✔
162
    if (chunk_config.get_max_heap_extra() > 0)
106✔
163
        place_data->extra = new(ptr + extra_offset) std::uint8_t[chunk_config.get_max_heap_extra()];
10✔
164
    else
165
        place_data->extra = nullptr;
96✔
166
    place_data_storage.reset(ptr);
106✔
167
}
110✔
168

169
void chunk_stream_state_base::free_place_data::operator()(unsigned char *ptr) const
106✔
170
{
171
    // It's not totally clear whether std::launder is required here, but
172
    // better to be safe.
173
    auto *place_data = std::launder(reinterpret_cast<chunk_place_data *>(ptr));
106✔
174
    place_data->~chunk_place_data();
106✔
175
    operator delete(ptr);
106✔
176
}
106✔
177

178
void chunk_stream_state_base::packet_memcpy(
2,829✔
179
    const memory_allocator::pointer &allocation,
180
    const packet_header &packet) const
181
{
182
    const heap_metadata &metadata = *get_heap_metadata(allocation);
2,829✔
183
    if (chunk_too_old(metadata.chunk_id))
2,829✔
184
    {
185
        // The packet corresponds to a chunk that has already been aged out
186
        // TODO: increment a counter / log a warning
187
        return;
389✔
188
    }
189
    orig_memcpy(allocation, packet);
2,440✔
190
    std::size_t payload_divide = chunk_config.get_packet_presence_payload_size();
2,440✔
191
    if (payload_divide != 0)
2,440✔
192
    {
193
        // TODO: could possibly optimise this using something like libdivide
194
        std::size_t index = metadata.heap_index + packet.payload_offset / payload_divide;
3✔
195
        assert(index < metadata.chunk_ptr->present_size);
3✔
196
        metadata.chunk_ptr->present[index] = true;
3✔
197
    }
198
}
199

200
void chunk_stream_state_base::do_heap_ready(live_heap &&lh)
2,819✔
201
{
202
    if (lh.is_complete())
2,819✔
203
    {
204
        heap h(std::move(lh));
2,818✔
205
        auto metadata = get_heap_metadata(h.get_payload());
2,818✔
206
        // We need to check the chunk_id because the chunk might have been aged
207
        // out while the heap was incomplete.
208
        if (metadata && metadata->chunk_ptr
2,818✔
209
            && !chunk_too_old(metadata->chunk_id)
2,438✔
210
            && !get_chunk_config().get_packet_presence_payload_size())
5,636✔
211
        {
212
            assert(metadata->heap_index < metadata->chunk_ptr->present_size);
2,436✔
213
            metadata->chunk_ptr->present[metadata->heap_index] = true;
2,436✔
214
        }
215
    }
2,818✔
216
}
2,819✔
217

218
const chunk_stream_state_base::heap_metadata *chunk_stream_state_base::get_heap_metadata(
5,647✔
219
    const memory_allocator::pointer &ptr)
220
{
221
    return ptr.get_deleter().target<heap_metadata>();
5,647✔
222
}
223

224
chunk_manager_simple::chunk_manager_simple(const chunk_stream_config &chunk_config)
13✔
225
{
226
    if (!chunk_config.get_allocate())
13✔
UNCOV
227
        throw std::invalid_argument("chunk_config.allocate is not set");
×
228
    if (!chunk_config.get_ready())
13✔
UNCOV
229
        throw std::invalid_argument("chunk_config.ready is not set");
×
230
}
13✔
231

232
std::uint64_t *chunk_manager_simple::get_batch_stats(chunk_stream_state<chunk_manager_simple> &state) const
738✔
233
{
234
    return static_cast<chunk_stream *>(&state)->batch_stats.data();
738✔
235
}
236

237
chunk *chunk_manager_simple::allocate_chunk(chunk_stream_state<chunk_manager_simple> &state, std::int64_t chunk_id)
91✔
238
{
239
    const auto &allocate = state.chunk_config.get_allocate();
91✔
240
    std::unique_ptr<chunk> owned = allocate(chunk_id, state.place_data->batch_stats);
91✔
241
    return owned.release();  // ready_chunk will re-take ownership
182✔
242
}
91✔
243

244
void chunk_manager_simple::ready_chunk(chunk_stream_state<chunk_manager_simple> &state, chunk *c)
76✔
245
{
246
    std::unique_ptr<chunk> owned(c);
76✔
247
    state.chunk_config.get_ready()(std::move(owned), get_batch_stats(state));
76✔
248
}
76✔
249

250
template class chunk_stream_state<chunk_manager_simple>;
251
template class chunk_stream_allocator<chunk_manager_simple>;
252

253
} // namespace detail
254

255
chunk_stream::chunk_stream(
13✔
256
    io_context_ref io_context,
257
    const stream_config &config,
258
    const chunk_stream_config &chunk_config)
13✔
259
    : chunk_stream_state(config, chunk_config, detail::chunk_manager_simple(chunk_config)),
260
    stream(std::move(io_context), adjust_config(config))
13✔
261
{
262
}
12✔
263

264
void chunk_stream::heap_ready(live_heap &&lh)
654✔
265
{
266
    do_heap_ready(std::move(lh));
654✔
267
}
654✔
268

269
void chunk_stream::stop_received()
12✔
270
{
271
    stream::stop_received();
12✔
272
    flush_chunks();
12✔
273
}
12✔
274

275
void chunk_stream::stop()
35✔
276
{
277
    {
278
        std::lock_guard<std::mutex> lock(get_queue_mutex());
35✔
279
        flush_chunks();
35✔
280
    }
35✔
281
    stream::stop();
35✔
282
}
35✔
283

284
chunk_stream::~chunk_stream()
12✔
285
{
286
    stop();
12✔
287
}
12✔
288

289
} // namespace spead2::recv
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc