• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 6666188375

27 Oct 2023 10:45AM UTC coverage: 69.614% (-8.5%) from 78.12%
6666188375

push

github

web-flow
Merge pull request #290 from ska-sa/prepare-4.1.1

Prepare 4.1.1

4747 of 6819 relevant lines covered (69.61%)

123579.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/recv_chunk_stream_group.cpp
1
/* Copyright 2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#include <cstddef>
22
#include <vector>
23
#include <mutex>
24
#include <algorithm>
25
#include <limits>
26
#include <spead2/recv_chunk_stream.h>
27
#include <spead2/recv_chunk_stream_group.h>
28

29
namespace spead2::recv
30
{
31

32
chunk_stream_group_config &chunk_stream_group_config::set_max_chunks(std::size_t max_chunks)
×
33
{
34
    if (max_chunks == 0)
×
35
        throw std::invalid_argument("max_chunks cannot be 0");
×
36
    this->max_chunks = max_chunks;
×
37
    return *this;
×
38
}
39

40
chunk_stream_group_config &chunk_stream_group_config::set_eviction_mode(eviction_mode eviction_mode_)
×
41
{
42
    this->eviction_mode_ = eviction_mode_;
×
43
    return *this;
×
44
}
45

46
chunk_stream_group_config &chunk_stream_group_config::set_allocate(chunk_allocate_function allocate)
×
47
{
48
    this->allocate = std::move(allocate);
×
49
    return *this;
×
50
}
51

52
chunk_stream_group_config &chunk_stream_group_config::set_ready(chunk_ready_function ready)
×
53
{
54
    this->ready = std::move(ready);
×
55
    return *this;
×
56
}
57

58
namespace detail
59
{
60

61
chunk_manager_group::chunk_manager_group(chunk_stream_group &group)
×
62
    : group(group)
×
63
{
64
}
×
65

66
std::uint64_t *chunk_manager_group::get_batch_stats(chunk_stream_state<chunk_manager_group> &state) const
×
67
{
68
    return static_cast<chunk_stream_group_member *>(&state)->batch_stats.data();
×
69
}
70

71
chunk *chunk_manager_group::allocate_chunk(
×
72
    chunk_stream_state<chunk_manager_group> &state, std::int64_t chunk_id)
73
{
74
    return group.get_chunk(chunk_id, state.stream_id, state.place_data->batch_stats);
×
75
}
76

77
void chunk_manager_group::head_updated(
×
78
    chunk_stream_state<chunk_manager_group> &state, std::uint64_t head_chunk)
79
{
80
    group.stream_head_updated(static_cast<chunk_stream_group_member &>(state), head_chunk);
×
81
}
×
82

83
} // namespace detail
84

85
chunk_stream_group::chunk_stream_group(const chunk_stream_group_config &config)
×
86
    : config(config), chunks(config.get_max_chunks())
×
87
{
88
}
×
89

90
chunk_stream_group::~chunk_stream_group()
×
91
{
92
    stop();
×
93
}
×
94

95
chunk_stream_group::iterator chunk_stream_group::begin() noexcept
×
96
{
97
    return iterator(streams.begin());
×
98
}
99

100
chunk_stream_group::iterator chunk_stream_group::end() noexcept
×
101
{
102
    return iterator(streams.end());
×
103
}
104

105
chunk_stream_group::const_iterator chunk_stream_group::begin() const noexcept
×
106
{
107
    return const_iterator(streams.begin());
×
108
}
109

110
chunk_stream_group::const_iterator chunk_stream_group::end() const noexcept
×
111
{
112
    return const_iterator(streams.end());
×
113
}
114

115
chunk_stream_group::const_iterator chunk_stream_group::cbegin() const noexcept
×
116
{
117
    return const_iterator(streams.begin());
×
118
}
119

120
chunk_stream_group::const_iterator chunk_stream_group::cend() const noexcept
×
121
{
122
    return const_iterator(streams.end());
×
123
}
124

125
chunk_stream_group_member &chunk_stream_group::emplace_back(
×
126
    io_service_ref io_service,
127
    const stream_config &config,
128
    const chunk_stream_config &chunk_config)
129
{
130
    return emplace_back<chunk_stream_group_member>(std::move(io_service), config, chunk_config);
×
131
}
132

133
void chunk_stream_group::stop()
×
134
{
135
    /* The mutex is not held while stopping streams, so that callbacks
136
     * triggered by stopping the streams can take the lock if necessary.
137
     *
138
     * It's safe to iterate streams without the mutex because this function
139
     * is called by the user, so a simultaneous call to emplace_back would
140
     * violate the requirement that the user doesn't call the API from more
141
     * than one thread at a time.
142
     */
143
    if (config.get_eviction_mode() == chunk_stream_group_config::eviction_mode::LOSSLESS)
×
144
    {
145
        /* Stopping a stream that is currently waiting in get_chunk could
146
         * deadlock. In lossy mode, there are already provisions to guarantee
147
         * forward progress, but in lossless mode we need some help.
148
         */
149
        for (const auto &stream : streams)
×
150
        {
151
            stream->async_flush_until(std::numeric_limits<std::uint64_t>::max());
×
152
        }
153
    }
154
    for (const auto &stream : streams)
×
155
        stream->stop1();
×
156
}
×
157

158
chunk *chunk_stream_group::get_chunk(std::uint64_t chunk_id, std::uintptr_t stream_id, std::uint64_t *batch_stats)
×
159
{
160
    std::unique_lock<std::mutex> lock(mutex);
×
161
    /* Streams should not be requesting chunks older than their heads, and the group
162
     * head is at least as old as any stream head.
163
     */
164
    assert(chunk_id >= chunks.get_head_chunk());
×
165
    /* Any chunk old enough be made ready needs to first be released by the
166
     * member streams. To do that, we request all the streams to flush, then
167
     * wait until it is safe, using the condition variable to wake up
168
     * whenever there is forward progress.
169
     *
170
     * Another thread may call get_chunk in the meantime and advance the
171
     * window, so we must be careful not to assume anything about the
172
     * state after a wait.
173
     */
174
    const std::size_t max_chunks = config.get_max_chunks();
×
175
    if (chunk_id - chunks.get_head_chunk() >= max_chunks)
×
176
    {
177
        std::uint64_t target = chunk_id - (max_chunks - 1);  // first chunk we don't need to flush
×
178
        if (config.get_eviction_mode() == chunk_stream_group_config::eviction_mode::LOSSY
×
179
            && target > last_flush_until)
×
180
        {
181
            for (const auto &s : streams)
×
182
                s->async_flush_until(target);
×
183
            last_flush_until = target;
×
184
        }
185
        while (chunks.get_head_chunk() < target)
×
186
        {
187
            ready_condition.wait(lock);
×
188
        }
189
    }
190

191
    chunk *c = chunks.get_chunk(
×
192
        chunk_id,
193
        stream_id,
194
        [this, batch_stats](std::int64_t id) {
×
195
            return config.get_allocate()(id, batch_stats).release();
×
196
        },
197
        [](chunk *) {
×
198
            // Should be unreachable, as we've ensured this by waiting above
199
            assert(false);
×
200
        },
201
        [](std::uint64_t) {}  // Don't need notification for head moving
×
202
    );
203
    return c;
×
204
}
×
205

206
void chunk_stream_group::ready_chunk(chunk *c, std::uint64_t *batch_stats)
×
207
{
208
    std::unique_ptr<chunk> owned(c);
×
209
    config.get_ready()(std::move(owned), batch_stats);
×
210
}
×
211

212
void chunk_stream_group::stream_head_updated(chunk_stream_group_member &s, std::uint64_t head_chunk)
×
213
{
214
    std::lock_guard<std::mutex> lock(mutex);
×
215
    std::size_t stream_index = s.group_index;
×
216
    std::uint64_t old = head_chunks[stream_index];
×
217
    assert(head_chunk > old);  // head_updated should only be called on forward progress
×
218
    head_chunks[stream_index] = head_chunk;
×
219
    // Update so that our head chunk is min(head_chunks). We can skip the work
220
    // if we weren't previously the oldest.
221
    if (chunks.get_head_chunk() == old)
×
222
    {
223
        auto min_head_chunk = *std::min_element(head_chunks.begin(), head_chunks.end());
×
224
        chunks.flush_until(
×
225
            min_head_chunk,
226
            [this, &s](chunk *c) { ready_chunk(c, s.batch_stats.data()); },
×
227
            [this](std::uint64_t) { ready_condition.notify_all(); }
×
228
        );
229
    }
230
}
×
231

232
chunk_stream_group_member::chunk_stream_group_member(
×
233
    chunk_stream_group &group,
234
    std::size_t group_index,
235
    io_service_ref io_service,
236
    const stream_config &config,
237
    const chunk_stream_config &chunk_config)
×
238
    : chunk_stream_state(config, chunk_config, detail::chunk_manager_group(group)),
239
    stream(std::move(io_service), adjust_config(config)),
×
240
    group(group), group_index(group_index)
×
241
{
242
    if (chunk_config.get_max_chunks() > group.config.get_max_chunks())
×
243
        throw std::invalid_argument("stream max_chunks must not be larger than group max_chunks");
×
244
}
×
245

246
void chunk_stream_group_member::heap_ready(live_heap &&lh)
×
247
{
248
    do_heap_ready(std::move(lh));
×
249
}
×
250

251
void chunk_stream_group_member::async_flush_until(std::uint64_t chunk_id)
×
252
{
253
    post([chunk_id](stream_base &s) {
×
254
        chunk_stream_group_member &self = static_cast<chunk_stream_group_member &>(s);
×
255
        self.chunks.flush_until(
×
256
            chunk_id,
257
            [](chunk *) {},
×
258
            [&self](std::uint64_t head_chunk) {
×
259
                self.group.stream_head_updated(self, head_chunk);
×
260
            }
×
261
        );
262
    });
×
263
}
×
264

265
void chunk_stream_group_member::stop1()
×
266
{
267
    {
268
        std::lock_guard<std::mutex> lock(get_queue_mutex());
×
269
        flush_chunks();
×
270
    }
×
271
    stream::stop();
×
272
}
×
273

274
void chunk_stream_group_member::stop_received()
×
275
{
276
    stream::stop_received();
×
277
    flush_chunks();
×
278
    group.stream_stop_received(*this);
×
279
}
×
280

281
void chunk_stream_group_member::stop()
×
282
{
283
    group.stop();
×
284
}
×
285

286
} // namespace spead2::recv
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc