• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 16780666019

06 Aug 2025 02:51PM UTC coverage: 78.72% (+0.3%) from 78.411%
16780666019

push

github

bmerry
Add discarded_chunks statistic for pop_if_full feature

This required some major rework of the way ready callbacks are handled.
It turned out the previous approach didn't carry enough information for
this to be workable for chunk stream groups, because the batch_stats
pointer is only useful if you know the statistic to slot mapping, and
that is specific to the stream rather than the group (even if it will
typically be uniform across the streams).

This thus required introducing a new variant on the chunk_ready_function
type (group_chunk_ready_function) and an awkward backwards compatibility
mechanism.

The same thing probably ought to be done for chunk_allocation_function,
but I've only partly gotten around to that.

26 of 32 new or added lines in 3 files covered. (81.25%)

7 existing lines in 2 files now uncovered.

5623 of 7143 relevant lines covered (78.72%)

90321.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.0
/src/recv_chunk_stream_group.cpp
1
/* Copyright 2023, 2025 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <cstddef>
22
#include <vector>
23
#include <mutex>
24
#include <algorithm>
25
#include <limits>
26
#include <spead2/recv_chunk_stream.h>
27
#include <spead2/recv_chunk_stream_group.h>
28

29
namespace spead2::recv
30
{
31

32
chunk_stream_group_config &chunk_stream_group_config::set_max_chunks(std::size_t max_chunks)
17✔
33
{
34
    if (max_chunks == 0)
17✔
35
        throw std::invalid_argument("max_chunks cannot be 0");
1✔
36
    this->max_chunks = max_chunks;
16✔
37
    return *this;
16✔
38
}
39

40
chunk_stream_group_config &chunk_stream_group_config::set_eviction_mode(eviction_mode eviction_mode_)
16✔
41
{
42
    this->eviction_mode_ = eviction_mode_;
16✔
43
    return *this;
16✔
44
}
45

46
chunk_stream_group_config &chunk_stream_group_config::set_allocate(chunk_allocate_function allocate)
25✔
47
{
48
    this->allocate = std::move(allocate);
25✔
49
    return *this;
25✔
50
}
51

UNCOV
52
chunk_stream_group_config &chunk_stream_group_config::set_ready(chunk_ready_function ready)
×
53
{
NEW
54
    this->group_ready = [ready](std::unique_ptr<chunk> &&c, std::uint64_t *batch_stats, chunk_stream_group_member &)
×
55
    {
NEW
56
        ready(std::move(c), batch_stats);
×
NEW
57
    };
×
UNCOV
58
    this->ready = std::move(ready);
×
UNCOV
59
    return *this;
×
60
}
61

62
chunk_stream_group_config &chunk_stream_group_config::set_group_ready(group_chunk_ready_function ready)
25✔
63
{
64
    this->ready = nullptr;
25✔
65
    this->group_ready = ready;
25✔
66
    return *this;
25✔
67
}
68

UNCOV
69
chunk_stream_group_config &chunk_stream_group_config::set_pop_if_full(bool pop_if_full)
×
70
{
71
    this->pop_if_full = pop_if_full;
×
72
    return *this;
×
73
}
74

75
namespace detail
76
{
77

78
chunk_manager_group::chunk_manager_group(chunk_stream_group &group)
94✔
79
    : group(group)
94✔
80
{
81
}
94✔
82

83
std::uint64_t *chunk_manager_group::get_batch_stats(chunk_stream_state<chunk_manager_group> &state) const
2,141✔
84
{
85
    return static_cast<chunk_stream_group_member *>(&state)->batch_stats.data();
2,141✔
86
}
87

88
chunk *chunk_manager_group::allocate_chunk(
892✔
89
    chunk_stream_state<chunk_manager_group> &state, std::int64_t chunk_id)
90
{
91
    return group.get_chunk(
1,784✔
92
        chunk_id, state.stream_id, state.place_data->batch_stats, static_cast<chunk_stream_group_member &>(state)
892✔
93
    );
892✔
94
}
95

96
void chunk_manager_group::head_updated(
541✔
97
    chunk_stream_state<chunk_manager_group> &state, std::uint64_t head_chunk)
98
{
99
    group.stream_head_updated(static_cast<chunk_stream_group_member &>(state), head_chunk);
541✔
100
}
541✔
101

102
} // namespace detail
103

104
chunk_stream_group::chunk_stream_group(const chunk_stream_group_config &config)
25✔
105
    : config(config), chunks(config.get_max_chunks())
25✔
106
{
107
}
25✔
108

109
chunk_stream_group::~chunk_stream_group()
50✔
110
{
111
    stop();
25✔
112
}
25✔
113

114
chunk_stream_group::iterator chunk_stream_group::begin() noexcept
×
115
{
116
    return iterator(streams.begin());
×
117
}
118

119
chunk_stream_group::iterator chunk_stream_group::end() noexcept
×
120
{
121
    return iterator(streams.end());
×
122
}
123

124
chunk_stream_group::const_iterator chunk_stream_group::begin() const noexcept
×
125
{
126
    return const_iterator(streams.begin());
×
127
}
128

129
chunk_stream_group::const_iterator chunk_stream_group::end() const noexcept
×
130
{
131
    return const_iterator(streams.end());
×
132
}
133

134
chunk_stream_group::const_iterator chunk_stream_group::cbegin() const noexcept
×
135
{
136
    return const_iterator(streams.begin());
×
137
}
138

139
chunk_stream_group::const_iterator chunk_stream_group::cend() const noexcept
×
140
{
141
    return const_iterator(streams.end());
×
142
}
143

144
chunk_stream_group_member &chunk_stream_group::emplace_back(
×
145
    io_context_ref io_context,
146
    const stream_config &config,
147
    const chunk_stream_config &chunk_config)
148
{
149
    return emplace_back<chunk_stream_group_member>(std::move(io_context), config, chunk_config);
×
150
}
151

152
void chunk_stream_group::stop()
67✔
153
{
154
    /* The mutex is not held while stopping streams, so that callbacks
155
     * triggered by stopping the streams can take the lock if necessary.
156
     *
157
     * It's safe to iterate streams without the mutex because this function
158
     * is called by the user, so a simultaneous call to emplace_back would
159
     * violate the requirement that the user doesn't call the API from more
160
     * than one thread at a time.
161
     */
162
    if (config.get_eviction_mode() == chunk_stream_group_config::eviction_mode::LOSSLESS)
67✔
163
    {
164
        /* Stopping a stream that is currently waiting in get_chunk could
165
         * deadlock. In lossy mode, there are already provisions to guarantee
166
         * forward progress, but in lossless mode we need some help.
167
         */
168
        for (const auto &stream : streams)
122✔
169
        {
170
            stream->async_flush_until(std::numeric_limits<std::uint64_t>::max());
96✔
171
        }
172
    }
173
    for (const auto &stream : streams)
319✔
174
        stream->stop1();
252✔
175
}
67✔
176

177
chunk *chunk_stream_group::get_chunk(std::uint64_t chunk_id, std::uintptr_t stream_id, std::uint64_t *batch_stats, chunk_stream_group_member &)
892✔
178
{
179
    std::unique_lock<std::mutex> lock(mutex);
892✔
180
    /* Streams should not be requesting chunks older than their heads, and the group
181
     * head is at least as old as any stream head.
182
     */
183
    assert(chunk_id >= chunks.get_head_chunk());
892✔
184
    /* Any chunk old enough be made ready needs to first be released by the
185
     * member streams. To do that, we request all the streams to flush, then
186
     * wait until it is safe, using the condition variable to wake up
187
     * whenever there is forward progress.
188
     *
189
     * Another thread may call get_chunk in the meantime and advance the
190
     * window, so we must be careful not to assume anything about the
191
     * state after a wait.
192
     */
193
    const std::size_t max_chunks = config.get_max_chunks();
892✔
194
    if (chunk_id - chunks.get_head_chunk() >= max_chunks)
892✔
195
    {
196
        std::uint64_t target = chunk_id - (max_chunks - 1);  // first chunk we don't need to flush
359✔
197
        if (config.get_eviction_mode() == chunk_stream_group_config::eviction_mode::LOSSY
359✔
198
            && target > last_flush_until)
359✔
199
        {
200
            for (const auto &s : streams)
490✔
201
                s->async_flush_until(target);
392✔
202
            last_flush_until = target;
98✔
203
        }
204
        while (chunks.get_head_chunk() < target)
718✔
205
        {
206
            ready_condition.wait(lock);
359✔
207
        }
208
    }
209

210
    chunk *c = chunks.get_chunk(
892✔
211
        chunk_id,
212
        stream_id,
213
        [this, batch_stats](std::int64_t id) {
508✔
214
            return config.get_allocate()(id, batch_stats).release();
254✔
215
        },
216
        [](chunk *) {
×
217
            // Should be unreachable, as we've ensured this by waiting above
218
            assert(false);
×
219
        },
220
        [](std::uint64_t) {}  // Don't need notification for head moving
×
221
    );
222
    return c;
892✔
223
}
892✔
224

225
void chunk_stream_group::ready_chunk(chunk *c, chunk_stream_group_member &s)
237✔
226
{
227
    std::uint64_t *batch_stats = s.batch_stats.data();
237✔
228
    std::unique_ptr<chunk> owned(c);
237✔
229
    config.get_group_ready()(std::move(owned), batch_stats, s);
237✔
230
}
237✔
231

232
void chunk_stream_group::stream_head_updated(chunk_stream_group_member &s, std::uint64_t head_chunk)
822✔
233
{
234
    std::lock_guard<std::mutex> lock(mutex);
822✔
235
    std::size_t stream_index = s.group_index;
822✔
236
    std::uint64_t old = head_chunks[stream_index];
822✔
237
    assert(head_chunk > old);  // head_updated should only be called on forward progress
822✔
238
    head_chunks[stream_index] = head_chunk;
822✔
239
    // Update so that our head chunk is min(head_chunks). We can skip the work
240
    // if we weren't previously the oldest.
241
    if (chunks.get_head_chunk() == old)
822✔
242
    {
243
        auto min_head_chunk = *std::min_element(head_chunks.begin(), head_chunks.end());
821✔
244
        chunks.flush_until(
821✔
245
            min_head_chunk,
246
            [this, &s](chunk *c) { ready_chunk(c, s); },
237✔
247
            [this](std::uint64_t) { ready_condition.notify_all(); }
221✔
248
        );
249
    }
250
}
822✔
251

252
chunk_stream_group_member::chunk_stream_group_member(
94✔
253
    chunk_stream_group &group,
254
    std::size_t group_index,
255
    io_context_ref io_context,
256
    const stream_config &config,
257
    const chunk_stream_config &chunk_config)
94✔
258
    : chunk_stream_state(config, chunk_config, detail::chunk_manager_group(group)),
259
    stream(std::move(io_context), adjust_config(config)),
94✔
260
    group(group), group_index(group_index)
188✔
261
{
262
    if (chunk_config.get_max_chunks() > group.config.get_max_chunks())
94✔
263
        throw std::invalid_argument("stream max_chunks must not be larger than group max_chunks");
×
264
}
94✔
265

266
void chunk_stream_group_member::heap_ready(live_heap &&lh)
2,141✔
267
{
268
    do_heap_ready(std::move(lh));
2,141✔
269
}
2,141✔
270

271
void chunk_stream_group_member::async_flush_until(std::uint64_t chunk_id)
488✔
272
{
273
    post([chunk_id](stream_base &s) {
488✔
274
        chunk_stream_group_member &self = static_cast<chunk_stream_group_member &>(s);
391✔
275
        self.chunks.flush_until(
391✔
276
            chunk_id,
277
            [](chunk *) {},
238✔
278
            [&self](std::uint64_t head_chunk) {
281✔
279
                self.group.stream_head_updated(self, head_chunk);
281✔
280
            }
281✔
281
        );
282
    });
391✔
283
}
488✔
284

285
void chunk_stream_group_member::stop1()
252✔
286
{
287
    {
288
        std::lock_guard<std::mutex> lock(get_queue_mutex());
252✔
289
        flush_chunks();
252✔
290
    }
252✔
291
    stream::stop();
252✔
292
}
252✔
293

294
void chunk_stream_group_member::stop_received()
94✔
295
{
296
    stream::stop_received();
94✔
297
    flush_chunks();
94✔
298
    group.stream_stop_received(*this);
94✔
299
}
94✔
300

301
void chunk_stream_group_member::stop()
2✔
302
{
303
    group.stop();
2✔
304
}
2✔
305

306
} // namespace spead2::recv
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc