• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 5462348362

pending completion
5462348362

Pull #219

github

web-flow
Merge fbf2f7ae2 into 99464cfdf
Pull Request #219: Add chunk stream groups

589 of 589 new or added lines in 16 files covered. (100.0%)

5397 of 7206 relevant lines covered (74.9%)

52888.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.17
/src/recv_chunk_stream_group.cpp
1
/* Copyright 2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <cstddef>
22
#include <vector>
23
#include <mutex>
24
#include <algorithm>
25
#include <limits>
26
#include <spead2/recv_chunk_stream.h>
27
#include <spead2/recv_chunk_stream_group.h>
28

29
namespace spead2
30
{
31
namespace recv
32
{
33

34
constexpr std::size_t chunk_stream_group_config::default_max_chunks;
35

36
chunk_stream_group_config &chunk_stream_group_config::set_max_chunks(std::size_t max_chunks)
16✔
37
{
38
    if (max_chunks == 0)
16✔
39
        throw std::invalid_argument("max_chunks cannot be 0");
1✔
40
    this->max_chunks = max_chunks;
15✔
41
    return *this;
15✔
42
}
43

44
chunk_stream_group_config &chunk_stream_group_config::set_eviction_mode(eviction_mode eviction_mode_)
15✔
45
{
46
    this->eviction_mode_ = eviction_mode_;
15✔
47
    return *this;
15✔
48
}
49

50
chunk_stream_group_config &chunk_stream_group_config::set_allocate(chunk_allocate_function allocate)
24✔
51
{
52
    this->allocate = std::move(allocate);
24✔
53
    return *this;
24✔
54
}
55

56
chunk_stream_group_config &chunk_stream_group_config::set_ready(chunk_ready_function ready)
24✔
57
{
58
    this->ready = std::move(ready);
24✔
59
    return *this;
24✔
60
}
61

62
namespace detail
63
{
64

65
chunk_manager_group::chunk_manager_group(chunk_stream_group &group)
92✔
66
    : group(group)
92✔
67
{
68
}
92✔
69

70
std::uint64_t *chunk_manager_group::get_batch_stats(chunk_stream_state<chunk_manager_group> &state) const
2,202✔
71
{
72
    return static_cast<chunk_stream_group_member *>(&state)->batch_stats.data();
2,202✔
73
}
74

75
chunk *chunk_manager_group::allocate_chunk(
904✔
76
    chunk_stream_state<chunk_manager_group> &state, std::int64_t chunk_id)
77
{
78
    return group.get_chunk(chunk_id, state.stream_id, state.place_data->batch_stats);
904✔
79
}
80

81
void chunk_manager_group::head_updated(
542✔
82
    chunk_stream_state<chunk_manager_group> &state, std::uint64_t head_chunk)
83
{
84
    group.stream_head_updated(static_cast<chunk_stream_group_member &>(state), head_chunk);
542✔
85
}
542✔
86

87
} // namespace detail
88

89
chunk_stream_group::chunk_stream_group(const chunk_stream_group_config &config)
24✔
90
    : config(config), chunks(config.get_max_chunks())
24✔
91
{
92
}
24✔
93

94
chunk_stream_group::~chunk_stream_group()
48✔
95
{
96
    stop();
24✔
97
}
24✔
98

99
chunk_stream_group::iterator chunk_stream_group::begin() noexcept
×
100
{
101
    return iterator(streams.begin());
×
102
}
103

104
chunk_stream_group::iterator chunk_stream_group::end() noexcept
×
105
{
106
    return iterator(streams.end());
×
107
}
108

109
chunk_stream_group::const_iterator chunk_stream_group::begin() const noexcept
×
110
{
111
    return const_iterator(streams.begin());
×
112
}
113

114
chunk_stream_group::const_iterator chunk_stream_group::end() const noexcept
×
115
{
116
    return const_iterator(streams.end());
×
117
}
118

119
chunk_stream_group::const_iterator chunk_stream_group::cbegin() const noexcept
×
120
{
121
    return const_iterator(streams.begin());
×
122
}
123

124
chunk_stream_group::const_iterator chunk_stream_group::cend() const noexcept
×
125
{
126
    return const_iterator(streams.end());
×
127
}
128

129
chunk_stream_group_member &chunk_stream_group::emplace_back(
92✔
130
    io_service_ref io_service,
131
    const stream_config &config,
132
    const chunk_stream_config &chunk_config)
133
{
134
    return emplace_back<chunk_stream_group_member>(std::move(io_service), config, chunk_config);
92✔
135
}
136

137
void chunk_stream_group::stop()
63✔
138
{
139
    /* The mutex is not held while stopping streams, so that callbacks
140
     * triggered by stopping the streams can take the lock if necessary.
141
     *
142
     * It's safe to iterate streams without the mutex because this function
143
     * is called by the user, so a simultaneous call to emplace_back would
144
     * violate the requirement that the user doesn't call the API from more
145
     * than one thread at a time.
146
     */
147
    if (config.get_eviction_mode() == chunk_stream_group_config::eviction_mode::LOSSLESS)
63✔
148
    {
149
        /* Stopping a stream that is currently waiting in get_chunk could
150
         * deadlock. In lossy mode, there are already provisions to guarantee
151
         * forward progress, but in lossless mode we need some help.
152
         */
153
        for (const auto &stream : streams)
110✔
154
        {
155
            stream->async_flush_until(std::numeric_limits<std::uint64_t>::max());
88✔
156
        }
157
    }
158
    for (const auto &stream : streams)
307✔
159
        stream->stop1();
244✔
160
}
63✔
161

162
chunk *chunk_stream_group::get_chunk(std::uint64_t chunk_id, std::uintptr_t stream_id, std::uint64_t *batch_stats)
904✔
163
{
164
    std::unique_lock<std::mutex> lock(mutex);
904✔
165
    /* Streams should not be requesting chunks older than their heads, and the group
166
     * head is at least as old as any stream head.
167
     */
168
    assert(chunk_id >= chunks.get_head_chunk());
169
    /* Any chunk old enough be made ready needs to first be released by the
170
     * member streams. To do that, we request all the streams to flush, then
171
     * wait until it is safe, using the condition variable to wake up
172
     * whenever there is forward progress.
173
     *
174
     * Another thread may call get_chunk in the meantime and advance the
175
     * window, so we must be careful not to assume anything about the
176
     * state after a wait.
177
     */
178
    const std::size_t max_chunks = config.get_max_chunks();
904✔
179
    if (chunk_id - chunks.get_head_chunk() >= max_chunks)
904✔
180
    {
181
        std::uint64_t target = chunk_id - (max_chunks - 1);  // first chunk we don't need to flush
376✔
182
        if (config.get_eviction_mode() == chunk_stream_group_config::eviction_mode::LOSSY
376✔
183
            && target > last_flush_until)
376✔
184
        {
185
            for (const auto &s : streams)
500✔
186
                s->async_flush_until(target);
400✔
187
            last_flush_until = target;
100✔
188
        }
189
        while (chunks.get_head_chunk() < target)
752✔
190
        {
191
            ready_condition.wait(lock);
376✔
192
        }
193
    }
194

195
    chunk *c = chunks.get_chunk(
904✔
196
        chunk_id,
197
        stream_id,
198
        [this, batch_stats](std::int64_t id) {
502✔
199
            return config.get_allocate()(id, batch_stats).release();
251✔
200
        },
201
        [](chunk *) {
×
202
            // Should be unreachable, as we've ensured this by waiting above
203
            assert(false);
204
        },
×
205
        [](std::uint64_t) {}  // Don't need notification for head moving
×
206
    );
207
    return c;
904✔
208
}
904✔
209

210
void chunk_stream_group::ready_chunk(chunk *c, std::uint64_t *batch_stats)
234✔
211
{
212
    std::unique_ptr<chunk> owned(c);
234✔
213
    config.get_ready()(std::move(owned), batch_stats);
234✔
214
}
234✔
215

216
void chunk_stream_group::stream_head_updated(chunk_stream_group_member &s, std::uint64_t head_chunk)
827✔
217
{
218
    std::lock_guard<std::mutex> lock(mutex);
827✔
219
    std::size_t stream_index = s.group_index;
834✔
220
    std::uint64_t old = head_chunks[stream_index];
834✔
221
    assert(head_chunk > old);  // head_updated should only be called on forward progress
222
    head_chunks[stream_index] = head_chunk;
834✔
223
    // Update so that our head chunk is min(head_chunks). We can skip the work
224
    // if we weren't previously the oldest.
225
    if (chunks.get_head_chunk() == old)
834✔
226
    {
227
        auto min_head_chunk = *std::min_element(head_chunks.begin(), head_chunks.end());
834✔
228
        chunks.flush_until(
834✔
229
            min_head_chunk,
230
            [this, &s](chunk *c) { ready_chunk(c, s.batch_stats.data()); },
234✔
231
            [this](std::uint64_t) { ready_condition.notify_all(); }
219✔
232
        );
233
    }
234
}
834✔
235

236
chunk_stream_group_member::chunk_stream_group_member(
92✔
237
    chunk_stream_group &group,
238
    std::size_t group_index,
239
    io_service_ref io_service,
240
    const stream_config &config,
241
    const chunk_stream_config &chunk_config)
92✔
242
    : chunk_stream_state(config, chunk_config, detail::chunk_manager_group(group)),
243
    stream(std::move(io_service), adjust_config(config)),
92✔
244
    group(group), group_index(group_index)
184✔
245
{
246
    if (chunk_config.get_max_chunks() > group.config.get_max_chunks())
92✔
247
        throw std::invalid_argument("stream max_chunks must not be larger than group max_chunks");
×
248
}
92✔
249

250
void chunk_stream_group_member::heap_ready(live_heap &&lh)
2,204✔
251
{
252
    do_heap_ready(std::move(lh));
2,204✔
253
}
2,203✔
254

255
void chunk_stream_group_member::async_flush_until(std::uint64_t chunk_id)
488✔
256
{
257
    post([chunk_id](stream_base &s) {
488✔
258
        chunk_stream_group_member &self = static_cast<chunk_stream_group_member &>(s);
403✔
259
        self.chunks.flush_until(
403✔
260
            chunk_id,
261
            [](chunk *) {},
230✔
262
            [&self](std::uint64_t head_chunk) {
285✔
263
                self.group.stream_head_updated(self, head_chunk);
285✔
264
            }
292✔
265
        );
266
    });
404✔
267
}
488✔
268

269
void chunk_stream_group_member::stop1()
244✔
270
{
271
    {
272
        std::lock_guard<std::mutex> lock(get_queue_mutex());
244✔
273
        flush_chunks();
244✔
274
    }
244✔
275
    stream::stop();
244✔
276
}
244✔
277

278
void chunk_stream_group_member::stop_received()
92✔
279
{
280
    stream::stop_received();
92✔
281
    flush_chunks();
92✔
282
    group.stream_stop_received(*this);
92✔
283
}
92✔
284

285
void chunk_stream_group_member::stop()
×
286
{
287
    group.stop();
×
288
}
×
289

290
} // namespace recv
291
} // namespace spead2
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc