• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ska-sa / spead2 / 12652972619

07 Jan 2025 02:00PM UTC coverage: 78.871% (+0.02%) from 78.852%
12652972619

push

github

web-flow
Merge pull request #368 from ska-sa/bump-boost-1.87

Support Boost 1.87

115 of 137 new or added lines in 32 files covered. (83.94%)

2 existing lines in 2 files now uncovered.

5577 of 7071 relevant lines covered (78.87%)

91360.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.79
/src/send_tcp.cpp
1
/*
2
 * TCP sender for SPEAD protocol
3
 *
4
 * ICRAR - International Centre for Radio Astronomy Research
5
 * (c) UWA - The University of Western Australia, 2018
6
 * Copyright by UWA (in the framework of the ICRAR)
7
 *
8
 * This program is free software: you can redistribute it and/or modify it under
9
 * the terms of the GNU Lesser General Public License as published by the Free
10
 * Software Foundation, either version 3 of the License, or (at your option) any
11
 * later version.
12
 *
13
 * This program is distributed in the hope that it will be useful, but WITHOUT
14
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
16
 * details.
17
 *
18
 * You should have received a copy of the GNU Lesser General Public License
19
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
 */
21

22
#include <stdexcept>
23
#include <utility>
24
#include <spead2/common_socket.h>
25
#include <spead2/send_tcp.h>
26
#include <spead2/send_writer.h>
27

28
namespace spead2::send
29
{
30

31
static boost::asio::ip::tcp::socket make_socket(
42✔
32
    const io_context_ref &io_context,
33
    const std::vector<boost::asio::ip::tcp::endpoint> &endpoints,
34
    std::size_t buffer_size,
35
    const boost::asio::ip::address &interface_address)
36
{
37
    if (endpoints.size() != 1)
42✔
38
        throw std::invalid_argument("endpoints must contain exactly one element");
×
39
    const boost::asio::ip::tcp::endpoint &endpoint = endpoints[0];
42✔
40
    boost::asio::ip::tcp::socket socket(*io_context, endpoint.protocol());
42✔
41
    if (!interface_address.is_unspecified())
42✔
42
        socket.bind(boost::asio::ip::tcp::endpoint(interface_address, 0));
×
43
    set_socket_send_buffer_size(socket, buffer_size);
42✔
44
    return socket;
42✔
45
}
×
46

47
namespace
48
{
49

50
class tcp_writer : public writer
51
{
52
private:
53
    /// The underlying TCP socket
54
    boost::asio::ip::tcp::socket socket;
55
    /// Whether we were handled an already-connected socket
56
    const bool pre_connected;
57
    /// Endpoint to connect to (if not pre-connected)
58
    boost::asio::ip::tcp::endpoint endpoint;
59
    /// Callback once connected (if not pre-connected)
60
    std::function<void(const boost::system::error_code &)> connect_handler;
61
    // Scratch space for constructing packets
62
    std::unique_ptr<std::uint8_t[]> scratch;
63

64
    virtual void wakeup() override final;
65
    virtual void start() override final;
66

67
public:
68
    /**
69
     * Constructor. A callback is provided to indicate when the connection is
70
     * established.
71
     *
72
     * @warning The callback may be called before the constructor returns. The
73
     * implementation of the callback needs to be prepared to handle this case.
74
     *
75
     * @param io_context   I/O context for sending data
76
     * @param connect_handler  Callback when connection is established. It is called
77
     *                     with a @c boost::system::error_code to indicate whether
78
     *                     connection was successful.
79
     * @param endpoints    Destination host and port (must contain exactly one element)
80
     * @param config       Stream configuration
81
     * @param buffer_size  Socket buffer size (0 for OS default)
82
     * @param interface_address   Source address
83
     *                            @verbatim embed:rst:leading-asterisks
84
     *                            (see tips on :ref:`routing`)
85
     *                            @endverbatim
86
     */
87
    tcp_writer(
88
        io_context_ref io_context,
89
        std::function<void(const boost::system::error_code &)> &&connect_handler,
90
        const std::vector<boost::asio::ip::tcp::endpoint> &endpoints,
91
        const stream_config &config,
92
        std::size_t buffer_size,
93
        const boost::asio::ip::address &interface_address);
94

95
    /**
96
     * Constructor using an existing socket. The socket must be connected.
97
     */
98
    tcp_writer(
99
        io_context_ref io_context,
100
        boost::asio::ip::tcp::socket &&socket,
101
        const stream_config &config);
102

103
    virtual std::size_t get_num_substreams() const override final { return 1; }
68✔
104
};
105

106
void tcp_writer::wakeup()
878✔
107
{
108
    transmit_packet data;
878✔
109
    packet_result result = get_packet(data, scratch.get());
878✔
110
    switch (result)
878✔
111
    {
112
    case packet_result::SLEEP:
×
113
        sleep();
×
114
        return;
×
115
    case packet_result::EMPTY:
172✔
116
        request_wakeup();
172✔
117
        return;
172✔
118
    case packet_result::SUCCESS:
706✔
119
        break;
706✔
120
    }
121

122
    auto *item = data.item;
706✔
123
    bool last = data.last;
706✔
124
    auto handler = [this, item, last](const boost::system::error_code &ec, std::size_t bytes_transferred)
2,955✔
125
    {
126
        item->bytes_sent += bytes_transferred;
706✔
127
        if (!item->result)
706✔
128
            item->result = ec;
706✔
129
        if (last)
706✔
130
            groups_completed(1);
131✔
131
        wakeup();
706✔
132
    };
1,412✔
133
    boost::asio::async_write(socket, data.buffers, std::move(handler));
706✔
134
}
878✔
135

136
void tcp_writer::start()
68✔
137
{
138
    if (!pre_connected)
68✔
139
    {
140
        socket.async_connect(endpoint,
42✔
141
            [this] (const boost::system::error_code &ec)
84✔
142
            {
143
                connect_handler(ec);
42✔
144
                wakeup();
42✔
145
            });
42✔
146
    }
147
    else
148
        request_wakeup();
26✔
149
}
68✔
150

151
tcp_writer::tcp_writer(
42✔
152
    io_context_ref io_context,
153
    std::function<void(const boost::system::error_code &)> &&connect_handler,
154
    const std::vector<boost::asio::ip::tcp::endpoint> &endpoints,
155
    const stream_config &config,
156
    std::size_t buffer_size,
157
    const boost::asio::ip::address &interface_address)
42✔
158
    : writer(std::move(io_context), config),
42✔
159
    socket(make_socket(get_io_context(), endpoints, buffer_size, interface_address)),
42✔
160
    pre_connected(false),
42✔
161
    endpoint(endpoints[0]),
42✔
162
    connect_handler(std::move(connect_handler)),
42✔
163
    scratch(new std::uint8_t[config.get_max_packet_size()])
84✔
164
{
165
}
42✔
166

167
tcp_writer::tcp_writer(
26✔
168
    io_context_ref io_context,
169
    boost::asio::ip::tcp::socket &&socket,
170
    const stream_config &config)
26✔
171
    : writer(std::move(io_context), config),
26✔
172
    socket(std::move(socket)),
26✔
173
    pre_connected(true),
26✔
174
    scratch(new std::uint8_t[config.get_max_packet_size()])
52✔
175
{
176
    if (!socket_uses_io_context(this->socket, get_io_context()))
26✔
NEW
177
        throw std::invalid_argument("I/O context does not match the socket's I/O context");
×
178
}
26✔
179

180
} // anonymous namespace
181

182
tcp_stream::tcp_stream(
42✔
183
    io_context_ref io_context,
184
    std::function<void(const boost::system::error_code &)> &&connect_handler,
185
    const std::vector<boost::asio::ip::tcp::endpoint> &endpoints,
186
    const stream_config &config,
187
    std::size_t buffer_size,
188
    const boost::asio::ip::address &interface_address)
42✔
189
    : stream(std::make_unique<tcp_writer>(
42✔
190
        std::move(io_context),
42✔
191
        std::move(connect_handler),
42✔
192
        endpoints,
193
        config,
194
        buffer_size,
195
        interface_address))
84✔
196
{
197
}
42✔
198

199
tcp_stream::tcp_stream(
26✔
200
    io_context_ref io_context,
201
    boost::asio::ip::tcp::socket &&socket,
202
    const stream_config &config)
26✔
203
    : stream(std::make_unique<tcp_writer>(
26✔
204
        std::move(io_context),
26✔
205
        std::move(socket),
26✔
206
        config))
52✔
207
{
208
}
26✔
209

210
} // namespace spead2::send
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc