• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mavlink / MAVSDK / 18546574407

16 Oct 2025 12:36AM UTC coverage: 48.253% (+0.6%) from 47.661%
18546574407

push

github

web-flow
Merge pull request #2684 from mavlink/pr-add-reconnection-systemtests

Re-connection fixes and system tests

294 of 299 new or added lines in 3 files covered. (98.33%)

11 existing lines in 4 files now uncovered.

17429 of 36120 relevant lines covered (48.25%)

472.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.0
/src/mavsdk/core/tcp_server_connection.cpp
1
#include "tcp_server_connection.h"
2
#include "log.h"
3

4
#include <cassert>
5
#include <fcntl.h>
6
#include <sstream>
7

8
#ifdef WINDOWS
9
#ifndef MINGW
10
#pragma comment(lib, "Ws2_32.lib") // Without this, Ws2_32.lib is not included in static library.
11
#endif
12
#include <Windows.h>
13
#include <Winsock2.h>
14
#include <ws2tcpip.h>
15
#else
16
#include <netinet/in.h>
17
#include <sys/select.h>
18
#include <sys/socket.h>
19
#include <arpa/inet.h>
20
#include <errno.h>
21
#include <netdb.h>
22
#endif
23

24
namespace mavsdk {
25
TcpServerConnection::TcpServerConnection(
4✔
26
    Connection::ReceiverCallback receiver_callback,
27
    Connection::LibmavReceiverCallback libmav_receiver_callback,
28
    MavsdkImpl& mavsdk_impl,
29
    std::string local_ip,
30
    int local_port,
31
    ForwardingOption forwarding_option) :
4✔
32
    Connection(
33
        std::move(receiver_callback),
4✔
34
        std::move(libmav_receiver_callback),
4✔
35
        mavsdk_impl,
36
        forwarding_option),
37
    _local_ip(std::move(local_ip)),
4✔
38
    _local_port(local_port)
12✔
39
{}
4✔
40

41
TcpServerConnection::~TcpServerConnection()
8✔
42
{
43
    stop();
4✔
44
}
8✔
45

46
ConnectionResult TcpServerConnection::start()
4✔
47
{
48
    if (!start_mavlink_receiver()) {
4✔
49
        return ConnectionResult::ConnectionsExhausted;
×
50
    }
51

52
    if (!start_libmav_receiver()) {
4✔
53
        return ConnectionResult::ConnectionsExhausted;
×
54
    }
55

56
#ifdef WINDOWS
57
    WSADATA wsa;
58
    if (WSAStartup(MAKEWORD(2, 2), &wsa) != 0) {
59
        LogErr() << "Error: Winsock failed, error: " << get_socket_error_string(WSAGetLastError());
60
        return ConnectionResult::SocketError;
61
    }
62
#endif
63

64
    std::lock_guard<std::mutex> lock(_mutex);
4✔
65

66
    _server_socket_fd.reset(socket(AF_INET, SOCK_STREAM, 0));
4✔
67
    if (_server_socket_fd.empty()) {
4✔
68
        LogErr() << "socket error: " << strerror(errno);
×
69
        return ConnectionResult::SocketError;
×
70
    }
71

72
    // Allow reuse of address to avoid "Address already in use" errors
73
    int yes = 1;
4✔
74
#ifdef WINDOWS
75
    setsockopt(_server_socket_fd.get(), SOL_SOCKET, SO_REUSEADDR, (const char*)&yes, sizeof(yes));
76
#else
77
    setsockopt(_server_socket_fd.get(), SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
4✔
78
#endif
79

80
    sockaddr_in server_addr{};
4✔
81
    server_addr.sin_family = AF_INET;
4✔
82
    server_addr.sin_addr.s_addr = INADDR_ANY;
4✔
83
    server_addr.sin_port = htons(_local_port);
4✔
84

85
    if (bind(
4✔
86
            _server_socket_fd.get(),
87
            reinterpret_cast<sockaddr*>(&server_addr),
88
            sizeof(server_addr)) < 0) {
4✔
89
        LogErr() << "bind error: " << strerror(errno);
×
90
        return ConnectionResult::SocketError;
×
91
    }
92

93
    if (listen(_server_socket_fd.get(), 3) < 0) {
4✔
94
        LogErr() << "listen error: " << strerror(errno);
×
95
        return ConnectionResult::SocketError;
×
96
    }
97

98
    _accept_receive_thread =
99
        std::make_unique<std::thread>(&TcpServerConnection::accept_client, this);
4✔
100

101
    return ConnectionResult::Success;
4✔
102
}
4✔
103

104
ConnectionResult TcpServerConnection::stop()
4✔
105
{
106
    _should_exit = true;
4✔
107

108
    if (_accept_receive_thread && _accept_receive_thread->joinable()) {
4✔
109
        _accept_receive_thread->join();
4✔
110
        _accept_receive_thread.reset();
4✔
111
    }
112

113
    {
114
        std::lock_guard<std::mutex> lock(_mutex);
4✔
115
        _client_socket_fd.close();
4✔
116
        _server_socket_fd.close();
4✔
117
    }
4✔
118

119
    // We need to stop this after stopping the receive thread, otherwise
120
    // it can happen that we interfere with the parsing of a message.
121
    stop_mavlink_receiver();
4✔
122

123
    return ConnectionResult::Success;
4✔
124
}
125

126
std::pair<bool, std::string> TcpServerConnection::send_message(const mavlink_message_t& message)
36✔
127
{
128
    // Convert message to raw bytes and use common send path
129
    uint8_t buffer[MAVLINK_MAX_PACKET_LEN];
130
    uint16_t buffer_len = mavlink_msg_to_send_buffer(buffer, &message);
36✔
131

132
    assert(buffer_len <= MAVLINK_MAX_PACKET_LEN);
36✔
133

134
    return send_raw_bytes(reinterpret_cast<const char*>(buffer), buffer_len);
36✔
135
}
136

137
void TcpServerConnection::accept_client()
4✔
138
{
139
    // Get server socket fd with mutex protection for setup
140
    SocketHolder::DescriptorType server_socket_fd;
141
    {
142
        std::lock_guard<std::mutex> lock(_mutex);
4✔
143
        server_socket_fd = _server_socket_fd.get();
4✔
144
    }
4✔
145

146
#ifdef WINDOWS
147
    // Set server socket to non-blocking
148
    u_long iMode = 1;
149
    int iResult = ioctlsocket(server_socket_fd, FIONBIO, &iMode);
150
    if (iResult != 0) {
151
        LogErr() << "ioctlsocket failed with error: " << get_socket_error_string(WSAGetLastError());
152
    }
153
#else
154
    // Set server socket to non-blocking
155
    int flags = fcntl(server_socket_fd, F_GETFL, 0);
4✔
156
    fcntl(server_socket_fd, F_SETFL, flags | O_NONBLOCK);
4✔
157
#endif
158

159
    while (!_should_exit) {
11✔
160
        fd_set readfds;
161
        FD_ZERO(&readfds);
119✔
162
        FD_SET(server_socket_fd, &readfds);
7✔
163

164
        // Set timeout to 1 second
165
        timeval timeout;
166
        timeout.tv_sec = 1;
7✔
167
        timeout.tv_usec = 0;
7✔
168

169
        const int activity = select(server_socket_fd + 1, &readfds, nullptr, nullptr, &timeout);
7✔
170

171
        if (activity < 0 && errno != EINTR) {
7✔
172
            LogErr() << "select error: " << strerror(errno);
×
173
            continue;
2✔
174
        }
175

176
        if (activity == 0) {
7✔
177
            // Timeout, no incoming connection
178
            continue;
2✔
179
        }
180

181
        if (FD_ISSET(server_socket_fd, &readfds)) {
5✔
182
            sockaddr_in client_addr{};
5✔
183
            socklen_t client_addr_len = sizeof(client_addr);
5✔
184

185
            int new_fd = accept(
5✔
186
                server_socket_fd, reinterpret_cast<sockaddr*>(&client_addr), &client_addr_len);
187

188
            if (new_fd < 0) {
5✔
189
                if (_should_exit) {
×
190
                    return;
×
191
                }
192
                LogErr() << "accept error: " << strerror(errno);
×
193
                continue;
×
194
            }
195

196
            // Set receive timeout on client socket
197
            const unsigned timeout_ms = 500;
5✔
198
#if defined(WINDOWS)
199
            setsockopt(
200
                new_fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout_ms, sizeof(timeout_ms));
201
#else
202
            struct timeval tv;
203
            tv.tv_sec = 0;
5✔
204
            tv.tv_usec = timeout_ms * 1000;
5✔
205
            setsockopt(new_fd, SOL_SOCKET, SO_RCVTIMEO, (const void*)&tv, sizeof(tv));
5✔
206
#endif
207

208
            // Now store the new client socket with mutex protection
209
            {
210
                std::lock_guard<std::mutex> lock(_mutex);
5✔
211
                _client_socket_fd.reset(new_fd);
5✔
212
            }
5✔
213

214
            receive();
5✔
215
        }
216
    }
217
}
218

219
void TcpServerConnection::receive()
5✔
220
{
221
    std::array<char, 2048> buffer{};
5✔
222

223
    // Get client socket fd with mutex protection, then release mutex before blocking recv
224
    SocketHolder::DescriptorType client_socket_fd;
225
    {
226
        std::lock_guard<std::mutex> lock(_mutex);
5✔
227
        if (_client_socket_fd.empty()) {
5✔
NEW
228
            return;
×
229
        }
230
        client_socket_fd = _client_socket_fd.get();
5✔
231
    }
5✔
232

233
    bool dataReceived = false;
5✔
234
    while (!dataReceived && !_should_exit) {
47✔
235
        const auto recv_len = recv(client_socket_fd, buffer.data(), buffer.size(), 0);
46✔
236

237
#ifdef WINDOWS
238
        if (recv_len == SOCKET_ERROR) {
239
            // On Windows, on the first try, select says there is something
240
            // but recv doesn't succeed yet, and we just need to try again.
241
            if (WSAGetLastError() == WSAEWOULDBLOCK) {
242
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
243
                continue;
244
            }
245
            // And at the end, we get an abort that we can silently ignore.
246
            if (WSAGetLastError() == WSAECONNABORTED) {
247
                return;
248
            }
249
        }
250
#else
251
        if (recv_len < 0) {
46✔
252
            // On macOS we presumably see the same thing, and have to try again.
253
            if (errno == EAGAIN) {
13✔
254
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
13✔
255
                continue;
13✔
256
            }
257

258
            // Connection reset - if shutting down, exit quietly; otherwise log and exit
259
            if (errno == ECONNRESET) {
×
260
                if (!_should_exit) {
×
261
                    LogErr() << "recv failed: " << strerror(errno);
×
262
                }
263
                return;
×
264
            }
265

266
            LogErr() << "recv failed: " << strerror(errno);
×
267
            return;
×
268
        }
269
#endif
270

271
        if (recv_len == 0) {
33✔
272
            // Client disconnected, close the socket and go back to accept new connections
273
            LogInfo() << "TCP client disconnected, waiting for new connection...";
4✔
274
            {
275
                std::lock_guard<std::mutex> lock(_mutex);
4✔
276
                _client_socket_fd.close();
4✔
277
            }
4✔
278
            return;
4✔
279
        }
280

281
        _mavlink_receiver->set_new_datagram(buffer.data(), static_cast<int>(recv_len));
29✔
282

283
        // Parse all mavlink messages in one data packet. Once exhausted, we'll exit while.
284
        while (_mavlink_receiver->parse_message()) {
60✔
285
            receive_message(_mavlink_receiver->get_last_message(), this);
31✔
286
        }
287

288
        // Also parse with libmav if available
289
        if (_libmav_receiver) {
29✔
290
            _libmav_receiver->set_new_datagram(buffer.data(), static_cast<int>(recv_len));
29✔
291

292
            while (_libmav_receiver->parse_message()) {
58✔
293
                receive_libmav_message(_libmav_receiver->get_last_message(), this);
29✔
294
            }
295
        }
296
    }
297
}
298

299
std::pair<bool, std::string> TcpServerConnection::send_raw_bytes(const char* bytes, size_t length)
36✔
300
{
301
    // Basic implementation for TCP server connections
302
    std::pair<bool, std::string> result;
36✔
303

304
    // Get client socket fd with mutex protection, then release mutex before blocking send
305
    SocketHolder::DescriptorType client_socket_fd;
306
    {
307
        std::lock_guard<std::mutex> lock(_mutex);
36✔
308
        if (_client_socket_fd.empty()) {
36✔
309
            result.first = false;
1✔
310
            result.second = "Not connected";
1✔
311
            return result;
1✔
312
        }
313
        client_socket_fd = _client_socket_fd.get();
35✔
314
    }
36✔
315

316
#if !defined(MSG_NOSIGNAL)
317
    auto flags = 0;
318
#else
319
    auto flags = MSG_NOSIGNAL;
35✔
320
#endif
321

322
    const auto send_len = send(client_socket_fd, bytes, length, flags);
35✔
323

324
    if (send_len != static_cast<std::remove_cv_t<decltype(send_len)>>(length)) {
35✔
325
        // Broken pipe is expected during shutdown, don't log it
326
        std::stringstream ss;
×
327
#ifdef WINDOWS
328
        int err = WSAGetLastError();
329
        ss << "Send failure: " << get_socket_error_string(err);
330
        if (err != WSAECONNRESET || !_should_exit) {
331
            LogErr() << ss.str();
332
        }
333
#else
334
        ss << "Send failure: " << strerror(errno);
×
335
        if (errno != EPIPE || !_should_exit) {
×
336
            LogErr() << ss.str();
×
337
        }
338
#endif
339
        result.first = false;
×
340
        result.second = ss.str();
×
341
        return result;
×
342
    }
×
343

344
    result.first = true;
35✔
345
    return result;
35✔
346
}
347

348
} // namespace mavsdk
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc