• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mavlink / MAVSDK / 15058650823

16 May 2025 01:31AM UTC coverage: 44.401% (+0.1%) from 44.27%
15058650823

Pull #2573

github

web-flow
Merge eb4adc5fc into e6d9d7827
Pull Request #2573: Add incoming and outgoing message queue, fixing threading/locking issues

161 of 187 new or added lines in 7 files covered. (86.1%)

36 existing lines in 9 files now uncovered.

14925 of 33614 relevant lines covered (44.4%)

284.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.42
/src/mavsdk/core/mavsdk_impl.cpp
1
#include "mavsdk_impl.h"
2

3
#include <algorithm>
4
#include <mutex>
5
#include <tcp_server_connection.h>
6

7
#include "connection.h"
8
#include "log.h"
9
#include "tcp_client_connection.h"
10
#include "tcp_server_connection.h"
11
#include "udp_connection.h"
12
#include "system.h"
13
#include "system_impl.h"
14
#include "serial_connection.h"
15
#include "version.h"
16
#include "server_component_impl.h"
17
#include "plugin_base.h"
18
#include "mavlink_channels.h"
19
#include "callback_list.tpp"
20
#include "hostname_to_ip.h"
21

22
namespace mavsdk {
23

24
template class CallbackList<>;
25

26
MavsdkImpl::MavsdkImpl(const Mavsdk::Configuration& configuration) :
87✔
27
    timeout_handler(time),
87✔
28
    call_every_handler(time)
174✔
29
{
30
    LogInfo() << "MAVSDK version: " << mavsdk_version;
87✔
31

32
    if (const char* env_p = std::getenv("MAVSDK_CALLBACK_DEBUGGING")) {
87✔
33
        if (std::string(env_p) == "1") {
×
34
            LogDebug() << "Callback debugging is on.";
×
35
            _callback_debugging = true;
×
36
        }
37
    }
38

39
    if (const char* env_p = std::getenv("MAVSDK_MESSAGE_DEBUGGING")) {
87✔
40
        if (std::string(env_p) == "1") {
×
41
            LogDebug() << "Message debugging is on.";
×
42
            _message_logging_on = true;
×
43
        }
44
    }
45

46
    if (const char* env_p = std::getenv("MAVSDK_SYSTEM_DEBUGGING")) {
87✔
47
        if (std::string(env_p) == "1") {
×
48
            LogDebug() << "System debugging is on.";
×
49
            _system_debugging = true;
×
50
        }
51
    }
52

53
    set_configuration(configuration);
87✔
54

55
    // Start the user callback thread first, so it is ready for anything generated by
56
    // the work thread.
57

58
    _process_user_callbacks_thread =
174✔
59
        new std::thread(&MavsdkImpl::process_user_callbacks_thread, this);
87✔
60

61
    _work_thread = new std::thread(&MavsdkImpl::work_thread, this);
87✔
62
}
87✔
63

64
MavsdkImpl::~MavsdkImpl()
87✔
65
{
66
    {
67
        std::lock_guard<std::mutex> lock(_heartbeat_mutex);
87✔
68
        call_every_handler.remove(_heartbeat_send_cookie);
87✔
69
    }
87✔
70

71
    _should_exit = true;
87✔
72

73
    // Stop work first because we don't want to trigger anything that would
74
    // potentially want to call into user code.
75

76
    if (_work_thread != nullptr) {
87✔
77
        _work_thread->join();
87✔
78
        delete _work_thread;
87✔
79
        _work_thread = nullptr;
87✔
80
    }
81

82
    if (_process_user_callbacks_thread != nullptr) {
87✔
83
        _user_callback_queue.stop();
87✔
84
        _process_user_callbacks_thread->join();
87✔
85
        delete _process_user_callbacks_thread;
87✔
86
        _process_user_callbacks_thread = nullptr;
87✔
87
    }
88

89
    std::lock_guard lock(_mutex);
87✔
90

91
    _systems.clear();
87✔
92
    _connections.clear();
87✔
93
}
174✔
94

95
std::string MavsdkImpl::version()
1✔
96
{
97
    static unsigned version_counter = 0;
98

99
    ++version_counter;
1✔
100

101
    switch (version_counter) {
1✔
102
        case 10:
×
103
            return "You were wondering about the name of this library?";
×
104
        case 11:
×
105
            return "Let's look at the history:";
×
106
        case 12:
×
107
            return "DroneLink";
×
108
        case 13:
×
109
            return "DroneCore";
×
110
        case 14:
×
111
            return "DronecodeSDK";
×
112
        case 15:
×
113
            return "MAVSDK";
×
114
        case 16:
×
115
            return "And that's it...";
×
116
        case 17:
×
117
            return "At least for now ¯\\_(ツ)_/¯.";
×
118
        default:
1✔
119
            return mavsdk_version;
1✔
120
    }
121
}
122

123
std::vector<std::shared_ptr<System>> MavsdkImpl::systems() const
86✔
124
{
125
    std::vector<std::shared_ptr<System>> systems_result{};
86✔
126

127
    std::lock_guard lock(_mutex);
86✔
128
    for (auto& system : _systems) {
129✔
129
        // We ignore the 0 entry because it's just a null system.
130
        // It's only created because the older, deprecated API needs a
131
        // reference.
132
        if (system.first == 0) {
43✔
133
            continue;
×
134
        }
135
        systems_result.push_back(system.second);
43✔
136
    }
137

138
    return systems_result;
86✔
139
}
86✔
140

141
std::optional<std::shared_ptr<System>> MavsdkImpl::first_autopilot(double timeout_s)
33✔
142
{
143
    {
144
        std::lock_guard lock(_mutex);
33✔
145
        for (auto system : _systems) {
33✔
146
            if (system.second->is_connected() && system.second->has_autopilot()) {
×
147
                return system.second;
×
148
            }
149
        }
×
150
    }
33✔
151

152
    if (timeout_s == 0.0) {
33✔
153
        // Don't wait at all.
154
        return {};
×
155
    }
156

157
    auto prom = std::promise<std::shared_ptr<System>>();
33✔
158

159
    std::once_flag flag;
33✔
160
    auto handle = subscribe_on_new_system([this, &prom, &flag]() {
66✔
161
        // Check all systems, not just the first one
162
        auto all_systems = systems();
33✔
163
        for (auto& system : all_systems) {
33✔
164
            if (system->is_connected() && system->has_autopilot()) {
33✔
165
                std::call_once(flag, [&prom, &system]() { prom.set_value(system); });
66✔
166
                break;
33✔
167
            }
168
        }
169
    });
66✔
170

171
    auto fut = prom.get_future();
33✔
172

173
    if (timeout_s > 0.0) {
33✔
174
        if (fut.wait_for(std::chrono::milliseconds(int64_t(timeout_s * 1e3))) ==
33✔
175
            std::future_status::ready) {
176
            unsubscribe_on_new_system(handle);
33✔
177
            return fut.get();
33✔
178

179
        } else {
180
            unsubscribe_on_new_system(handle);
×
181
            return std::nullopt;
×
182
        }
183
    } else {
184
        fut.wait();
×
185
        unsubscribe_on_new_system(handle);
×
186
        return std::optional(fut.get());
×
187
    }
188
}
33✔
189

190
std::shared_ptr<ServerComponent> MavsdkImpl::server_component(unsigned instance)
47✔
191
{
192
    std::lock_guard lock(_mutex);
47✔
193

194
    auto component_type = _configuration.get_component_type();
47✔
195
    switch (component_type) {
47✔
196
        case ComponentType::Autopilot:
47✔
197
        case ComponentType::GroundStation:
198
        case ComponentType::CompanionComputer:
199
        case ComponentType::Camera:
200
        case ComponentType::Gimbal:
201
        case ComponentType::RemoteId:
202
        case ComponentType::Custom:
203
            return server_component_by_type(component_type, instance);
47✔
204
        default:
×
205
            LogErr() << "Unknown component type";
×
206
            return {};
×
207
    }
208
}
47✔
209

210
std::shared_ptr<ServerComponent>
211
MavsdkImpl::server_component_by_type(ComponentType server_component_type, unsigned instance)
47✔
212
{
213
    switch (server_component_type) {
47✔
214
        case ComponentType::Autopilot:
33✔
215
            if (instance == 0) {
33✔
216
                return server_component_by_id(MAV_COMP_ID_AUTOPILOT1);
33✔
217
            } else {
218
                LogErr() << "Only autopilot instance 0 is valid";
×
219
                return {};
×
220
            }
221

222
        case ComponentType::GroundStation:
×
223
            if (instance == 0) {
×
224
                return server_component_by_id(MAV_COMP_ID_MISSIONPLANNER);
×
225
            } else {
226
                LogErr() << "Only one ground station supported at this time";
×
227
                return {};
×
228
            }
229

230
        case ComponentType::CompanionComputer:
1✔
231
            if (instance == 0) {
1✔
232
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER);
1✔
233
            } else if (instance == 1) {
×
234
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER2);
×
235
            } else if (instance == 2) {
×
236
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER3);
×
237
            } else if (instance == 3) {
×
238
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER4);
×
239
            } else {
240
                LogErr() << "Only companion computer 0..3 are supported";
×
241
                return {};
×
242
            }
243

244
        case ComponentType::Camera:
13✔
245
            if (instance == 0) {
13✔
246
                return server_component_by_id(MAV_COMP_ID_CAMERA);
13✔
247
            } else if (instance == 1) {
×
248
                return server_component_by_id(MAV_COMP_ID_CAMERA2);
×
249
            } else if (instance == 2) {
×
250
                return server_component_by_id(MAV_COMP_ID_CAMERA3);
×
251
            } else if (instance == 3) {
×
252
                return server_component_by_id(MAV_COMP_ID_CAMERA4);
×
253
            } else if (instance == 4) {
×
254
                return server_component_by_id(MAV_COMP_ID_CAMERA5);
×
255
            } else if (instance == 5) {
×
256
                return server_component_by_id(MAV_COMP_ID_CAMERA6);
×
257
            } else {
258
                LogErr() << "Only camera 0..5 are supported";
×
259
                return {};
×
260
            }
261

262
        default:
×
263
            LogErr() << "Unknown server component type";
×
264
            return {};
×
265
    }
266
}
267

268
std::shared_ptr<ServerComponent> MavsdkImpl::server_component_by_id(uint8_t component_id)
47✔
269
{
270
    if (component_id == 0) {
47✔
271
        LogErr() << "Server component with component ID 0 not allowed";
×
272
        return nullptr;
×
273
    }
274

275
    std::lock_guard lock(_server_components_mutex);
47✔
276

277
    return server_component_by_id_with_lock(component_id);
47✔
278
}
47✔
279

280
std::shared_ptr<ServerComponent> MavsdkImpl::server_component_by_id_with_lock(uint8_t component_id)
176✔
281
{
282
    for (auto& it : _server_components) {
177✔
283
        if (it.first == component_id) {
89✔
284
            if (it.second != nullptr) {
88✔
285
                return it.second;
88✔
286
            } else {
287
                it.second = std::make_shared<ServerComponent>(*this, component_id);
×
288
            }
289
        }
290
    }
291

292
    _server_components.emplace_back(std::pair<uint8_t, std::shared_ptr<ServerComponent>>(
176✔
293
        component_id, std::make_shared<ServerComponent>(*this, component_id)));
176✔
294

295
    return _server_components.back().second;
88✔
296
}
297

298
void MavsdkImpl::forward_message(mavlink_message_t& message, Connection* connection)
×
299
{
300
    // Forward_message Function implementing Mavlink routing rules.
301
    // See https://mavlink.io/en/guide/routing.html
302

303
    bool forward_heartbeats_enabled = true;
×
304
    const uint8_t target_system_id = get_target_system_id(message);
×
305
    const uint8_t target_component_id = get_target_component_id(message);
×
306

307
    // If it's a message only for us, we keep it, otherwise, we forward it.
308
    const bool targeted_only_at_us =
309
        (target_system_id == get_own_system_id() && target_component_id == get_own_component_id());
×
310

311
    // We don't forward heartbeats unless it's specifically enabled.
312
    const bool heartbeat_check_ok =
×
313
        (message.msgid != MAVLINK_MSG_ID_HEARTBEAT || forward_heartbeats_enabled);
×
314

315
    if (!targeted_only_at_us && heartbeat_check_ok) {
×
316
        unsigned successful_emissions = 0;
×
317
        for (auto& entry : _connections) {
×
318
            // Check whether the connection is not the one from which we received the message.
319
            // And also check if the connection was set to forward messages.
320
            if (entry.connection.get() == connection ||
×
321
                !entry.connection->should_forward_messages()) {
×
322
                continue;
×
323
            }
324
            auto result = (*entry.connection).send_message(message);
×
325
            if (result.first) {
×
326
                successful_emissions++;
×
327
            } else {
328
                _connections_errors_subscriptions.queue(
×
329
                    Mavsdk::ConnectionError{result.second, entry.handle},
×
330
                    [this](const auto& func) { call_user_callback(func); });
×
331
            }
332
        }
×
333
        if (successful_emissions == 0) {
×
334
            LogErr() << "Message forwarding failed";
×
335
        }
336
    }
337
}
×
338

339
void MavsdkImpl::receive_message(mavlink_message_t& message, Connection* connection)
2,093✔
340
{
341
    {
342
        std::lock_guard lock(_received_messages_mutex);
2,093✔
343
        _received_messages.emplace(ReceivedMessage{std::move(message), connection});
2,092✔
344
    }
2,093✔
345
    _received_messages_cv.notify_one();
2,093✔
346
}
2,093✔
347

348
void MavsdkImpl::process_messages()
18,659✔
349
{
350
    std::unique_lock lock(_received_messages_mutex);
18,659✔
351
    while (!_received_messages.empty()) {
20,509✔
352
        auto message_copied = _received_messages.front();
2,084✔
353
        lock.unlock();
2,084✔
354
        process_message(message_copied.message, message_copied.connection_ptr);
2,085✔
355
        lock.lock();
2,084✔
356
        _received_messages.pop();
2,085✔
357
    }
358
}
18,435✔
359

360
void MavsdkImpl::process_message(mavlink_message_t& message, Connection* connection)
2,085✔
361
{
362
    // Assumes _received_messages_mutex
363

364
    if (_message_logging_on) {
2,085✔
365
        LogDebug() << "Processing message " << message.msgid << " from "
×
366
                   << static_cast<int>(message.sysid) << "/" << static_cast<int>(message.compid);
×
367
    }
368

369
    if (_should_exit) {
2,085✔
370
        // If we're meant to clean up, let's not try to acquire any more locks but bail.
UNCOV
371
        return;
×
372
    }
373

374
    {
375
        std::lock_guard lock(_mutex);
2,084✔
376

377
        // This is a low level interface where incoming messages can be tampered
378
        // with or even dropped.
379
        {
380
            bool keep = true;
2,085✔
381
            {
382
                std::lock_guard<std::mutex> intercept_lock(_intercept_callbacks_mutex);
2,085✔
383
                if (_intercept_incoming_messages_callback != nullptr) {
2,085✔
384
                    keep = _intercept_incoming_messages_callback(message);
241✔
385
                }
386
            }
2,084✔
387

388
            if (!keep) {
2,085✔
389
                LogDebug() << "Dropped incoming message: " << int(message.msgid);
33✔
390
                return;
33✔
391
            }
392
        }
393

394
        if (_should_exit) {
2,052✔
395
            // If we're meant to clean up, let's not try to acquire any more locks but bail.
NEW
396
            return;
×
397
        }
398

399
        /** @note: Forward message if option is enabled and multiple interfaces are connected.
400
         *  Performs message forwarding checks for every messages if message forwarding
401
         *  is enabled on at least one connection, and in case of a single forwarding connection,
402
         *  we check that it is not the one which received the current message.
403
         *
404
         * Conditions:
405
         * 1. At least 2 connections.
406
         * 2. At least 1 forwarding connection.
407
         * 3. At least 2 forwarding connections or current connection is not forwarding.
408
         */
409

410
        if (_connections.size() > 1 && mavsdk::Connection::forwarding_connections_count() > 0 &&
2,052✔
411
            (mavsdk::Connection::forwarding_connections_count() > 1 ||
×
412
             !connection->should_forward_messages())) {
×
413
            if (_message_logging_on) {
×
414
                LogDebug() << "Forwarding message " << message.msgid << " from "
×
415
                           << static_cast<int>(message.sysid) << "/"
×
416
                           << static_cast<int>(message.compid);
×
417
            }
418
            forward_message(message, connection);
×
419
        }
420

421
        // Don't ever create a system with sysid 0.
422
        if (message.sysid == 0) {
2,052✔
NEW
423
            if (_message_logging_on) {
×
NEW
424
                LogDebug() << "Ignoring message with sysid == 0";
×
425
            }
NEW
426
            return;
×
427
        }
428

429
        // Filter out messages by QGroundControl, however, only do that if MAVSDK
430
        // is also implementing a ground station and not if it is used in another
431
        // configuration, e.g. on a companion.
432
        //
433
        // This is a workaround because PX4 started forwarding messages between
434
        // mavlink instances which leads to existing implementations (including
435
        // examples and integration tests) to connect to QGroundControl by accident
436
        // instead of PX4 because the check `has_autopilot()` is not used.
437

438
        if (_configuration.get_component_type() == ComponentType::GroundStation &&
2,052✔
439
            message.sysid == 255 && message.compid == MAV_COMP_ID_MISSIONPLANNER) {
2,052✔
NEW
440
            if (_message_logging_on) {
×
NEW
441
                LogDebug() << "Ignoring messages from QGC as we are also a ground station";
×
442
            }
NEW
443
            return;
×
444
        }
445

446
        bool found_system = false;
2,052✔
447
        for (auto& system : _systems) {
2,052✔
448
            if (system.first == message.sysid) {
1,966✔
449
                system.second->system_impl()->add_new_component(message.compid);
1,966✔
450
                found_system = true;
1,966✔
451
                break;
1,966✔
452
            }
453
        }
454

455
        if (!found_system) {
2,051✔
456
            if (_system_debugging) {
86✔
NEW
457
                LogWarn() << "Create new system/component " << (int)message.sysid << "/"
×
NEW
458
                          << (int)message.compid;
×
NEW
459
                LogWarn() << "From message " << (int)message.msgid << " with len "
×
NEW
460
                          << (int)message.len;
×
NEW
461
                std::string bytes = "";
×
NEW
462
                for (unsigned i = 0; i < 12 + message.len; ++i) {
×
NEW
463
                    bytes += std::to_string(reinterpret_cast<uint8_t*>(&message)[i]) + ' ';
×
464
                }
NEW
465
                LogWarn() << "Bytes: " << bytes;
×
466
            }
×
467
            make_system_with_component(message.sysid, message.compid);
86✔
468

469
            // We now better talk back.
470
            start_sending_heartbeats();
86✔
471
        }
472

473
        if (_should_exit) {
2,051✔
474
            // Don't try to call at() if systems have already been destroyed
475
            // in destructor.
NEW
476
            return;
×
477
        }
478
    }
2,085✔
479

480
    mavlink_message_handler.process_message(message);
2,052✔
481

482
    for (auto& system : _systems) {
2,052✔
483
        if (system.first == message.sysid) {
2,052✔
484
            system.second->system_impl()->process_mavlink_message(message);
2,052✔
485
            break;
2,051✔
486
        }
487
    }
488
}
489

490
bool MavsdkImpl::send_message(mavlink_message_t& message)
2,223✔
491
{
492
    // Create a copy of the message to avoid reference issues
493
    mavlink_message_t message_copy = message;
2,223✔
494

495
    {
496
        std::lock_guard lock(_messages_to_send_mutex);
2,223✔
497
        _messages_to_send.push(std::move(message_copy));
2,222✔
498
    }
2,223✔
499

500
    // For heartbeat messages, we want to process them immediately to speed up system discovery
501
    if (message.msgid == MAVLINK_MSG_ID_HEARTBEAT) {
2,223✔
502
        // Trigger message processing in the work thread
503
        // This is a hint to process messages sooner, but doesn't block
504
        std::this_thread::yield();
232✔
505
    }
506

507
    return true;
2,223✔
508
}
509

510
void MavsdkImpl::deliver_messages()
20,888✔
511
{
512
    // Process messages one at a time to avoid holding the mutex while delivering
513
    while (true) {
514
        mavlink_message_t message;
20,888✔
515
        {
516
            std::lock_guard lock(_messages_to_send_mutex);
20,760✔
517
            if (_messages_to_send.empty()) {
20,618✔
518
                break;
18,619✔
519
            }
520
            message = _messages_to_send.front();
2,222✔
521
            _messages_to_send.pop();
2,221✔
522
        }
20,842✔
523
        deliver_message(message);
2,219✔
524
    }
2,223✔
525
}
18,376✔
526

527
void MavsdkImpl::deliver_message(mavlink_message_t& message)
2,223✔
528
{
529
    if (_message_logging_on) {
2,223✔
530
        LogDebug() << "Sending message " << message.msgid << " from "
×
531
                   << static_cast<int>(message.sysid) << "/" << static_cast<int>(message.compid)
×
532
                   << " to " << static_cast<int>(get_target_system_id(message)) << "/"
×
533
                   << static_cast<int>(get_target_component_id(message));
×
534
    }
535

536
    // This is a low level interface where outgoing messages can be tampered
537
    // with or even dropped.
538
    bool keep = true;
2,223✔
539
    {
540
        std::lock_guard<std::mutex> lock(_intercept_callbacks_mutex);
2,223✔
541
        if (_intercept_outgoing_messages_callback != nullptr) {
2,222✔
542
            keep = _intercept_outgoing_messages_callback(message);
220✔
543
        }
544
    }
2,221✔
545

546
    if (!keep) {
2,223✔
547
        // We fake that everything was sent as instructed because
548
        // a potential loss would happen later, and we would not be informed
549
        // about it.
550
        LogDebug() << "Dropped outgoing message: " << int(message.msgid);
87✔
551
        return;
87✔
552
    }
553

554
    std::lock_guard lock(_mutex);
2,136✔
555

556
    if (_connections.empty()) {
2,135✔
557
        // We obviously can't send any messages without a connection added, so
558
        // we silently ignore this.
559
        return;
43✔
560
    }
561

562
    uint8_t successful_emissions = 0;
2,092✔
563
    for (auto& _connection : _connections) {
4,184✔
564
        const uint8_t target_system_id = get_target_system_id(message);
2,092✔
565

566
        if (target_system_id != 0 && !(*_connection.connection).has_system_id(target_system_id)) {
2,092✔
567
            continue;
×
568
        }
569
        const auto result = (*_connection.connection).send_message(message);
2,093✔
570
        if (result.first) {
2,093✔
571
            successful_emissions++;
2,093✔
572
        } else {
573
            _connections_errors_subscriptions.queue(
×
574
                Mavsdk::ConnectionError{result.second, _connection.handle},
×
575
                [this](const auto& func) { call_user_callback(func); });
×
576
        }
577
    }
2,093✔
578

579
    if (successful_emissions == 0) {
2,093✔
580
        LogErr() << "Sending message failed";
×
581
    }
582
}
2,135✔
583

584
std::pair<ConnectionResult, Mavsdk::ConnectionHandle> MavsdkImpl::add_any_connection(
86✔
585
    const std::string& connection_url, ForwardingOption forwarding_option)
586
{
587
    CliArg cli_arg;
86✔
588
    if (!cli_arg.parse(connection_url)) {
86✔
589
        return {ConnectionResult::ConnectionUrlInvalid, Mavsdk::ConnectionHandle{}};
×
590
    }
591

592
    return std::visit(
86✔
593
        overloaded{
172✔
594
            [](std::monostate) {
×
595
                // Should not happen anyway.
596
                return std::pair<ConnectionResult, Mavsdk::ConnectionHandle>{
×
597
                    ConnectionResult::ConnectionUrlInvalid, Mavsdk::ConnectionHandle()};
×
598
            },
599
            [this, forwarding_option](const CliArg::Udp& udp) {
84✔
600
                return add_udp_connection(udp, forwarding_option);
84✔
601
            },
602
            [this, forwarding_option](const CliArg::Tcp& tcp) {
2✔
603
                return add_tcp_connection(tcp, forwarding_option);
2✔
604
            },
605
            [this, forwarding_option](const CliArg::Serial& serial) {
×
606
                return add_serial_connection(
×
607
                    serial.path, serial.baudrate, serial.flow_control_enabled, forwarding_option);
×
608
            }},
609
        cli_arg.protocol);
86✔
610
}
86✔
611

612
std::pair<ConnectionResult, Mavsdk::ConnectionHandle>
613
MavsdkImpl::add_udp_connection(const CliArg::Udp& udp, ForwardingOption forwarding_option)
84✔
614
{
615
    auto new_conn = std::make_unique<UdpConnection>(
84✔
616
        [this](mavlink_message_t& message, Connection* connection) {
2,074✔
617
            receive_message(message, connection);
2,074✔
618
        },
2,074✔
619
        udp.mode == CliArg::Udp::Mode::In ? udp.host : "0.0.0.0",
210✔
620
        udp.mode == CliArg::Udp::Mode::In ? udp.port : 0,
168✔
621
        forwarding_option);
252✔
622

623
    if (!new_conn) {
84✔
624
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
625
    }
626

627
    ConnectionResult ret = new_conn->start();
84✔
628

629
    if (ret != ConnectionResult::Success) {
84✔
630
        return {ret, Mavsdk::ConnectionHandle{}};
×
631
    }
632

633
    if (udp.mode == CliArg::Udp::Mode::Out) {
84✔
634
        // We need to add the IP rather than a hostname, otherwise we end up with two remotes:
635
        // one for the IP, and one for a hostname.
636
        auto remote_ip = resolve_hostname_to_ip(udp.host);
42✔
637

638
        if (!remote_ip) {
42✔
639
            return {ConnectionResult::DestinationIpUnknown, Mavsdk::ConnectionHandle{}};
×
640
        }
641

642
        new_conn->add_remote(remote_ip.value(), udp.port);
42✔
643
        std::lock_guard lock(_mutex);
42✔
644

645
        // With a UDP remote, we need to initiate the connection by sending heartbeats.
646
        auto new_configuration = get_configuration();
42✔
647
        new_configuration.set_always_send_heartbeats(true);
42✔
648
        set_configuration(new_configuration);
42✔
649
    }
42✔
650

651
    auto handle = add_connection(std::move(new_conn));
84✔
652

653
    return {ConnectionResult::Success, handle};
84✔
654
}
84✔
655

656
std::pair<ConnectionResult, Mavsdk::ConnectionHandle>
657
MavsdkImpl::add_tcp_connection(const CliArg::Tcp& tcp, ForwardingOption forwarding_option)
2✔
658
{
659
    if (tcp.mode == CliArg::Tcp::Mode::Out) {
2✔
660
        auto new_conn = std::make_unique<TcpClientConnection>(
1✔
661
            [this](mavlink_message_t& message, Connection* connection) {
7✔
662
                receive_message(message, connection);
7✔
663
            },
7✔
664
            tcp.host,
1✔
665
            tcp.port,
1✔
666
            forwarding_option);
1✔
667
        if (!new_conn) {
1✔
668
            return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
669
        }
670
        ConnectionResult ret = new_conn->start();
1✔
671
        if (ret == ConnectionResult::Success) {
1✔
672
            return {ret, add_connection(std::move(new_conn))};
1✔
673
        } else {
674
            return {ret, Mavsdk::ConnectionHandle{}};
×
675
        }
676
    } else {
1✔
677
        auto new_conn = std::make_unique<TcpServerConnection>(
1✔
678
            [this](mavlink_message_t& message, Connection* connection) {
12✔
679
                receive_message(message, connection);
12✔
680
            },
12✔
681
            tcp.host,
1✔
682
            tcp.port,
1✔
683
            forwarding_option);
1✔
684
        if (!new_conn) {
1✔
685
            return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
686
        }
687
        ConnectionResult ret = new_conn->start();
1✔
688
        if (ret == ConnectionResult::Success) {
1✔
689
            return {ret, add_connection(std::move(new_conn))};
1✔
690
        } else {
691
            return {ret, Mavsdk::ConnectionHandle{}};
×
692
        }
693
    }
1✔
694
}
695

696
std::pair<ConnectionResult, Mavsdk::ConnectionHandle> MavsdkImpl::add_serial_connection(
×
697
    const std::string& dev_path,
698
    int baudrate,
699
    bool flow_control,
700
    ForwardingOption forwarding_option)
701
{
702
    auto new_conn = std::make_unique<SerialConnection>(
×
703
        [this](mavlink_message_t& message, Connection* connection) {
×
704
            receive_message(message, connection);
×
705
        },
×
706
        dev_path,
707
        baudrate,
708
        flow_control,
709
        forwarding_option);
×
710
    if (!new_conn) {
×
711
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
712
    }
713
    ConnectionResult ret = new_conn->start();
×
714
    if (ret == ConnectionResult::Success) {
×
715
        auto handle = add_connection(std::move(new_conn));
×
716

717
        auto new_configuration = get_configuration();
×
718

719
        // PX4 starting with v1.13 does not send heartbeats by default, so we need
720
        // to initiate the MAVLink connection by sending heartbeats.
721
        // Therefore, we override the default here and enable sending heartbeats.
722
        new_configuration.set_always_send_heartbeats(true);
×
723
        set_configuration(new_configuration);
×
724

725
        return {ret, handle};
×
726

727
    } else {
728
        return {ret, Mavsdk::ConnectionHandle{}};
×
729
    }
730
}
×
731

732
Mavsdk::ConnectionHandle MavsdkImpl::add_connection(std::unique_ptr<Connection>&& new_connection)
86✔
733
{
734
    std::lock_guard lock(_mutex);
86✔
735
    auto handle = _connections_handle_factory.create();
86✔
736
    _connections.emplace_back(ConnectionEntry{std::move(new_connection), handle});
86✔
737

738
    return handle;
172✔
739
}
86✔
740

741
void MavsdkImpl::remove_connection(Mavsdk::ConnectionHandle handle)
×
742
{
NEW
743
    std::lock_guard lock(_mutex);
×
744

745
    _connections.erase(std::remove_if(_connections.begin(), _connections.end(), [&](auto&& entry) {
×
746
        return (entry.handle == handle);
×
747
    }));
748
}
×
749

750
Mavsdk::Configuration MavsdkImpl::get_configuration() const
42✔
751
{
752
    std::lock_guard configuration_lock(_mutex);
42✔
753
    return _configuration;
84✔
754
}
42✔
755

756
void MavsdkImpl::set_configuration(Mavsdk::Configuration new_configuration)
129✔
757
{
758
    std::lock_guard server_components_lock(_server_components_mutex);
129✔
759
    // We just point the default to the newly created component. This means
760
    // that the previous default component will be deleted if it is not
761
    // used/referenced anywhere.
762
    _default_server_component =
129✔
763
        server_component_by_id_with_lock(new_configuration.get_component_id());
129✔
764

765
    if (new_configuration.get_always_send_heartbeats() &&
214✔
766
        !_configuration.get_always_send_heartbeats()) {
85✔
767
        start_sending_heartbeats();
43✔
768
    } else if (
86✔
769
        !new_configuration.get_always_send_heartbeats() &&
130✔
770
        _configuration.get_always_send_heartbeats() && !is_any_system_connected()) {
130✔
771
        stop_sending_heartbeats();
×
772
    }
773

774
    _configuration = new_configuration;
129✔
775
    // We cache these values as atomic to avoid having to lock any mutex for them.
776
    _our_system_id = new_configuration.get_system_id();
129✔
777
    _our_component_id = new_configuration.get_component_id();
129✔
778
}
129✔
779

780
uint8_t MavsdkImpl::get_own_system_id() const
5,140✔
781
{
782
    return _our_system_id;
5,140✔
783
}
784

785
uint8_t MavsdkImpl::get_own_component_id() const
1,383✔
786
{
787
    return _our_component_id;
1,383✔
788
}
789

790
uint8_t MavsdkImpl::channel() const
×
791
{
792
    // TODO
793
    return 0;
×
794
}
795

796
Autopilot MavsdkImpl::autopilot() const
×
797
{
798
    // TODO
799
    return Autopilot::Px4;
×
800
}
801

802
// FIXME: this should be per component
803
uint8_t MavsdkImpl::get_mav_type() const
233✔
804
{
805
    return _configuration.get_mav_type();
233✔
806
}
807

808
void MavsdkImpl::make_system_with_component(uint8_t system_id, uint8_t comp_id)
86✔
809
{
810
    // Needs _systems_lock
811

812
    if (_should_exit) {
86✔
813
        // When the system got destroyed in the destructor, we have to give up.
814
        return;
×
815
    }
816

817
    if (static_cast<int>(system_id) == 0 && static_cast<int>(comp_id) == 0) {
86✔
818
        LogDebug() << "Initializing connection to remote system...";
×
819
    } else {
820
        LogDebug() << "New system ID: " << static_cast<int>(system_id)
258✔
821
                   << " Comp ID: " << static_cast<int>(comp_id);
258✔
822
    }
823

824
    // Make a system with its first component
825
    auto new_system = std::make_shared<System>(*this);
86✔
826
    new_system->init(system_id, comp_id);
86✔
827

828
    _systems.emplace_back(system_id, new_system);
86✔
829
}
86✔
830

831
void MavsdkImpl::notify_on_discover()
86✔
832
{
833
    // Queue the callbacks without holding the mutex to avoid deadlocks
834
    _new_system_callbacks.queue([this](const auto& func) { call_user_callback(func); });
129✔
835
}
86✔
836

837
void MavsdkImpl::notify_on_timeout()
×
838
{
839
    // Queue the callbacks without holding the mutex to avoid deadlocks
840
    _new_system_callbacks.queue([this](const auto& func) { call_user_callback(func); });
×
841
}
×
842

843
Mavsdk::NewSystemHandle
844
MavsdkImpl::subscribe_on_new_system(const Mavsdk::NewSystemCallback& callback)
43✔
845
{
846
    std::lock_guard lock(_mutex);
43✔
847

848
    const auto handle = _new_system_callbacks.subscribe(callback);
43✔
849

850
    if (is_any_system_connected()) {
43✔
851
        _new_system_callbacks.queue([this](const auto& func) { call_user_callback(func); });
×
852
    }
853

854
    return handle;
86✔
855
}
43✔
856

857
void MavsdkImpl::unsubscribe_on_new_system(Mavsdk::NewSystemHandle handle)
42✔
858
{
859
    _new_system_callbacks.unsubscribe(handle);
42✔
860
}
42✔
861

862
bool MavsdkImpl::is_any_system_connected() const
43✔
863
{
864
    std::vector<std::shared_ptr<System>> connected_systems = systems();
43✔
865
    return std::any_of(connected_systems.cbegin(), connected_systems.cend(), [](auto& system) {
43✔
866
        return system->is_connected();
×
867
    });
43✔
868
}
43✔
869

870
void MavsdkImpl::work_thread()
87✔
871
{
872
    while (!_should_exit) {
18,685✔
873
        // Process incoming messages
874
        process_messages();
18,424✔
875

876
        // Run timers
877
        timeout_handler.run_once();
18,656✔
878
        call_every_handler.run_once();
18,582✔
879

880
        // Do component work
881
        {
882
            std::lock_guard lock(_server_components_mutex);
18,667✔
883
            for (auto& it : _server_components) {
37,180✔
884
                if (it.second != nullptr) {
18,703✔
885
                    it.second->_impl->do_work();
18,540✔
886
                }
887
            }
888
        }
18,315✔
889

890
        // Deliver outgoing messages
891
        deliver_messages();
18,405✔
892

893
        // If no messages to send, check if there are messages to receive
894
        std::unique_lock lock_received(_received_messages_mutex);
18,522✔
895
        if (_received_messages.empty()) {
18,572✔
896
            // No messages to process, wait for a signal or timeout
897
            _received_messages_cv.wait_for(lock_received, std::chrono::milliseconds(10), [this]() {
18,374✔
898
                return !_received_messages.empty() || _should_exit;
37,218✔
899
            });
900
        }
901
    }
18,485✔
902
}
124✔
903

904
void MavsdkImpl::call_user_callback_located(
1,045✔
905
    const std::string& filename, const int linenumber, const std::function<void()>& func)
906
{
907
    // Don't enqueue callbacks if we're shutting down
908
    if (_should_exit) {
1,045✔
NEW
909
        return;
×
910
    }
911

912
    auto callback_size = _user_callback_queue.size();
1,045✔
913
    if (callback_size == 10) {
1,045✔
914
        LogWarn()
×
915
            << "User callback queue too slow.\n"
×
916
               "See: https://mavsdk.mavlink.io/main/en/cpp/troubleshooting.html#user_callbacks";
×
917

918
    } else if (callback_size == 99) {
1,045✔
919
        LogErr()
×
920
            << "User callback queue overflown\n"
×
921
               "See: https://mavsdk.mavlink.io/main/en/cpp/troubleshooting.html#user_callbacks";
×
922

923
    } else if (callback_size == 100) {
1,045✔
924
        return;
×
925
    }
926

927
    // We only need to keep track of filename and linenumber if we're actually debugging this.
928
    UserCallback user_callback =
1,045✔
929
        _callback_debugging ? UserCallback{func, filename, linenumber} : UserCallback{func};
2,090✔
930

931
    _user_callback_queue.enqueue(user_callback);
1,045✔
932
}
1,045✔
933

934
void MavsdkImpl::process_user_callbacks_thread()
87✔
935
{
936
    while (!_should_exit) {
1,219✔
937
        auto callback = _user_callback_queue.dequeue();
1,132✔
938
        if (!callback) {
1,132✔
939
            continue;
87✔
940
        }
941

942
        // Check if we're in the process of shutting down before executing the callback
943
        if (_should_exit) {
1,045✔
NEW
944
            continue;
×
945
        }
946

947
        const double timeout_s = 1.0;
1,045✔
948
        auto cookie = timeout_handler.add(
1,045✔
949
            [&]() {
×
950
                if (_callback_debugging) {
×
951
                    LogWarn() << "Callback called from " << callback.value().filename << ":"
×
952
                              << callback.value().linenumber << " took more than " << timeout_s
×
953
                              << " second to run.";
×
954
                    fflush(stdout);
×
955
                    fflush(stderr);
×
956
                    abort();
×
957
                } else {
958
                    LogWarn()
×
959
                        << "Callback took more than " << timeout_s << " second to run.\n"
×
960
                        << "See: https://mavsdk.mavlink.io/main/en/cpp/troubleshooting.html#user_callbacks";
×
961
                }
962
            },
×
963
            timeout_s);
1,045✔
964
        callback.value().func();
1,045✔
965
        timeout_handler.remove(cookie);
1,045✔
966
    }
1,132✔
967
}
87✔
968

969
void MavsdkImpl::start_sending_heartbeats()
215✔
970
{
971
    // Check if we're in the process of shutting down
972
    if (_should_exit) {
215✔
NEW
973
        return;
×
974
    }
975

976
    // Before sending out first heartbeats we need to make sure we have a
977
    // default server component.
978
    default_server_component_impl();
215✔
979

980
    {
981
        std::lock_guard<std::mutex> lock(_heartbeat_mutex);
215✔
982
        call_every_handler.remove(_heartbeat_send_cookie);
215✔
983
        _heartbeat_send_cookie =
430✔
984
            call_every_handler.add([this]() { send_heartbeats(); }, HEARTBEAT_SEND_INTERVAL_S);
446✔
985
    }
215✔
986
}
987

988
void MavsdkImpl::stop_sending_heartbeats()
×
989
{
990
    if (!_configuration.get_always_send_heartbeats()) {
×
NEW
991
        std::lock_guard<std::mutex> lock(_heartbeat_mutex);
×
UNCOV
992
        call_every_handler.remove(_heartbeat_send_cookie);
×
UNCOV
993
    }
×
994
}
×
995

996
ServerComponentImpl& MavsdkImpl::default_server_component_impl()
1,071✔
997
{
998
    std::lock_guard lock(_server_components_mutex);
1,071✔
999
    return default_server_component_with_lock();
1,071✔
1000
}
1,071✔
1001

1002
ServerComponentImpl& MavsdkImpl::default_server_component_with_lock()
1,071✔
1003
{
1004
    if (_default_server_component == nullptr) {
1,071✔
NEW
1005
        _default_server_component = server_component_by_id_with_lock(_our_component_id);
×
1006
    }
1007
    return *_default_server_component->_impl;
1,071✔
1008
}
1009

1010
void MavsdkImpl::send_heartbeats()
230✔
1011
{
1012
    std::lock_guard lock(_server_components_mutex);
230✔
1013

1014
    for (auto& it : _server_components) {
464✔
1015
        if (it.second != nullptr) {
233✔
1016
            it.second->_impl->send_heartbeat();
230✔
1017
        }
1018
    }
1019
}
230✔
1020

1021
void MavsdkImpl::intercept_incoming_messages_async(std::function<bool(mavlink_message_t&)> callback)
22✔
1022
{
1023
    std::lock_guard<std::mutex> lock(_intercept_callbacks_mutex);
22✔
1024
    _intercept_incoming_messages_callback = callback;
22✔
1025
}
22✔
1026

1027
void MavsdkImpl::intercept_outgoing_messages_async(std::function<bool(mavlink_message_t&)> callback)
14✔
1028
{
1029
    std::lock_guard<std::mutex> lock(_intercept_callbacks_mutex);
14✔
1030
    _intercept_outgoing_messages_callback = callback;
14✔
1031
}
14✔
1032

1033
Mavsdk::ConnectionErrorHandle
1034
MavsdkImpl::subscribe_connection_errors(Mavsdk::ConnectionErrorCallback callback)
×
1035
{
NEW
1036
    std::lock_guard lock(_mutex);
×
1037

1038
    const auto handle = _connections_errors_subscriptions.subscribe(callback);
×
1039

1040
    return handle;
×
1041
}
×
1042

1043
void MavsdkImpl::unsubscribe_connection_errors(Mavsdk::ConnectionErrorHandle handle)
×
1044
{
NEW
1045
    std::lock_guard lock(_mutex);
×
1046
    _connections_errors_subscriptions.unsubscribe(handle);
×
1047
}
×
1048

1049
uint8_t MavsdkImpl::get_target_system_id(const mavlink_message_t& message)
2,093✔
1050
{
1051
    // Checks whether connection knows target system ID by extracting target system if set.
1052
    const mavlink_msg_entry_t* meta = mavlink_get_msg_entry(message.msgid);
2,093✔
1053

1054
    if (meta == nullptr || !(meta->flags & MAV_MSG_ENTRY_FLAG_HAVE_TARGET_SYSTEM)) {
2,093✔
1055
        return 0;
371✔
1056
    }
1057

1058
    // Don't look at the target system offset if it is outside the payload length.
1059
    // This can happen if the fields are trimmed.
1060
    if (meta->target_system_ofs >= message.len) {
1,722✔
1061
        return 0;
5✔
1062
    }
1063

1064
    return (_MAV_PAYLOAD(&message))[meta->target_system_ofs];
1,717✔
1065
}
1066

1067
uint8_t MavsdkImpl::get_target_component_id(const mavlink_message_t& message)
×
1068
{
1069
    // Checks whether connection knows target system ID by extracting target system if set.
1070
    const mavlink_msg_entry_t* meta = mavlink_get_msg_entry(message.msgid);
×
1071

1072
    if (meta == nullptr || !(meta->flags & MAV_MSG_ENTRY_FLAG_HAVE_TARGET_COMPONENT)) {
×
1073
        return 0;
×
1074
    }
1075

1076
    // Don't look at the target component offset if it is outside the payload length.
1077
    // This can happen if the fields are trimmed.
1078
    if (meta->target_component_ofs >= message.len) {
×
1079
        return 0;
×
1080
    }
1081

1082
    return (_MAV_PAYLOAD(&message))[meta->target_component_ofs];
×
1083
}
1084

1085
Sender& MavsdkImpl::sender()
×
1086
{
NEW
1087
    std::lock_guard lock(_server_components_mutex);
×
NEW
1088
    return default_server_component_with_lock().sender();
×
UNCOV
1089
}
×
1090

1091
} // namespace mavsdk
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc