• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 12652972619

07 Jan 2025 02:00PM UTC coverage: 78.871% (+0.02%) from 78.852%
12652972619

push

github

web-flow
Merge pull request #368 from ska-sa/bump-boost-1.87

Support Boost 1.87

115 of 137 new or added lines in 32 files covered. (83.94%)

2 existing lines in 2 files now uncovered.

5577 of 7071 relevant lines covered (78.87%)

91360.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.35
/src/send_udp.cpp
1
/* Copyright 2015, 2019-2020, 2023, 2025 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
#include <cstddef>
18
#include <cstring>
19
#include <utility>
20
#include <algorithm>
21
#include <boost/asio.hpp>
22
#include <spead2/send_udp.h>
23
#include <spead2/send_writer.h>
24
#include <spead2/common_defines.h>
25
#include <spead2/common_socket.h>
26
#if SPEAD2_USE_SENDMMSG
27
# include <sys/types.h>
28
# include <sys/socket.h>
29
# include <netinet/in.h>
30
# include <netinet/udp.h>
31
#endif
32

33
namespace spead2::send
34
{
35

36
namespace
37
{
38

39
class udp_writer : public writer
40
{
41
private:
42
    boost::asio::ip::udp::socket socket;
43
    std::vector<boost::asio::ip::udp::endpoint> endpoints;
44

45
    virtual void wakeup() override final;
46

47
    /* NB: Linux has a maximum of 64 segments for GSO (UDP_MAX_SEGMENTS in the
48
     * kernel, but it doesn't seem to be exposed to userspace). If max_batch
49
     * is increased, logic will need to be added to the GSO merging to prevent
50
     * creating messages bigger than this.
51
     */
52
    static constexpr int max_batch = 64;
53
#if SPEAD2_USE_SENDMMSG
54
    // Some magic values for current_gso_size
55
    /// GSO allowed, but socket option not currently set
56
    [[maybe_unused]] static constexpr int gso_inactive = 0;
57
    /// GSO failed; do not try again
58
    [[maybe_unused]] static constexpr int gso_disabled = -1;
59
    /// Last send with GSO failed; retrying without GSO
60
    [[maybe_unused]] static constexpr int gso_probe = -2;
61

62
    static constexpr int max_gso_message_size = 65535;  // maximum size the kernel will accept
63
    struct mmsghdr msgvec[max_batch];
64
    std::vector<struct iovec> msg_iov;
65
    struct
66
    {
67
        transmit_packet packet;
68
        std::unique_ptr<std::uint8_t[]> scratch;
69
        bool merged; // packet is part of the same message as the previous packet
70
    } packets[max_batch];
71
    int current_gso_size = gso_inactive;
72

73
#if SPEAD2_USE_GSO
74
    /// Set the socket option
75
    void set_gso_size(int size, boost::system::error_code &result);
76
#endif
77

78
    /**
79
     * Set up @ref msgvec from @ref msg_iov.
80
     *
81
     * The packets in [first_packet, last_packet) are assumed to have already
82
     * been set in @ref msg_iov, starting from @a first_iov. If @a gso_size is
83
     * positive, then multiple packets may be concatenated into a single
84
     * element of @ref msgvec, provided that all but the last have size
85
     * @a gso_size. Otherwise, each packet gets its own entry in @ref msgvec.
86
     *
87
     * @return The past-the-end index into @ref msgvec after the packets are
88
     * filled in.
89
     */
90
    int prepare_msgvec(int first_packet, int last_packet, int first_msg, int first_iov, int gso_size);
91
    void send_packets(int first_packet, int last_packet, int first_msg, int last_msg);
92
#else
93
    std::unique_ptr<std::uint8_t[]> scratch;
94
#endif
95

96
public:
97
    udp_writer(
98
        io_context_ref io_context,
99
        boost::asio::ip::udp::socket &&socket,
100
        const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
101
        const stream_config &config,
102
        std::size_t buffer_size);
103

104
    virtual std::size_t get_num_substreams() const override final { return endpoints.size(); }
148✔
105
};
106

107
#if SPEAD2_USE_SENDMMSG
108

109
#if SPEAD2_USE_GSO
110
void udp_writer::set_gso_size(int size, boost::system::error_code &result)
290✔
111
{
112
    if (setsockopt(socket.native_handle(), IPPROTO_UDP, UDP_SEGMENT,
290✔
113
                   &size, sizeof(size)) == -1)
290✔
114
    {
115
        result.assign(errno, boost::asio::error::get_system_category());
2✔
116
    }
117
    else
118
    {
119
        result.clear();
288✔
120
    }
121
}
290✔
122
#endif
123

124
void udp_writer::send_packets(int first_packet, int last_packet, int first_msg, int last_msg)
724✔
125
{
126
#if SPEAD2_USE_GSO
127
restart:
724✔
128
#endif
129
    // Try sending
130
    int sent = sendmmsg(socket.native_handle(), msgvec + first_msg, last_msg - first_msg, MSG_DONTWAIT);
724✔
131
    int groups = 0;
724✔
132
    boost::system::error_code result;
724✔
133
    if (sent < 0 && errno != EAGAIN && errno != EWOULDBLOCK)
724✔
134
    {
135
        /* Not all device drivers support GSO. If we were trying with GSO, try again
136
         * without.
137
         */
138
        result.assign(errno, boost::asio::error::get_system_category());
10✔
139
#if SPEAD2_USE_GSO
140
        if (current_gso_size == gso_probe)
10✔
141
        {
142
            /* We tried sending with GSO and it failed, but resending without GSO
143
             * also failed, so the fault is probably not lack of GSO support. Allow
144
             * GSO to be used again.
145
             */
146
            current_gso_size = gso_inactive;
×
147
        }
148
        else if (current_gso_size > 0)
10✔
149
        {
150
            set_gso_size(0, result);
6✔
151
            if (!result)
6✔
152
            {
153
                /* Re-compute msgvec without GSO */
154
                current_gso_size = gso_probe;
6✔
155
                last_msg = prepare_msgvec(first_packet, last_packet, first_msg,
6✔
156
                                          msgvec[first_msg].msg_hdr.msg_iov - msg_iov.data(),
6✔
157
                                          0);
158
                goto restart;
6✔
159
            }
160
        }
161
#endif
162
        do
163
        {
164
            auto *item = packets[first_packet].packet.item;
4✔
165
            if (!item->result)
4✔
166
                item->result = result;
2✔
167
            groups += packets[first_packet].packet.last;
4✔
168
            first_packet++;
4✔
169
        } while (first_packet < last_packet && packets[first_packet].merged);
4✔
170
        first_msg++;
4✔
171
    }
4✔
172
    else if (sent > 0)
714✔
173
    {
174
        if (current_gso_size == gso_probe)
714✔
175
        {
176
            log_debug("disabling GSO because sending with it failed and without succeeded");
6✔
177
            // Sending with GSO failed and without GSO succeeded. The network
178
            // device probably does not support it, so don't try again.
179
            current_gso_size = gso_disabled;
6✔
180
        }
181
        for (int i = 0; i < sent; i++)
9,055✔
182
        {
183
            do
184
            {
185
                auto *item = packets[first_packet].packet.item;
16,794✔
186
                item->bytes_sent += packets[first_packet].packet.size;
16,794✔
187
                groups += packets[first_packet].packet.last;
16,794✔
188
                first_packet++;
16,794✔
189
            } while (first_packet < last_packet && packets[first_packet].merged);
16,794✔
190
        }
191
        first_msg += sent;
714✔
192
    }
193

194
    if (groups > 0)
718✔
195
        groups_completed(groups);
382✔
196
    if (first_msg < last_msg)
718✔
197
    {
198
        // We didn't manage to send it all: schedule a new attempt once there is
199
        // buffer space.
200
        socket.async_wait(
4✔
201
            socket.wait_write,
202
            [this, first_packet, last_packet, first_msg, last_msg](
4✔
203
                const boost::system::error_code &
204
            ) {
4✔
205
                send_packets(first_packet, last_packet, first_msg, last_msg);
4✔
206
            });
4✔
207
    }
208
    else
209
    {
210
        post_wakeup();
714✔
211
    }
212
}
718✔
213

214
int udp_writer::prepare_msgvec(int first_packet, int last_packet, int first_msg, int first_iov, int gso_size)
720✔
215
{
216
    int merged_size = 0;
720✔
217
    int iov = first_iov;
720✔
218
    int msg = first_msg;
720✔
219
    for (int i = first_packet; i < last_packet; i++)
17,748✔
220
    {
221
        /* Check if we can merge with the previous packet using generic
222
         * segmentation offload. */
223
        if (!SPEAD2_USE_GSO
17,028✔
224
            || i == first_packet
225
            || (int) packets[i - 1].packet.size != gso_size
16,308✔
226
            || packets[i].packet.substream_index != packets[i - 1].packet.substream_index
14,221✔
227
            || merged_size + packets[i].packet.size > max_gso_message_size)
8,729✔
228
        {
229
            // Can't merge, so initialise a new header
230
            auto &hdr = msgvec[msg].msg_hdr;
8,442✔
231
            hdr.msg_iov = &msg_iov[iov];
8,442✔
232
            hdr.msg_iovlen = 0;
8,442✔
233
            const auto &endpoint = endpoints[packets[i].packet.substream_index];
8,442✔
234
            hdr.msg_name = (void *) endpoint.data();
8,442✔
235
            hdr.msg_namelen = endpoint.size();
8,442✔
236
            msg++;
8,442✔
237
            packets[i].merged = false;
8,442✔
238
            merged_size = 0;
8,442✔
239
        }
8,442✔
240
        else
241
        {
242
            packets[i].merged = true;
8,586✔
243
        }
244
        auto &hdr = msgvec[msg - 1].msg_hdr;
17,028✔
245
        hdr.msg_iovlen += packets[i].packet.buffers.size();
17,028✔
246
        merged_size += packets[i].packet.size;
17,028✔
247
        iov += packets[i].packet.buffers.size();
17,028✔
248
    }
249
    return msg;
720✔
250
}
251

252
void udp_writer::wakeup()
1,424✔
253
{
254
    packet_result result = get_packet(packets[0].packet, packets[0].scratch.get());
1,424✔
255
    switch (result)
1,424✔
256
    {
257
    case packet_result::SLEEP:
338✔
258
        sleep();
338✔
259
        return;
710✔
260
    case packet_result::EMPTY:
372✔
261
        request_wakeup();
372✔
262
        return;
372✔
263
    case packet_result::SUCCESS:
714✔
264
        break;
714✔
265
    }
266

267
    // We have at least one packet to send. See if we can get some more.
268
    int n;
269
    std::size_t n_iov = packets[0].packet.buffers.size();
714✔
270
    std::size_t max_size = packets[0].packet.size;
714✔
271
    for (n = 1; n < max_batch; n++)
16,798✔
272
    {
273
        result = get_packet(packets[n].packet, packets[n].scratch.get());
16,798✔
274
        if (result != packet_result::SUCCESS)
16,798✔
275
            break;
714✔
276
        n_iov += packets[n].packet.buffers.size();
16,084✔
277
        max_size = std::max(max_size, packets[n].packet.size);
16,084✔
278
    }
279

280
#if SPEAD2_USE_GSO
281
    int new_gso_size = max_size;
714✔
282
    if (new_gso_size != current_gso_size && current_gso_size >= 0)
714✔
283
    {
284
        boost::system::error_code result;
282✔
285
        set_gso_size(new_gso_size, result);
282✔
286
        if (!result)
282✔
287
            current_gso_size = new_gso_size;
280✔
288
        else if (result == boost::system::errc::no_protocol_option) // ENOPROTOOPT
2✔
289
        {
290
            /* Socket option is not supported on this platform. Just
291
             * disable GSO in our code.
292
             */
293
            log_debug("disabling GSO because socket option is not supported");
×
294
            current_gso_size = gso_disabled;
×
295
        }
296
        else
297
        {
298
            /* Something else has gone wrong. Make a best effort to disable
299
             * GSO on the socket.
300
             */
301
            log_warning("failed to set UDP_SEGMENT socket option to %1%: %2% (%3%)",
2✔
302
                        new_gso_size, result.value(), result.message());
2✔
303
            set_gso_size(0, result);
2✔
304
            if (!result)
2✔
305
                current_gso_size = 0;
2✔
306
        }
307
    }
308
#endif
309

310
    /* Fill in msg_iov from the packets */
311
    msg_iov.resize(n_iov);
714✔
312
    int iov = 0;
714✔
313
    for (int i = 0; i < n; i++)
17,512✔
314
    {
315
        for (const auto &buffer : packets[i].packet.buffers)
50,708✔
316
        {
317
            msg_iov[iov].iov_base = const_cast<void *>(buffer.data());
33,910✔
318
            msg_iov[iov].iov_len = buffer.size();
33,910✔
319
            iov++;
33,910✔
320
        }
321
    }
322
    int n_msgs = prepare_msgvec(0, n, 0, 0, current_gso_size);
714✔
323
    send_packets(0, n, 0, n_msgs);
714✔
324
}
325

326
#else // SPEAD2_USE_SENDMMSG
327

328
void udp_writer::wakeup()
329
{
330
    for (int i = 0; i < max_batch; i++)
331
    {
332
        transmit_packet data;
333
        packet_result result = get_packet(data, scratch.get());
334
        switch (result)
335
        {
336
        case packet_result::SLEEP:
337
            sleep();
338
            return;
339
        case packet_result::EMPTY:
340
            request_wakeup();
341
            return;
342
        case packet_result::SUCCESS:
343
            break;
344
        }
345

346
        // First try a synchronous send
347
        auto *item = data.item;
348
        bool last = data.last;
349
        const auto &endpoint = endpoints[data.substream_index];
350
        boost::system::error_code ec;
351
        std::size_t bytes = socket.send_to(data.buffers, endpoint, 0, ec);
352
        if (ec == boost::asio::error::would_block)
353
        {
354
            // Socket buffer is full, so do an asynchronous send
355
            auto handler = [this, item, last](const boost::system::error_code &ec, std::size_t bytes_transferred)
356
            {
357
                item->bytes_sent += bytes_transferred;
358
                if (!item->result)
359
                    item->result = ec;
360
                if (last)
361
                    groups_completed(1);
362
                wakeup();
363
            };
364
            socket.async_send_to(data.buffers, endpoints[data.substream_index],
365
                                 std::move(handler));
366
            return;
367
        }
368
        else
369
        {
370
            item->bytes_sent += bytes;
371
            if (!item->result)
372
                item->result = ec;
373
            if (last)
374
                groups_completed(1);
375
        }
376
    }
377
    post_wakeup();
378
}
379

380
#endif // !SPEAD2_USE_SENDMMSG
381

382
udp_writer::udp_writer(
149✔
383
    io_context_ref io_context,
384
    boost::asio::ip::udp::socket &&socket,
385
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
386
    const stream_config &config,
387
    std::size_t buffer_size)
149✔
388
    : writer(std::move(io_context), config),
149✔
389
    socket(std::move(socket)),
149✔
390
    endpoints(endpoints)
9,899✔
391
#if !SPEAD2_USE_SENDMMSG
392
    , scratch(new std::uint8_t[config.get_max_packet_size()])
393
#endif
394
{
395
    if (!socket_uses_io_context(this->socket, get_io_context()))
149✔
NEW
396
        throw std::invalid_argument("I/O context does not match the socket's I/O context");
×
397
    auto protocol = this->socket.local_endpoint().protocol();
149✔
398
    for (const auto &endpoint : endpoints)
445✔
399
        if (endpoint.protocol() != protocol)
297✔
400
            throw std::invalid_argument("Endpoint does not match protocol of the socket");
1✔
401
    set_socket_send_buffer_size(this->socket, buffer_size);
148✔
402
    this->socket.non_blocking(true);
148✔
403
#if SPEAD2_USE_SENDMMSG
404
    std::memset(&msgvec, 0, sizeof(msgvec));
148✔
405
    for (int i = 0; i < max_batch; i++)
9,620✔
406
        packets[i].scratch.reset(new std::uint8_t[config.get_max_packet_size()]);
9,472✔
407
#endif
408
}
218✔
409

410
} // anonymous namespace
411

412
static boost::asio::ip::udp::socket make_socket(
69✔
413
    boost::asio::io_context &io_context,
414
    const boost::asio::ip::udp &protocol,
415
    const boost::asio::ip::address &interface_address)
416
{
417
    boost::asio::ip::udp::socket socket(io_context, protocol);
69✔
418
    if (!interface_address.is_unspecified())
69✔
419
        socket.bind(boost::asio::ip::udp::endpoint(interface_address, 0));
×
420
    return socket;
69✔
421
}
×
422

423
static boost::asio::ip::udp get_protocol(const std::vector<boost::asio::ip::udp::endpoint> &endpoints)
70✔
424
{
425
    if (endpoints.empty())
70✔
426
        throw std::invalid_argument("Endpoint list must be non-empty");
1✔
427
    return endpoints[0].protocol();
69✔
428
}
429

430
udp_stream::udp_stream(
70✔
431
    io_context_ref io_context,
432
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
433
    const stream_config &config,
434
    std::size_t buffer_size,
435
    const boost::asio::ip::address &interface_address)
70✔
436
    : udp_stream(io_context,
437
                 make_socket(*io_context, get_protocol(endpoints), interface_address),
72✔
438
                 endpoints, config, buffer_size)
70✔
439
{
440
}
68✔
441

442
static boost::asio::ip::udp::socket make_multicast_socket(
×
443
    boost::asio::io_context &io_context,
444
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
445
    int ttl)
446
{
447
    for (const auto &endpoint : endpoints)
×
448
        if (!endpoint.address().is_multicast())
×
449
            throw std::invalid_argument("endpoint is not a multicast address");
×
NEW
450
    boost::asio::ip::udp::socket socket(io_context, get_protocol(endpoints));
×
451
    socket.set_option(boost::asio::ip::multicast::hops(ttl));
×
452
    return socket;
×
453
}
×
454

455
static boost::asio::ip::udp::socket make_multicast_v4_socket(
20✔
456
    boost::asio::io_context &io_context,
457
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
458
    int ttl,
459
    const boost::asio::ip::address &interface_address)
460
{
461
    for (const auto &endpoint : endpoints)
61✔
462
        if (!endpoint.address().is_v4() || !endpoint.address().is_multicast())
41✔
463
            throw std::invalid_argument("endpoint is not an IPv4 multicast address");
×
464
    if (!interface_address.is_unspecified() && !interface_address.is_v4())
20✔
465
        throw std::invalid_argument("interface address is not an IPv4 address");
×
466
    boost::asio::ip::udp::socket socket(io_context, boost::asio::ip::udp::v4());
20✔
467
    socket.set_option(boost::asio::ip::multicast::hops(ttl));
20✔
468
    if (!interface_address.is_unspecified())
20✔
469
        socket.set_option(boost::asio::ip::multicast::outbound_interface(interface_address.to_v4()));
20✔
470
    return socket;
20✔
471
}
×
472

473
static boost::asio::ip::udp::socket make_multicast_v6_socket(
20✔
474
    boost::asio::io_context &io_context,
475
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
476
    int ttl, unsigned int interface_index)
477
{
478
    for (const auto &endpoint : endpoints)
61✔
479
        if (!endpoint.address().is_v6() || !endpoint.address().is_multicast())
41✔
480
            throw std::invalid_argument("endpoint is not an IPv4 multicast address");
×
481
    boost::asio::ip::udp::socket socket(io_context, boost::asio::ip::udp::v6());
20✔
482
    socket.set_option(boost::asio::ip::multicast::hops(ttl));
20✔
483
    socket.set_option(boost::asio::ip::multicast::outbound_interface(interface_index));
20✔
484
    return socket;
20✔
485
}
×
486

487
udp_stream::udp_stream(
×
488
    io_context_ref io_context,
489
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
490
    const stream_config &config,
491
    std::size_t buffer_size,
492
    int ttl)
×
493
    : udp_stream(io_context,
NEW
494
                 make_multicast_socket(*io_context, endpoints, ttl),
×
UNCOV
495
                 std::move(endpoints), config, buffer_size)
×
496
{
497
}
×
498

499
udp_stream::udp_stream(
20✔
500
    io_context_ref io_context,
501
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
502
    const stream_config &config,
503
    std::size_t buffer_size,
504
    int ttl,
505
    const boost::asio::ip::address &interface_address)
20✔
506
    : udp_stream(io_context,
507
                 make_multicast_v4_socket(*io_context, endpoints, ttl, interface_address),
20✔
508
                 std::move(endpoints), config, buffer_size)
40✔
509
{
510
}
20✔
511

512
udp_stream::udp_stream(
20✔
513
    io_context_ref io_context,
514
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
515
    const stream_config &config,
516
    std::size_t buffer_size,
517
    int ttl,
518
    unsigned int interface_index)
20✔
519
    : udp_stream(io_context,
520
                 make_multicast_v6_socket(*io_context, endpoints, ttl, interface_index),
20✔
521
                 std::move(endpoints), config, buffer_size)
40✔
522
{
523
}
20✔
524

525
udp_stream::udp_stream(
149✔
526
    io_context_ref io_context,
527
    boost::asio::ip::udp::socket &&socket,
528
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
529
    const stream_config &config,
530
    std::size_t buffer_size)
149✔
531
    : stream(std::make_unique<udp_writer>(
149✔
532
        std::move(io_context),
149✔
533
        std::move(socket),
149✔
534
        endpoints,
535
        config,
536
        buffer_size))
297✔
537
{
538
}
148✔
539

540
udp_stream::udp_stream(
40✔
541
    io_context_ref io_context,
542
    boost::asio::ip::udp::socket &&socket,
543
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
544
    const stream_config &config)
40✔
545
    : udp_stream(io_context, std::move(socket), endpoints, config, 0)
40✔
546
{
547
}
40✔
548

549
} // namespace spead2::send
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc