• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 5998177654

28 Aug 2023 09:34AM UTC coverage: 77.911% (+2.9%) from 74.976%
5998177654

push

github

web-flow
Merge pull request #246 from ska-sa/meson

Switch to Meson build system

5446 of 6990 relevant lines covered (77.91%)

54679.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.25
/src/recv_chunk_stream_group.cpp
1
/* Copyright 2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <cstddef>
22
#include <vector>
23
#include <mutex>
24
#include <algorithm>
25
#include <limits>
26
#include <spead2/recv_chunk_stream.h>
27
#include <spead2/recv_chunk_stream_group.h>
28

29
namespace spead2::recv
30
{
31

32
chunk_stream_group_config &chunk_stream_group_config::set_max_chunks(std::size_t max_chunks)
17✔
33
{
34
    if (max_chunks == 0)
17✔
35
        throw std::invalid_argument("max_chunks cannot be 0");
1✔
36
    this->max_chunks = max_chunks;
16✔
37
    return *this;
16✔
38
}
39

40
chunk_stream_group_config &chunk_stream_group_config::set_eviction_mode(eviction_mode eviction_mode_)
16✔
41
{
42
    this->eviction_mode_ = eviction_mode_;
16✔
43
    return *this;
16✔
44
}
45

46
chunk_stream_group_config &chunk_stream_group_config::set_allocate(chunk_allocate_function allocate)
25✔
47
{
48
    this->allocate = std::move(allocate);
25✔
49
    return *this;
25✔
50
}
51

52
chunk_stream_group_config &chunk_stream_group_config::set_ready(chunk_ready_function ready)
25✔
53
{
54
    this->ready = std::move(ready);
25✔
55
    return *this;
25✔
56
}
57

58
namespace detail
59
{
60

61
chunk_manager_group::chunk_manager_group(chunk_stream_group &group)
94✔
62
    : group(group)
94✔
63
{
64
}
94✔
65

66
std::uint64_t *chunk_manager_group::get_batch_stats(chunk_stream_state<chunk_manager_group> &state) const
2,206✔
67
{
68
    return static_cast<chunk_stream_group_member *>(&state)->batch_stats.data();
2,206✔
69
}
70

71
chunk *chunk_manager_group::allocate_chunk(
893✔
72
    chunk_stream_state<chunk_manager_group> &state, std::int64_t chunk_id)
73
{
74
    return group.get_chunk(chunk_id, state.stream_id, state.place_data->batch_stats);
893✔
75
}
76

77
void chunk_manager_group::head_updated(
537✔
78
    chunk_stream_state<chunk_manager_group> &state, std::uint64_t head_chunk)
79
{
80
    group.stream_head_updated(static_cast<chunk_stream_group_member &>(state), head_chunk);
537✔
81
}
537✔
82

83
} // namespace detail
84

85
chunk_stream_group::chunk_stream_group(const chunk_stream_group_config &config)
25✔
86
    : config(config), chunks(config.get_max_chunks())
25✔
87
{
88
}
25✔
89

90
chunk_stream_group::~chunk_stream_group()
50✔
91
{
92
    stop();
25✔
93
}
25✔
94

95
chunk_stream_group::iterator chunk_stream_group::begin() noexcept
×
96
{
97
    return iterator(streams.begin());
×
98
}
99

100
chunk_stream_group::iterator chunk_stream_group::end() noexcept
×
101
{
102
    return iterator(streams.end());
×
103
}
104

105
chunk_stream_group::const_iterator chunk_stream_group::begin() const noexcept
×
106
{
107
    return const_iterator(streams.begin());
×
108
}
109

110
chunk_stream_group::const_iterator chunk_stream_group::end() const noexcept
×
111
{
112
    return const_iterator(streams.end());
×
113
}
114

115
chunk_stream_group::const_iterator chunk_stream_group::cbegin() const noexcept
×
116
{
117
    return const_iterator(streams.begin());
×
118
}
119

120
chunk_stream_group::const_iterator chunk_stream_group::cend() const noexcept
×
121
{
122
    return const_iterator(streams.end());
×
123
}
124

125
chunk_stream_group_member &chunk_stream_group::emplace_back(
×
126
    io_service_ref io_service,
127
    const stream_config &config,
128
    const chunk_stream_config &chunk_config)
129
{
130
    return emplace_back<chunk_stream_group_member>(std::move(io_service), config, chunk_config);
×
131
}
132

133
void chunk_stream_group::stop()
67✔
134
{
135
    /* The mutex is not held while stopping streams, so that callbacks
136
     * triggered by stopping the streams can take the lock if necessary.
137
     *
138
     * It's safe to iterate streams without the mutex because this function
139
     * is called by the user, so a simultaneous call to emplace_back would
140
     * violate the requirement that the user doesn't call the API from more
141
     * than one thread at a time.
142
     */
143
    if (config.get_eviction_mode() == chunk_stream_group_config::eviction_mode::LOSSLESS)
67✔
144
    {
145
        /* Stopping a stream that is currently waiting in get_chunk could
146
         * deadlock. In lossy mode, there are already provisions to guarantee
147
         * forward progress, but in lossless mode we need some help.
148
         */
149
        for (const auto &stream : streams)
122✔
150
        {
151
            stream->async_flush_until(std::numeric_limits<std::uint64_t>::max());
96✔
152
        }
153
    }
154
    for (const auto &stream : streams)
319✔
155
        stream->stop1();
252✔
156
}
67✔
157

158
chunk *chunk_stream_group::get_chunk(std::uint64_t chunk_id, std::uintptr_t stream_id, std::uint64_t *batch_stats)
893✔
159
{
160
    std::unique_lock<std::mutex> lock(mutex);
893✔
161
    /* Streams should not be requesting chunks older than their heads, and the group
162
     * head is at least as old as any stream head.
163
     */
164
    assert(chunk_id >= chunks.get_head_chunk());
893✔
165
    /* Any chunk old enough be made ready needs to first be released by the
166
     * member streams. To do that, we request all the streams to flush, then
167
     * wait until it is safe, using the condition variable to wake up
168
     * whenever there is forward progress.
169
     *
170
     * Another thread may call get_chunk in the meantime and advance the
171
     * window, so we must be careful not to assume anything about the
172
     * state after a wait.
173
     */
174
    const std::size_t max_chunks = config.get_max_chunks();
893✔
175
    if (chunk_id - chunks.get_head_chunk() >= max_chunks)
893✔
176
    {
177
        std::uint64_t target = chunk_id - (max_chunks - 1);  // first chunk we don't need to flush
367✔
178
        if (config.get_eviction_mode() == chunk_stream_group_config::eviction_mode::LOSSY
367✔
179
            && target > last_flush_until)
367✔
180
        {
181
            for (const auto &s : streams)
500✔
182
                s->async_flush_until(target);
400✔
183
            last_flush_until = target;
100✔
184
        }
185
        while (chunks.get_head_chunk() < target)
734✔
186
        {
187
            ready_condition.wait(lock);
367✔
188
        }
189
    }
190

191
    chunk *c = chunks.get_chunk(
893✔
192
        chunk_id,
193
        stream_id,
194
        [this, batch_stats](std::int64_t id) {
508✔
195
            return config.get_allocate()(id, batch_stats).release();
254✔
196
        },
197
        [](chunk *) {
×
198
            // Should be unreachable, as we've ensured this by waiting above
199
            assert(false);
×
200
        },
201
        [](std::uint64_t) {}  // Don't need notification for head moving
×
202
    );
203
    return c;
893✔
204
}
893✔
205

206
void chunk_stream_group::ready_chunk(chunk *c, std::uint64_t *batch_stats)
237✔
207
{
208
    std::unique_ptr<chunk> owned(c);
237✔
209
    config.get_ready()(std::move(owned), batch_stats);
237✔
210
}
237✔
211

212
void chunk_stream_group::stream_head_updated(chunk_stream_group_member &s, std::uint64_t head_chunk)
828✔
213
{
214
    std::lock_guard<std::mutex> lock(mutex);
828✔
215
    std::size_t stream_index = s.group_index;
828✔
216
    std::uint64_t old = head_chunks[stream_index];
828✔
217
    assert(head_chunk > old);  // head_updated should only be called on forward progress
828✔
218
    head_chunks[stream_index] = head_chunk;
828✔
219
    // Update so that our head chunk is min(head_chunks). We can skip the work
220
    // if we weren't previously the oldest.
221
    if (chunks.get_head_chunk() == old)
828✔
222
    {
223
        auto min_head_chunk = *std::min_element(head_chunks.begin(), head_chunks.end());
827✔
224
        chunks.flush_until(
827✔
225
            min_head_chunk,
226
            [this, &s](chunk *c) { ready_chunk(c, s.batch_stats.data()); },
237✔
227
            [this](std::uint64_t) { ready_condition.notify_all(); }
221✔
228
        );
229
    }
230
}
828✔
231

232
chunk_stream_group_member::chunk_stream_group_member(
94✔
233
    chunk_stream_group &group,
234
    std::size_t group_index,
235
    io_service_ref io_service,
236
    const stream_config &config,
237
    const chunk_stream_config &chunk_config)
94✔
238
    : chunk_stream_state(config, chunk_config, detail::chunk_manager_group(group)),
239
    stream(std::move(io_service), adjust_config(config)),
94✔
240
    group(group), group_index(group_index)
188✔
241
{
242
    if (chunk_config.get_max_chunks() > group.config.get_max_chunks())
94✔
243
        throw std::invalid_argument("stream max_chunks must not be larger than group max_chunks");
×
244
}
94✔
245

246
void chunk_stream_group_member::heap_ready(live_heap &&lh)
2,206✔
247
{
248
    do_heap_ready(std::move(lh));
2,206✔
249
}
2,206✔
250

251
void chunk_stream_group_member::async_flush_until(std::uint64_t chunk_id)
496✔
252
{
253
    post([chunk_id](stream_base &s) {
496✔
254
        chunk_stream_group_member &self = static_cast<chunk_stream_group_member &>(s);
394✔
255
        self.chunks.flush_until(
394✔
256
            chunk_id,
257
            [](chunk *) {},
239✔
258
            [&self](std::uint64_t head_chunk) {
291✔
259
                self.group.stream_head_updated(self, head_chunk);
291✔
260
            }
291✔
261
        );
262
    });
394✔
263
}
496✔
264

265
void chunk_stream_group_member::stop1()
252✔
266
{
267
    {
268
        std::lock_guard<std::mutex> lock(get_queue_mutex());
252✔
269
        flush_chunks();
252✔
270
    }
252✔
271
    stream::stop();
252✔
272
}
252✔
273

274
void chunk_stream_group_member::stop_received()
94✔
275
{
276
    stream::stop_received();
94✔
277
    flush_chunks();
94✔
278
    group.stream_stop_received(*this);
94✔
279
}
94✔
280

281
void chunk_stream_group_member::stop()
2✔
282
{
283
    group.stop();
2✔
284
}
2✔
285

286
} // namespace spead2::recv
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc