• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 7083965150

04 Dec 2023 08:29AM UTC coverage: 70.332% (+0.01%) from 70.318%
7083965150

push

github

bmerry
Add some dev docs on GSO

4931 of 7011 relevant lines covered (70.33%)

91105.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.46
/src/send_udp.cpp
1
/* Copyright 2015, 2019-2020, 2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
#include <cstddef>
18
#include <cstring>
19
#include <utility>
20
#include <algorithm>
21
#include <boost/asio.hpp>
22
#include <spead2/send_udp.h>
23
#include <spead2/send_writer.h>
24
#include <spead2/common_defines.h>
25
#include <spead2/common_socket.h>
26
#if SPEAD2_USE_SENDMMSG
27
# include <sys/types.h>
28
# include <sys/socket.h>
29
# include <netinet/in.h>
30
# include <netinet/udp.h>
31
#endif
32

33
namespace spead2::send
34
{
35

36
namespace
37
{
38

39
class udp_writer : public writer
40
{
41
private:
42
    boost::asio::ip::udp::socket socket;
43
    std::vector<boost::asio::ip::udp::endpoint> endpoints;
44

45
    virtual void wakeup() override final;
46

47
    /* NB: Linux has a maximum of 64 segments for GSO (UDP_MAX_SEGMENTS in the
48
     * kernel, but it doesn't seem to be exposed to userspace). If max_batch
49
     * is increased, logic will need to be added to the GSO merging to prevent
50
     * creating messages bigger than this.
51
     */
52
    static constexpr int max_batch = 64;
53
#if SPEAD2_USE_SENDMMSG
54
    // Some magic values for current_gso_size
55
    ///< GSO allowed, but socket option not currently set
56
    [[maybe_unused]] static constexpr int gso_inactive = 0;
57
    ///< GSO failed; do not try again
58
    [[maybe_unused]] static constexpr int gso_disabled = -1;
59
    ///< Last send with GSO failed; retrying without GSO
60
    [[maybe_unused]] static constexpr int gso_probe = -2;
61

62
    static constexpr int max_gso_message_size = 65535;  // maximum size the kernel will accept
63
    struct mmsghdr msgvec[max_batch];
64
    std::vector<struct iovec> msg_iov;
65
    struct
66
    {
67
        transmit_packet packet;
68
        std::unique_ptr<std::uint8_t[]> scratch;
69
        bool merged; // packet is part of the same message as the previous packet
70
    } packets[max_batch];
71
    int current_gso_size = gso_inactive;
72

73
#if SPEAD2_USE_GSO
74
    /// Set the socket option
75
    void set_gso_size(int size, boost::system::error_code &result);
76
#endif
77

78
    /**
79
     * Set up @ref msgvec from @ref msg_iov.
80
     *
81
     * The packets in [first_packet, last_packet) are assumed to have already
82
     * been set in @ref msg_iov, starting from @a first_iov. If @a gso_size is
83
     * positive, then multiple packets may be concatenated into a single
84
     * element of @ref msgvec, provided that all but the last have size
85
     * @a gso_size. Otherwise, each packet gets its own entry in @ref msgvec.
86
     *
87
     * @return The past-the-end index into @ref msgvec after the packets are
88
     * filled in.
89
     */
90
    int prepare_msgvec(int first_packet, int last_packet, int first_msg, int first_iov, int gso_size);
91
    void send_packets(int first_packet, int last_packet, int first_msg, int last_msg);
92
#else
93
    std::unique_ptr<std::uint8_t[]> scratch;
94
#endif
95

96
public:
97
    udp_writer(
98
        io_service_ref io_service,
99
        boost::asio::ip::udp::socket &&socket,
100
        const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
101
        const stream_config &config,
102
        std::size_t buffer_size);
103

104
    virtual std::size_t get_num_substreams() const override final { return endpoints.size(); }
149✔
105
};
106

107
#if SPEAD2_USE_SENDMMSG
108

109
#if SPEAD2_USE_GSO
110
void udp_writer::set_gso_size(int size, boost::system::error_code &result)
290✔
111
{
112
    if (setsockopt(socket.native_handle(), IPPROTO_UDP, UDP_SEGMENT,
290✔
113
                   &size, sizeof(size)) == -1)
290✔
114
    {
115
        result.assign(errno, boost::asio::error::get_system_category());
2✔
116
    }
117
    else
118
    {
119
        result.clear();
288✔
120
    }
121
}
290✔
122
#endif
123

124
void udp_writer::send_packets(int first_packet, int last_packet, int first_msg, int last_msg)
682✔
125
{
126
restart:
682✔
127
    // Try sending
128
    int sent = sendmmsg(socket.native_handle(), msgvec + first_msg, last_msg - first_msg, MSG_DONTWAIT);
682✔
129
    int groups = 0;
682✔
130
    boost::system::error_code result;
682✔
131
    if (sent < 0 && errno != EAGAIN && errno != EWOULDBLOCK)
682✔
132
    {
133
        /* Not all device drivers support GSO. If we were trying with GSO, try again
134
         * without.
135
         */
136
        result.assign(errno, boost::asio::error::get_system_category());
10✔
137
#if SPEAD2_USE_GSO
138
        if (current_gso_size == gso_probe)
10✔
139
        {
140
            /* We tried sending with GSO and it failed, but resending without GSO
141
             * also failed, so the fault is probably not lack of GSO support. Allow
142
             * GSO to be used again.
143
             */
144
            current_gso_size = gso_inactive;
×
145
        }
146
        else if (current_gso_size > 0)
10✔
147
        {
148
            set_gso_size(0, result);
6✔
149
            if (!result)
6✔
150
            {
151
                /* Re-compute msgvec without GSO */
152
                current_gso_size = gso_probe;
6✔
153
                last_msg = prepare_msgvec(first_packet, last_packet, first_msg,
6✔
154
                                          msgvec[first_msg].msg_hdr.msg_iov - msg_iov.data(),
6✔
155
                                          0);
156
                goto restart;
6✔
157
            }
158
        }
159
#endif
160
        do
161
        {
162
            auto *item = packets[first_packet].packet.item;
4✔
163
            if (!item->result)
4✔
164
                item->result = result;
2✔
165
            groups += packets[first_packet].packet.last;
4✔
166
            first_packet++;
4✔
167
        } while (first_packet < last_packet && packets[first_packet].merged);
4✔
168
        first_msg++;
4✔
169
    }
4✔
170
    else if (sent > 0)
672✔
171
    {
172
        if (current_gso_size == gso_probe)
672✔
173
        {
174
            log_debug("disabling GSO because sending with it failed and without succeeded");
6✔
175
            // Sending with GSO failed and without GSO succeeded. The network
176
            // device probably does not support it, so don't try again.
177
            current_gso_size = gso_disabled;
6✔
178
        }
179
        for (int i = 0; i < sent; i++)
9,013✔
180
        {
181
            do
182
            {
183
                auto *item = packets[first_packet].packet.item;
16,794✔
184
                item->bytes_sent += packets[first_packet].packet.size;
16,794✔
185
                groups += packets[first_packet].packet.last;
16,794✔
186
                first_packet++;
16,794✔
187
            } while (first_packet < last_packet && packets[first_packet].merged);
16,794✔
188
        }
189
        first_msg += sent;
672✔
190
    }
191

192
    if (groups > 0)
676✔
193
        groups_completed(groups);
340✔
194
    if (first_msg < last_msg)
676✔
195
    {
196
        // We didn't manage to send it all: schedule a new attempt once there is
197
        // buffer space.
198
        socket.async_send(
4✔
199
            boost::asio::null_buffers(),
8✔
200
            [this, first_packet, last_packet, first_msg, last_msg](
4✔
201
                const boost::system::error_code &, std::size_t
202
            ) {
4✔
203
                send_packets(first_packet, last_packet, first_msg, last_msg);
4✔
204
            });
4✔
205
    }
206
    else
207
    {
208
        post_wakeup();
672✔
209
    }
210
}
676✔
211

212
int udp_writer::prepare_msgvec(int first_packet, int last_packet, int first_msg, int first_iov, int gso_size)
678✔
213
{
214
    int merged_size = 0;
678✔
215
    int iov = first_iov;
678✔
216
    int msg = first_msg;
678✔
217
    for (int i = first_packet; i < last_packet; i++)
17,706✔
218
    {
219
        /* Check if we can merge with the previous packet using generalised
220
         * segmentation offload. */
221
        if (!SPEAD2_USE_GSO
17,028✔
222
            || i == first_packet
223
            || (int) packets[i - 1].packet.size != gso_size
16,350✔
224
            || packets[i].packet.substream_index != packets[i - 1].packet.substream_index
14,259✔
225
            || merged_size + packets[i].packet.size > max_gso_message_size)
8,729✔
226
        {
227
            // Can't merge, so initialise a new header
228
            auto &hdr = msgvec[msg].msg_hdr;
8,442✔
229
            hdr.msg_iov = &msg_iov[iov];
8,442✔
230
            hdr.msg_iovlen = 0;
8,442✔
231
            const auto &endpoint = endpoints[packets[i].packet.substream_index];
8,442✔
232
            hdr.msg_name = (void *) endpoint.data();
8,442✔
233
            hdr.msg_namelen = endpoint.size();
8,442✔
234
            msg++;
8,442✔
235
            packets[i].merged = false;
8,442✔
236
            merged_size = 0;
8,442✔
237
        }
8,442✔
238
        else
239
        {
240
            packets[i].merged = true;
8,586✔
241
        }
242
        auto &hdr = msgvec[msg - 1].msg_hdr;
17,028✔
243
        hdr.msg_iovlen += packets[i].packet.buffers.size();
17,028✔
244
        merged_size += packets[i].packet.size;
17,028✔
245
        iov += packets[i].packet.buffers.size();
17,028✔
246
    }
247
    return msg;
678✔
248
}
249

250
void udp_writer::wakeup()
1,344✔
251
{
252
    packet_result result = get_packet(packets[0].packet, packets[0].scratch.get());
1,344✔
253
    switch (result)
1,344✔
254
    {
255
    case packet_result::SLEEP:
338✔
256
        sleep();
338✔
257
        return;
672✔
258
    case packet_result::EMPTY:
334✔
259
        request_wakeup();
334✔
260
        return;
334✔
261
    case packet_result::SUCCESS:
672✔
262
        break;
672✔
263
    }
264

265
    // We have at least one packet to send. See if we can get some more.
266
    int n;
267
    std::size_t n_iov = packets[0].packet.buffers.size();
672✔
268
    std::size_t max_size = packets[0].packet.size;
672✔
269
    for (n = 1; n < max_batch; n++)
16,798✔
270
    {
271
        result = get_packet(packets[n].packet, packets[n].scratch.get());
16,798✔
272
        if (result != packet_result::SUCCESS)
16,798✔
273
            break;
672✔
274
        n_iov += packets[n].packet.buffers.size();
16,126✔
275
        max_size = std::max(max_size, packets[n].packet.size);
16,126✔
276
    }
277

278
#if SPEAD2_USE_GSO
279
    int new_gso_size = max_size;
672✔
280
    if (new_gso_size != current_gso_size && current_gso_size >= 0)
672✔
281
    {
282
        boost::system::error_code result;
282✔
283
        set_gso_size(new_gso_size, result);
282✔
284
        if (!result)
282✔
285
            current_gso_size = new_gso_size;
280✔
286
        else if (result == boost::system::errc::no_protocol_option) // ENOPROTOOPT
2✔
287
        {
288
            /* Socket option is not supported on this platform. Just
289
             * disable GSO in our code.
290
             */
291
            log_debug("disabling GSO because socket option is not supported");
×
292
            current_gso_size = gso_disabled;
×
293
        }
294
        else
295
        {
296
            /* Something else has gone wrong. Make a best effort to disable
297
             * GSO on the socket.
298
             */
299
            log_warning("failed to set UDP_SEGMENT socket option to %1%: %2% (%3%)",
2✔
300
                        new_gso_size, result.value(), result.message());
2✔
301
            set_gso_size(0, result);
2✔
302
            if (!result)
2✔
303
                current_gso_size = 0;
2✔
304
        }
305
    }
306
#endif
307

308
    /* Fill in msg_iov from the packets */
309
    msg_iov.resize(n_iov);
672✔
310
    int iov = 0;
672✔
311
    for (int i = 0; i < n; i++)
17,470✔
312
    {
313
        for (const auto &buffer : packets[i].packet.buffers)
50,708✔
314
        {
315
            msg_iov[iov].iov_base = const_cast<void *>(
33,910✔
316
                boost::asio::buffer_cast<const void *>(buffer));
33,910✔
317
            msg_iov[iov].iov_len = boost::asio::buffer_size(buffer);
33,910✔
318
            iov++;
33,910✔
319
        }
320
    }
321
    int n_msgs = prepare_msgvec(0, n, 0, 0, current_gso_size);
672✔
322
    send_packets(0, n, 0, n_msgs);
672✔
323
}
324

325
#else // SPEAD2_USE_SENDMMSG
326

327
void udp_writer::wakeup()
328
{
329
    for (int i = 0; i < max_batch; i++)
330
    {
331
        transmit_packet data;
332
        packet_result result = get_packet(data, scratch.get());
333
        switch (result)
334
        {
335
        case packet_result::SLEEP:
336
            sleep();
337
            return;
338
        case packet_result::EMPTY:
339
            request_wakeup();
340
            return;
341
        case packet_result::SUCCESS:
342
            break;
343
        }
344

345
        // First try a synchronous send
346
        auto *item = data.item;
347
        bool last = data.last;
348
        const auto &endpoint = endpoints[data.substream_index];
349
        boost::system::error_code ec;
350
        std::size_t bytes = socket.send_to(data.buffers, endpoint, 0, ec);
351
        if (ec == boost::asio::error::would_block)
352
        {
353
            // Socket buffer is full, so do an asynchronous send
354
            auto handler = [this, item, last](const boost::system::error_code &ec, std::size_t bytes_transferred)
355
            {
356
                item->bytes_sent += bytes_transferred;
357
                if (!item->result)
358
                    item->result = ec;
359
                if (last)
360
                    groups_completed(1);
361
                wakeup();
362
            };
363
            socket.async_send_to(data.buffers, endpoints[data.substream_index],
364
                                 std::move(handler));
365
            return;
366
        }
367
        else
368
        {
369
            item->bytes_sent += bytes;
370
            if (!item->result)
371
                item->result = ec;
372
            if (last)
373
                groups_completed(1);
374
        }
375
    }
376
    post_wakeup();
377
}
378

379
#endif // !SPEAD2_USE_SENDMMSG
380

381
udp_writer::udp_writer(
150✔
382
    io_service_ref io_service,
383
    boost::asio::ip::udp::socket &&socket,
384
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
385
    const stream_config &config,
386
    std::size_t buffer_size)
150✔
387
    : writer(std::move(io_service), config),
150✔
388
    socket(std::move(socket)),
150✔
389
    endpoints(endpoints)
9,965✔
390
#if !SPEAD2_USE_SENDMMSG
391
    , scratch(new std::uint8_t[config.get_max_packet_size()])
392
#endif
393
{
394
    if (!socket_uses_io_service(this->socket, get_io_service()))
150✔
395
        throw std::invalid_argument("I/O service does not match the socket's I/O service");
×
396
    auto protocol = this->socket.local_endpoint().protocol();
150✔
397
    for (const auto &endpoint : endpoints)
447✔
398
        if (endpoint.protocol() != protocol)
298✔
399
            throw std::invalid_argument("Endpoint does not match protocol of the socket");
1✔
400
    set_socket_send_buffer_size(this->socket, buffer_size);
149✔
401
    this->socket.non_blocking(true);
149✔
402
#if SPEAD2_USE_SENDMMSG
403
    std::memset(&msgvec, 0, sizeof(msgvec));
149✔
404
    for (int i = 0; i < max_batch; i++)
9,685✔
405
        packets[i].scratch.reset(new std::uint8_t[config.get_max_packet_size()]);
9,536✔
406
#endif
407
}
219✔
408

409
} // anonymous namespace
410

411
static boost::asio::ip::udp::socket make_socket(
70✔
412
    boost::asio::io_service &io_service,
413
    const boost::asio::ip::udp &protocol,
414
    const boost::asio::ip::address &interface_address)
415
{
416
    boost::asio::ip::udp::socket socket(io_service, protocol);
70✔
417
    if (!interface_address.is_unspecified())
70✔
418
        socket.bind(boost::asio::ip::udp::endpoint(interface_address, 0));
×
419
    return socket;
70✔
420
}
×
421

422
static boost::asio::ip::udp get_protocol(const std::vector<boost::asio::ip::udp::endpoint> &endpoints)
71✔
423
{
424
    if (endpoints.empty())
71✔
425
        throw std::invalid_argument("Endpoint list must be non-empty");
1✔
426
    return endpoints[0].protocol();
70✔
427
}
428

429
udp_stream::udp_stream(
71✔
430
    io_service_ref io_service,
431
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
432
    const stream_config &config,
433
    std::size_t buffer_size,
434
    const boost::asio::ip::address &interface_address)
71✔
435
    : udp_stream(io_service,
436
                 make_socket(*io_service, get_protocol(endpoints), interface_address),
73✔
437
                 endpoints, config, buffer_size)
71✔
438
{
439
}
69✔
440

441
static boost::asio::ip::udp::socket make_multicast_socket(
×
442
    boost::asio::io_service &io_service,
443
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
444
    int ttl)
445
{
446
    for (const auto &endpoint : endpoints)
×
447
        if (!endpoint.address().is_multicast())
×
448
            throw std::invalid_argument("endpoint is not a multicast address");
×
449
    boost::asio::ip::udp::socket socket(io_service, get_protocol(endpoints));
×
450
    socket.set_option(boost::asio::ip::multicast::hops(ttl));
×
451
    return socket;
×
452
}
×
453

454
static boost::asio::ip::udp::socket make_multicast_v4_socket(
20✔
455
    boost::asio::io_service &io_service,
456
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
457
    int ttl,
458
    const boost::asio::ip::address &interface_address)
459
{
460
    for (const auto &endpoint : endpoints)
61✔
461
        if (!endpoint.address().is_v4() || !endpoint.address().is_multicast())
41✔
462
            throw std::invalid_argument("endpoint is not an IPv4 multicast address");
×
463
    if (!interface_address.is_unspecified() && !interface_address.is_v4())
20✔
464
        throw std::invalid_argument("interface address is not an IPv4 address");
×
465
    boost::asio::ip::udp::socket socket(io_service, boost::asio::ip::udp::v4());
20✔
466
    socket.set_option(boost::asio::ip::multicast::hops(ttl));
20✔
467
    if (!interface_address.is_unspecified())
20✔
468
        socket.set_option(boost::asio::ip::multicast::outbound_interface(interface_address.to_v4()));
20✔
469
    return socket;
20✔
470
}
×
471

472
static boost::asio::ip::udp::socket make_multicast_v6_socket(
20✔
473
    boost::asio::io_service &io_service,
474
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
475
    int ttl, unsigned int interface_index)
476
{
477
    for (const auto &endpoint : endpoints)
61✔
478
        if (!endpoint.address().is_v6() || !endpoint.address().is_multicast())
41✔
479
            throw std::invalid_argument("endpoint is not an IPv4 multicast address");
×
480
    boost::asio::ip::udp::socket socket(io_service, boost::asio::ip::udp::v6());
20✔
481
    socket.set_option(boost::asio::ip::multicast::hops(ttl));
20✔
482
    socket.set_option(boost::asio::ip::multicast::outbound_interface(interface_index));
20✔
483
    return socket;
20✔
484
}
×
485

486
udp_stream::udp_stream(
×
487
    io_service_ref io_service,
488
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
489
    const stream_config &config,
490
    std::size_t buffer_size,
491
    int ttl)
×
492
    : udp_stream(io_service,
493
                 make_multicast_socket(*io_service, endpoints, ttl),
×
494
                 std::move(endpoints), config, buffer_size)
×
495
{
496
}
×
497

498
udp_stream::udp_stream(
20✔
499
    io_service_ref io_service,
500
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
501
    const stream_config &config,
502
    std::size_t buffer_size,
503
    int ttl,
504
    const boost::asio::ip::address &interface_address)
20✔
505
    : udp_stream(io_service,
506
                 make_multicast_v4_socket(*io_service, endpoints, ttl, interface_address),
20✔
507
                 std::move(endpoints), config, buffer_size)
40✔
508
{
509
}
20✔
510

511
udp_stream::udp_stream(
20✔
512
    io_service_ref io_service,
513
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
514
    const stream_config &config,
515
    std::size_t buffer_size,
516
    int ttl,
517
    unsigned int interface_index)
20✔
518
    : udp_stream(io_service,
519
                 make_multicast_v6_socket(*io_service, endpoints, ttl, interface_index),
20✔
520
                 std::move(endpoints), config, buffer_size)
40✔
521
{
522
}
20✔
523

524
udp_stream::udp_stream(
150✔
525
    io_service_ref io_service,
526
    boost::asio::ip::udp::socket &&socket,
527
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
528
    const stream_config &config,
529
    std::size_t buffer_size)
150✔
530
    : stream(std::make_unique<udp_writer>(
150✔
531
        std::move(io_service),
150✔
532
        std::move(socket),
150✔
533
        endpoints,
534
        config,
535
        buffer_size))
299✔
536
{
537
}
149✔
538

539
udp_stream::udp_stream(
40✔
540
    io_service_ref io_service,
541
    boost::asio::ip::udp::socket &&socket,
542
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
543
    const stream_config &config)
40✔
544
    : udp_stream(io_service, std::move(socket), endpoints, config, 0)
40✔
545
{
546
}
40✔
547

548
} // namespace spead2::send
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc