• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 7083073978

04 Dec 2023 06:48AM UTC coverage: 70.318%. First build
7083073978

push

github

bmerry
Add some debug logging to GSO state machine transitions

1 of 2 new or added lines in 1 file covered. (50.0%)

4930 of 7011 relevant lines covered (70.32%)

91100.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.46
/src/send_udp.cpp
1
/* Copyright 2015, 2019-2020, 2023 National Research Foundation (SARAO)
2
 *
3
 * This program is free software: you can redistribute it and/or modify it under
4
 * the terms of the GNU Lesser General Public License as published by the Free
5
 * Software Foundation, either version 3 of the License, or (at your option) any
6
 * later version.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11
 * details.
12
 *
13
 * You should have received a copy of the GNU Lesser General Public License
14
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
#include <cstddef>
18
#include <cstring>
19
#include <utility>
20
#include <algorithm>
21
#include <boost/asio.hpp>
22
#include <spead2/send_udp.h>
23
#include <spead2/send_writer.h>
24
#include <spead2/common_defines.h>
25
#include <spead2/common_socket.h>
26
#if SPEAD2_USE_SENDMMSG
27
# include <sys/types.h>
28
# include <sys/socket.h>
29
# include <netinet/in.h>
30
# include <netinet/udp.h>
31
#endif
32

33
namespace spead2::send
34
{
35

36
namespace
37
{
38

39
class udp_writer : public writer
40
{
41
private:
42
    // Some magic values for current_gso_size
43
    static constexpr int gso_inactive = 0;  ///< GSO allowed, but socket option not currently set
44
    static constexpr int gso_disabled = -1; ///< GSO failed; do not try again
45
    static constexpr int gso_probe = -2;    ///< Last send with GSO failed; retrying without GSO
46

47
    boost::asio::ip::udp::socket socket;
48
    std::vector<boost::asio::ip::udp::endpoint> endpoints;
49

50
    virtual void wakeup() override final;
51

52
    /* NB: Linux has a maximum of 64 segments for GSO (UDP_MAX_SEGMENTS in the
53
     * kernel, but it doesn't seem to be exposed to userspace). If max_batch
54
     * is increased, logic will need to be added to the GSO merging to prevent
55
     * creating messages bigger than this.
56
     */
57
    static constexpr int max_batch = 64;
58
#if SPEAD2_USE_SENDMMSG
59
    static constexpr int max_gso_message_size = 65535;  // maximum size the kernel will accept
60
    struct mmsghdr msgvec[max_batch];
61
    std::vector<struct iovec> msg_iov;
62
    struct
63
    {
64
        transmit_packet packet;
65
        std::unique_ptr<std::uint8_t[]> scratch;
66
        bool merged; // packet is part of the same message as the previous packet
67
    } packets[max_batch];
68
    int current_gso_size = gso_inactive;
69

70
#if SPEAD2_USE_GSO
71
    /// Set the socket option
72
    void set_gso_size(int size, boost::system::error_code &result);
73
#endif
74

75
    /**
76
     * Set up @ref msgvec from @ref msg_iov.
77
     *
78
     * The packets in [first_packet, last_packet) are assumed to have already
79
     * been set in @ref msg_iov, starting from @a first_iov. If @a gso_size is
80
     * positive, then multiple packets may be concatenated into a single
81
     * element of @ref msgvec, provided that all but the last have size
82
     * @a gso_size. Otherwise, each packet gets its own entry in @ref msgvec.
83
     *
84
     * @return The past-the-end index into @ref msgvec after the packets are
85
     * filled in.
86
     */
87
    int prepare_msgvec(int first_packet, int last_packet, int first_msg, int first_iov, int gso_size);
88
    void send_packets(int first_packet, int last_packet, int first_msg, int last_msg);
89
#else
90
    std::unique_ptr<std::uint8_t[]> scratch;
91
#endif
92

93
public:
94
    udp_writer(
95
        io_service_ref io_service,
96
        boost::asio::ip::udp::socket &&socket,
97
        const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
98
        const stream_config &config,
99
        std::size_t buffer_size);
100

101
    virtual std::size_t get_num_substreams() const override final { return endpoints.size(); }
149✔
102
};
103

104
#if SPEAD2_USE_SENDMMSG
105

106
#if SPEAD2_USE_GSO
107
void udp_writer::set_gso_size(int size, boost::system::error_code &result)
290✔
108
{
109
    if (setsockopt(socket.native_handle(), IPPROTO_UDP, UDP_SEGMENT,
290✔
110
                   &size, sizeof(size)) == -1)
290✔
111
    {
112
        result.assign(errno, boost::asio::error::get_system_category());
2✔
113
    }
114
    else
115
    {
116
        result.clear();
288✔
117
    }
118
}
290✔
119
#endif
120

121
void udp_writer::send_packets(int first_packet, int last_packet, int first_msg, int last_msg)
682✔
122
{
123
restart:
682✔
124
    // Try sending
125
    int sent = sendmmsg(socket.native_handle(), msgvec + first_msg, last_msg - first_msg, MSG_DONTWAIT);
682✔
126
    int groups = 0;
682✔
127
    boost::system::error_code result;
682✔
128
    if (sent < 0 && errno != EAGAIN && errno != EWOULDBLOCK)
682✔
129
    {
130
        /* Not all device drivers support GSO. If we were trying with GSO, try again
131
         * without.
132
         */
133
        result.assign(errno, boost::asio::error::get_system_category());
10✔
134
#if SPEAD2_USE_GSO
135
        if (current_gso_size == gso_probe)
10✔
136
        {
137
            /* We tried sending with GSO and it failed, but resending without GSO
138
             * also failed, so the fault is probably not lack of GSO support. Allow
139
             * GSO to be used again.
140
             */
141
            current_gso_size = gso_inactive;
×
142
        }
143
        else if (current_gso_size > 0)
10✔
144
        {
145
            set_gso_size(0, result);
6✔
146
            if (!result)
6✔
147
            {
148
                /* Re-compute msgvec without GSO */
149
                current_gso_size = gso_probe;
6✔
150
                last_msg = prepare_msgvec(first_packet, last_packet, first_msg,
6✔
151
                                          msgvec[first_msg].msg_hdr.msg_iov - msg_iov.data(),
6✔
152
                                          0);
153
                goto restart;
6✔
154
            }
155
        }
156
#endif
157
        do
158
        {
159
            auto *item = packets[first_packet].packet.item;
4✔
160
            if (!item->result)
4✔
161
                item->result = result;
2✔
162
            groups += packets[first_packet].packet.last;
4✔
163
            first_packet++;
4✔
164
        } while (first_packet < last_packet && packets[first_packet].merged);
4✔
165
        first_msg++;
4✔
166
    }
4✔
167
    else if (sent > 0)
672✔
168
    {
169
        if (current_gso_size == gso_probe)
672✔
170
        {
171
            log_debug("disabling GSO because sending with it failed and without succeeded");
6✔
172
            // Sending with GSO failed and without GSO succeeded. The network
173
            // device probably does not support it, so don't try again.
174
            current_gso_size = gso_disabled;
6✔
175
        }
176
        for (int i = 0; i < sent; i++)
9,013✔
177
        {
178
            do
179
            {
180
                auto *item = packets[first_packet].packet.item;
16,794✔
181
                item->bytes_sent += packets[first_packet].packet.size;
16,794✔
182
                groups += packets[first_packet].packet.last;
16,794✔
183
                first_packet++;
16,794✔
184
            } while (first_packet < last_packet && packets[first_packet].merged);
16,794✔
185
        }
186
        first_msg += sent;
672✔
187
    }
188

189
    if (groups > 0)
676✔
190
        groups_completed(groups);
340✔
191
    if (first_msg < last_msg)
676✔
192
    {
193
        // We didn't manage to send it all: schedule a new attempt once there is
194
        // buffer space.
195
        socket.async_send(
4✔
196
            boost::asio::null_buffers(),
8✔
197
            [this, first_packet, last_packet, first_msg, last_msg](
4✔
198
                const boost::system::error_code &, std::size_t
199
            ) {
4✔
200
                send_packets(first_packet, last_packet, first_msg, last_msg);
4✔
201
            });
4✔
202
    }
203
    else
204
    {
205
        post_wakeup();
672✔
206
    }
207
}
676✔
208

209
int udp_writer::prepare_msgvec(int first_packet, int last_packet, int first_msg, int first_iov, int gso_size)
678✔
210
{
211
    int merged_size = 0;
678✔
212
    int iov = first_iov;
678✔
213
    int msg = first_msg;
678✔
214
    for (int i = first_packet; i < last_packet; i++)
17,706✔
215
    {
216
        /* Check if we can merge with the previous packet using generalised
217
         * segmentation offload. */
218
        if (!SPEAD2_USE_GSO
17,028✔
219
            || i == first_packet
220
            || (int) packets[i - 1].packet.size != gso_size
16,350✔
221
            || packets[i].packet.substream_index != packets[i - 1].packet.substream_index
14,259✔
222
            || merged_size + packets[i].packet.size > max_gso_message_size)
8,729✔
223
        {
224
            // Can't merge, so initialise a new header
225
            auto &hdr = msgvec[msg].msg_hdr;
8,442✔
226
            hdr.msg_iov = &msg_iov[iov];
8,442✔
227
            hdr.msg_iovlen = 0;
8,442✔
228
            const auto &endpoint = endpoints[packets[i].packet.substream_index];
8,442✔
229
            hdr.msg_name = (void *) endpoint.data();
8,442✔
230
            hdr.msg_namelen = endpoint.size();
8,442✔
231
            msg++;
8,442✔
232
            packets[i].merged = false;
8,442✔
233
            merged_size = 0;
8,442✔
234
        }
8,442✔
235
        else
236
        {
237
            packets[i].merged = true;
8,586✔
238
        }
239
        auto &hdr = msgvec[msg - 1].msg_hdr;
17,028✔
240
        hdr.msg_iovlen += packets[i].packet.buffers.size();
17,028✔
241
        merged_size += packets[i].packet.size;
17,028✔
242
        iov += packets[i].packet.buffers.size();
17,028✔
243
    }
244
    return msg;
678✔
245
}
246

247
void udp_writer::wakeup()
1,344✔
248
{
249
    packet_result result = get_packet(packets[0].packet, packets[0].scratch.get());
1,344✔
250
    switch (result)
1,344✔
251
    {
252
    case packet_result::SLEEP:
338✔
253
        sleep();
338✔
254
        return;
672✔
255
    case packet_result::EMPTY:
334✔
256
        request_wakeup();
334✔
257
        return;
334✔
258
    case packet_result::SUCCESS:
672✔
259
        break;
672✔
260
    }
261

262
    // We have at least one packet to send. See if we can get some more.
263
    int n;
264
    std::size_t n_iov = packets[0].packet.buffers.size();
672✔
265
    std::size_t max_size = packets[0].packet.size;
672✔
266
    for (n = 1; n < max_batch; n++)
16,798✔
267
    {
268
        result = get_packet(packets[n].packet, packets[n].scratch.get());
16,798✔
269
        if (result != packet_result::SUCCESS)
16,798✔
270
            break;
672✔
271
        n_iov += packets[n].packet.buffers.size();
16,126✔
272
        max_size = std::max(max_size, packets[n].packet.size);
16,126✔
273
    }
274

275
#if SPEAD2_USE_GSO
276
    int new_gso_size = max_size;
672✔
277
    if (new_gso_size != current_gso_size && current_gso_size >= 0)
672✔
278
    {
279
        boost::system::error_code result;
282✔
280
        set_gso_size(new_gso_size, result);
282✔
281
        if (!result)
282✔
282
            current_gso_size = new_gso_size;
280✔
283
        else if (result == boost::system::errc::no_protocol_option) // ENOPROTOOPT
2✔
284
        {
285
            /* Socket option is not supported on this platform. Just
286
             * disable GSO in our code.
287
             */
NEW
288
            log_debug("disabling GSO because socket option is not supported");
×
289
            current_gso_size = gso_disabled;
×
290
        }
291
        else
292
        {
293
            /* Something else has gone wrong. Make a best effort to disable
294
             * GSO on the socket.
295
             */
296
            log_warning("failed to set UDP_SEGMENT socket option to %1%: %2% (%3%)",
2✔
297
                        new_gso_size, result.value(), result.message());
2✔
298
            set_gso_size(0, result);
2✔
299
            if (!result)
2✔
300
                current_gso_size = 0;
2✔
301
        }
302
    }
303
#endif
304

305
    /* Fill in msg_iov from the packets */
306
    msg_iov.resize(n_iov);
672✔
307
    int iov = 0;
672✔
308
    for (int i = 0; i < n; i++)
17,470✔
309
    {
310
        for (const auto &buffer : packets[i].packet.buffers)
50,708✔
311
        {
312
            msg_iov[iov].iov_base = const_cast<void *>(
33,910✔
313
                boost::asio::buffer_cast<const void *>(buffer));
33,910✔
314
            msg_iov[iov].iov_len = boost::asio::buffer_size(buffer);
33,910✔
315
            iov++;
33,910✔
316
        }
317
    }
318
    int n_msgs = prepare_msgvec(0, n, 0, 0, current_gso_size);
672✔
319
    send_packets(0, n, 0, n_msgs);
672✔
320
}
321

322
#else // SPEAD2_USE_SENDMMSG
323

324
void udp_writer::wakeup()
325
{
326
    for (int i = 0; i < max_batch; i++)
327
    {
328
        transmit_packet data;
329
        packet_result result = get_packet(data, scratch.get());
330
        switch (result)
331
        {
332
        case packet_result::SLEEP:
333
            sleep();
334
            return;
335
        case packet_result::EMPTY:
336
            request_wakeup();
337
            return;
338
        case packet_result::SUCCESS:
339
            break;
340
        }
341

342
        // First try a synchronous send
343
        auto *item = data.item;
344
        bool last = data.last;
345
        const auto &endpoint = endpoints[data.substream_index];
346
        boost::system::error_code ec;
347
        std::size_t bytes = socket.send_to(data.buffers, endpoint, 0, ec);
348
        if (ec == boost::asio::error::would_block)
349
        {
350
            // Socket buffer is full, so do an asynchronous send
351
            auto handler = [this, item, last](const boost::system::error_code &ec, std::size_t bytes_transferred)
352
            {
353
                item->bytes_sent += bytes_transferred;
354
                if (!item->result)
355
                    item->result = ec;
356
                if (last)
357
                    groups_completed(1);
358
                wakeup();
359
            };
360
            socket.async_send_to(data.buffers, endpoints[data.substream_index],
361
                                 std::move(handler));
362
            return;
363
        }
364
        else
365
        {
366
            item->bytes_sent += bytes;
367
            if (!item->result)
368
                item->result = ec;
369
            if (last)
370
                groups_completed(1);
371
        }
372
    }
373
    post_wakeup();
374
}
375

376
#endif // !SPEAD2_USE_SENDMMSG
377

378
udp_writer::udp_writer(
150✔
379
    io_service_ref io_service,
380
    boost::asio::ip::udp::socket &&socket,
381
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
382
    const stream_config &config,
383
    std::size_t buffer_size)
150✔
384
    : writer(std::move(io_service), config),
150✔
385
    socket(std::move(socket)),
150✔
386
    endpoints(endpoints)
9,965✔
387
#if !SPEAD2_USE_SENDMMSG
388
    , scratch(new std::uint8_t[config.get_max_packet_size()])
389
#endif
390
{
391
    if (!socket_uses_io_service(this->socket, get_io_service()))
150✔
392
        throw std::invalid_argument("I/O service does not match the socket's I/O service");
×
393
    auto protocol = this->socket.local_endpoint().protocol();
150✔
394
    for (const auto &endpoint : endpoints)
447✔
395
        if (endpoint.protocol() != protocol)
298✔
396
            throw std::invalid_argument("Endpoint does not match protocol of the socket");
1✔
397
    set_socket_send_buffer_size(this->socket, buffer_size);
149✔
398
    this->socket.non_blocking(true);
149✔
399
#if SPEAD2_USE_SENDMMSG
400
    std::memset(&msgvec, 0, sizeof(msgvec));
149✔
401
    for (int i = 0; i < max_batch; i++)
9,685✔
402
        packets[i].scratch.reset(new std::uint8_t[config.get_max_packet_size()]);
9,536✔
403
#endif
404
}
219✔
405

406
} // anonymous namespace
407

408
static boost::asio::ip::udp::socket make_socket(
70✔
409
    boost::asio::io_service &io_service,
410
    const boost::asio::ip::udp &protocol,
411
    const boost::asio::ip::address &interface_address)
412
{
413
    boost::asio::ip::udp::socket socket(io_service, protocol);
70✔
414
    if (!interface_address.is_unspecified())
70✔
415
        socket.bind(boost::asio::ip::udp::endpoint(interface_address, 0));
×
416
    return socket;
70✔
417
}
×
418

419
static boost::asio::ip::udp get_protocol(const std::vector<boost::asio::ip::udp::endpoint> &endpoints)
71✔
420
{
421
    if (endpoints.empty())
71✔
422
        throw std::invalid_argument("Endpoint list must be non-empty");
1✔
423
    return endpoints[0].protocol();
70✔
424
}
425

426
udp_stream::udp_stream(
71✔
427
    io_service_ref io_service,
428
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
429
    const stream_config &config,
430
    std::size_t buffer_size,
431
    const boost::asio::ip::address &interface_address)
71✔
432
    : udp_stream(io_service,
433
                 make_socket(*io_service, get_protocol(endpoints), interface_address),
73✔
434
                 endpoints, config, buffer_size)
71✔
435
{
436
}
69✔
437

438
static boost::asio::ip::udp::socket make_multicast_socket(
×
439
    boost::asio::io_service &io_service,
440
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
441
    int ttl)
442
{
443
    for (const auto &endpoint : endpoints)
×
444
        if (!endpoint.address().is_multicast())
×
445
            throw std::invalid_argument("endpoint is not a multicast address");
×
446
    boost::asio::ip::udp::socket socket(io_service, get_protocol(endpoints));
×
447
    socket.set_option(boost::asio::ip::multicast::hops(ttl));
×
448
    return socket;
×
449
}
×
450

451
static boost::asio::ip::udp::socket make_multicast_v4_socket(
20✔
452
    boost::asio::io_service &io_service,
453
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
454
    int ttl,
455
    const boost::asio::ip::address &interface_address)
456
{
457
    for (const auto &endpoint : endpoints)
61✔
458
        if (!endpoint.address().is_v4() || !endpoint.address().is_multicast())
41✔
459
            throw std::invalid_argument("endpoint is not an IPv4 multicast address");
×
460
    if (!interface_address.is_unspecified() && !interface_address.is_v4())
20✔
461
        throw std::invalid_argument("interface address is not an IPv4 address");
×
462
    boost::asio::ip::udp::socket socket(io_service, boost::asio::ip::udp::v4());
20✔
463
    socket.set_option(boost::asio::ip::multicast::hops(ttl));
20✔
464
    if (!interface_address.is_unspecified())
20✔
465
        socket.set_option(boost::asio::ip::multicast::outbound_interface(interface_address.to_v4()));
20✔
466
    return socket;
20✔
467
}
×
468

469
static boost::asio::ip::udp::socket make_multicast_v6_socket(
20✔
470
    boost::asio::io_service &io_service,
471
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
472
    int ttl, unsigned int interface_index)
473
{
474
    for (const auto &endpoint : endpoints)
61✔
475
        if (!endpoint.address().is_v6() || !endpoint.address().is_multicast())
41✔
476
            throw std::invalid_argument("endpoint is not an IPv4 multicast address");
×
477
    boost::asio::ip::udp::socket socket(io_service, boost::asio::ip::udp::v6());
20✔
478
    socket.set_option(boost::asio::ip::multicast::hops(ttl));
20✔
479
    socket.set_option(boost::asio::ip::multicast::outbound_interface(interface_index));
20✔
480
    return socket;
20✔
481
}
×
482

483
udp_stream::udp_stream(
×
484
    io_service_ref io_service,
485
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
486
    const stream_config &config,
487
    std::size_t buffer_size,
488
    int ttl)
×
489
    : udp_stream(io_service,
490
                 make_multicast_socket(*io_service, endpoints, ttl),
×
491
                 std::move(endpoints), config, buffer_size)
×
492
{
493
}
×
494

495
udp_stream::udp_stream(
20✔
496
    io_service_ref io_service,
497
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
498
    const stream_config &config,
499
    std::size_t buffer_size,
500
    int ttl,
501
    const boost::asio::ip::address &interface_address)
20✔
502
    : udp_stream(io_service,
503
                 make_multicast_v4_socket(*io_service, endpoints, ttl, interface_address),
20✔
504
                 std::move(endpoints), config, buffer_size)
40✔
505
{
506
}
20✔
507

508
udp_stream::udp_stream(
20✔
509
    io_service_ref io_service,
510
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
511
    const stream_config &config,
512
    std::size_t buffer_size,
513
    int ttl,
514
    unsigned int interface_index)
20✔
515
    : udp_stream(io_service,
516
                 make_multicast_v6_socket(*io_service, endpoints, ttl, interface_index),
20✔
517
                 std::move(endpoints), config, buffer_size)
40✔
518
{
519
}
20✔
520

521
udp_stream::udp_stream(
150✔
522
    io_service_ref io_service,
523
    boost::asio::ip::udp::socket &&socket,
524
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
525
    const stream_config &config,
526
    std::size_t buffer_size)
150✔
527
    : stream(std::make_unique<udp_writer>(
150✔
528
        std::move(io_service),
150✔
529
        std::move(socket),
150✔
530
        endpoints,
531
        config,
532
        buffer_size))
299✔
533
{
534
}
149✔
535

536
udp_stream::udp_stream(
40✔
537
    io_service_ref io_service,
538
    boost::asio::ip::udp::socket &&socket,
539
    const std::vector<boost::asio::ip::udp::endpoint> &endpoints,
540
    const stream_config &config)
40✔
541
    : udp_stream(io_service, std::move(socket), endpoints, config, 0)
40✔
542
{
543
}
40✔
544

545
} // namespace spead2::send
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc