• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 6314584092

26 Sep 2023 03:00PM UTC coverage: 78.166% (+0.1%) from 78.041%
6314584092

push

github

bmerry
Add StreamConfig tests for explicit_start

5320 of 6806 relevant lines covered (78.17%)

56165.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.14
/src/recv_udp.cpp
1
/* Copyright 2015, 2019-2020, 2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
/**
18
 * @file
19
 */
20

21
#ifndef _GNU_SOURCE
22
# define _GNU_SOURCE
23
#endif
24
#include <spead2/common_features.h>
25
#if SPEAD2_USE_RECVMMSG
26
# include <sys/socket.h>
27
# include <sys/types.h>
28
# include <unistd.h>
29
#endif
30
#include <system_error>
31
#include <cstdint>
32
#include <cstring>
33
#include <cstdlib>
34
#include <mutex>
35
#include <functional>
36
#include <boost/asio.hpp>
37
#include <boost/lexical_cast.hpp>
38
#include <spead2/recv_stream.h>
39
#include <spead2/recv_udp.h>
40
#include <spead2/recv_udp_base.h>
41
#include <spead2/recv_udp_ibv.h>
42
#include <spead2/common_logging.h>
43
#include <spead2/common_socket.h>
44

45
namespace spead2::recv
46
{
47

48
udp_reader::udp_reader(
289✔
49
    stream &owner,
50
    boost::asio::ip::udp::socket &&socket,
51
    std::size_t max_size)
289✔
52
    : udp_reader_base(owner), max_size(max_size),
289✔
53
#if SPEAD2_USE_RECVMMSG
54
    buffer(mmsg_count), iov(mmsg_count), msgvec(mmsg_count),
289✔
55
#else
56
    buffer(new std::uint8_t[max_size + 1]),
57
#endif
58
    socket(std::move(socket))
578✔
59
{
60
    assert(socket_uses_io_service(this->socket, get_io_service()));
289✔
61
#if SPEAD2_USE_RECVMMSG
62
    for (std::size_t i = 0; i < mmsg_count; i++)
18,785✔
63
    {
64
        // Allocate one extra byte so that overflow can be detected
65
        buffer[i].reset(new std::uint8_t[max_size + 1]);
18,496✔
66
        iov[i].iov_base = (void *) buffer[i].get();
18,496✔
67
        iov[i].iov_len = max_size + 1;
18,496✔
68
        std::memset(&msgvec[i], 0, sizeof(msgvec[i]));
18,496✔
69
        msgvec[i].msg_hdr.msg_iov = &iov[i];
18,496✔
70
        msgvec[i].msg_hdr.msg_iovlen = 1;
18,496✔
71
    }
72
#endif
73
}
289✔
74

75
void udp_reader::start()
289✔
76
{
77
    if (bind_endpoint)
289✔
78
        socket.bind(*bind_endpoint);
207✔
79
    enqueue_receive(make_handler_context());
288✔
80
}
288✔
81

82
static boost::asio::ip::udp::socket make_v4_socket(
41✔
83
    boost::asio::io_service &io_service,
84
    const boost::asio::ip::udp::endpoint &endpoint,
85
    std::size_t buffer_size,
86
    const boost::asio::ip::address &interface_address)
87
{
88
    if (!interface_address.is_v4())
41✔
89
        throw std::invalid_argument("interface address is not an IPv4 address");
×
90
    auto ep = endpoint;
41✔
91
    if (ep.address().is_unspecified())
41✔
92
        ep.address(interface_address);
×
93
    if (!ep.address().is_v4())
41✔
94
        throw std::invalid_argument("endpoint is not an IPv4 address");
×
95
    if (!ep.address().is_multicast() && ep.address() != interface_address)
41✔
96
        throw std::invalid_argument("endpoint is not multicast and does not match interface address");
×
97
    boost::asio::ip::udp::socket socket(io_service, ep.protocol());
41✔
98
    if (ep.address().is_multicast())
41✔
99
    {
100
        socket.set_option(boost::asio::socket_base::reuse_address(true));
41✔
101
        socket.set_option(boost::asio::ip::multicast::join_group(
41✔
102
            ep.address().to_v4(), interface_address.to_v4()));
82✔
103
    }
104
    set_socket_recv_buffer_size(socket, buffer_size);
41✔
105
    return socket;
82✔
106
}
×
107

108
static boost::asio::ip::udp::socket make_multicast_v6_socket(
41✔
109
    boost::asio::io_service &io_service,
110
    const boost::asio::ip::udp::endpoint &endpoint,
111
    std::size_t buffer_size,
112
    unsigned int interface_index)
113
{
114
    if (!endpoint.address().is_v6() || !endpoint.address().is_multicast())
41✔
115
        throw std::invalid_argument("endpoint is not an IPv6 multicast address");
×
116
    boost::asio::ip::udp::socket socket(io_service, endpoint.protocol());
41✔
117
    socket.set_option(boost::asio::socket_base::reuse_address(true));
41✔
118
    socket.set_option(boost::asio::ip::multicast::join_group(
41✔
119
        endpoint.address().to_v6(), interface_index));
41✔
120
    set_socket_recv_buffer_size(socket, buffer_size);
41✔
121
    return socket;
41✔
122
}
×
123

124
static boost::asio::ip::udp::socket make_socket(
125✔
125
    boost::asio::io_service &io_service,
126
    const boost::asio::ip::udp::endpoint &endpoint,
127
    std::size_t buffer_size)
128
{
129
    boost::asio::ip::udp::socket socket(io_service, endpoint.protocol());
125✔
130
    if (endpoint.address().is_multicast())
125✔
131
    {
132
        socket.set_option(boost::asio::socket_base::reuse_address(true));
×
133
        socket.set_option(boost::asio::ip::multicast::join_group(endpoint.address()));
×
134
    }
135
    set_socket_recv_buffer_size(socket, buffer_size);
125✔
136
    return socket;
125✔
137
}
×
138

139
udp_reader::udp_reader(
125✔
140
    stream &owner,
141
    const boost::asio::ip::udp::endpoint &endpoint,
142
    std::size_t max_size,
143
    std::size_t buffer_size)
125✔
144
    : udp_reader(
145
        owner,
146
        make_socket(owner.get_io_service(), endpoint, buffer_size),
250✔
147
        max_size)
125✔
148
{
149
    bind_endpoint = endpoint;
125✔
150
}
125✔
151

152
udp_reader::udp_reader(
41✔
153
    stream &owner,
154
    const boost::asio::ip::udp::endpoint &endpoint,
155
    std::size_t max_size,
156
    std::size_t buffer_size,
157
    const boost::asio::ip::address &interface_address)
41✔
158
    : udp_reader(
159
        owner,
160
        make_v4_socket(owner.get_io_service(),
82✔
161
                       endpoint, buffer_size, interface_address),
162
        max_size)
41✔
163
{
164
    auto ep = endpoint;
41✔
165
    // Match the logic in make_v4_socket
166
    if (ep.address().is_unspecified())
41✔
167
        ep.address(interface_address);
×
168
    bind_endpoint = ep;
41✔
169
}
41✔
170

171
udp_reader::udp_reader(
41✔
172
    stream &owner,
173
    const boost::asio::ip::udp::endpoint &endpoint,
174
    std::size_t max_size,
175
    std::size_t buffer_size,
176
    unsigned int interface_index)
41✔
177
    : udp_reader(
178
        owner,
179
        make_multicast_v6_socket(owner.get_io_service(),
82✔
180
                                 endpoint, buffer_size, interface_index),
181
        max_size)
41✔
182
{
183
    bind_endpoint = endpoint;
41✔
184
}
41✔
185

186
void udp_reader::packet_handler(
6,995✔
187
    handler_context ctx,
188
    stream_base::add_packet_state &state,
189
    const boost::system::error_code &error,
190
    [[maybe_unused]] std::size_t bytes_transferred)
191
{
192
    if (!error)
6,995✔
193
    {
194
#if SPEAD2_USE_RECVMMSG
195
        int received = recvmmsg(socket.native_handle(), msgvec.data(), msgvec.size(),
6,995✔
196
                                MSG_DONTWAIT, nullptr);
6,995✔
197
        log_debug("recvmmsg returned %1%", received);
6,995✔
198
        if (received == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
6,995✔
199
        {
200
            std::error_code code(errno, std::system_category());
×
201
            log_warning("recvmmsg failed: %1% (%2%)", code.value(), code.message());
×
202
        }
203
        for (int i = 0; i < received; i++)
21,471✔
204
        {
205
            bool stopped = process_one_packet(state,
14,763✔
206
                                              buffer[i].get(), msgvec[i].msg_len, max_size);
14,763✔
207
            if (stopped)
14,763✔
208
                break;
287✔
209
        }
210
#else
211
        process_one_packet(state, buffer.get(), bytes_transferred, max_size);
212
#endif
213
    }
214
    else if (error != boost::asio::error::operation_aborted)
×
215
        log_warning("Error in UDP receiver: %1%", error.message());
×
216

217
    if (!state.is_stopped())
6,995✔
218
    {
219
        enqueue_receive(std::move(ctx));
6,708✔
220
    }
221
}
6,995✔
222

223
void udp_reader::enqueue_receive(handler_context ctx)
6,996✔
224
{
225
    using namespace std::placeholders;
226
    socket.async_receive_from(
6,996✔
227
#if SPEAD2_USE_RECVMMSG
228
        boost::asio::null_buffers(),
×
229
#else
230
        boost::asio::buffer(buffer.get(), max_size + 1),
231
#endif
232
        sender_endpoint,
6,996✔
233
        bind_handler(std::move(ctx), std::bind(&udp_reader::packet_handler, this, _1, _2, _3, _4)));
13,992✔
234
}
6,996✔
235

236
void udp_reader::stop()
289✔
237
{
238
    /* asio guarantees that closing a socket will cancel any pending
239
     * operations on it.
240
     * Don't put any logging here: it could be running in a shutdown
241
     * path where it is no longer safe to do so.
242
     */
243
    socket.close();
289✔
244
}
289✔
245

246
/////////////////////////////////////////////////////////////////////////////
247

248
static bool ibv_override;
249
#if SPEAD2_USE_IBV
250
static int ibv_comp_vector;
251
#endif
252
static boost::asio::ip::address ibv_interface;
253
static std::once_flag ibv_once;
254

255
static void init_ibv_override()
2✔
256
{
257
    const char *interface = getenv("SPEAD2_IBV_INTERFACE");
2✔
258
    ibv_override = false;
2✔
259
    if (interface && interface[0])
2✔
260
    {
261
#if !SPEAD2_USE_IBV
262
        log_warning("SPEAD2_IBV_INTERFACE found, but ibverbs support not compiled in");
263
#else
264
        boost::system::error_code ec;
×
265
        ibv_interface = boost::asio::ip::address_v4::from_string(interface, ec);
×
266
        if (ec)
×
267
        {
268
            log_warning("SPEAD2_IBV_INTERFACE could not be parsed as an IPv4 address: %1%", ec.message());
×
269
        }
270
        else
271
        {
272
            ibv_override = true;
×
273
            const char *comp_vector = getenv("SPEAD2_IBV_COMP_VECTOR");
×
274
            if (comp_vector && comp_vector[0])
×
275
            {
276
                try
277
                {
278
                    ibv_comp_vector = boost::lexical_cast<int>(comp_vector);
×
279
                }
280
                catch (boost::bad_lexical_cast &)
×
281
                {
282
                    log_warning("SPEAD2_IBV_COMP_VECTOR is not a valid integer, ignoring");
×
283
                }
×
284
            }
285
        }
286
#endif
287
    }
288
}
2✔
289

290
std::unique_ptr<reader> reader_factory<udp_reader>::make_reader(
125✔
291
    stream &owner,
292
    const boost::asio::ip::udp::endpoint &endpoint,
293
    std::size_t max_size,
294
    std::size_t buffer_size)
295
{
296
    if (endpoint.address().is_v4())
125✔
297
    {
298
        std::call_once(ibv_once, init_ibv_override);
2✔
299
#if SPEAD2_USE_IBV
300
        if (ibv_override)
2✔
301
        {
302
            log_info("Overriding reader for %1%:%2% to use ibverbs",
×
303
                     endpoint.address().to_string(), endpoint.port());
×
304
            return reader_factory<udp_ibv_reader>::make_reader(
305
                owner,
306
                udp_ibv_config()
×
307
                    .add_endpoint(endpoint)
×
308
                    .set_interface_address(ibv_interface)
×
309
                    .set_max_size(max_size)
×
310
                    .set_buffer_size(buffer_size)
×
311
                    .set_comp_vector(ibv_comp_vector));
×
312
        }
313
#endif
314
    }
315
    return std::make_unique<udp_reader>(owner, endpoint, max_size, buffer_size);
125✔
316
}
317

318
std::unique_ptr<reader> reader_factory<udp_reader>::make_reader(
41✔
319
    stream &owner,
320
    const boost::asio::ip::udp::endpoint &endpoint,
321
    std::size_t max_size,
322
    std::size_t buffer_size,
323
    const boost::asio::ip::address &interface_address)
324
{
325
    if (endpoint.address().is_v4())
41✔
326
    {
327
        std::call_once(ibv_once, init_ibv_override);
41✔
328
#if SPEAD2_USE_IBV
329
        if (ibv_override)
41✔
330
        {
331
            log_info("Overriding reader for %1%:%2% to use ibverbs",
×
332
                     endpoint.address().to_string(), endpoint.port());
×
333
            return reader_factory<udp_ibv_reader>::make_reader(
334
                owner,
335
                udp_ibv_config()
×
336
                    .add_endpoint(endpoint)
×
337
                    .set_interface_address(interface_address)
×
338
                    .set_max_size(max_size)
×
339
                    .set_buffer_size(buffer_size)
×
340
                    .set_comp_vector(ibv_comp_vector));
×
341
        }
342
#endif
343
    }
344
    return std::make_unique<udp_reader>(owner, endpoint, max_size, buffer_size, interface_address);
41✔
345
}
346

347
std::unique_ptr<reader> reader_factory<udp_reader>::make_reader(
41✔
348
    stream &owner,
349
    const boost::asio::ip::udp::endpoint &endpoint,
350
    std::size_t max_size,
351
    std::size_t buffer_size,
352
    unsigned int interface_index)
353
{
354
    return std::make_unique<udp_reader>(owner, endpoint, max_size, buffer_size, interface_index);
41✔
355
}
356

357
std::unique_ptr<reader> reader_factory<udp_reader>::make_reader(
82✔
358
    stream &owner,
359
    boost::asio::ip::udp::socket &&socket,
360
    std::size_t max_size)
361
{
362
    return std::make_unique<udp_reader>(owner, std::move(socket), max_size);
82✔
363
}
364

365
} // namespace spead2::recv
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc